worker-events: Кросс-воркерные события для NGINX на чистом Lua
Установка
Если вы еще не настроили подписку на RPM-репозиторий, зарегистрируйтесь. Затем вы можете продолжить с следующими шагами.
CentOS/RHEL 7 или Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-worker-events
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-worker-events
Чтобы использовать эту библиотеку Lua с NGINX, убедитесь, что nginx-module-lua установлен.
Этот документ описывает lua-resty-worker-events v2.0.1, выпущенный 28 июня 2021 года.
lua-resty-worker-events
События межпроцессного взаимодействия для воркер-процессов Nginx
Синопсис
http {
# размер зависит от количества событий, которые нужно обрабатывать:
lua_shared_dict process_events 1m;
init_worker_by_lua_block {
local ev = require "resty.worker.events"
local handler = function(data, event, source, pid)
print("received event; source=",source,
", event=",event,
", data=", tostring(data),
", from process ",pid)
end
ev.register(handler)
local ok, err = ev.configure {
shm = "process_events", -- определено в "lua_shared_dict"
timeout = 2, -- время жизни уникальных данных события в shm
interval = 1, -- интервал опроса (в секундах)
wait_interval = 0.010, -- ожидание перед повторной попыткой получения данных события
wait_max = 0.5, -- максимальное время ожидания перед отбрасыванием события
shm_retries = 999, -- количество попыток для фрагментации shm (нет памяти)
}
if not ok then
ngx.log(ngx.ERR, "failed to start event system: ", err)
return
end
}
server {
...
# пример для опроса:
location = /some/path {
default_type text/plain;
content_by_lua_block {
-- вручную вызывайте `poll`, чтобы оставаться в курсе событий, можно использовать вместо
-- или вместе с интервалом таймера. Опрос эффективен,
-- поэтому, если важно оставаться в курсе событий, это предпочтительно.
require("resty.worker.events").poll()
-- выполняйте обычные действия здесь
}
}
}
}
Описание
Этот модуль предоставляет способ отправки событий другим воркер-процессам на сервере Nginx. Связь осуществляется через зону общей памяти, где будут храниться данные событий.
Порядок событий во всех воркерах гарантированно будет одинаковым.
Воркер-процесс настроит таймер для проверки событий в фоновом режиме. Модуль следует паттерну одиночки и, следовательно, запускается один раз на воркер. Если важно оставаться в курсе событий, интервал можно установить на меньшую частоту, и вызов poll при каждом полученном запросе гарантирует, что все будет обработано как можно скорее.
Дизайн позволяет реализовать 3 сценария использования:
- широковещательная рассылка события всем воркер-процессам, см. post. В этом случае порядок событий гарантированно будет одинаковым во всех воркер-процессах. Пример: проверка состояния, выполняемая в одном воркере, но информирующая всех воркеров о сбое узла upstream.
- широковещательная рассылка события только локальному воркеру, см. post_local.
- объединение внешних событий в одно действие. Пример: все воркеры отслеживают внешние события, указывающие на необходимость обновления кэша в памяти. При получении они все отправляют его с уникальным хешем события (все воркеры генерируют один и тот же хеш), см. параметр
uniqueметода post. Теперь только 1 воркер получит событие только один раз, поэтому только один воркер обратится к upstream базе данных для обновления данных в памяти.
Этот модуль сам вызовет два события с source="resty-worker-events":
* event="started" когда модуль впервые настраивается (заметьте: обработчик событий должен быть
зарегистрирован перед вызовом configure, чтобы иметь возможность поймать событие)
* event="stopping" когда воркер-процесс выходит (на основе настройки таймера premature)
Смотрите event_list для использования событий без жестко закодированных магических значений/строк.
Устранение неполадок
Чтобы правильно определить размер shm, важно понять, как он используется. Данные событий хранятся в shm для передачи их другим воркерам. Таким образом, в shm есть 2 типа записей:
- события, которые должны выполняться только одним воркером (см. параметр
uniqueметодаpost). Эти записи получаютttlв shm и, следовательно, истекают. - все остальные события (за исключением локальных событий, которые не используют SHM). В этих
случаях
ttlне устанавливается.
Результат вышеизложенного заключается в том, что SHM всегда будет полон! Так что это не метрика для исследования.
Как предотвратить проблемы:
- размер SHM должен быть как минимум кратен максимальному ожидаемому объему данных. Он
должен быть способен вместить все события, которые могут быть отправлены в течение одного
interval(см.configure). - ошибки
no memoryне могут быть решены увеличением размера SHM. Единственный способ решить эти проблемы — увеличить параметрshm_retries, передаваемый вconfigure(который уже имеет высокое значение по умолчанию). Это связано с тем, что ошибка вызвана фрагментацией, а не нехваткой памяти. -
ошибка
waiting for event data timed outвозникает, если данные события были исключены до того, как все воркеры смогли с ними справиться. Это может произойти, если происходит всплеск (с большим объемом данных) событий. Чтобы решить эти проблемы:- старайтесь избегать больших объемов данных событий
- используйте меньший
interval, чтобы воркеры проверяли (и обрабатывали) события чаще (см. параметрinterval, передаваемый вconfigure) - увеличьте размер SHM, чтобы он мог вместить все данные событий, которые могут быть отправлены в течение 1 интервала.
Методы
configure
syntax: success, err = events.configure(opts)
Инициализирует слушатель событий. Обычно это следует вызывать из обработчика
init_by_lua, потому что это гарантирует, что все воркеры начнут с первого события. В случае перезагрузки системы (запуск новых и остановка старых воркеров) прошлые события не будут воспроизведены. И поскольку порядок, в котором воркеры перезагружаются, не может быть гарантирован, также не может быть гарантирован старт события. Поэтому, если какое-либо состояние зависит от событий, вам нужно управлять этим состоянием отдельно.
Параметр opts — это таблица Lua с именованными параметрами:
shm: (обязательный) имя общей памяти, которую нужно использовать. Данные событий не будут истекать, поэтому модуль полагается на механизм lru shm для исключения старых событий из shm. Таким образом, shm, вероятно, не следует использовать для других целей.shm_retries: (необязательный) количество попыток, когда shm возвращает "no memory" при отправке события, по умолчанию 999. Каждый раз, когда происходит попытка вставки и памяти нет (либо нет доступного места, либо память доступна, но фрагментирована), "до десятков" старых записей исключаются. После этого, если памяти все еще нет, возвращается ошибка "no memory". Повторная попытка вставки запускает фазу исключения несколько раз, увеличивая доступную память, а также вероятность нахождения достаточно большого непрерывного блока памяти для новых данных события.interval: (необязательный) интервал для опроса событий (в секундах), по умолчанию 1. Установите 0, чтобы отключить опрос.wait_interval: (необязательный) интервал между двумя попытками, когда найден новый eventid, но данные еще недоступны (из-за асинхронного поведения воркер-процессов)wait_max: (необязательный) максимальное время ожидания данных, когда найден идентификатор события, перед отбрасыванием события. Это настройка на случай, если что-то пошло не так.timeout: (необязательный) тайм-аут уникальных данных события, хранящихся в shm (в секундах), по умолчанию 2. Смотрите параметрuniqueметода post.
Возвращаемое значение будет true, или nil и сообщение об ошибке.
Этот метод можно вызывать несколько раз для обновления настроек, за исключением значения shm, которое
нельзя изменять после первоначальной конфигурации.
ПРИМЕЧАНИЕ: wait_interval выполняется с использованием функции ngx.sleep. В контекстах, где эта
функция недоступна (например, init_worker), будет выполнено активное ожидание для реализации задержки.
configured
syntax: is_already_configured = events.configured()
Модуль событий работает как одиночка для каждого воркер-процесса. Функция configured
позволяет проверить, работает ли он уже.
Рекомендуется проверка перед запуском любых зависимостей;
local events = require "resty.worker.events"
local initialization_of_my_module = function()
assert(events.configured(), "Пожалуйста, настройте модуль 'lua-resty-worker-events' "..
"прежде чем использовать my_module")
-- выполняйте инициализацию здесь
end
event_list
syntax: _M.events = events.event_list(sourcename, event1, event2, ...)
Утилита для генерации списков событий и предотвращения опечаток в
магических строках. Доступ к несуществующему событию в возвращаемой таблице приведет
к ошибке 'unknown event'.
Первый параметр sourcename — это уникальное имя, которое идентифицирует источник события,
которое будет доступно как поле _source. Все последующие параметры
— это именованные события, генерируемые источником событий.
Пример использования;
local ev = require "resty.worker.events"
-- Пример источника событий
local events = ev.event_list(
"my-module-event-source", -- доступно как _M.events._source
"started", -- доступно как _M.events.started
"event2" -- доступно как _M.events.event2
)
local raise_event = function(event, data)
return ev.post(events._source, event, data)
end
-- Отправьте свое собственное событие 'started'
raise_event(events.started, nil) -- nil для ясности, данные события не передаются
-- определите свою таблицу модуля
local _M = {
events = events -- экспортируйте таблицу событий
-- реализация идет здесь
}
return _M
-- Пример клиента события;
local mymod = require("some_module") -- модуль с таблицей `events`
-- определите обратный вызов и используйте таблицу событий источника модуля
local my_callback = function(data, event, source, pid)
if event == mymod.events.started then -- 'started' — это имя события
-- событие started из модуля resty-worker-events
elseif event == mymod.events.stoppping then -- 'stopping' — это имя события
-- вышеуказанное вызовет ошибку из-за опечатки в `stoppping`
end
end
ev.register(my_callback, mymod.events._source)
poll
syntax: success, err = events.poll()
Будет опрашивать новые события и обрабатывать их все (вызывать зарегистрированные обратные вызовы). Реализация эффективна, она проверяет только одно значение общей памяти и немедленно возвращает, если новых событий нет.
Возвращаемое значение будет "done", когда все события были обработаны, "recursive", если он уже
находился в цикле опроса, или nil + error, если что-то пошло не так.
Результат "recursive" просто
означает, что событие было успешно отправлено, но еще не обработано, из-за других
событий, которые нужно обработать сначала.
post
syntax: success, err = events.post(source, event, data, unique)
Будет отправлять новое событие. source и event — это обе строки. data может быть чем угодно (включая nil)
при условии, что он (де)сериализуем модулем cjson.
Если параметр unique предоставлен, то только один воркер выполнит событие,
остальные воркеры проигнорируют его. Также любые последующие события с тем же значением unique
будут проигнорированы (в течение указанного периода timeout, переданного в configure).
Процесс, выполняющий событие, не обязательно будет процессом, отправляющим событие.
Возвращаемое значение будет true, когда событие было успешно отправлено, или
nil + error в случае неудачи.
Примечание: воркер-процесс, отправляющий событие, также получит это событие! Поэтому, если источник события также будет действовать на событие, он не должен делать это из кода отправки события, а только при его получении.
post_local
syntax: success, err = events.post_local(source, event, data)
То же самое, что и post, за исключением того, что событие будет локальным для воркер-процесса,
оно не будет транслироваться другим воркерам. С помощью этого метода элемент data
не будет сериализован в json.
Возвращаемое значение будет true, когда событие было успешно отправлено, или
nil + error в случае неудачи.
register
syntax: events.register(callback, source, event1, event2, ...)
Зарегистрирует функцию обратного вызова для получения событий. Если source и event опущены, то
обратный вызов будет выполняться на каждом событии, если source предоставлен, то только события с
совпадающим источником будут переданы. Если (одно или несколько) имени события указано, то только когда
и source, и event совпадают, обратный вызов будет вызван.
Обратный вызов должен иметь следующую сигнатуру;
syntax: callback = function(data, event, source, pid)
Параметры будут такими же, как и те, которые передаются в post, за исключением дополнительного значения
pid, которое будет pid исходного воркер-процесса, или nil, если это было только локальное событие.
Любое возвращаемое значение из callback будет отброшено.
Примечание: data может быть ссылочным типом данных (например, тип Lua table). То же самое значение передается
все обратные вызовы, поэтому не изменяйте значение в вашем обработчике, если не знаете, что делаете!
Возвращаемое значение register будет true, или оно вызовет ошибку, если callback не является
функцией.
ПРЕДУПРЕЖДЕНИЕ: обработчики событий должны возвращаться быстро. Если обработчик занимает больше времени, чем
установленное значение timeout, события будут отброшены!
Примечание: чтобы получить собственное событие started процесса, обработчик должен быть зарегистрирован до
вызова configure.
register_weak
syntax: events.register_weak(callback, source, event1, event2, ...)
Эта функция идентична register, за исключением того, что модуль
будет хранить только слабые ссылки на функцию callback.
unregister
syntax: events.unregister(callback, source, event1, event2, ...)
Отменит регистрацию функции обратного вызова и предотвратит ее получение дальнейших событий. Параметры работают точно так же, как и в register.
Возвращаемое значение будет true, если оно было удалено, false, если его не было в списке обработчиков, или
оно вызовет ошибку, если callback не является функцией.
История
Выпуск новых версий
- убедитесь, что журнал изменений ниже актуален
- обновите номер версии в коде
- создайте новый rockspec в
./rockspecs - зафиксируйте с сообщением
release x.x.x - пометьте коммит как
x.x.x - отправьте коммит и теги
- загрузите в luarocks
2.0.1, 28-июня-2021
- исправление: возможная взаимная блокировка на этапе 'инициализации'
2.0.0, 16-сентября-2020
- СЛОМ: функция
postбольше не вызываетpoll, делая все события асинхронными. Когда требуется немедленная обработка события, необходимо явно вызватьpoll. - СЛОМ: функция
post_localбольше не выполняет событие немедленно, делая все локальные события асинхронными. Когда требуется немедленная обработка события, необходимо явно вызватьpoll. - исправление: предотвращение загрузки на 100% ЦП во время перезагрузки, когда event-shm очищается
- исправление: улучшено ведение журнала в случае ошибки записи в shm (добавлен размер полезной нагрузки для целей устранения неполадок)
- исправление: больше не записывать полезную нагрузку в журнал, так как это может раскрыть конфиденциальные данные через журналы
- изменение: обновлено значение по умолчанию
shm_retriesдо 999 - изменение: изменен цикл таймера на цикл ожидания (производительность)
- исправление: при повторной конфигурации убедитесь, что таблица обратных вызовов инициализирована
1.1.0, 23-декабря-2020 (обслуживающий релиз)
- функция: цикл опроса теперь работает бесконечно, спя 0.5 секунды между запусками, избегая создания новых таймеров на каждом шаге.
1.0.0, 18-июля-2019
- СЛОМ: возвращаемые значения из
poll(а значит, такжеpostиpost_local) изменены, чтобы быть более lua-подобными, чтобы быть истинными, когда все в порядке. - функция: новая опция
shm_retriesдля исправления ошибок "no memory", вызванных фрагментацией памяти в shm при отправке событий. - исправление: исправлены две опечатки в именах переменных (крайние случаи)
0.3.3, 8-мая-2018
- исправление: тайм-ауты на этапах инициализации, путем удаления настройки тайм-аута, см. проблему #9
0.3.2, 11-апреля-2018
- изменение: добавление трассировки стека к ошибкам обработчика
- исправление: сбой обработчика ошибок, если значение не было сериализуемым, см. проблему #5
- исправление: исправлен тест для слабых обработчиков
См. также
- OpenResty: http://openresty.org
GitHub
Вы можете найти дополнительные советы по конфигурации и документацию для этого модуля в репозитории GitHub для nginx-module-worker-events.