Перейти к содержанию

qless: Lua привязка к Qless (Управление очередями / Конвейерами) для nginx-module-lua / Redis

Установка

Если вы еще не настроили подписку на RPM-репозиторий, зарегистрируйтесь. Затем вы можете продолжить с следующими шагами.

CentOS/RHEL 7 или Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-qless

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-qless

Чтобы использовать эту Lua библиотеку с NGINX, убедитесь, что nginx-module-lua установлен.

Этот документ описывает lua-resty-qless v0.12, выпущенный 8 июля 2022 года.


lua-resty-qless — это привязка к qless-core от Moz — мощная система управления очередями заданий на основе Redis, вдохновленная resque, но реализованная как коллекция Lua-скриптов для Redis.

Эта привязка предоставляет полную реализацию Qless через Lua-скрипт, работающий в OpenResty / lua-nginx-module, включая рабочие процессы, которые могут быть запущены в фазе init_worker_by_lua.

По сути, с помощью этого модуля и современного экземпляра Redis вы можете превратить ваш сервер OpenResty в довольно сложную, но легковесную систему управления очередями заданий, которая также совместима с эталонной реализацией на Ruby, Qless.

Примечание: Этот модуль не предназначен для работы в чистой среде Lua.

Философия и Номенклатура

job — это единица работы, идентифицируемая по идентификатору задания или jid. queue может содержать несколько заданий, которые запланированы на выполнение в определенное время, несколько заданий, ожидающих выполнения, и задания, которые в настоящее время выполняются. worker — это процесс на хосте, уникально идентифицируемый, который запрашивает задания из очереди, выполняет некоторые процессы, связанные с этим заданием, а затем помечает его как завершенное. Когда оно завершено, его можно поместить в другую очередь.

Задания могут находиться только в одной очереди одновременно. Эта очередь — та, в которую они были помещены в последний раз. Поэтому, если рабочий процесс работает над заданием, и вы его переместите, запрос рабочего процесса на завершение задания будет проигнорирован.

Задание может быть canceled, что означает, что оно исчезает в небытие, и мы больше никогда не обратим на него внимания. Задание может быть dropped, что происходит, когда рабочий процесс не успевает отправить сигнал о работе или завершить задание вовремя, или задание может быть failed, что происходит, когда хост распознает какое-то систематически проблемное состояние задания. Рабочий процесс должен завершать задание только в том случае, если ошибка, вероятно, не является временной; в противном случае этот рабочий процесс должен просто сбросить его и позволить системе вернуть его.

Особенности

  1. Задания не теряются Иногда рабочие процессы сбрасывают задания. Qless автоматически подбирает их обратно и передает другому рабочему процессу.
  2. Тегирование / Отслеживание Некоторые задания более интересны, чем другие. Отслеживайте эти задания, чтобы получать обновления о их прогрессе.
  3. Зависимости заданий Одно задание может потребовать ожидания завершения другого задания.
  4. Статистика Qless автоматически ведет статистику о том, как долго задания ждут обработки и сколько времени занимает их обработка. В настоящее время мы отслеживаем количество, среднее значение, стандартное отклонение и гистограмму этих времен.
  5. Данные задания хранятся временно Информация о заданиях сохраняется на настраиваемое количество времени, чтобы вы могли вернуться к истории задания, данным и т.д.
  6. Приоритет Задания с одинаковым приоритетом извлекаются в порядке их вставки; более высокий приоритет означает, что оно будет извлечено быстрее.
  7. Логика повторной попытки Каждое задание имеет количество связанных с ним повторных попыток, которые обновляются, когда оно помещается в новую очередь или завершается. Если задание постоянно сбрасывается, оно считается проблемным и автоматически завершается.
  8. Веб-приложение lua-resty-qless-web предоставляет вам видимость и контроль над определенными операционными вопросами.
  9. Запланированная работа Пока задание ожидает указанной задержки (по умолчанию 0), задания не могут быть извлечены рабочими процессами.
  10. Периодические задания Запланировать одно задание — это хорошо, но мы также поддерживаем задания, которые должны повторяться периодически.
  11. Уведомления Отслеживаемые задания генерируют события на каналах pubsub по мере их завершения, сбоя, помещения, извлечения и т.д. Используйте эти события, чтобы получать уведомления о прогрессе по интересующим вас заданиям.

