Aller au contenu

qless: Liaison Lua à Qless (Gestion des files d'attente / Pipelines) pour nginx-module-lua / Redis

Installation

Si vous n'avez pas encore configuré l'abonnement au dépôt RPM, inscrivez-vous. Ensuite, vous pouvez procéder avec les étapes suivantes.

CentOS/RHEL 7 ou Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-qless

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-qless

Pour utiliser cette bibliothèque Lua avec NGINX, assurez-vous que nginx-module-lua est installé.

Ce document décrit lua-resty-qless v0.12 publié le 08 juillet 2022.


lua-resty-qless est une liaison à qless-core de Moz - un puissant système de mise en file d'attente de tâches basé sur Redis inspiré par resque, mais implémenté comme une collection de scripts Lua pour Redis.

Cette liaison fournit une implémentation complète de Qless via un script Lua s'exécutant dans OpenResty / lua-nginx-module, y compris des travailleurs qui peuvent être démarrés pendant la phase init_worker_by_lua.

Essentiellement, avec ce module et une instance Redis moderne, vous pouvez transformer votre serveur OpenResty en un système de mise en file d'attente de tâches assez sophistiqué mais léger, qui est également compatible avec l'implémentation Ruby de référence, Qless.

Remarque : Ce module n'est pas conçu pour fonctionner dans un environnement Lua pur.

Philosophie et Nomenclature

Un job est une unité de travail identifiée par un identifiant de tâche ou jid. Une queue peut contenir plusieurs tâches qui sont programmées pour être exécutées à un certain moment, plusieurs tâches qui attendent d'être exécutées, et des tâches qui sont actuellement en cours d'exécution. Un worker est un processus sur un hôte, identifié de manière unique, qui demande des tâches à la file d'attente, effectue un processus associé à cette tâche, puis la marque comme complète. Lorsqu'elle est terminée, elle peut être mise dans une autre file d'attente.

Les tâches ne peuvent être dans qu'une seule file d'attente à la fois. Cette file d'attente est celle dans laquelle elles ont été mises pour la dernière fois. Donc, si un travailleur travaille sur une tâche, et que vous la déplacez, la demande du travailleur pour terminer la tâche sera ignorée.

Une tâche peut être canceled, ce qui signifie qu'elle disparaît dans l'éther, et nous n'y prêterons plus jamais attention. Une tâche peut être dropped, ce qui se produit lorsqu'un travailleur échoue à envoyer un signal de vie ou à terminer la tâche dans un délai raisonnable, ou une tâche peut être failed, ce qui se produit lorsqu'un hôte reconnaît un état systématiquement problématique concernant la tâche. Un travailleur ne doit échouer à une tâche que si l'erreur est probablement pas transitoire ; sinon, ce travailleur devrait simplement l'abandonner et laisser le système la récupérer.

Fonctionnalités

  1. Les tâches ne tombent pas à terre Parfois, les travailleurs abandonnent des tâches. Qless les récupère automatiquement et les confie à un autre travailleur.
  2. Tagging / Suivi Certaines tâches sont plus intéressantes que d'autres. Suivez ces tâches pour obtenir des mises à jour sur leur progression.
  3. Dépendances de tâches Une tâche peut avoir besoin d'attendre qu'une autre tâche soit terminée.
  4. Statistiques Qless conserve automatiquement des statistiques sur le temps d'attente des tâches pour être traitées et le temps qu'elles prennent à être traitées. Actuellement, nous suivons le nombre, la moyenne, l'écart type et un histogramme de ces temps.
  5. Les données de tâche sont stockées temporairement Les informations sur les tâches restent disponibles pendant une durée configurable afin que vous puissiez toujours revenir sur l'historique, les données, etc. d'une tâche.
  6. Priorité Les tâches ayant la même priorité sont extraites dans l'ordre où elles ont été insérées ; une priorité plus élevée signifie qu'elle est extraite plus rapidement.
  7. Logique de réessai Chaque tâche a un nombre de réessais associés, qui sont renouvelés lorsqu'elle est mise dans une nouvelle file d'attente ou terminée. Si une tâche est régulièrement abandonnée, elle est présumée problématique et est automatiquement échouée.
  8. Application Web lua-resty-qless-web vous donne visibilité et contrôle sur certains problèmes opérationnels.
  9. Travail planifié Jusqu'à ce qu'une tâche attende un délai spécifié (par défaut à 0), les tâches ne peuvent pas être extraites par les travailleurs.
  10. Tâches récurrentes La planification est bien et bonne, mais nous prenons également en charge les tâches qui doivent se reproduire périodiquement.
  11. Notifications Les tâches suivies émettent des événements sur des canaux pubsub au fur et à mesure qu'elles sont complétées, échouées, mises, extraites, etc. Utilisez ces événements pour être informé de l'avancement des tâches qui vous intéressent.

Connexion

Tout d'abord, requérez resty.qless et créez un client, en spécifiant vos détails de connexion Redis.

local qless = require("resty.qless").new({
    host = "127.0.0.1",
    port = 6379,
})

Les paramètres passés à new sont transmis à lua-resty-redis-connector. Veuillez consulter la documentation là-bas pour les options de connexion, y compris comment utiliser Redis Sentinel, etc.

De plus, si votre application a une connexion Redis que vous souhaitez réutiliser, il existe deux façons de l'intégrer :

1) En utilisant directement une connexion déjà établie

