Saltar a contenido

qless: enlace Lua a Qless (gestión de colas / tuberías) para nginx-module-lua / Redis

Instalación

Si no has configurado la suscripción al repositorio RPM, regístrate. Luego puedes proceder con los siguientes pasos.

CentOS/RHEL 7 o Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-qless

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-qless

Para usar esta biblioteca Lua con NGINX, asegúrate de que nginx-module-lua esté instalado.

Este documento describe lua-resty-qless v0.12 lanzado el 08 de julio de 2022.


lua-resty-qless es un enlace a qless-core de Moz - un poderoso sistema de colas de trabajos basado en Redis inspirado en resque, pero implementado como una colección de scripts Lua para Redis.

Este enlace proporciona una implementación completa de Qless a través de un script Lua que se ejecuta en OpenResty / lua-nginx-module, incluyendo trabajadores que pueden iniciarse durante la fase init_worker_by_lua.

Esencialmente, con este módulo y una instancia moderna de Redis, puedes convertir tu servidor OpenResty en un sistema de colas de trabajos bastante sofisticado pero ligero, que también es compatible con la implementación de referencia en Ruby, Qless.

Nota: Este módulo no está diseñado para funcionar en un entorno Lua puro.

Filosofía y Nomenclatura

Un job es una unidad de trabajo identificada por un id de trabajo o jid. Una queue puede contener varios trabajos que están programados para ejecutarse en un momento determinado, varios trabajos que están esperando para ejecutarse y trabajos que están actualmente en ejecución. Un worker es un proceso en un host, identificado de manera única, que solicita trabajos de la cola, realiza algún proceso asociado con ese trabajo y luego lo marca como completo. Cuando se completa, puede ser colocado en otra cola.

Los trabajos solo pueden estar en una cola a la vez. Esa cola es la última en la que fueron colocados. Así que si un trabajador está trabajando en un trabajo y lo mueves, la solicitud del trabajador para completar el trabajo será ignorada.

Un trabajo puede ser canceled, lo que significa que desaparece en el éter, y nunca más le prestaremos atención. Un trabajo puede ser dropped, que es cuando un trabajador no logra enviar un latido o completar el trabajo de manera oportuna, o un trabajo puede ser failed, que es cuando un host reconoce algún estado problemático sistemático sobre el trabajo. Un trabajador solo debe fallar un trabajo si el error probablemente no es transitorio; de lo contrario, ese trabajador debería simplemente dejarlo y permitir que el sistema lo reclame.

Características

  1. Los trabajos no se caen al suelo A veces los trabajadores dejan caer trabajos. Qless los recoge automáticamente y se los da a otro trabajador.
  2. Etiquetado / Seguimiento Algunos trabajos son más interesantes que otros. Sigue esos trabajos para obtener actualizaciones sobre su progreso.
  3. Dependencias de trabajos Un trabajo podría necesitar esperar a que otro trabajo se complete.
  4. Estadísticas Qless mantiene automáticamente estadísticas sobre cuánto tiempo esperan los trabajos para ser procesados y cuánto tiempo tardan en ser procesados. Actualmente, llevamos un registro del conteo, la media, la desviación estándar y un histograma de estos tiempos.
  5. Los datos del trabajo se almacenan temporalmente La información del trabajo permanece durante un tiempo configurable para que puedas mirar atrás en el historial, datos, etc. de un trabajo.
  6. Prioridad Los trabajos con la misma prioridad se procesan en el orden en que fueron insertados; una mayor prioridad significa que se procesan más rápido.
  7. Lógica de reintentos Cada trabajo tiene un número de reintentos asociados, que se renuevan cuando se coloca en una nueva cola o se completa. Si un trabajo se deja caer repetidamente, se presume que es problemático y se falla automáticamente.
  8. Aplicación Web lua-resty-qless-web te brinda visibilidad y control sobre ciertos problemas operativos.
  9. Trabajo Programado Hasta que un trabajo espera un retraso especificado (por defecto 0), los trabajos no pueden ser procesados por los trabajadores.
  10. Trabajos Recurrentes Programar está bien, pero también admitimos trabajos que necesitan recurrir periódicamente.
  11. Notificaciones Los trabajos rastreados emiten eventos en canales pubsub a medida que se completan, fallan, se colocan, se procesan, etc. Usa estos eventos para recibir notificaciones sobre el progreso de los trabajos que te interesan.