Подключение

Прежде всего, подключите resty.qless и создайте клиент, указав данные подключения к Redis.

local qless = require("resty.qless").new({
    host = "127.0.0.1",
    port = 6379,
})

Параметры, переданные в new, передаются в lua-resty-redis-connector. Пожалуйста, ознакомьтесь с документацией там для параметров подключения, включая использование Redis Sentinel и т.д.

Кроме того, если ваше приложение имеет подключение к Redis, которое вы хотите повторно использовать, есть два способа интеграции:

1) Использование уже установленного подключения напрямую

local qless = require("resty.qless").new({
    redis_client = my_redis,
})

2) Предоставление обратных вызовов для подключения и закрытия соединения

local qless = require("resty.qless").new({
    get_redis_client = my_connection_callback,
    close_redis_client = my_close_callback,
})

Когда вы закончите с Qless, вы должны вызвать qless:set_keepalive(), который попытается вернуть Redis в пул поддерживаемых соединений, используя настройки, которые вы предоставили напрямую, или через параметры, переданные в lua-resty-redis-connector, или вызвав ваш обратный вызов close_redis_client.

Добавление заданий в очередь

Сами задания являются модулями, которые должны быть загружаемыми через require и предоставлять функцию perform, которая принимает единственный аргумент job.

-- my/test/job.lua (класс задания становится "my.test.job")

local _M = {}

function _M.perform(job)
    -- job является экземпляром Qless_Job и предоставляет доступ к
    -- job.data (который является таблицей Lua), способ отменить
    -- задание (job:cancel()), и многое другое.

    -- верните "nil, err_type, err_msg", чтобы указать на неожиданную ошибку

    if not job.data then
        return nil, "job-error", "данные отсутствуют"
    end

    -- Выполните работу
end

return _M

Теперь вы можете получить доступ к очереди и добавить задание в эту очередь.

-- Это ссылается на новую или существующую очередь 'testing'
local queue = qless.queues['testing']

-- Давайте добавим задание с некоторыми данными. Возвращает ID задания
local jid = queue:put("my.test.job", { hello = "howdy" })
-- = "0c53b0404c56012f69fa482a1427ab7d"

-- Теперь мы можем запросить задание
local job = queue:pop()

-- И мы можем выполнить работу, связанную с ним!
job:perform()

Данные задания должны быть таблицей (которая сериализуется в JSON внутренне).

Значение, возвращаемое queue:put(), — это ID задания, или jid. Каждое задание Qless имеет уникальный jid, и он предоставляет способ взаимодействия с существующим заданием:

-- найти существующее задание по его jid
local job = qless.jobs:get(jid)

-- Запросите его, чтобы узнать детали о нем:
job.klass -- класс задания
job.queue -- очередь, в которой находится задание
job.data  -- данные для задания
job.history -- история того, что произошло с заданием до сих пор
job.dependencies -- jids других заданий, которые должны завершиться до этого
job.dependents -- jids других заданий, которые зависят от этого
job.priority -- приоритет этого задания
job.tags -- таблица тегов для этого задания
job.original_retries -- количество раз, которое задание может быть повторно выполнено
job.retries_left -- количество оставшихся повторных попыток

-- Вы также можете изменить задание различными способами:
job:requeue("some_other_queue") -- переместить его в новую очередь
job:cancel() -- отменить задание
job:tag("foo") -- добавить тег
job:untag("foo") -- удалить тег

Запуск рабочих процессов

Традиционно Qless предлагал скрипт рабочего процесса на Ruby с использованием форков, вдохновленный Resque.