local qless = require("resty.qless").new({
    redis_client = my_redis,
})

2) En fournissant des rappels pour établir et fermer la connexion

local qless = require("resty.qless").new({
    get_redis_client = my_connection_callback,
    close_redis_client = my_close_callback,
})

Lorsque vous avez terminé avec Qless, vous devez appeler qless:set_keepalive() qui tentera de remettre Redis dans le pool de keepalive, soit en utilisant les paramètres que vous fournissez directement, soit via les paramètres envoyés à lua-resty-redis-connector, ou en appelant votre rappel close_redis_client.

Mise en file d'attente des tâches

Les tâches elles-mêmes sont des modules, qui doivent être chargés via require et fournir une fonction perform, qui accepte un seul argument job.

-- my/test/job.lua (la "classe" de la tâche devient "my.test.job")

local _M = {}

function _M.perform(job)
    -- job est une instance de Qless_Job et fournit un accès à
    -- job.data (qui est une table Lua), un moyen d'annuler la
    -- tâche (job:cancel()), et plus encore.

    -- retournez "nil, err_type, err_msg" pour indiquer un échec inattendu

    if not job.data then
        return nil, "job-error", "data missing"
    end

    -- Effectuer le travail
end

return _M

Maintenant, vous pouvez accéder à une file d'attente et ajouter une tâche à cette file d'attente.

-- Cela fait référence à une nouvelle file d'attente ou à une file d'attente existante 'testing'
local queue = qless.queues['testing']

-- Ajoutons une tâche, avec des données. Retourne l'ID de la tâche
local jid = queue:put("my.test.job", { hello = "howdy" })
-- = "0c53b0404c56012f69fa482a1427ab7d"

-- Maintenant, nous pouvons demander une tâche
local job = queue:pop()

-- Et nous pouvons effectuer le travail associé !
job:perform()

Les données de la tâche doivent être une table (qui est sérialisée en JSON en interne).

La valeur retournée par queue:put() est l'ID de la tâche, ou jid. Chaque tâche Qless a un jid unique, et il fournit un moyen d'interagir avec une tâche existante :

-- trouver une tâche existante par son jid
local job = qless.jobs:get(jid)

-- Interrogez-la pour connaître les détails :
job.klass -- la classe de la tâche
job.queue -- la file d'attente dans laquelle se trouve la tâche
job.data  -- les données de la tâche
job.history -- l'historique de ce qui est arrivé à la tâche jusqu'à présent
job.dependencies -- les jids d'autres tâches qui doivent être terminées avant celle-ci
job.dependents -- les jids d'autres tâches qui dépendent de celle-ci
job.priority -- la priorité de cette tâche
job.tags -- table des tags pour cette tâche
job.original_retries -- le nombre de fois que la tâche peut être réessayée
job.retries_left -- le nombre de réessais restants

-- Vous pouvez également modifier la tâche de diverses manières :
job:requeue("some_other_queue") -- déplacez-la vers une nouvelle file d'attente
job:cancel() -- annulez la tâche
job:tag("foo") -- ajoutez un tag
job:untag("foo") -- supprimez un tag

Exécution des travailleurs

Traditionnellement, Qless offrait un script de travailleur Ruby forké inspiré par Resque.

Dans lua-resty-qless, nous tirons parti de la phase init_lua_by_worker et de l'API ngx.timer.at pour exécuter des travailleurs dans des "threads légers" indépendants, évolutifs à travers vos processus de travailleurs.