Conexión

Primero que nada, requiere resty.qless y crea un cliente, especificando los detalles de tu conexión Redis.

local qless = require("resty.qless").new({
    host = "127.0.0.1",
    port = 6379,
})

Los parámetros pasados a new se reenvían a lua-resty-redis-connector. Por favor, revisa la documentación allí para opciones de conexión, incluyendo cómo usar Redis Sentinel, etc.

Además, si tu aplicación tiene una conexión Redis que deseas reutilizar, hay dos formas en que puedes integrar esto:

1) Usando una conexión ya establecida directamente

local qless = require("resty.qless").new({
    redis_client = my_redis,
})

2) Proporcionando callbacks para conectar y cerrar la conexión

local qless = require("resty.qless").new({
    get_redis_client = my_connection_callback,
    close_redis_client = my_close_callback,
})

Cuando termines con Qless, debes llamar a qless:set_keepalive() que intentará devolver Redis a la piscina de keepalive, ya sea usando configuraciones que proporciones directamente, o a través de parámetros enviados a lua-resty-redis-connector, o llamando a tu callback close_redis_client.

Encolando Trabajos

Los trabajos en sí son módulos, que deben ser cargables a través de require y proporcionar una función perform, que acepta un único argumento job.

-- my/test/job.lua (la "clase" del trabajo se convierte en "my.test.job")

local _M = {}

function _M.perform(job)
    -- job es una instancia de Qless_Job y proporciona acceso a
    -- job.data (que es una tabla Lua), un medio para cancelar el
    -- trabajo (job:cancel()), y más.

    -- devuelve "nil, err_type, err_msg" para indicar un fallo inesperado

    if not job.data then
        return nil, "job-error", "data missing"
    end

    -- Hacer trabajo
end

return _M

Ahora puedes acceder a una cola y agregar un trabajo a esa cola.

-- Esto hace referencia a una nueva cola o existente 'testing'
local queue = qless.queues['testing']

-- Agreguemos un trabajo, con algunos datos. Devuelve el ID del trabajo
local jid = queue:put("my.test.job", { hello = "howdy" })
-- = "0c53b0404c56012f69fa482a1427ab7d"

-- Ahora podemos pedir un trabajo
local job = queue:pop()

-- ¡Y podemos hacer el trabajo asociado con él!
job:perform()

Los datos del trabajo deben ser una tabla (que se serializa a JSON internamente).

El valor devuelto por queue:put() es el ID del trabajo, o jid. Cada trabajo de Qless tiene un jid único, y proporciona un medio para interactuar con un trabajo existente:

-- encontrar un trabajo existente por su jid
local job = qless.jobs:get(jid)

-- Consultarlo para averiguar detalles sobre él:
job.klass -- la clase del trabajo
job.queue -- la cola en la que está el trabajo
job.data  -- los datos del trabajo
job.history -- el historial de lo que ha sucedido con el trabajo hasta ahora
job.dependencies -- los jids de otros trabajos que deben completarse antes que este
job.dependents -- los jids de otros trabajos que dependen de este
job.priority -- la prioridad de este trabajo
job.tags -- tabla de etiquetas para este trabajo
job.original_retries -- el número de veces que se permite reintentar el trabajo
job.retries_left -- el número de reintentos restantes

-- También puedes cambiar el trabajo de varias maneras:
job:requeue("some_other_queue") -- moverlo a una nueva cola
job:cancel() -- cancelar el trabajo
job:tag("foo") -- agregar una etiqueta
job:untag("foo") -- eliminar una etiqueta

Ejecutando Trabajadores

Tradicionalmente, Qless ofrecía un script de trabajador Ruby que hacía fork inspirado en Resque.

En lua-resty-qless, aprovechamos la fase init_lua_by_worker y la API ngx.timer.at para ejecutar trabajadores en "hilos ligeros" independientes, escalables a través de tus procesos de trabajador.

