Pular para conteúdo

qless: ligação Lua para Qless (gerenciamento de fila / pipeline) para nginx-module-lua / Redis

Instalação

Se você ainda não configurou a assinatura do repositório RPM, inscreva-se. Depois, você pode prosseguir com os seguintes passos.

CentOS/RHEL 7 ou Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-qless

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-qless

Para usar esta biblioteca Lua com NGINX, certifique-se de que o nginx-module-lua está instalado.

Este documento descreve lua-resty-qless v0.12 lançado em 08 de julho de 2022.


lua-resty-qless é uma ligação para qless-core da Moz - um poderoso sistema de enfileiramento de tarefas baseado em Redis inspirado no resque, mas implementado como uma coleção de scripts Lua para Redis.

Esta ligação fornece uma implementação completa de Qless através de um script Lua executado em OpenResty / lua-nginx-module, incluindo trabalhadores que podem ser iniciados durante a fase init_worker_by_lua.

Essencialmente, com este módulo e uma instância moderna do Redis, você pode transformar seu servidor OpenResty em um sistema de enfileiramento de tarefas bastante sofisticado, mas leve, que também é compatível com a implementação de referência em Ruby, Qless.

Nota: Este módulo não foi projetado para funcionar em um ambiente Lua puro.

Filosofia e Nomenclatura

Um job é uma unidade de trabalho identificada por um id de trabalho ou jid. Uma queue pode conter vários jobs que estão programados para serem executados em um determinado momento, vários jobs que estão aguardando para serem executados e jobs que estão atualmente em execução. Um worker é um processo em um host, identificado de forma única, que solicita jobs da fila, realiza algum processo associado a esse job e, em seguida, o marca como completo. Quando é concluído, pode ser colocado em outra fila.

Jobs podem estar em apenas uma fila por vez. Essa fila é a última fila em que foram colocados. Portanto, se um worker estiver trabalhando em um job e você o mover, a solicitação do worker para concluir o job será ignorada.

Um job pode ser canceled, o que significa que ele desaparece no éter, e nunca mais prestaremos atenção nele novamente. Um job pode ser dropped, que é quando um worker falha em enviar um heartbeat ou concluir o job de maneira oportuna, ou um job pode ser failed, que é quando um host reconhece algum estado problemático sistematicamente sobre o job. Um worker deve falhar um job apenas se o erro provavelmente não for transitório; caso contrário, esse worker deve apenas descartá-lo e deixar o sistema recuperá-lo.

Recursos

  1. Jobs não são descartados Às vezes, workers descartam jobs. O Qless automaticamente os recolhe e os entrega a outro worker.
  2. Tagging / Tracking Alguns jobs são mais interessantes que outros. Acompanhe esses jobs para obter atualizações sobre seu progresso.
  3. Dependências de Jobs Um job pode precisar esperar que outro job seja concluído.
  4. Estatísticas O Qless mantém automaticamente estatísticas sobre quanto tempo os jobs aguardam para serem processados e quanto tempo levam para serem processados. Atualmente, mantemos o controle da contagem, média, desvio padrão e um histograma desses tempos.
  5. Os dados do job são armazenados temporariamente As informações do job permanecem por um período configurável para que você ainda possa olhar para o histórico, dados, etc. de um job.
  6. Prioridade Jobs com a mesma prioridade são retirados na ordem em que foram inseridos; uma prioridade mais alta significa que será retirado mais rapidamente.
  7. Lógica de Retentativa Cada job tem um número de retentativas associado a ele, que são renovadas quando ele é colocado em uma nova fila ou concluído. Se um job for repetidamente descartado, presume-se que ele seja problemático e é automaticamente falhado.
  8. Aplicativo Web lua-resty-qless-web oferece visibilidade e controle sobre certos problemas operacionais.
  9. Trabalho Agendado Até que um job aguarde um atraso especificado (padrão é 0), os jobs não podem ser retirados pelos workers.
  10. Jobs Recorrentes O agendamento é bom, mas também suportamos jobs que precisam ocorrer periodicamente.
  11. Notificações Jobs rastreados emitem eventos em canais pubsub à medida que são concluídos, falhados, colocados, retirados, etc. Use esses eventos para ser notificado sobre o progresso dos jobs que você está interessado.