В lua-resty-qless мы используем фазу init_lua_by_worker и API ngx.timer.at, чтобы запускать рабочих процессов в независимых "легких потоках", масштабируемых по вашим рабочим процессам.

Вы можете запускать множество легких потоков одновременно на каждом рабочем процессе, которые Nginx будет планировать за вас.

init_worker_by_lua '
    local resty_qless_worker = require "resty.qless.worker"

    local worker = resty_qless_worker.new(redis_params)

    worker:start({
        interval = 1,
        concurrency = 4,
        reserver = "ordered",
        queues = { "my_queue", "my_other_queue" },
    })
';

Рабочие процессы поддерживают три стратегии (резервирования) для того, в каком порядке извлекать задания из очередей: ordered, round-robin и shuffled round-robin.

Резервирование в порядке очереди будет продолжать извлекать задания из первой очереди, пока она не опустеет, прежде чем пытаться извлечь задания из второй очереди. Резервирование по кругу будет извлекать задание из первой очереди, затем из второй очереди и так далее. Перемешанное просто гарантирует, что выбор по кругу будет непредсказуемым.

Вы также можете легко реализовать свое собственное. Следуйте другим резервам в качестве руководства и убедитесь, что ваше можно загрузить с помощью require "resty.qless.reserver.myreserver".

Промежуточное ПО

Рабочие процессы также поддерживают промежуточное ПО, которое можно использовать для внедрения логики вокруг обработки одного задания. Это может быть полезно, например, когда вам нужно восстановить соединение с базой данных.

Для этого вы устанавливаете middleware рабочего процесса в функцию и вызываете coroutine.yield там, где хотите, чтобы задание было выполнено.

local worker = resty_qless_worker.new(redis_params)

worker.middleware = function(job)
    -- Выполните предварительную работу
    coroutine.yield()
    -- Выполните последующую работу
end

worker:start({ queues = "my_queue" })

Зависимости заданий

Допустим, у вас есть одно задание, которое зависит от другого, но определения задач принципиально различны. Вам нужно приготовить индейку, и вам нужно сделать начинку, но вы не можете приготовить индейку, пока начинка не будет готова:

local queue = qless.queues['cook']
local stuffing_jid = queue:put("jobs.make.stuffing",
  { lots = "of butter" }
)
local turkey_jid  = queue:put("jobs.make.turkey",
  { with = "stuffing" },
  { depends = stuffing_jid }
)

Когда задание по приготовлению начинки завершится, задание по приготовлению индейки будет разблокировано и готово к обработке.

Приоритет

Некоторые задания должны извлекаться раньше других. Будь то служебная записка или отладка, вы можете сделать это довольно легко, когда помещаете задание в очередь:

queue:put("jobs.test", { foo = "bar" }, { priority = 10 })

Что происходит, когда вы хотите изменить приоритет задания, пока оно все еще ожидает в очереди?

local job = qless.jobs:get("0c53b0404c56012f69fa482a1427ab7d")
job.priority = 10
-- Теперь это будет извлечено перед любым заданием с более низким приоритетом

Примечание: Установка поля приоритета выше — это все, что вам нужно сделать, благодаря метаметодам Lua, которые вызываются для обновления Redis. Это может выглядеть немного "авто-магически", но намерение состоит в том, чтобы сохранить совместимость дизайна API с клиентом Ruby насколько это возможно.

Запланированные задания

Если вы не хотите, чтобы задание выполнялось сразу, а в будущем, вы можете указать задержку:

-- Выполнить как минимум через 10 минут
queue:put("jobs.test", { foo = "bar" }, { delay = 600 })

Это не гарантирует, что задание будет выполнено ровно через 10 минут. Вы можете добиться этого, изменив приоритет задания так, чтобы после истечения 10 минут оно было помещено перед заданиями с более низким приоритетом:

-- Выполнить через 10 минут
queue:put("jobs.test",
  { foo = "bar" },
  { delay = 600, priority = 100 }
)

Периодические задания

