pubsub: 基于 cosocket API 的 NGINX 模块 Lua Pubsub 客户端驱动
安装
如果您尚未设置 RPM 仓库订阅,请 注册。然后您可以继续以下步骤。
CentOS/RHEL 7 或 Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-pubsub
CentOS/RHEL 8+、Fedora Linux、Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-pubsub
要在 NGINX 中使用此 Lua 库,请确保已安装 nginx-module-lua。
本文档描述了 lua-resty-pubsub v1.5,发布于 2024 年 11 月 13 日。
基于 cosocket API 的 ngx_lua 的 Lua Pubsub 客户端驱动。
描述
此 Lua 库是 ngx_lua NGINX 模块的 Pubsub 客户端驱动: http://wiki.nginx.org/HttpLuaModule
此 Lua 库利用 ngx_lua 的 cosocket API,确保 100% 非阻塞行为。该库使用 NGINX 定时器和 HTTP 请求将消息(带属性)推送到 Google Cloud pubsub。
请注意,至少需要 ngx_lua 0.9.3 或 ngx_openresty 1.4.3.7,并且不幸的是,仅支持 LuaJIT (--with-luajit)。
概要
server {
location = /publish {
resolver 8.8.8.8 ipv6=off;
content_by_lua_block {
local cjson = require "cjson"
local pubsub_producer = require "resty.pubsub.producer"
local OAUTH_TOKEN = ngx.shared.OAUTH_TOKEN -- 也可以提供不同的字典
-- 如果消息成功发送,将接收消息的回调
-- 消息和 err 的类型是一个表
local success_callback = function (topic, err, messages)
ngx.log(ngx.INFO, "Messages: ", cjson.encode(messages), " successfully pushed to topic: ", topic)
end
-- 如果消息发送失败,将接收消息的回调
-- 消息和 err 的类型是一个表
local error_callback = function (topic, err, messages)
for _, message in ipairs(messages) do
ngx.log(ngx.ERR, "Failed to send Message : ", cjson.encode(message), " with err: ", cjson.encode(err))
end
end
local publish = function()
-- Pubsub 生产者配置
local pubsub_config = {
project_id = "demo-project",
topic = "demo-topic",
pubsub_base_domain = "pubsub.googleapis.com",
pubsub_base_port = 443,
is_emulator = false,
producer_config = {
max_batch_size = 200, -- 数据包数量
max_buffering = 5000, -- 缓冲区中最大数据包数量
timer_interval = 10000, -- 毫秒
last_flush_interval = 5000, -- 毫秒
http_timeout = 6000, -- 毫秒
keepalive_max_idle_timeout = 2000, -- 毫秒
keepalive_pool_size = 50
},
oauth_config = {
service_account_key_path = "/etc/key.json", -- 请替换为您自己的密钥路径
oauth_base_uri = "https://www.googleapis.com/oauth2/v4/token",
oauth_scopes = {
"https://www.googleapis.com/auth/pubsub"
},
oauth_token_dict = OAUTH_TOKEN
},
success_callback = success_callback,
error_callback = error_callback
}
-- 创建生产者对象
-- 无论您调用 new 几次,生产者实例始终会为每个主题每个工作进程生成一次
local producer, err = pubsub_producer:new(pubsub_config)
-- 还要检查初始化生产者时是否有任何错误
if err ~= nil then
ngx.log(ngx.ERR, "Unable to create pubsub producer ", err)
return
end
-- 最后发送带属性的消息。
local ok, send_err = producer:send("Some Random Text", {
attr1 = "Test1",
attr2 = "Test2"
}, "optional_ordering_key")
-- 还要检查发送消息时是否有任何错误
if send_err ~= nil then
ngx.log(ngx.ERR, "Unable to send data to pubsub: ", send_err)
return
end
end
-- 发布消息
publish()
}
}
}
配置
生产者配置
| 属性 | 数据类型 | 描述 | 默认值 |
|---|---|---|---|
| project_id | string | 指定您的 Pub/Sub 项目的项目 ID 字符串 | none (必填) |
| topic | string | 指定需要发送数据的主题 | none (必填) |
| pubsub_base_domain | string | 指定通过其建立 HTTP 连接的基础域。 | pubsub.googleapis.com |
| pubsub_base_port | number | 指定通过其建立 HTTP 连接的基础域端口。 | 443 |
| is_emulator | boolean | 指定布尔值。如果您正在与之通信,则为 true。 | false |
| producer_config.max_batch_size | number | 指定将推送到 pubsub 的最大批量大小。 | 200 |
| producer_config.max_buffering | number | 指定将保持数据的缓冲区的最大大小,持续特定时间。 | 5000 |
| producer_config.timer_interval | number (milliseconds) | 指定检查缓冲区中陈旧消息以进行发布的时间间隔。 | 10000 |
| producer_config.last_flush_interval | number (milliseconds) | 指定最后一次刷新时间和当前时间之间的最大间隔。当缓冲区中的数据包少于批量大小时,长时间使用。 | 10000 |
| producer_config.http_timeout | number (milliseconds) | 设置后续操作的超时保护,包括连接方法。 | 5000 |
| producer_config.keepalive_max_idle_timeout | number (milliseconds) | 用于 httpc:set_keepalive,尝试将当前连接放入 ngx_lua cosocket 连接池。 | 2000 |
| producer_config.keepalive_pool_size | number | 用于 httpc:set_keepalive,尝试将当前连接放入 ngx_lua cosocket 连接池。 | 50 |
| oauth_config.service_account_key_path | string | 指定用于对 pub/sub 项目进行身份验证的服务帐户密钥的路径。 | none (必填) |
| oauth_config.oauth_base_uri | string | 指定请求发送到 Google 授权服务器以获取将在后续请求中使用的令牌的基础 URI。 | https://www.googleapis.com/oauth2/v4/token |
| oauth_config.oauth_scopes | list of string | 指定一个表,包含您可能需要请求以访问 Google API 的 OAuth 2.0 范围,具体取决于您需要的访问级别。 | {"https://www.googleapis.com/auth/pubsub"} |
| oauth_config.oauth_token_dict | lua_shared_dict | 指定一个跨工作进程的共享内存区域,用作 oauth 令牌的存储。 | ngx.shared.OAUTH_TOKEN |
| success_handler | function | 这是一个回调函数,将提供所有成功推送到 pubsub 的消息及其属性。 | none (可选) |
| error_handler | function | 这是一个回调函数,当批处理失败时将执行。 | none (可选) |
模块
resty.pubsub.producer
要加载此模块,只需执行以下操作
local producer = require "resty.pubsub.producer"
方法
new
语法: local p, err = producer:new(pubsub_config)
send
语法: p:send(message, attributes[, ordering_key])
- 需要一个类型为字符串的消息,一个类型为表的属性和一个可选的类型为字符串的 ordering_key
另见
- ngx_lua 模块: http://wiki.nginx.org/HttpLuaModule
GitHub
您可以在 nginx-module-pubsub 的 GitHub 仓库 中找到此模块的其他配置提示和文档。