Conectando

Primeiro, exija resty.qless e crie um cliente, especificando os detalhes da sua conexão Redis.

local qless = require("resty.qless").new({
    host = "127.0.0.1",
    port = 6379,
})

Os parâmetros passados para new são encaminhados para lua-resty-redis-connector. Por favor, revise a documentação lá para opções de conexão, incluindo como usar Redis Sentinel, etc.

Além disso, se sua aplicação tiver uma conexão Redis que você deseja reutilizar, existem duas maneiras de integrar isso:

1) Usando uma conexão já estabelecida diretamente

local qless = require("resty.qless").new({
    redis_client = my_redis,
})

2) Fornecendo callbacks para conectar e fechar a conexão

local qless = require("resty.qless").new({
    get_redis_client = my_connection_callback,
    close_redis_client = my_close_callback,
})

Quando terminar com o Qless, você deve chamar qless:set_keepalive(), que tentará colocar o Redis de volta na piscina de keepalive, usando as configurações que você fornece diretamente, ou através de parâmetros enviados para lua-resty-redis-connector, ou chamando seu callback close_redis_client.

Enfileirando Jobs

Jobs em si são módulos, que devem ser carregáveis via require e fornecer uma função perform, que aceita um único argumento job.

-- my/test/job.lua (o "klass" do job se torna "my.test.job")

local _M = {}

function _M.perform(job)
    -- job é uma instância de Qless_Job e fornece acesso a
    -- job.data (que é uma tabela Lua), um meio de cancelar o
    -- job (job:cancel()), e mais.

    -- retorne "nil, err_type, err_msg" para indicar uma falha inesperada

    if not job.data then
        return nil, "job-error", "data missing"
    end

    -- Fazer o trabalho
end

return _M

Agora você pode acessar uma fila e adicionar um job a essa fila.

-- Isso referencia uma nova ou existente fila 'testing'
local queue = qless.queues['testing']

-- Vamos adicionar um job, com alguns dados. Retorna o ID do Job
local jid = queue:put("my.test.job", { hello = "howdy" })
-- = "0c53b0404c56012f69fa482a1427ab7d"

-- Agora podemos solicitar um job
local job = queue:pop()

-- E podemos fazer o trabalho associado a ele!
job:perform()

Os dados do job devem ser uma tabela (que é serializada em JSON internamente).

O valor retornado por queue:put() é o ID do job, ou jid. Cada job do Qless tem um jid único, e fornece um meio de interagir com um job existente:

-- encontrar um job existente pelo seu jid
local job = qless.jobs:get(jid)

-- Consultá-lo para descobrir detalhes sobre ele:
job.klass -- a classe do job
job.queue -- a fila em que o job está
job.data  -- os dados do job
job.history -- o histórico do que aconteceu com o job até agora
job.dependencies -- os jids de outros jobs que devem ser concluídos antes deste
job.dependents -- os jids de outros jobs que dependem deste
job.priority -- a prioridade deste job
job.tags -- tabela de tags para este job
job.original_retries -- o número de vezes que o job pode ser refeito
job.retries_left -- o número de retentativas restantes

-- Você também pode alterar o job de várias maneiras:
job:requeue("some_other_queue") -- movê-lo para uma nova fila
job:cancel() -- cancelar o job
job:tag("foo") -- adicionar uma tag
job:untag("foo") -- remover uma tag

Executando Workers

Tradicionalmente, o Qless oferecia um script de worker Ruby que utilizava fork, inspirado no Resque.

