qless: Lua-Bindung für Qless (Warteschlangen- / Pipeline-Management) für nginx-module-lua / Redis
Installation
Wenn Sie noch kein RPM-Repository-Abonnement eingerichtet haben, melden Sie sich an. Dann können Sie mit den folgenden Schritten fortfahren.
CentOS/RHEL 7 oder Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-qless
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-qless
Um diese Lua-Bibliothek mit NGINX zu verwenden, stellen Sie sicher, dass nginx-module-lua installiert ist.
Dieses Dokument beschreibt lua-resty-qless v0.12, das am 08. Juli 2022 veröffentlicht wurde.
lua-resty-qless ist eine Bindung zu qless-core von Moz - einem leistungsstarken, auf Redis basierenden Job-Warteschlangensystem, inspiriert von resque, jedoch als Sammlung von Lua-Skripten für Redis implementiert.
Diese Bindung bietet eine vollständige Implementierung von Qless über ein Lua-Skript, das in OpenResty / lua-nginx-module ausgeführt wird, einschließlich Worker, die während der init_worker_by_lua-Phase gestartet werden können.
Im Wesentlichen können Sie mit diesem Modul und einer modernen Redis-Instanz Ihren OpenResty-Server in ein recht anspruchsvolles, aber leichtgewichtiges Job-Warteschlangensystem verwandeln, das auch mit der Referenzimplementierung in Ruby, Qless, kompatibel ist.
Hinweis: Dieses Modul ist nicht dafür ausgelegt, in einer reinen Lua-Umgebung zu arbeiten.
Philosophie und Nomenklatur
Ein job ist eine Arbeitseinheit, die durch eine Job-ID oder jid identifiziert wird. Eine queue kann mehrere Jobs enthalten, die zu einem bestimmten Zeitpunkt ausgeführt werden sollen, mehrere Jobs, die auf die Ausführung warten, und Jobs, die derzeit ausgeführt werden. Ein worker ist ein Prozess auf einem Host, der eindeutig identifiziert wird, der Jobs aus der Warteschlange anfordert, einen mit diesem Job verbundenen Prozess ausführt und ihn dann als abgeschlossen markiert. Wenn er abgeschlossen ist, kann er in eine andere Warteschlange gelegt werden.
Jobs können sich nur in einer Warteschlange gleichzeitig befinden. Diese Warteschlange ist die, in die sie zuletzt gelegt wurden. Wenn ein Worker also an einem Job arbeitet und Sie ihn verschieben, wird die Anfrage des Workers zur Fertigstellung des Jobs ignoriert.
Ein Job kann canceled werden, was bedeutet, dass er im Nichts verschwindet und wir ihm nie wieder Beachtung schenken werden. Ein Job kann dropped werden, was passiert, wenn ein Worker es versäumt, rechtzeitig ein Heartbeat zu senden oder den Job abzuschließen, oder ein Job kann failed werden, was passiert, wenn ein Host einen systematisch problematischen Zustand des Jobs erkennt. Ein Worker sollte einen Job nur dann als fehlgeschlagen markieren, wenn der Fehler wahrscheinlich nicht vorübergehend ist; andernfalls sollte dieser Worker ihn einfach fallen lassen und das System ihn zurückfordern lassen.
Funktionen
- Jobs fallen nicht einfach weg Manchmal fallen Jobs von den Workern. Qless nimmt sie automatisch wieder auf und gibt sie einem anderen Worker.
- Tagging / Tracking Einige Jobs sind interessanter als andere. Verfolgen Sie diese Jobs, um Updates über ihren Fortschritt zu erhalten.
- Job-Abhängigkeiten Ein Job muss möglicherweise warten, bis ein anderer Job abgeschlossen ist.
- Statistiken Qless führt automatisch Statistiken darüber, wie lange Jobs warten, um verarbeitet zu werden, und wie lange sie zur Verarbeitung benötigen. Derzeit verfolgen wir die Anzahl, den Mittelwert, die Standardabweichung und ein Histogramm dieser Zeiten.
- Jobdaten werden vorübergehend gespeichert Jobinformationen bleiben für eine konfigurierbare Zeit erhalten, sodass Sie auf die Historie, Daten usw. eines Jobs zurückblicken können.
- Priorität Jobs mit derselben Priorität werden in der Reihenfolge, in der sie eingefügt wurden, bearbeitet; eine höhere Priorität bedeutet, dass der Job schneller bearbeitet wird.
- Wiederholungslogik Jeder Job hat eine Anzahl von Wiederholungen, die mit ihm verbunden sind, die erneuert werden, wenn er in eine neue Warteschlange gelegt oder abgeschlossen wird. Wenn ein Job wiederholt fallen gelassen wird, wird davon ausgegangen, dass er problematisch ist, und wird automatisch als fehlgeschlagen markiert.
- Web-App lua-resty-qless-web gibt Ihnen Einblick und Kontrolle über bestimmte betriebliche Probleme.
- Geplante Arbeiten Bis ein Job eine festgelegte Verzögerung (Standardwert 0) abwartet, können Jobs von Workern nicht bearbeitet werden.
- Wiederkehrende Jobs Planung ist gut und schön, aber wir unterstützen auch Jobs, die regelmäßig wiederholt werden müssen.
- Benachrichtigungen Verfolgte Jobs geben Ereignisse auf Pub/Sub-Kanälen aus, während sie abgeschlossen, fehlgeschlagen, gesetzt, bearbeitet usw. werden. Verwenden Sie diese Ereignisse, um über den Fortschritt von Jobs, die Sie interessieren, benachrichtigt zu werden.
Verbindung
Zuerst müssen Sie resty.qless einbinden und einen Client erstellen, indem Sie Ihre Redis-Verbindungsdetails angeben.
local qless = require("resty.qless").new({
host = "127.0.0.1",
port = 6379,
})
Die an new übergebenen Parameter werden an lua-resty-redis-connector weitergeleitet. Bitte überprüfen Sie die dortige Dokumentation für Verbindungsoptionen, einschließlich der Verwendung von Redis Sentinel usw.
Darüber hinaus, wenn Ihre Anwendung eine Redis-Verbindung hat, die Sie wiederverwenden möchten, gibt es zwei Möglichkeiten, dies zu integrieren:
1) Verwendung einer bereits etablierten Verbindung direkt
local qless = require("resty.qless").new({
redis_client = my_redis,
})
2) Bereitstellung von Rückruffunktionen zum Herstellen und Schließen der Verbindung
local qless = require("resty.qless").new({
get_redis_client = my_connection_callback,
close_redis_client = my_close_callback,
})
Wenn Sie mit Qless fertig sind, sollten Sie qless:set_keepalive() aufrufen, was versuchen wird, Redis wieder in den Keepalive-Pool zu setzen, entweder unter Verwendung der von Ihnen direkt bereitgestellten Einstellungen oder über Parameter, die an lua-resty-redis-connector gesendet werden, oder indem Sie Ihren close_redis_client-Rückruf aufrufen.
Jobs in die Warteschlange stellen
Jobs selbst sind Module, die über require ladbar sein müssen und eine perform-Funktion bereitstellen, die ein einzelnes job-Argument akzeptiert.
-- my/test/job.lua (die "klass" des Jobs wird zu "my.test.job")
local _M = {}
function _M.perform(job)
-- job ist eine Instanz von Qless_Job und bietet Zugriff auf
-- job.data (das eine Lua-Tabelle ist), eine Möglichkeit, den
-- Job abzubrechen (job:cancel()), und mehr.
-- geben Sie "nil, err_type, err_msg" zurück, um einen unerwarteten Fehler anzuzeigen
if not job.data then
return nil, "job-error", "Daten fehlen"
end
-- Arbeit erledigen
end
return _M
Jetzt können Sie auf eine Warteschlange zugreifen und einen Job in diese Warteschlange einfügen.
-- Dies verweist auf eine neue oder vorhandene Warteschlange 'testing'
local queue = qless.queues['testing']
-- Lassen Sie uns einen Job mit einigen Daten hinzufügen. Gibt die Job-ID zurück
local jid = queue:put("my.test.job", { hello = "howdy" })
-- = "0c53b0404c56012f69fa482a1427ab7d"
-- Jetzt können wir nach einem Job fragen
local job = queue:pop()
-- Und wir können die damit verbundene Arbeit erledigen!
job:perform()
Die Jobdaten müssen eine Tabelle sein (die intern in JSON serialisiert wird).
Der von queue:put() zurückgegebene Wert ist die Job-ID oder jid. Jeder Qless-Job hat eine eindeutige jid, und sie bietet eine Möglichkeit, mit einem bestehenden Job zu interagieren:
-- Finden Sie einen vorhandenen Job anhand seiner jid
local job = qless.jobs:get(jid)
-- Abfragen, um Details darüber herauszufinden:
job.klass -- die Klasse des Jobs
job.queue -- die Warteschlange, in der sich der Job befindet
job.data -- die Daten für den Job
job.history -- die Historie dessen, was bisher mit dem Job passiert ist
job.dependencies -- die jids anderer Jobs, die abgeschlossen sein müssen, bevor dieser hier
job.dependents -- die jids anderer Jobs, die von diesem abhängen
job.priority -- die Priorität dieses Jobs
job.tags -- Tabelle der Tags für diesen Job
job.original_retries -- die Anzahl der Male, die der Job wiederholt werden darf
job.retries_left -- die Anzahl der verbleibenden Wiederholungen
-- Sie können den Job auch auf verschiedene Arten ändern:
job:requeue("some_other_queue") -- in eine neue Warteschlange verschieben
job:cancel() -- den Job abbrechen
job:tag("foo") -- ein Tag hinzufügen
job:untag("foo") -- ein Tag entfernen
Worker ausführen
Traditionell bot Qless ein Forking-Ruby-Worker-Skript, inspiriert von Resque.
In lua-resty-qless nutzen wir die Phase init_lua_by_worker und die ngx.timer.at-API, um Worker in unabhängigen "Light Threads" auszuführen, die über Ihre Worker-Prozesse skalierbar sind.
Sie können viele Light Threads gleichzeitig pro Worker-Prozess ausführen, die Nginx für Sie plant.
init_worker_by_lua '
local resty_qless_worker = require "resty.qless.worker"
local worker = resty_qless_worker.new(redis_params)
worker:start({
interval = 1,
concurrency = 4,
reserver = "ordered",
queues = { "my_queue", "my_other_queue" },
})
';
Worker unterstützen drei Strategien (Reserver), um in welcher Reihenfolge Jobs aus den Warteschlangen entnommen werden: ordered, round-robin und shuffled round-robin.
Der geordnete Reserver wird weiterhin Jobs von der ersten Warteschlange entnehmen, bis sie leer ist, bevor er versucht, Jobs von der zweiten Warteschlange zu entnehmen. Der Round-Robin-Reserver wird einen Job von der ersten Warteschlange entnehmen, dann von der zweiten Warteschlange und so weiter. Shuffled stellt einfach sicher, dass die Auswahl im Round-Robin unvorhersehbar ist.
Sie könnten auch leicht Ihre eigene implementieren. Folgen Sie den anderen Reserven als Leitfaden und stellen Sie sicher, dass Ihre "requireable" ist mit require "resty.qless.reserver.myreserver".
Middleware
Worker unterstützen auch Middleware, die verwendet werden kann, um Logik rund um die Verarbeitung eines einzelnen Jobs einzufügen. Dies kann nützlich sein, wenn Sie beispielsweise eine Datenbankverbindung wiederherstellen müssen.
Um dies zu tun, setzen Sie die middleware des Workers auf eine Funktion und rufen coroutine.yield dort auf, wo Sie möchten, dass der Job ausgeführt wird.
local worker = resty_qless_worker.new(redis_params)
worker.middleware = function(job)
-- Vor der Job-Arbeit erledigen
coroutine.yield()
-- Nach der Job-Arbeit erledigen
end
worker:start({ queues = "my_queue" })
Job-Abhängigkeiten
Angenommen, Sie haben einen Job, der von einem anderen abhängt, aber die Aufgabenbeschreibungen sind grundlegend unterschiedlich. Sie müssen einen Truthahn braten, und Sie müssen Füllung machen, aber Sie können den Truthahn nicht machen, bis die Füllung gemacht ist:
local queue = qless.queues['cook']
local stuffing_jid = queue:put("jobs.make.stuffing",
{ lots = "of butter" }
)
local turkey_jid = queue:put("jobs.make.turkey",
{ with = "stuffing" },
{ depends = stuffing_jid }
)
Wenn der Füllungsjob abgeschlossen ist, wird der Truthahnjob entsperrt und kann bearbeitet werden.
Priorität
Einige Jobs müssen schneller bearbeitet werden als andere. Ob es sich um ein Trouble Ticket oder Debugging handelt, Sie können dies ziemlich einfach tun, wenn Sie einen Job in eine Warteschlange einfügen:
queue:put("jobs.test", { foo = "bar" }, { priority = 10 })
Was passiert, wenn Sie die Priorität eines Jobs anpassen möchten, während er noch in einer Warteschlange wartet?
local job = qless.jobs:get("0c53b0404c56012f69fa482a1427ab7d")
job.priority = 10
-- Jetzt wird dieser Job vor jedem Job mit niedrigerer Priorität bearbeitet
Hinweis: Das Setzen des Prioritätsfeldes oben ist alles, was Sie tun müssen, dank der Lua-Metamethoden, die aufgerufen werden, um Redis zu aktualisieren. Dies mag ein wenig "automagisch" aussehen, aber die Absicht ist es, die API-Designkompatibilität mit dem Ruby-Client so weit wie möglich zu erhalten.
Geplante Jobs
Wenn Sie nicht möchten, dass ein Job sofort, sondern zu einem späteren Zeitpunkt ausgeführt wird, können Sie eine Verzögerung angeben:
-- Führen Sie den Job mindestens 10 Minuten ab jetzt aus
queue:put("jobs.test", { foo = "bar" }, { delay = 600 })
Dies garantiert nicht, dass der Job genau nach 10 Minuten ausgeführt wird. Sie können dies erreichen, indem Sie die Priorität des Jobs ändern, sodass er, sobald 10 Minuten vergangen sind, vor Jobs mit geringerer Priorität gesetzt wird:
-- Führen Sie den Job in 10 Minuten aus
queue:put("jobs.test",
{ foo = "bar" },
{ delay = 600, priority = 100 }
)
Wiederkehrende Jobs
Manchmal reicht es nicht aus, einfach einen Job zu planen, sondern Sie möchten Jobs regelmäßig ausführen. Insbesondere haben Sie möglicherweise eine Batch-Operation, die einmal pro Stunde ausgeführt werden muss, und es ist Ihnen egal, welcher Worker sie ausführt. Wiederkehrende Jobs werden ähnlich wie andere Jobs spezifiziert:
-- Führen Sie den Job jede Stunde aus
local recurring_jid = queue:recur("jobs.test", { widget = "warble" }, 3600)
-- = 22ac75008a8011e182b24cf9ab3a8f3b
Sie können sogar auf sie zugreifen, ähnlich wie Sie es mit normalen Jobs tun würden:
local job = qless.jobs:get("22ac75008a8011e182b24cf9ab3a8f3b")
Die Änderung des Intervalls, in dem er nachträglich ausgeführt wird, ist trivial:
-- Ich denke, ich brauche ihn nur alle zwei Stunden auszuführen
job.interval = 7200
Wenn Sie möchten, dass er jede Stunde zur vollen Stunde ausgeführt wird, es aber gerade 2:37 Uhr ist, können Sie einen Offset angeben, der angibt, wie lange er warten soll, bevor der erste Job entnommen wird:
-- 23 Minuten warten, bis er ausgeführt werden soll
queue:recur("jobs.test",
{ howdy = "hello" },
3600,
{ offset = (23 * 60) }
)
Wiederkehrende Jobs haben ebenfalls Priorität, eine konfigurierbare Anzahl von Wiederholungen und Tags. Diese Einstellungen gelten nicht für die wiederkehrenden Jobs, sondern für die Jobs, die sie erzeugen. Im Fall, dass mehr als ein Intervall vergeht, bevor ein Worker versucht, den Job zu entnehmen, werden mehr als ein Job erstellt. Der Gedanke ist, dass, während es vollständig clientgesteuert ist, der Zustand nicht davon abhängen sollte, wie oft Worker versuchen, Jobs zu entnehmen.
-- Wiederholen Sie jede Minute
queue:recur("jobs.test", { lots = "of jobs" }, 60)
-- 5 Minuten warten
local jobs = queue:pop(10)
ngx.say(#jobs, " Jobs wurden entnommen")
-- = 5 Jobs wurden entnommen
Konfigurationsoptionen
Sie können globale (im Kontext derselben Redis-Instanz) Konfigurationen abrufen und festlegen, um das Verhalten für Heartbeating usw. zu ändern. Es gibt nicht viele Konfigurationsoptionen, aber eine wichtige ist, wie lange Jobdaten aufbewahrt werden. Jobdaten laufen ab, nachdem sie für jobs-history Sekunden abgeschlossen wurden, sind jedoch auf die letzten jobs-history-count abgeschlossenen Jobs beschränkt. Diese sind standardmäßig auf 50.000 Jobs und 30 Tage eingestellt, aber je nach Volumen können sich Ihre Anforderungen ändern. Um nur die letzten 500 Jobs für bis zu 7 Tage aufzubewahren:
qless:config_set("jobs-history", 7 * 86400)
qless:config_get("jobs-history-count", 500)
Tagging / Tracking
In qless bedeutet 'Tracking', einen Job als wichtig zu kennzeichnen. Verfolgte Jobs geben abonnierbare Ereignisse aus, während sie Fortschritte machen (mehr dazu weiter unten).
local job = qless.jobs:get("b1882e009a3d11e192d0b174d751779d")
job:track()
Jobs können mit Strings getaggt werden, die für schnelle Suchen indiziert sind. Zum Beispiel könnten Jobs mit Kundenkonten oder einem anderen Schlüssel verknüpft sein, der für Ihr Projekt sinnvoll ist.
queue:put("jobs.test", {},
{ tags = { "12345", "foo", "bar" } }
)
Dies macht sie durch die Ruby / Sinatra-Weboberfläche oder aus dem Code heraus durchsuchbar:
local jids = qless.jobs:tagged("foo")
Sie können auch nach Belieben Tags hinzufügen oder entfernen:
local job = qless.jobs:get('b1882e009a3d11e192d0b174d751779d')
job:tag("howdy", "hello")
job:untag("foo", "bar")
Benachrichtigungen
Verfolgte Jobs geben Ereignisse auf bestimmten Pub/Sub-Kanälen aus, wenn Dinge mit ihnen passieren. Ob es sich um das Entnehmen aus einer Warteschlange, das Abschließen durch einen Worker usw. handelt.
Diejenigen, die mit Redis Pub/Sub vertraut sind, werden feststellen, dass eine Redis-Verbindung nur für Pub/Sub-Befehle verwendet werden kann, sobald sie auf Empfang geschaltet ist. Aus diesem Grund werden die Ereignismodule unabhängig von den Redis-Verbindungsparametern übergeben.
local events = qless.events(redis_params)
events:listen({ "canceled", "failed" }, function(channel, jid)
ngx.log(ngx.INFO, jid, ": ", channel)
-- protokolliert "b1882e009a3d11e192d0b174d751779d: canceled" usw.
end
Sie können auch den "log"-Kanal abhören, der eine JSON-Struktur aller protokollierten Ereignisse bereitstellt.
local events = qless.events(redis_params)
events:listen({ "log" }, function(channel, message)
local message = cjson.decode(message)
ngx.log(ngx.INFO, message.event, " ", message.jid)
end
Heartbeating
Wenn ein Worker einen Job erhält, wird ihm ein exklusives Lock für diesen Job gegeben. Das bedeutet, dass dieser Job keinem anderen Worker gegeben wird, solange der Worker mit Fortschritten beim Job eincheckt. Standardmäßig müssen Jobs entweder alle 60 Sekunden Fortschritte melden oder abgeschlossen werden, aber das ist eine konfigurierbare Option. Für längere Jobs macht das möglicherweise keinen Sinn.
-- Hurra! Wir haben einen Arbeitsauftrag!
local job = queue:pop()
-- Wie lange habe ich Zeit, bis ich einchecken muss?
job:ttl()
-- = 59
-- Hey! Ich arbeite noch daran!
job:heartbeat()
-- = 1331326141.0
-- Okay, ich habe noch etwas Zeit. Oh! Jetzt bin ich fertig!
job:complete()
Wenn Sie das Heartbeat-Intervall in allen Warteschlangen erhöhen möchten,
-- Jetzt haben Jobs 10 Minuten Zeit, um einzuchecken
qless:set_config("heartbeat", 600)
-- Aber die Warteschlange "testing" hat nicht so viel Zeit.
qless.queues["testing"].heartbeat = 300
Bei der Auswahl eines Heartbeat-Intervalls beachten Sie, dass dies die Zeit ist, die vergehen kann, bevor qless erkennt, ob ein Job fallen gelassen wurde. Gleichzeitig möchten Sie qless nicht mit Heartbeating alle 10 Sekunden belasten, wenn Ihr Job voraussichtlich mehrere Stunden dauern wird.
Ein Idiom, das Sie für lang laufende Jobs verwenden sollten, die ihren Fortschritt regelmäßig einchecken möchten:
-- Warten Sie, bis wir 5 Minuten auf dem Heartbeat übrig haben, und wenn wir feststellen, dass
-- wir unser Lock auf einen Job verloren haben, dann ehrenvoll aussteigen
if job:ttl() < 300 and not job:heartbeat() then
-- beenden
end
Statistiken
Eine schöne Funktion von Qless ist, dass Sie Statistiken über die Nutzung abrufen können. Statistiken werden nach Tag aggregiert, sodass Sie, wenn Sie Statistiken über eine Warteschlange wünschen, angeben müssen, über welche Warteschlange und an welchem Tag Sie sprechen. Standardmäßig erhalten Sie nur die Statistiken für heute. Diese Statistiken enthalten Informationen über die durchschnittliche Wartezeit für Jobs, die Standardabweichung und ein Histogramm. Dieselben Daten werden auch für den Abschluss von Jobs bereitgestellt:
-- Wie läuft es heute?
local stats = queue:stats()
-- = { 'run' = { 'mean' = ..., }, 'wait' = {'mean' = ..., } }
Zeit
Es ist wichtig zu beachten, dass Redis keinen Zugriff auf die Systemzeit erlaubt, wenn Sie Daten manipulieren möchten (was unsere Skripte tun). Und doch haben wir Heartbeating. Das bedeutet, dass die Clients tatsächlich die aktuelle Zeit senden, wenn sie die meisten Anfragen stellen, und um der Konsistenz willen bedeutet das, dass Ihre Worker relativ synchronisiert sein müssen. Das bedeutet nicht bis zu den Zehntelsekunden, aber wenn Sie merkliche Uhrdrift erleben, sollten Sie NTP untersuchen.
Sicherstellen der Job-Eindeutigkeit
Wie oben erwähnt, werden Jobs eindeutig durch eine ID identifiziert - ihre jid. Qless generiert eine UUID für jeden in die Warteschlange gestellten Job oder Sie können eine manuell angeben:
queue:put("jobs.test", { hello = 'howdy' }, { jid = 'my-job-jid' })
Dies kann nützlich sein, wenn Sie die Eindeutigkeit eines Jobs sicherstellen möchten: Erstellen Sie einfach eine jid, die eine Funktion der Klasse und der Daten des Jobs ist, dann wird garantiert, dass Qless nicht mehrere Jobs mit derselben Klasse und denselben Daten hat.
GitHub
Sie finden möglicherweise zusätzliche Konfigurationstipps und Dokumentationen für dieses Modul im GitHub-Repository für nginx-module-qless.