Puedes ejecutar muchos hilos ligeros concurrentemente por proceso de trabajador, que Nginx programará por ti.

init_worker_by_lua '
    local resty_qless_worker = require "resty.qless.worker"

    local worker = resty_qless_worker.new(redis_params)

    worker:start({
        interval = 1,
        concurrency = 4,
        reserver = "ordered",
        queues = { "my_queue", "my_other_queue" },
    })
';

Los trabajadores admiten tres estrategias (reservadores) para el orden en que se procesan los trabajos de las colas: ordered, round-robin y shuffled round-robin.

El reservador ordenado seguirá sacando trabajos de la primera cola hasta que esté vacía, antes de intentar sacar trabajos de la segunda cola. El reservador round-robin sacará un trabajo de la primera cola, luego de la segunda cola, y así sucesivamente. Shuffled simplemente asegura que la selección round-robin sea impredecible.

También podrías implementar fácilmente el tuyo propio. Sigue los otros reservadores como guía y asegúrate de que el tuyo sea "requerible" con require "resty.qless.reserver.myreserver".

Middleware

Los trabajadores también admiten middleware que se puede usar para inyectar lógica alrededor del procesamiento de un solo trabajo. Esto puede ser útil, por ejemplo, cuando necesitas restablecer una conexión a la base de datos.

Para hacer esto, estableces el middleware del trabajador en una función y llamas a coroutine.yield donde deseas que se realice el trabajo.

local worker = resty_qless_worker.new(redis_params)

worker.middleware = function(job)
    -- Hacer trabajo previo al trabajo
    coroutine.yield()
    -- Hacer trabajo posterior al trabajo
end

worker:start({ queues = "my_queue" })

Dependencias de Trabajos

Supongamos que tienes un trabajo que depende de otro, pero las definiciones de tareas son fundamentalmente diferentes. Necesitas cocinar un pavo y necesitas hacer relleno, pero no puedes hacer el pavo hasta que se haya hecho el relleno:

local queue = qless.queues['cook']
local stuffing_jid = queue:put("jobs.make.stuffing", 
  { lots = "of butter" }
)
local turkey_jid  = queue:put("jobs.make.turkey", 
  { with = "stuffing" }, 
  { depends = stuffing_jid }
)

Cuando el trabajo de relleno se completa, el trabajo de pavo se desbloquea y está libre para ser procesado.

Prioridad

Algunos trabajos necesitan ser procesados antes que otros. Ya sea un ticket de problemas o depuración, puedes hacer esto bastante fácilmente cuando colocas un trabajo en una cola:

queue:put("jobs.test", { foo = "bar" }, { priority = 10 })

¿Qué sucede cuando deseas ajustar la prioridad de un trabajo mientras aún está esperando en una cola?

local job = qless.jobs:get("0c53b0404c56012f69fa482a1427ab7d")
job.priority = 10
-- Ahora este se procesará antes que cualquier trabajo de menor prioridad

Nota: Establecer el campo de prioridad anterior es todo lo que necesitas hacer, gracias a los metamétodos de Lua que se invocan para actualizar Redis. Esto puede parecer un poco "auto-mágico", pero la intención es mantener la compatibilidad de diseño de API con el cliente Ruby tanto como sea posible.

Trabajos Programados

Si no deseas que un trabajo se ejecute de inmediato, sino en algún momento en el futuro, puedes especificar un retraso:

-- Ejecutar al menos 10 minutos a partir de ahora
queue:put("jobs.test", { foo = "bar" }, { delay = 600 })

Esto no garantiza que el trabajo se ejecute exactamente en 10 minutos. Puedes lograr esto cambiando la prioridad del trabajo para que una vez transcurridos 10 minutos, se coloque antes que los trabajos de menor prioridad:

-- Ejecutar en 10 minutos
queue:put("jobs.test", 
  { foo = "bar" }, 
  { delay = 600, priority = 100 }
)

Trabajos Recurrentes

