跳转至

rabbitmqstomp: 基于 cosocket API 的 NGINX 模块 Lua RabbitMQ 客户端库

安装

如果您尚未设置 RPM 仓库订阅,请 注册。然后您可以继续以下步骤。

CentOS/RHEL 7 或 Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-rabbitmqstomp

CentOS/RHEL 8+、Fedora Linux、Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-rabbitmqstomp

要在 NGINX 中使用此 Lua 库,请确保已安装 nginx-module-lua

本文档描述了 lua-resty-rabbitmqstomp v0.1,发布于 2013 年 6 月 1 日。


lua-resty-rabbitmqstomp - 使用 cosocket API 与启用了 STOMP 插件的 RabbitMQ 代理进行 STOMP 1.2 通信的 Lua RabbitMQ 客户端库。

限制

此库是有主见的,并且有某些假设和限制,可能会在未来得到解决;

  • RabbitMQ 服务器应启用支持 STOMP v1.2 的 STOMP 适配器
  • 假设用户、vhost、交换、队列和绑定已设置好

STOMP v1.2 客户端实现

此库使用 STOMP 1.2 与 RabbitMQ 代理进行通信,并实现 RabbitMQ STOMP 插件的扩展和限制。

在内部,RabbitMQ 使用 AMQP 进行进一步的通信。通过这种方式,库使得实现与 RabbitMQ 代理通过 STOMP、通过 AMQP 进行通信的消费者和生产者成为可能。该协议是基于帧的,包含一个命令、头和以 EOL (^@) 结束的主体,EOL 由 \r (013) 和所需的 \n (010) 组成,位于 TCP 流上:

COMMAND
header1:value1
header2: value2

COMMAND 后跟 EOL,然后是以 key:value 对格式分隔的头,接着是一个空行,主体从此开始,帧以 ^@ EOL 结束。COMMAND 和头是 UTF-8 编码的。

连接

要连接,我们创建并通过 cosocket API 发送一个 CONNECT 帧,连接到代理 IP,支持 IPv4 和 IPv6。在帧中,我们使用 login、passcode 进行身份验证,accept-version 强制客户端 STOMP 版本支持,host 用于选择代理的 VHOST。

CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/devnode
heart-beat:optional

^@

发生错误时,将返回一个 ERROR 帧,例如:

ERROR
message:Bad CONNECT
content-type:text/plain
version:1.0,1.1,1.2
content-length:32

Access refused for user 'admin'^@

成功连接后,代理将返回一个 CONNECTED 帧,例如:

CONNECTED
session:session-sGF0vjCKH1bLhFr6w9QwuQ
heart-beat:0,0
server:RabbitMQ/3.0.4
version:1.2

创建连接时,应提供用户名、密码、vhost、心跳、代理主机和端口。

发布

我们可以使用 SEND 命令向具有路由键、持久性模式、投递模式和其他头的交换发布消息:

SEND
destination:/exchange/exchange_name/routing_key
app-id: luaresty
delivery-mode:2
persistent:true
content-type:json/application
content-length:5

请注意,content-length 包括消息和 EOL 字节。

方法

new

syntax: rabbit, err = rabbitmqstomp:new()

创建一个 RabbitMQ 对象。如果失败,则返回 nil 和描述错误的字符串。

set_timeout

syntax: rabbit:set_timeout(time)

设置后续操作的超时(以毫秒为单位)保护,包括连接方法。请注意,超时应在创建对象后调用任何其他方法之前设置。

connect

syntax: ok, err = red:connect{host=host, port=port, username=username, password=password, vhost=vhost}

尝试连接到在主机上监听的 RabbitMQ STOMP 适配器的一个 stomp 代理。

如果未提供任何值,则假定默认值:

  • host: localhost
  • port: 61613
  • username: guest
  • password: guest
  • vhost: /

pool 可以用于自定义连接池的名称。

send

syntax: rabbit:send(msg, headers)

发布带有一组头的消息。

可以设置的一些头值:

destination: 消息的目的地,例如 /exchange/name/binding persistent: 要交付持久消息,如果声明则值应为 "true" receipt: 确认交付的收据 content-type: 消息类型,例如 application/json

有关支持的头的列表,请参见 STOMP 协议扩展和限制页面:https://www.rabbitmq.com/stomp.html

subscribe

syntax: rabbit:subscribe(headers)

通过使用 headers 订阅队列。当 persistent 为 true 时,它应具有 id。成功订阅后,代理将发送 MESSAGE 帧。

unsubscribe

syntax: rabbit:unsubscribe(headers)

通过使用 headers 取消订阅队列。成功取消订阅后,MESSAGE 帧将停止从代理发送。

receive

syntax: rabbit:receive())

尝试读取任何接收到的 MESSAGE 帧并返回消息。尝试在没有有效订阅的情况下接收将导致错误。

get_reused_times

syntax: times, err = rabbit:get_reused_times()

此方法返回当前连接的(成功)重用次数。如果发生错误,则返回 nil 和描述错误的字符串。

如果当前连接不是来自内置连接池,则此方法始终返回 0,即连接从未被重用(尚未)。如果连接来自连接池,则返回值始终为非零。因此,此方法也可用于确定当前连接是否来自池。

set_keepalive

syntax: ok, err = rabbit:set_keepalive(max_idle_timeout, pool_size)

将当前 RabbitMQ 连接立即放入 ngx_lua cosocket 连接池中。

您可以指定连接在池中时的最大空闲超时(以毫秒为单位)和每个 NGINX 工作进程的最大池大小。

成功时返回 1。发生错误时返回 nil 和描述错误的字符串。

仅在您本来会调用 close 方法的地方调用此方法。调用此方法将立即将当前 redis 对象转换为关闭状态。对当前对象的任何后续操作(除了 connect())将返回关闭错误。

close

syntax: ok, err = rabbit:close()

通过向 RabbitMQ STOMP 代理发送 DISCONNECT 来优雅地关闭当前 RabbitMQ 连接,并返回状态。

成功时返回 1。发生错误时返回 nil 和描述错误的字符串。

示例

一个简单的生产者,可以向具有某些绑定的交换发送可靠的持久消息:

local rabbitmq = require "resty.rabbitmqstomp"
local mq, err = rabbitmq:new()
if not mq then
      return
end

mq:set_timeout(10000)

local ok, err = mq:connect {
                    host = "127.0.0.1",
                    port = 61613,
                    username = "guest",
                    password = "guest",
                    vhost = "/"
                }
if not ok then
    return
end

local strlen =  string.len

local msg = "{'key': 'value'}"
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"

local ok, err = mq:send(msg, headers)
if not ok then
    return
end
ngx.log(ngx.INFO, "Published: " .. msg)

local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:subscribe(headers)
if not ok then
    return
end

local data, err = mq:receive()
if not ok then
    return
end
ngx.log(ngx.INFO, "Consumed: " .. data)

local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:unsubscribe(headers)

local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
    return
end

另见

GitHub

您可以在 nginx-module-rabbitmqstomp 的 GitHub 仓库中找到此模块的其他配置提示和文档。