No lua-resty-qless, aproveitamos a fase init_lua_by_worker e a API ngx.timer.at para executar workers em "light threads" independentes, escaláveis entre seus processos de worker.

Você pode executar muitos light threads simultaneamente por processo de worker, que o Nginx agendará para você.

init_worker_by_lua '
    local resty_qless_worker = require "resty.qless.worker"

    local worker = resty_qless_worker.new(redis_params)

    worker:start({
        interval = 1,
        concurrency = 4,
        reserver = "ordered",
        queues = { "my_queue", "my_other_queue" },
    })
';

Workers suportam três estratégias (reservadores) para qual ordem retirar jobs das filas: ordered, round-robin e shuffled round-robin.

O reservador ordenado continuará retirando jobs da primeira fila até que ela esteja vazia, antes de tentar retirar jobs da segunda fila. O reservador round-robin retirará um job da primeira fila, depois da segunda fila, e assim por diante. O shuffled simplesmente garante que a seleção round-robin seja imprevisível.

Você também pode facilmente implementar o seu próprio. Siga os outros reservadores como guia e certifique-se de que o seu seja "requerível" com require "resty.qless.reserver.myreserver".

Middleware

Workers também suportam middleware que pode ser usado para injetar lógica em torno do processamento de um único job. Isso pode ser útil, por exemplo, quando você precisa restabelecer uma conexão com o banco de dados.

Para fazer isso, você define o middleware do worker como uma função e chama coroutine.yield onde deseja que o job seja executado.

local worker = resty_qless_worker.new(redis_params)

worker.middleware = function(job)
    -- Fazer trabalho pré-job
    coroutine.yield()
    -- Fazer trabalho pós-job
end

worker:start({ queues = "my_queue" })

Dependências de Jobs

Vamos supor que você tenha um job que depende de outro, mas as definições das tarefas são fundamentalmente diferentes. Você precisa cozinhar um peru e precisa fazer o recheio, mas não pode fazer o peru até que o recheio esteja pronto:

local queue = qless.queues['cook']
local stuffing_jid = queue:put("jobs.make.stuffing", 
  { lots = "of butter" }
)
local turkey_jid  = queue:put("jobs.make.turkey", 
  { with = "stuffing" }, 
  { depends = stuffing_jid }
)

Quando o job de recheio é concluído, o job de peru é desbloqueado e pode ser processado.

Prioridade

Alguns jobs precisam ser retirados mais cedo que outros. Seja um ticket de problema ou depuração, você pode fazer isso facilmente ao colocar um job em uma fila:

queue:put("jobs.test", { foo = "bar" }, { priority = 10 })

O que acontece quando você deseja ajustar a prioridade de um job enquanto ele ainda está aguardando em uma fila?

local job = qless.jobs:get("0c53b0404c56012f69fa482a1427ab7d")
job.priority = 10
-- Agora isso será retirado antes de qualquer job de prioridade inferior

Nota: Definir o campo de prioridade acima é tudo o que você precisa fazer, graças aos metamétodos Lua que são invocados para atualizar o Redis. Isso pode parecer um pouco "auto-mágico", mas a intenção é manter a compatibilidade de design da API com o cliente Ruby tanto quanto possível.

Jobs Agendados

Se você não quiser que um job seja executado imediatamente, mas em algum momento no futuro, pode especificar um atraso:

-- Execute pelo menos 10 minutos a partir de agora
queue:put("jobs.test", { foo = "bar" }, { delay = 600 })

Isso não garante que o job será executado exatamente em 10 minutos. Você pode conseguir isso alterando a prioridade do job para que, uma vez que 10 minutos tenham se passado, ele seja colocado antes de jobs de prioridade inferior:

-- Execute em 10 minutos
queue:put("jobs.test", 
  { foo = "bar" }, 
  { delay = 600, priority = 100 }
)

Jobs Recorrentes

