worker-events: NGINX 中纯 Lua 的跨工作进程事件
安装
如果您尚未设置 RPM 仓库订阅,请 注册。然后您可以继续以下步骤。
CentOS/RHEL 7 或 Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-worker-events
CentOS/RHEL 8+、Fedora Linux、Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-worker-events
要在 NGINX 中使用此 Lua 库,请确保已安装 nginx-module-lua。
本文档描述了 lua-resty-worker-events v2.0.1,于 2021 年 6 月 28 日发布。
lua-resty-worker-events
用于 Nginx 工作进程的进程间事件
概述
http {
# 大小取决于要处理的事件数量:
lua_shared_dict process_events 1m;
init_worker_by_lua_block {
local ev = require "resty.worker.events"
local handler = function(data, event, source, pid)
print("收到事件; source=", source,
", event=", event,
", data=", tostring(data),
", 来自进程 ", pid)
end
ev.register(handler)
local ok, err = ev.configure {
shm = "process_events", -- 由 "lua_shared_dict" 定义
timeout = 2, -- shm 中唯一事件数据的生命周期
interval = 1, -- 轮询间隔(秒)
wait_interval = 0.010, -- 在重试获取事件数据之前等待
wait_max = 0.5, -- 在丢弃事件之前的最大等待时间
shm_retries = 999, -- shm 片段化的重试次数(无内存)
}
if not ok then
ngx.log(ngx.ERR, "启动事件系统失败: ", err)
return
end
}
server {
...
# 轮询示例:
location = /some/path {
default_type text/plain;
content_by_lua_block {
-- 手动调用 `poll` 以保持最新,可以单独使用,
-- 或与定时器间隔一起使用。轮询是高效的,
-- 如果保持最新很重要,这种方式更可取。
require("resty.worker.events").poll()
-- 在这里执行常规操作
}
}
}
}
描述
该模块提供了一种方法,可以将事件发送到 NGINX 服务器中的其他工作进程。通信通过一个共享内存区域进行,事件数据将存储在其中。
所有工作进程中的事件顺序 保证 是相同的。
工作进程将设置一个定时器以在后台检查事件。该模块遵循单例模式,因此每个工作进程只运行一次。然而,如果保持最新很重要,可以将间隔设置为较小的频率,并在每次接收到请求时调用 poll,以确保尽快处理所有内容。
该设计允许三种用例:
- 向所有工作进程广播事件,见 post。在这种情况下,所有工作进程中的事件顺序保证相同。例如;一个工作进程中运行的健康检查,但通知所有工作进程某个上游节点失败。
- 仅向本地工作进程广播事件,见 post_local。
- 将外部事件合并为单个操作。例如;所有工作进程监视外部事件,指示内存缓存需要刷新。当接收到它时,它们都以唯一事件哈希(所有工作进程生成相同的哈希)发布,见 post 的
unique参数。现在只有一个工作进程将接收到事件 仅一次,因此只有一个工作进程将访问上游数据库以刷新内存数据。
该模块本身将触发两个事件,source="resty-worker-events";
* event="started" 当模块首次配置时(注意:事件处理程序必须在调用 configure 之前 注册,以便能够捕获事件)
* event="stopping" 当工作进程退出时(基于定时器 premature 设置)
请参见 event_list,以在不使用硬编码魔法值/字符串的情况下使用事件。
故障排除
为了正确设置 shm 的大小,了解它的使用方式很重要。事件数据存储在 shm 中,以便将其传递给其他工作进程。因此,shm 中有两种类型的条目:
- 仅由单个工作进程执行的事件(见
post方法的unique参数)。这些条目在 shm 中获得一个ttl,因此会过期。 - 所有其他事件(除了不使用 SHM 的本地事件)。在这些情况下,没有设置
ttl。
上述结果是 SHM 将始终是满的!因此,这不是一个需要调查的指标。
如何防止问题:
- SHM 的大小必须至少是预期最大有效负载的倍数。它必须能够容纳在一个
interval内可能发送的所有事件(见configure)。 no memory错误 不能 通过增大 SHM 来解决。解决这些问题的唯一方法是增加传递给configure的shm_retries选项(该选项的默认值已经很高)。 这是因为该错误是由于碎片化造成的,而不是内存不足。-
waiting for event data timed out错误发生在事件数据在所有工作进程处理之前被驱逐的情况下。如果有大量(大有效负载)事件的突发情况,这可能会发生。要解决这些问题:- 尽量避免大的事件有效负载
- 使用较小的
interval,以便工作进程更频繁地检查(和处理)事件(见传递给configure的interval选项) - 增加 SHM 大小,以便它可以容纳在 1 个间隔内可能发送的所有事件数据。
方法
configure
语法: success, err = events.configure(opts)
将初始化事件监听器。通常应从 init_by_lua 处理程序调用,因为它将确保所有工作进程以第一个事件开始。在系统重新加载的情况下(启动新工作进程并停止旧工作进程),过去的事件将不会被重放。由于无法保证工作进程重新加载的顺序,因此也无法保证事件的开始。因此,如果某种状态是从事件派生的,您必须单独管理该状态。
opts 参数是一个带有命名选项的 Lua 表:
shm: (必需)要使用的共享内存的名称。事件数据将不会过期,因此该模块依赖于 shm lru 机制从 shm 中驱逐旧事件。因此,shm 可能不应用于其他目的。shm_retries: (可选)在发布事件时 shm 返回“无内存”时的重试次数,默认值为 999。每次插入尝试时,如果没有可用内存(要么没有空间可用,要么内存可用但碎片化),将驱逐“最多十个”旧条目。在那之后,如果仍然没有可用内存,将返回“无内存”错误。重试插入会多次触发驱逐阶段,增加可用内存的同时也增加找到足够大的连续内存块以容纳新事件数据的概率。interval: (可选)轮询事件的间隔(以秒为单位),默认值为 1。设置为 0 以禁用轮询。wait_interval: (可选)当找到新的 eventid,但数据尚不可用时(由于工作进程的异步行为),两次尝试之间的间隔wait_max: (可选)在找到事件 id 时等待数据的最大时间,然后丢弃事件。这是一个安全设置,以防出现问题。timeout: (可选)存储在 shm 中的唯一事件数据的超时(以秒为单位),默认值为 2。请参阅 post 方法的unique参数。
返回值将是 true,或 nil 和错误消息。
此方法可以重复调用以更新设置,除了初始配置后不能更改的 shm 值。
注意:wait_interval 是使用 ngx.sleep 函数执行的。在此函数不可用的上下文中(例如 init_worker),它将执行忙等待以执行延迟。
configured
语法: is_already_configured = events.configured()
事件模块作为每个工作进程的单例运行。configured 函数允许检查它是否已经启动并运行。在启动任何依赖项之前进行检查是推荐的;
local events = require "resty.worker.events"
local initialization_of_my_module = function()
assert(events.configured(), "请在使用 my_module 之前配置 'lua-resty-worker-events' 模块")
-- 在这里执行初始化
end
event_list
语法: _M.events = events.event_list(sourcename, event1, event2, ...)
实用程序函数,用于生成事件列表并防止魔法字符串中的拼写错误。访问返回表中不存在的事件将导致“未知事件错误”。
第一个参数 sourcename 是一个唯一名称,用于标识事件源,该名称将作为字段 _source 可用。所有后续参数是由事件源生成的命名事件。
示例用法;
local ev = require "resty.worker.events"
-- 事件源示例
local events = ev.event_list(
"my-module-event-source", -- 可用作 _M.events._source
"started", -- 可用作 _M.events.started
"event2" -- 可用作 _M.events.event2
)
local raise_event = function(event, data)
return ev.post(events._source, event, data)
end
-- 发布我自己的 'started' 事件
raise_event(events.started, nil) -- nil 为清晰起见,不传递事件数据
-- 定义我的模块表
local _M = {
events = events -- 导出事件表
-- 实现代码在这里
}
return _M
-- 事件客户端示例;
local mymod = require("some_module") -- 带有 `events` 表的模块
-- 定义回调并使用源模块的事件表
local my_callback = function(data, event, source, pid)
if event == mymod.events.started then -- 'started' 是事件名称
-- 来自 resty-worker-events 模块的 started 事件
elseif event == mymod.events.stoppping then -- 'stopping' 是事件名称
-- 由于 `stoppping` 中的拼写错误,上述代码将抛出错误
end
end
ev.register(my_callback, mymod.events._source)
poll
语法: success, err = events.poll()
将轮询新事件并处理所有事件(调用注册的回调)。实现是高效的,它只会检查一个共享内存值,如果没有新事件可用,则立即返回。
返回值将在处理所有事件时为 "done",如果已经在轮询循环中,则为 "recursive",如果出现问题,则为 nil + error。
"recursive" 结果仅表示事件已成功发布,但尚未处理,因为需要先处理其他事件。
post
语法: success, err = events.post(source, event, data, unique)
将发布一个新事件。source 和 event 都是字符串。data 可以是任何东西(包括 nil),只要它可以被 cjson 模块(反)序列化。
如果提供了 unique 参数,则只有一个工作进程将执行该事件,其他工作进程将忽略它。任何后续事件与相同的 unique 值也将被忽略(在指定给 configure 的 timeout 期间)。
执行事件的进程不一定是发布事件的进程。
返回值将在事件成功发布时为 true,在失败时为 nil + error。
注意:发送事件的工作进程也将接收到该事件!因此,如果事件源也会对事件采取行动,则不应在事件发布代码中执行,而应在接收到事件时执行。
post_local
语法: success, err = events.post_local(source, event, data)
与 post 相同,除了事件将仅限于工作进程本地,不会广播到其他工作进程。使用此方法时,data 元素将不会被 JSON 化。
返回值将在事件成功发布时为 true,在失败时为 nil + error。
register
语法: events.register(callback, source, event1, event2, ...)
将注册一个回调函数以接收事件。如果省略 source 和 event,则回调将在 每个 事件上执行;如果提供了 source,则仅会传递匹配源的事件。如果给定了(一个或多个)事件名称,则仅当 source 和 event 都匹配时,回调才会被调用。
回调应具有以下签名;
语法: callback = function(data, event, source, pid)
参数将与提供给 post 的参数相同,除了额外的值 pid,它将是源工作进程的 pid,如果仅是本地事件,则为 nil。来自 callback 的任何返回值将被丢弃。
注意:data 可能是数据的引用类型(例如 Lua table 类型)。相同的值会传递给所有回调,因此在处理程序中不要更改该值,除非您知道自己在做什么!
register 的返回值将是 true,如果 callback 不是函数值,则会抛出错误。
警告:事件处理程序必须快速返回。如果处理程序的执行时间超过配置的 timeout 值,则事件将被丢弃!
注意:要接收进程自己的 started 事件,处理程序必须在调用 configure 之前注册。
register_weak
语法: events.register_weak(callback, source, event1, event2, ...)
此函数与 register 相同,唯一的区别是模块将仅对 callback 函数保持 弱引用。
unregister
语法: events.unregister(callback, source, event1, event2, ...)
将注销回调函数,并阻止其接收进一步的事件。参数的工作方式与 register 完全相同。
返回值将是 true,如果已移除;如果不在处理程序列表中,则为 false;如果 callback 不是函数值,则会抛出错误。
历史
发布新版本
- 确保下面的变更日志是最新的
- 更新代码中的版本号
- 在
./rockspecs中创建新的 rockspec - 提交时消息为
release x.x.x - 将提交标记为
x.x.x - 推送提交和标签
- 上传到 luarocks
2.0.1, 2021 年 6 月 28 日
- 修复:'init phase' 中可能的死锁
2.0.0, 2020 年 9 月 16 日
- 重大变更:
post函数不再调用poll,使所有事件异步化。当需要立即处理事件时,必须显式调用poll。 - 重大变更:
post_local函数不再立即执行事件,使所有本地事件异步化。当需要立即处理事件时,必须显式调用poll。 - 修复:在重新加载期间防止 CPU 100% 旋转事件 shm 被清除
- 修复:在写入 shm 失败时改进日志记录(添加有效负载大小以便于故障排除)
- 修复:不再记录有效负载,因为这可能通过日志暴露敏感数据
- 更改:将
shm_retries默认值更新为 999 - 更改:将定时器循环更改为睡眠循环(性能)
- 修复:在重新配置时确保回调表已初始化
1.1.0, 2020 年 12 月 23 日(维护版本)
- 特性:轮询循环现在永远运行,在每次运行之间睡眠 0.5 秒,避免在每一步创建新定时器。
1.0.0, 2019 年 7 月 18 日
- 重大变更:
poll(因此也包括post和post_local)的返回值更改为更符合 Lua 风格,当一切正常时为真。 - 特性:新选项
shm_retries用于修复在发布事件时由 shm 中的内存碎片引起的“无内存”错误。 - 修复:修复了变量名称中的两个拼写错误(边缘情况)
0.3.3, 2018 年 5 月 8 日
- 修复:通过移除超时设置来解决初始化阶段的超时问题,见问题 #9
0.3.2, 2018 年 4 月 11 日
- 更改:为处理程序错误添加堆栈跟踪
- 修复:如果值不可序列化,则处理错误处理程序失败,见问题 #5
- 修复:修复对弱处理程序的测试
另见
- OpenResty: http://openresty.org
GitHub
您可以在 nginx-module-worker-events 的 GitHub 仓库 中找到此模块的其他配置提示和文档。