Иногда недостаточно просто запланировать одно задание, вам нужно регулярно выполнять задания. В частности, возможно, у вас есть какая-то пакетная операция, которую нужно выполнять раз в час, и вам не важно, какой рабочий процесс ее выполнит. Периодические задания задаются так же, как и другие задания:

-- Выполнять каждый час
local recurring_jid = queue:recur("jobs.test", { widget = "warble" }, 3600)
-- = 22ac75008a8011e182b24cf9ab3a8f3b

Вы даже можете получить к ним доступ так же, как и к обычным заданиям:

local job = qless.jobs:get("22ac75008a8011e182b24cf9ab3a8f3b")

Изменение интервала, с которым оно выполняется после этого, не составляет труда:

-- Я думаю, что мне нужно, чтобы оно выполнялось раз в два часа
job.interval = 7200

Если вы хотите, чтобы оно выполнялось каждый час ровно, но сейчас 2:37, вы можете указать смещение, которое определяет, как долго оно должно ждать перед извлечением первого задания:

-- 23 минуты ожидания, прежде чем оно должно начаться
queue:recur("jobs.test",
  { howdy = "hello" },
  3600,
  { offset = (23 * 60) }
)

Периодические задания также имеют приоритет, настраиваемое количество повторных попыток и теги. Эти настройки не применяются к периодическим заданиям, а скорее к заданиям, которые они порождают. В случае, если пройдет более одного интервала, прежде чем рабочий процесс попытается извлечь задание, создается более одного задания. Мы считаем, что, хотя это полностью управляется клиентом, состояние не должно зависеть от того, как часто рабочие процессы пытаются извлекать задания.

-- Повторять каждую минуту
queue:recur("jobs.test", { lots = "of jobs" }, 60)

-- Подождите 5 минут