A veces no es suficiente simplemente programar un trabajo, sino que deseas ejecutar trabajos regularmente. En particular, tal vez tengas alguna operación por lotes que necesita ejecutarse una vez por hora y no te importa qué trabajador la ejecute. Los trabajos recurrentes se especifican de manera muy similar a otros trabajos:

-- Ejecutar cada hora
local recurring_jid = queue:recur("jobs.test", { widget = "warble" }, 3600)
-- = 22ac75008a8011e182b24cf9ab3a8f3b

Incluso puedes acceder a ellos de manera muy similar a como lo harías con trabajos normales:

local job = qless.jobs:get("22ac75008a8011e182b24cf9ab3a8f3b")

Cambiar el intervalo en el que se ejecuta después de hecho es trivial:

-- Creo que solo necesito que se ejecute una vez cada dos horas
job.interval = 7200

Si deseas que se ejecute cada hora en punto, pero ahora son las 2:37, puedes especificar un desplazamiento que es cuánto tiempo debe esperar antes de procesar el primer trabajo:

-- 23 minutos de espera hasta que deba ir
queue:recur("jobs.test", 
  { howdy = "hello" }, 
  3600,
  { offset = (23 * 60) }
)

Los trabajos recurrentes también tienen prioridad, un número configurable de reintentos y etiquetas. Estas configuraciones no se aplican a los trabajos recurrentes, sino a los trabajos que generan. En el caso de que más de un intervalo pase antes de que un trabajador intente procesar el trabajo, se crea más de un trabajo. La idea es que, aunque se gestiona completamente por el cliente, el estado no debería depender de cuán a menudo los trabajadores intentan procesar trabajos.

-- Recur cada minuto
queue:recur("jobs.test", { lots = "of jobs" }, 60)

-- Esperar 5 minutos