Vous pouvez exécuter de nombreux threads légers simultanément par processus de travailleur, que Nginx programmera pour vous.

init_worker_by_lua '
    local resty_qless_worker = require "resty.qless.worker"

    local worker = resty_qless_worker.new(redis_params)

    worker:start({
        interval = 1,
        concurrency = 4,
        reserver = "ordered",
        queues = { "my_queue", "my_other_queue" },
    })
';

Les travailleurs prennent en charge trois stratégies (réservateurs) pour l'ordre dans lequel extraire les tâches des files d'attente : ordered, round-robin et shuffled round-robin.

Le réservateur ordonné continuera à extraire des tâches de la première file d'attente jusqu'à ce qu'elle soit vide, avant d'essayer d'extraire des tâches de la deuxième file d'attente. Le réservateur round-robin extraira une tâche de la première file d'attente, puis de la deuxième file d'attente, et ainsi de suite. Shuffled garantit simplement que la sélection round-robin est imprévisible.

Vous pourriez également facilement implémenter le vôtre. Suivez les autres réservateurs comme guide, et assurez-vous que le vôtre est "requérable" avec require "resty.qless.reserver.myreserver".

Middleware

Les travailleurs prennent également en charge le middleware qui peut être utilisé pour injecter une logique autour du traitement d'une seule tâche. Cela peut être utile, par exemple, lorsque vous devez rétablir une connexion à la base de données.

Pour ce faire, vous définissez le middleware du travailleur sur une fonction, et appelez coroutine.yield là où vous souhaitez que la tâche soit effectuée.

local worker = resty_qless_worker.new(redis_params)

worker.middleware = function(job)
    -- Effectuer le travail avant la tâche
    coroutine.yield()
    -- Effectuer le travail après la tâche
end

worker:start({ queues = "my_queue" })

Dépendances de tâches

Disons que vous avez une tâche qui dépend d'une autre, mais que les définitions de tâches sont fondamentalement différentes. Vous devez cuire une dinde, et vous devez faire de la farce, mais vous ne pouvez pas cuire la dinde tant que la farce n'est pas faite :

local queue = qless.queues['cook']
local stuffing_jid = queue:put("jobs.make.stuffing", 
  { lots = "of butter" }
)
local turkey_jid  = queue:put("jobs.make.turkey", 
  { with = "stuffing" }, 
  { depends = stuffing_jid }
)

Lorsque la tâche de farce est terminée, la tâche de dinde est déverrouillée et libre d'être traitée.

Priorité

Certaines tâches doivent être extraites plus tôt que d'autres. Que ce soit un ticket de problème ou du débogage, vous pouvez le faire assez facilement lorsque vous mettez une tâche dans une file d'attente :

queue:put("jobs.test", { foo = "bar" }, { priority = 10 })

Que se passe-t-il lorsque vous souhaitez ajuster la priorité d'une tâche alors qu'elle attend toujours dans une file d'attente ?

local job = qless.jobs:get("0c53b0404c56012f69fa482a1427ab7d")
job.priority = 10
-- Maintenant, cela sera extrait avant toute tâche de priorité inférieure

Remarque : Définir le champ de priorité ci-dessus est tout ce que vous devez faire, grâce aux métaméthodes Lua qui sont invoquées pour mettre à jour Redis. Cela peut sembler un peu "auto-magique", mais l'intention est de conserver la compatibilité de conception de l'API avec le client Ruby autant que possible.

Tâches planifiées

Si vous ne souhaitez pas qu'une tâche soit exécutée immédiatement mais à un moment donné dans le futur, vous pouvez spécifier un délai :

-- Exécuter au moins dans 10 minutes
queue:put("jobs.test", { foo = "bar" }, { delay = 600 })

Cela ne garantit pas que la tâche sera exécutée exactement dans 10 minutes. Vous pouvez accomplir cela en changeant la priorité de la tâche afin qu'une fois les 10 minutes écoulées, elle soit mise avant les tâches de moindre priorité :

-- Exécuter dans 10 minutes
queue:put("jobs.test", 
  { foo = "bar" }, 
  { delay = 600, priority = 100 }
)

Tâches récurrentes