local jobs = queue:pop(10)
ngx.say(#jobs, " заданий было извлечено")

-- = 5 заданий было извлечено

Параметры конфигурации

Вы можете получать и устанавливать глобальную (в контексте одного и того же экземпляра Redis) конфигурацию, чтобы изменить поведение для отправки сигналов о работе и так далее. Конфигурационных параметров не так много, но один важный — это то, как долго данные о заданиях хранятся. Данные о заданиях истекают после того, как они были завершены в течение jobs-history секунд, но ограничены последними jobs-history-count завершенными заданиями. По умолчанию это 50k заданий и 30 дней, но в зависимости от объема ваши потребности могут измениться. Чтобы хранить только последние 500 заданий в течение 7 дней:

qless:config_set("jobs-history", 7 * 86400)
qless:config_get("jobs-history-count", 500)

Тегирование / Отслеживание

В qless "отслеживание" означает пометку задания как важного. Отслеживаемые задания генерируют события, на которые можно подписаться, по мере их продвижения (подробнее об этом ниже).

local job = qless.jobs:get("b1882e009a3d11e192d0b174d751779d")
job:track()

Задания могут быть помечены строками, которые индексируются для быстрого поиска. Например, задания могут быть связаны с учетными записями клиентов или каким-то другим ключом, который имеет смысл для вашего проекта.

queue:put("jobs.test", {},
  { tags = { "12345", "foo", "bar" } }
)

Это делает их доступными для поиска в веб-интерфейсе Ruby / Sinatra или из кода:

local jids = qless.jobs:tagged("foo")

Вы также можете добавлять или удалять теги по мере необходимости:

local job = qless.jobs:get('b1882e009a3d11e192d0b174d751779d')
job:tag("howdy", "hello")
job:untag("foo", "bar")

Уведомления

Отслеживаемые задания генерируют события на определенных каналах pubsub по мере того, как с ними происходят события. Будь то извлечение из очереди, завершение рабочим процессом и т.д.

Те, кто знаком с pub/sub Redis, заметят, что соединение Redis может использоваться для команд pubsub только после того, как начнет слушать. По этой причине модуль событий передает параметры подключения Redis независимо.

local events = qless.events(redis_params)

events:listen({ "canceled", "failed" }, function(channel, jid)
    ngx.log(ngx.INFO, jid, ": ", channel)
    -- логирует "b1882e009a3d11e192d0b174d751779d: canceled" и т.д.
end

Вы также можете слушать канал "log", который предоставляет JSON-структуру всех зарегистрированных событий.

local events = qless.events(redis_params)

events:listen({ "log" }, function(channel, message)
    local message = cjson.decode(message)
    ngx.log(ngx.INFO, message.event, " ", message.jid)
end

Сигналы о работе

Когда рабочему процессу дается задание, ему предоставляется эксклюзивная блокировка на это задание. Это означает, что это задание не будет передано никакому другому рабочему процессу, пока рабочий процесс не проверит прогресс по заданию. По умолчанию задания должны либо сообщать о прогрессе каждые 60 секунд, либо завершать его, но это настраиваемый параметр. Для более длительных заданий это может не иметь смысла.

-- Ура! У нас есть работа!
local job = queue:pop()

-- Как долго мне нужно ждать, чтобы проверить?
job:ttl()
-- = 59

-- Эй! Я все еще работаю над этим!
job:heartbeat()
-- = 1331326141.0

-- Хорошо, у меня есть еще время. О! Теперь я закончил!
job:complete()

Если вы хотите увеличить интервал сердцебиения для всех очередей,

-- Теперь задания получают 10 минут на проверку
qless:set_config("heartbeat", 600)

-- Но очередь тестирования не получает так много времени.
qless.queues["testing"].heartbeat = 300

При выборе интервала сердцебиения обратите внимание, что это количество времени, которое может пройти, прежде чем qless поймет, что задание было сброшено. В то же время вы не хотите нагружать qless отправкой сигналов о работе каждые 10 секунд, если ваше задание ожидается на несколько часов.

Идиома, которую вам рекомендуется использовать для длительных заданий, которые хотят периодически проверять свой прогресс:

-- Подождите, пока не останется 5 минут до сердцебиения, и если мы обнаружим, что
-- мы потеряли свою блокировку на задании, то с честью уйдем
if job:ttl() < 300 and not job:heartbeat() then
  -- выход
end

Статистика

Одной из приятных особенностей Qless является то, что вы можете получать статистику об использовании. Статистика агрегируется по дням, поэтому, когда вы хотите получить статистику о очереди, вам нужно указать, о какой очереди и о каком дне идет речь. По умолчанию вы просто получаете статистику за сегодня. Эта статистика включает информацию о среднем времени ожидания задания, стандартном отклонении и гистограмме. Эти же данные также предоставляются для завершения задания:

-- Итак, как у нас дела сегодня?
local stats = queue:stats()
-- = { 'run' = { 'mean' = ..., }, 'wait' = {'mean' = ..., } }

Время

Важно отметить, что Redis не позволяет получить доступ к системному времени, если вы собираетесь выполнять какие-либо манипуляции с данными (что делают наши скрипты). И все же у нас есть сигналы о работе. Это означает, что клиенты на самом деле отправляют текущее время при выполнении большинства запросов, и для согласованности это означает, что ваши рабочие процессы должны быть относительно синхронизированы. Это не означает, что до десятков миллисекунд, но если у вас наблюдается заметное отклонение часов, вам следует исследовать NTP.

Обеспечение уникальности задания

Как упоминалось выше, задания уникально идентифицируются по идентификатору — их jid. Qless будет генерировать UUID для каждого поставленного задания, или вы можете указать один вручную:

queue:put("jobs.test", { hello = 'howdy' }, { jid = 'my-job-jid' })

Это может быть полезно, когда вы хотите обеспечить уникальность задания: просто создайте jid, который является функцией класса и данных задания, и гарантируется, что Qless не будет иметь несколько заданий с одним и тем же классом и данными.

GitHub

Вы можете найти дополнительные советы по конфигурации и документацию для этого модуля в репозитории GitHub для nginx-module-qless.