Às vezes, não é suficiente simplesmente agendar um job, mas você quer executar jobs regularmente. Em particular, talvez você tenha alguma operação em lote que precisa ser executada uma vez por hora e não se importa qual worker a executa. Jobs recorrentes são especificados muito parecido com outros jobs:

-- Execute a cada hora
local recurring_jid = queue:recur("jobs.test", { widget = "warble" }, 3600)
-- = 22ac75008a8011e182b24cf9ab3a8f3b

Você pode até acessá-los da mesma forma que faria com jobs normais:

local job = qless.jobs:get("22ac75008a8011e182b24cf9ab3a8f3b")

Alterar o intervalo em que ele é executado depois do fato é trivial:

-- Eu acho que só preciso que ele execute uma vez a cada duas horas
job.interval = 7200

Se você quiser que ele execute a cada hora cheia, mas agora são 2:37, pode especificar um deslocamento que é quanto tempo ele deve esperar antes de retirar o primeiro job:

-- 23 minutos de espera até que ele deva ir
queue:recur("jobs.test", 
  { howdy = "hello" }, 
  3600,
  { offset = (23 * 60) }
)

Jobs recorrentes também têm prioridade, um número configurável de retentativas e tags. Essas configurações não se aplicam aos jobs recorrentes, mas sim aos jobs que eles geram. No caso de mais de um intervalo passar antes que um worker tente retirar o job, mais de um job é criado. O raciocínio é que, enquanto é totalmente gerenciado pelo cliente, o estado não deve depender de quão frequentemente os workers estão tentando retirar jobs.

-- Recorrência a cada minuto
queue:recur("jobs.test", { lots = "of jobs" }, 60)

-- Esperar 5 minutos