local jobs = queue:pop(10)
ngx.say(#jobs, " trabajos fueron procesados")

-- = 5 trabajos fueron procesados

Opciones de Configuración

Puedes obtener y establecer configuraciones globales (en el contexto de la misma instancia de Redis) para cambiar el comportamiento del latido, y así sucesivamente. No hay un número tremendo de opciones de configuración, pero una importante es cuánto tiempo se mantienen los datos del trabajo. Los datos del trabajo expiran después de que se han completado durante jobs-history segundos, pero están limitados a los últimos jobs-history-count trabajos completados. Estos valores predeterminados son 50k trabajos y 30 días, pero dependiendo del volumen, tus necesidades pueden cambiar. Para mantener solo los últimos 500 trabajos durante hasta 7 días:

qless:config_set("jobs-history", 7 * 86400)
qless:config_get("jobs-history-count", 500)

Etiquetado / Seguimiento

En qless, 'seguimiento' significa marcar un trabajo como importante. Los trabajos rastreados emiten eventos suscribibles a medida que avanzan (más sobre eso a continuación).

local job = qless.jobs:get("b1882e009a3d11e192d0b174d751779d")
job:track()

Los trabajos pueden ser etiquetados con cadenas que están indexadas para búsquedas rápidas. Por ejemplo, los trabajos podrían estar asociados con cuentas de clientes, o alguna otra clave que tenga sentido para tu proyecto.

queue:put("jobs.test", {}, 
  { tags = { "12345", "foo", "bar" } }
)

Esto los hace buscables en la interfaz web Ruby / Sinatra, o desde el código:

local jids = qless.jobs:tagged("foo")

También puedes agregar o eliminar etiquetas a voluntad:

local job = qless.jobs:get('b1882e009a3d11e192d0b174d751779d')
job:tag("howdy", "hello")
job:untag("foo", "bar")

Notificaciones

Los trabajos rastreados emiten eventos en canales pubsub específicos a medida que suceden cosas con ellos. Ya sea que se procesen de una cola, se completen por un trabajador, etc.

Los que están familiarizados con Redis pub/sub notarán que una conexión Redis solo se puede usar para comandos pubsub una vez que está escuchando. Por esta razón, el módulo de eventos recibe parámetros de conexión Redis de manera independiente.

local events = qless.events(redis_params)

events:listen({ "canceled", "failed" }, function(channel, jid)
    ngx.log(ngx.INFO, jid, ": ", channel)
    -- registra "b1882e009a3d11e192d0b174d751779d: canceled" etc.
end)

También puedes escuchar el canal "log", que da una estructura JSON de todos los eventos registrados.

local events = qless.events(redis_params)

events:listen({ "log" }, function(channel, message)
    local message = cjson.decode(message)
    ngx.log(ngx.INFO, message.event, " ", message.jid)
end)

Latidos

Cuando a un trabajador se le asigna un trabajo, se le da un bloqueo exclusivo sobre ese trabajo. Eso significa que ese trabajo no se le dará a ningún otro trabajador, siempre que el trabajador informe sobre el progreso del trabajo. Por defecto, los trabajos deben informar sobre el progreso cada 60 segundos, o completarlo, pero esa es una opción configurable. Para trabajos más largos, esto puede no tener sentido.

-- ¡Hurra! ¡Tenemos un trabajo!
local job = queue:pop()

-- ¿Cuánto tiempo tengo hasta que deba informar?
job:ttl()
-- = 59

-- ¡Hey! ¡Todavía estoy trabajando en ello!
job:heartbeat()
-- = 1331326141.0

-- ¡Ok, tengo un poco más de tiempo! ¡Oh! ¡Ahora he terminado!
job:complete()

Si deseas aumentar el latido en todas las colas,

-- Ahora los trabajos tienen 10 minutos para informar
qless:set_config("heartbeat", 600)

-- Pero la cola de pruebas no tiene tanto tiempo.
qless.queues["testing"].heartbeat = 300

Al elegir un intervalo de latido, ten en cuenta que este es el tiempo que puede transcurrir antes de que qless se dé cuenta de si un trabajo se ha dejado caer. Al mismo tiempo, no deseas sobrecargar a qless con latidos cada 10 segundos si se espera que tu trabajo tome varias horas.

Un idiom que se te anima a usar para trabajos de larga duración que desean informar su progreso periódicamente:

-- Espera hasta que nos queden 5 minutos en el latido, y si encontramos que
-- hemos perdido nuestro bloqueo en un trabajo, entonces honorable caer sobre nuestra espada
if job:ttl() < 300 and not job:heartbeat() then
  -- salir
end

Estadísticas

Una buena característica de Qless es que puedes obtener estadísticas sobre el uso. Las estadísticas se agregan por día, así que cuando deseas estadísticas sobre una cola, necesitas decir qué cola y qué día estás hablando. Por defecto, solo obtienes las estadísticas de hoy. Estas estadísticas incluyen información sobre el tiempo medio de espera del trabajo, la desviación estándar y el histograma. Estos mismos datos también se proporcionan para la finalización del trabajo:

-- Entonces, ¿cómo nos va hoy?
local stats = queue:stats()
-- = { 'run' = { 'mean' = ..., }, 'wait' = {'mean' = ..., } }

Tiempo

Es importante notar que Redis no permite el acceso al tiempo del sistema si vas a realizar manipulaciones de datos (que nuestros scripts hacen). Y, sin embargo, tenemos latidos. Esto significa que los clientes envían realmente la hora actual al hacer la mayoría de las solicitudes, y por razones de consistencia, significa que tus trabajadores deben estar relativamente sincronizados. Esto no significa hasta décimas de milisegundos, pero si estás experimentando un apreciable desvío del reloj, deberías investigar NTP.

Asegurando la Unicidad del Trabajo

Como se mencionó anteriormente, los trabajos están identificados de manera única por un id--su jid. Qless generará un UUID para cada trabajo encolado o puedes especificar uno manualmente:

queue:put("jobs.test", { hello = 'howdy' }, { jid = 'my-job-jid' })

Esto puede ser útil cuando deseas asegurar la unicidad de un trabajo: simplemente crea un jid que sea una función de la clase y los datos del trabajo, garantizando que Qless no tendrá múltiples trabajos con la misma clase y datos.

GitHub

Puedes encontrar consejos de configuración adicionales y documentación para este módulo en el repositorio de GitHub para nginx-module-qless.