rabbitmqstomp: 基于 cosocket API 的 NGINX 模块 Lua RabbitMQ 客户端库
安装
如果您尚未设置 RPM 仓库订阅,请 注册。然后您可以继续以下步骤。
CentOS/RHEL 7 或 Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-rabbitmqstomp
CentOS/RHEL 8+、Fedora Linux、Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-rabbitmqstomp
要在 NGINX 中使用此 Lua 库,请确保已安装 nginx-module-lua。
本文档描述了 lua-resty-rabbitmqstomp v0.1,发布于 2013 年 6 月 1 日。
lua-resty-rabbitmqstomp - 使用 cosocket API 与启用了 STOMP 插件的 RabbitMQ 代理进行 STOMP 1.2 通信的 Lua RabbitMQ 客户端库。
限制
此库是有主见的,并且有某些假设和限制,可能会在未来得到解决;
- RabbitMQ 服务器应启用支持 STOMP v1.2 的 STOMP 适配器
- 假设用户、vhost、交换、队列和绑定已设置好
STOMP v1.2 客户端实现
此库使用 STOMP 1.2 与 RabbitMQ 代理进行通信,并实现 RabbitMQ STOMP 插件的扩展和限制。
在内部,RabbitMQ 使用 AMQP 进行进一步的通信。通过这种方式,库使得实现与 RabbitMQ 代理通过 STOMP、通过 AMQP 进行通信的消费者和生产者成为可能。该协议是基于帧的,包含一个命令、头和以 EOL (^@) 结束的主体,EOL 由 \r (013) 和所需的 \n (010) 组成,位于 TCP 流上:
COMMAND
header1:value1
header2: value2
COMMAND 后跟 EOL,然后是以 key:value 对格式分隔的头,接着是一个空行,主体从此开始,帧以 ^@ EOL 结束。COMMAND 和头是 UTF-8 编码的。
连接
要连接,我们创建并通过 cosocket API 发送一个 CONNECT 帧,连接到代理 IP,支持 IPv4 和 IPv6。在帧中,我们使用 login、passcode 进行身份验证,accept-version 强制客户端 STOMP 版本支持,host 用于选择代理的 VHOST。
CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/devnode
heart-beat:optional
^@
发生错误时,将返回一个 ERROR 帧,例如:
ERROR
message:Bad CONNECT
content-type:text/plain
version:1.0,1.1,1.2
content-length:32
Access refused for user 'admin'^@
成功连接后,代理将返回一个 CONNECTED 帧,例如:
CONNECTED
session:session-sGF0vjCKH1bLhFr6w9QwuQ
heart-beat:0,0
server:RabbitMQ/3.0.4
version:1.2
创建连接时,应提供用户名、密码、vhost、心跳、代理主机和端口。
发布
我们可以使用 SEND 命令向具有路由键、持久性模式、投递模式和其他头的交换发布消息:
SEND
destination:/exchange/exchange_name/routing_key
app-id: luaresty
delivery-mode:2
persistent:true
content-type:json/application
content-length:5
请注意,content-length 包括消息和 EOL 字节。
方法
new
syntax: rabbit, err = rabbitmqstomp:new()
创建一个 RabbitMQ 对象。如果失败,则返回 nil 和描述错误的字符串。
set_timeout
syntax: rabbit:set_timeout(time)
设置后续操作的超时(以毫秒为单位)保护,包括连接方法。请注意,超时应在创建对象后调用任何其他方法之前设置。
connect
syntax: ok, err = red:connect{host=host, port=port, username=username, password=password, vhost=vhost}
尝试连接到在主机上监听的 RabbitMQ STOMP 适配器的一个 stomp 代理。
如果未提供任何值,则假定默认值:
- host: localhost
- port: 61613
- username: guest
- password: guest
- vhost: /
pool 可以用于自定义连接池的名称。
send
syntax: rabbit:send(msg, headers)
发布带有一组头的消息。
可以设置的一些头值:
destination: 消息的目的地,例如 /exchange/name/binding
persistent: 要交付持久消息,如果声明则值应为 "true"
receipt: 确认交付的收据
content-type: 消息类型,例如 application/json
有关支持的头的列表,请参见 STOMP 协议扩展和限制页面:https://www.rabbitmq.com/stomp.html
subscribe
syntax: rabbit:subscribe(headers)
通过使用 headers 订阅队列。当 persistent 为 true 时,它应具有 id。成功订阅后,代理将发送 MESSAGE 帧。
unsubscribe
syntax: rabbit:unsubscribe(headers)
通过使用 headers 取消订阅队列。成功取消订阅后,MESSAGE 帧将停止从代理发送。
receive
syntax: rabbit:receive())
尝试读取任何接收到的 MESSAGE 帧并返回消息。尝试在没有有效订阅的情况下接收将导致错误。
get_reused_times
syntax: times, err = rabbit:get_reused_times()
此方法返回当前连接的(成功)重用次数。如果发生错误,则返回 nil 和描述错误的字符串。
如果当前连接不是来自内置连接池,则此方法始终返回 0,即连接从未被重用(尚未)。如果连接来自连接池,则返回值始终为非零。因此,此方法也可用于确定当前连接是否来自池。
set_keepalive
syntax: ok, err = rabbit:set_keepalive(max_idle_timeout, pool_size)
将当前 RabbitMQ 连接立即放入 ngx_lua cosocket 连接池中。
您可以指定连接在池中时的最大空闲超时(以毫秒为单位)和每个 NGINX 工作进程的最大池大小。
成功时返回 1。发生错误时返回 nil 和描述错误的字符串。
仅在您本来会调用 close 方法的地方调用此方法。调用此方法将立即将当前 redis 对象转换为关闭状态。对当前对象的任何后续操作(除了 connect())将返回关闭错误。
close
syntax: ok, err = rabbit:close()
通过向 RabbitMQ STOMP 代理发送 DISCONNECT 来优雅地关闭当前 RabbitMQ 连接,并返回状态。
成功时返回 1。发生错误时返回 nil 和描述错误的字符串。
示例
一个简单的生产者,可以向具有某些绑定的交换发送可靠的持久消息:
local rabbitmq = require "resty.rabbitmqstomp"
local mq, err = rabbitmq:new()
if not mq then
return
end
mq:set_timeout(10000)
local ok, err = mq:connect {
host = "127.0.0.1",
port = 61613,
username = "guest",
password = "guest",
vhost = "/"
}
if not ok then
return
end
local strlen = string.len
local msg = "{'key': 'value'}"
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"
local ok, err = mq:send(msg, headers)
if not ok then
return
end
ngx.log(ngx.INFO, "Published: " .. msg)
local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:subscribe(headers)
if not ok then
return
end
local data, err = mq:receive()
if not ok then
return
end
ngx.log(ngx.INFO, "Consumed: " .. data)
local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:unsubscribe(headers)
local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
return
end
另见
GitHub
您可以在 nginx-module-rabbitmqstomp 的 GitHub 仓库中找到此模块的其他配置提示和文档。