Parfois, il ne suffit pas simplement de planifier une tâche, mais vous souhaitez exécuter des tâches régulièrement. En particulier, peut-être que vous avez une opération par lots qui doit être exécutée une fois par heure et que vous ne vous souciez pas de quel travailleur l'exécute. Les tâches récurrentes sont spécifiées de la même manière que les autres tâches :

-- Exécuter toutes les heures
local recurring_jid = queue:recur("jobs.test", { widget = "warble" }, 3600)
-- = 22ac75008a8011e182b24cf9ab3a8f3b

Vous pouvez même y accéder de la même manière que vous le feriez pour des tâches normales :

local job = qless.jobs:get("22ac75008a8011e182b24cf9ab3a8f3b")

Changer l'intervalle auquel elle s'exécute après coup est trivial :

-- Je pense que je n'ai besoin de l'exécuter qu'une fois toutes les deux heures
job.interval = 7200

Si vous voulez qu'elle s'exécute chaque heure à l'heure, mais qu'il est actuellement 2:37, vous pouvez spécifier un décalage qui indique combien de temps elle doit attendre avant d'extraire la première tâche :

-- 23 minutes d'attente avant qu'elle ne doive commencer
queue:recur("jobs.test", 
  { howdy = "hello" }, 
  3600,
  { offset = (23 * 60) }
)

Les tâches récurrentes ont également une priorité, un nombre configurable de réessais et des tags. Ces paramètres ne s'appliquent pas aux tâches récurrentes, mais plutôt aux tâches qu'elles engendrent. Dans le cas où plus d'un intervalle passe avant qu'un travailleur n'essaie d'extraire la tâche, plus d'une tâche est créée. L'idée est que, bien que cela soit entièrement géré par le client, l'état ne doit pas dépendre de la fréquence à laquelle les travailleurs essaient d'extraire des tâches.

-- Répéter toutes les minutes
queue:recur("jobs.test", { lots = "of jobs" }, 60)

-- Attendre 5 minutes

local jobs = queue:pop(10)
ngx.say(#jobs, " tâches ont été extraites")

-- = 5 tâches ont été extraites

Options de configuration

Vous pouvez obtenir et définir des options de configuration globales (dans le contexte de la même instance Redis) pour changer le comportement de l'envoi de signaux de vie, etc. Il n'y a pas un grand nombre d'options de configuration, mais une importante est la durée pendant laquelle les données de la tâche sont conservées. Les données de la tâche expirent après avoir été complétées pendant jobs-history secondes, mais sont limitées aux jobs-history-count dernières tâches complétées. Ces valeurs par défaut sont de 50k tâches et 30 jours, mais selon le volume, vos besoins peuvent changer. Pour ne conserver que les 500 dernières tâches pendant jusqu'à 7 jours :

qless:config_set("jobs-history", 7 * 86400)
qless:config_get("jobs-history-count", 500)

Tagging / Suivi

Dans qless, 'suivi' signifie marquer une tâche comme importante. Les tâches suivies émettent des événements abonnissables à mesure qu'elles progressent (plus d'informations à ce sujet ci-dessous).

local job = qless.jobs:get("b1882e009a3d11e192d0b174d751779d")
job:track()

Les tâches peuvent être taguées avec des chaînes qui sont indexées pour des recherches rapides. Par exemple, les tâches peuvent être associées à des comptes clients, ou à une autre clé qui a du sens pour votre projet.

queue:put("jobs.test", {}, 
  { tags = { "12345", "foo", "bar" } }
)

Cela les rend recherchables dans l'interface web Ruby / Sinatra, ou à partir du code :

local jids = qless.jobs:tagged("foo")

Vous pouvez également ajouter ou supprimer des tags à volonté :

local job = qless.jobs:get('b1882e009a3d11e192d0b174d751779d')
job:tag("howdy", "hello")
job:untag("foo", "bar")

Notifications

Les tâches suivies émettent des événements sur des canaux pubsub spécifiques à mesure que des événements leur arrivent. Que ce soit lorsqu'elles sont extraites d'une file d'attente, complétées par un travailleur, etc.

Ceux qui connaissent Redis pub/sub noteront qu'une connexion Redis ne peut être utilisée pour des commandes de type pubsub qu'une fois qu'elle est à l'écoute. Pour cette raison, le module d'événements reçoit indépendamment les paramètres de connexion Redis.

local events = qless.events(redis_params)