local jobs = queue:pop(10)
ngx.say(#jobs, " jobs foram retirados")

-- = 5 jobs foram retirados

Opções de Configuração

Você pode obter e definir configurações globais (no contexto da mesma instância Redis) para alterar o comportamento de heartbeat, e assim por diante. Não há um número tremendo de opções de configuração, mas uma importante é quanto tempo os dados do job são mantidos. Os dados do job expiram após terem sido concluídos por jobs-history segundos, mas são limitados aos últimos jobs-history-count jobs concluídos. Esses valores padrão são 50k jobs e 30 dias, mas dependendo do volume, suas necessidades podem mudar. Para manter apenas os últimos 500 jobs por até 7 dias:

qless:config_set("jobs-history", 7 * 86400)
qless:config_get("jobs-history-count", 500)

Tagging / Tracking

No qless, 'tracking' significa marcar um job como importante. Jobs rastreados emitem eventos inscrevíveis à medida que progridem (mais sobre isso abaixo).

local job = qless.jobs:get("b1882e009a3d11e192d0b174d751779d")
job:track()

Jobs podem ser marcados com strings que são indexadas para buscas rápidas. Por exemplo, jobs podem estar associados a contas de clientes, ou alguma outra chave que faça sentido para seu projeto.

queue:put("jobs.test", {}, 
  { tags = { "12345", "foo", "bar" } }
)

Isso os torna pesquisáveis na interface web Ruby / Sinatra, ou a partir do código:

local jids = qless.jobs:tagged("foo")

Você também pode adicionar ou remover tags à vontade:

local job = qless.jobs:get('b1882e009a3d11e192d0b174d751779d')
job:tag("howdy", "hello")
job:untag("foo", "bar")

Notificações

Jobs rastreados emitem eventos em canais pubsub específicos à medida que coisas acontecem com eles. Seja sendo retirados de uma fila, completados por um worker, etc.

Aqueles familiarizados com pub/sub do Redis notarão que uma conexão Redis só pode ser usada para comandos pubsub uma vez que esteja ouvindo. Por essa razão, o módulo de eventos recebe parâmetros de conexão Redis de forma independente.

local events = qless.events(redis_params)

events:listen({ "canceled", "failed" }, function(channel, jid)
    ngx.log(ngx.INFO, jid, ": ", channel)
    -- registra "b1882e009a3d11e192d0b174d751779d: canceled" etc.
end)

Você também pode ouvir o canal "log", que fornece uma estrutura JSON de todos os eventos registrados.

local events = qless.events(redis_params)

events:listen({ "log" }, function(channel, message)
    local message = cjson.decode(message)
    ngx.log(ngx.INFO, message.event, " ", message.jid)
end)

Heartbeating

Quando um worker recebe um job, ele recebe um bloqueio exclusivo para esse job. Isso significa que esse job não será dado a nenhum outro worker, desde que o worker faça check-in com progresso no job. Por padrão, os jobs devem relatar progresso a cada 60 segundos ou concluí-lo, mas essa é uma opção configurável. Para jobs mais longos, isso pode não fazer sentido.

-- Hooray! Temos um trabalho!
local job = queue:pop()

-- Quanto tempo até eu ter que fazer check-in?
job:ttl()
-- = 59

-- Ei! Eu ainda estou trabalhando nisso!
job:heartbeat()
-- = 1331326141.0

-- Ok, eu tenho mais tempo. Oh! Agora estou pronto!
job:complete()

Se você quiser aumentar o heartbeat em todas as filas,

-- Agora os jobs têm 10 minutos para fazer check-in
qless:set_config("heartbeat", 600)

-- Mas a fila de testes não tem tanto tempo.
qless.queues["testing"].heartbeat = 300

Ao escolher um intervalo de heartbeat, note que este é o tempo que pode passar antes que o qless perceba se um job foi descartado. Ao mesmo tempo, você não quer sobrecarregar o qless com heartbeating a cada 10 segundos se seu job deve levar várias horas.

Um idiom que você é incentivado a usar para jobs de longa duração que desejam verificar seu progresso periodicamente:

-- Espere até que tenhamos 5 minutos restantes no heartbeat, e se descobrirmos que
-- perdemos nosso bloqueio em um job, então caia honrosamente na nossa espada
if job:ttl() < 300 and not job:heartbeat() then
  -- sair
end

Estatísticas

Um bom recurso do Qless é que você pode obter estatísticas sobre o uso. As estatísticas são agregadas por dia, então quando você deseja estatísticas sobre uma fila, precisa dizer qual fila e qual dia está falando. Por padrão, você só obtém as estatísticas para hoje. Essas estatísticas incluem informações sobre o tempo médio de espera do job, desvio padrão e histograma. Esses mesmos dados também são fornecidos para a conclusão do job:

-- Então, como estamos indo hoje?
local stats = queue:stats()
-- = { 'run' = { 'mean' = ..., }, 'wait' = {'mean' = ..., } }

Tempo

É importante notar que o Redis não permite acesso ao tempo do sistema se você estiver fazendo qualquer manipulação de dados (o que nossos scripts fazem). E ainda assim, temos heartbeating. Isso significa que os clientes realmente enviam o tempo atual ao fazer a maioria das solicitações e, para fins de consistência, significa que seus workers devem estar relativamente sincronizados. Isso não significa até os décimos de milissegundos, mas se você estiver experimentando um desvio de relógio apreciável, deve investigar o NTP.

Garantindo a Exclusividade do Job

Como mencionado acima, os Jobs são identificados de forma única por um id--seu jid. O Qless gerará um UUID para cada job enfileirado ou você pode especificar um manualmente:

queue:put("jobs.test", { hello = 'howdy' }, { jid = 'my-job-jid' })

Isso pode ser útil quando você deseja garantir a exclusividade de um job: basta criar um jid que seja uma função da classe e dos dados do Job, garantindo que o Qless não terá múltiplos jobs com a mesma classe e dados.

GitHub

Você pode encontrar dicas adicionais de configuração e documentação para este módulo no repositório GitHub para nginx-module-qless.