events:listen({ "canceled", "failed" }, function(channel, jid)
    ngx.log(ngx.INFO, jid, ": ", channel)
    -- journalise "b1882e009a3d11e192d0b174d751779d: canceled" etc.
end

Vous pouvez également écouter le canal "log", qui donne une structure JSON de tous les événements enregistrés.

local events = qless.events(redis_params)

events:listen({ "log" }, function(channel, message)
    local message = cjson.decode(message)
    ngx.log(ngx.INFO, message.event, " ", message.jid)
end

Envoi de signaux de vie

Lorsqu'un travailleur se voit attribuer une tâche, il reçoit un verrou exclusif sur cette tâche. Cela signifie que cette tâche ne sera pas donnée à un autre travailleur, tant que le travailleur vérifie ses progrès sur la tâche. Par défaut, les tâches doivent soit faire rapport de leurs progrès toutes les 60 secondes, soit les terminer, mais c'est une option configurable. Pour les tâches plus longues, cela peut ne pas avoir de sens.

-- Hourra ! Nous avons un morceau de travail !
local job = queue:pop()

-- Combien de temps avant que je doive vérifier ?
job:ttl()
-- = 59

-- Hé ! Je travaille encore dessus !
job:heartbeat()
-- = 1331326141.0

-- D'accord, j'ai un peu plus de temps. Oh ! Maintenant, j'ai fini !
job:complete()

Si vous souhaitez augmenter le signal de vie dans toutes les files d'attente,

-- Maintenant, les tâches ont 10 minutes pour faire leur rapport
qless:set_config("heartbeat", 600)

-- Mais la file d'attente de test n'a pas autant de temps.
qless.queues["testing"].heartbeat = 300

Lors du choix d'un intervalle de signal de vie, notez qu'il s'agit de la durée qui peut s'écouler avant que qless réalise qu'une tâche a été abandonnée. En même temps, vous ne voulez pas surcharger qless avec des signaux de vie toutes les 10 secondes si votre tâche est censée prendre plusieurs heures.

Un idiome que vous êtes encouragé à utiliser pour les tâches de longue durée qui souhaitent vérifier périodiquement leurs progrès :

-- Attendez jusqu'à ce qu'il nous reste 5 minutes sur le signal de vie, et si nous constatons que
-- nous avons perdu notre verrou sur une tâche, alors tombons honorablement sur notre épée
if job:ttl() < 300 and not job:heartbeat() then
  -- sortie
end

Statistiques

Une belle fonctionnalité de Qless est que vous pouvez obtenir des statistiques sur l'utilisation. Les statistiques sont agrégées par jour, donc lorsque vous voulez des statistiques sur une file d'attente, vous devez indiquer de quelle file d'attente et de quel jour vous parlez. Par défaut, vous obtenez simplement les statistiques pour aujourd'hui. Ces statistiques incluent des informations sur le temps d'attente moyen des tâches, l'écart type et un histogramme. Ces mêmes données sont également fournies pour l'achèvement des tâches :

-- Alors, comment ça se passe aujourd'hui ?
local stats = queue:stats()
-- = { 'run' = { 'mean' = ..., }, 'wait' = {'mean' = ..., } }

Temps

Il est important de noter que Redis n'autorise pas l'accès à l'heure système si vous allez effectuer des manipulations sur des données (ce que nos scripts font). Et pourtant, nous avons des signaux de vie. Cela signifie que les clients envoient en fait l'heure actuelle lors de la plupart des requêtes, et pour des raisons de cohérence, cela signifie que vos travailleurs doivent être relativement synchronisés. Cela ne signifie pas jusqu'à des dizaines de millisecondes, mais si vous constatez un décalage d'horloge appréciable, vous devriez enquêter sur NTP.

Assurer l'unicité des tâches

Comme mentionné ci-dessus, les tâches sont identifiées de manière unique par un identifiant -- leur jid. Qless générera un UUID pour chaque tâche mise en file d'attente ou vous pouvez en spécifier un manuellement :

queue:put("jobs.test", { hello = 'howdy' }, { jid = 'my-job-jid' })

Cela peut être utile lorsque vous souhaitez garantir l'unicité d'une tâche : créez simplement un jid qui est une fonction de la classe et des données de la tâche, cela garantira que Qless n'aura pas plusieurs tâches avec la même classe et les mêmes données.

GitHub

Vous pouvez trouver des conseils de configuration supplémentaires et de la documentation pour ce module dans le dépôt GitHub pour nginx-module-qless.