rabbitmqstomp: Biblioteca cliente Lua RabbitMQ opinativa para aplicativos nginx-module-lua baseada na API cosocket
Instalação
Se você ainda não configurou a assinatura do repositório RPM, inscreva-se. Depois, você pode prosseguir com os seguintes passos.
CentOS/RHEL 7 ou Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-rabbitmqstomp
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-rabbitmqstomp
Para usar esta biblioteca Lua com o NGINX, certifique-se de que o nginx-module-lua esteja instalado.
Este documento descreve lua-resty-rabbitmqstomp v0.1 lançado em 01 de junho de 2013.
lua-resty-rabbitmqstomp - Biblioteca cliente Lua RabbitMQ que usa a API cosocket para comunicação sobre STOMP 1.2 com um broker RabbitMQ que possui o plugin STOMP.
Limitações
Esta biblioteca é opinativa e possui certas suposições e limitações que podem ser abordadas no futuro;
- O servidor RabbitMQ deve ter o adaptador STOMP habilitado que suporta STOMP v1.2
- Suposição de que usuários, vhost, exchanges, filas e bindings já estão configurados
Implementação do Cliente STOMP v1.2
Esta biblioteca usa STOMP 1.2 para comunicação com o broker RabbitMQ e implementa extensões e restrições do plugin Stomp do RabbitMQ.
Internamente, o RabbitMQ usa AMQP para se comunicar. Dessa forma, a biblioteca permite a implementação de consumidores e produtores que se comunicam com o broker RabbitMQ via STOMP, sobre AMQP. O protocolo é baseado em frames e possui um comando, cabeçalhos e corpo terminados por um EOL (^@) que consiste em \r (013) e \n (010) sobre um stream TCP:
COMMAND
header1:value1
header2: value2
BODY^@
COMMAND é seguido por EOL, então cabeçalhos separados por EOL no formato chave:valor e, em seguida, uma linha em branco onde o BODY começa e o frame é terminado por EOL ^@. COMMAND e cabeçalhos são codificados em UTF-8.
Conexão
Para conectar, criamos e enviamos um frame CONNECT através de um socket TCP fornecido pela API cosocket conectando ao IP do broker, tanto IPv4 quanto IPv6 são suportados. No frame, usamos login, passcode para autenticação, accept-version para impor suporte à versão STOMP do cliente e host para selecionar o VHOST do broker.
CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/devnode
heart-beat:optional
^@
Em caso de erro, um frame ERROR é retornado, por exemplo:
ERROR
message:Bad CONNECT
content-type:text/plain
version:1.0,1.1,1.2
content-length:32
Access refused for user 'admin'^@
Em caso de conexão bem-sucedida, um frame CONNECTED é retornado pelo broker, por exemplo:
CONNECTED
session:session-sGF0vjCKH1bLhFr6w9QwuQ
heart-beat:0,0
server:RabbitMQ/3.0.4
version:1.2
Para criar uma conexão, nome de usuário, senha, vhost, heartbeat, host do broker e porta devem ser fornecidos.
Publicação
Podemos publicar mensagens em uma exchange com uma chave de roteamento, modo de persistência, modo de entrega e outros cabeçalhos usando o comando SEND:
SEND
destination:/exchange/exchange_name/routing_key
app-id: luaresty
delivery-mode:2
persistent:true
content-type:json/application
content-length:5
hello^@
Observe que o content-length inclui a mensagem e o byte EOL.
Métodos
new
syntax: rabbit, err = rabbitmqstomp:new()
Cria um objeto RabbitMQ. Em caso de falhas, retorna nil e uma string descrevendo o erro.
set_timeout
syntax: rabbit:set_timeout(time)
Define o tempo limite (em ms) de proteção para operações subsequentes, incluindo o método connect. Observe que o tempo limite deve ser definido antes de chamar qualquer outro método após a criação do objeto.
connect
syntax: ok, err = red:connect{host=host, port=port, username=username, password=password, vhost=vhost}
Tenta conectar a um broker STOMP com o adaptador RabbitMQ STOMP em um host, a porta está ouvindo.
Se nenhum dos valores for fornecido, valores padrão são assumidos:
- host: localhost
- port: 61613
- username: guest
- password: guest
- vhost: /
pool pode ser fornecido para ser usado como um nome personalizado para o pool de conexões em uso.
send
syntax: rabbit:send(msg, headers)
Publica uma mensagem com um conjunto de cabeçalhos.
Alguns valores de cabeçalho que podem ser definidos:
destination: Destino da mensagem, por exemplo /exchange/name/binding
persistent: Para entregar uma mensagem persistente, o valor deve ser "true" se declarado
receipt: Recibo para entrega confirmada
content-type: Tipo de mensagem, por exemplo application/json
Para a lista de cabeçalhos suportados, consulte a página de extensões e restrições do protocolo STOMP: https://www.rabbitmq.com/stomp.html
subscribe
syntax: rabbit:subscribe(headers)
Inscreve-se em uma fila usando headers. Deve ter um id quando persistent for true. Em caso de inscrição bem-sucedida, frames MESSAGE são enviados pelo broker.
unsubscribe
syntax: rabbit:unsubscribe(headers)
Desinscreve-se de uma fila usando headers. Em caso de desinscrição bem-sucedida, os frames MESSAGE pararão de chegar do broker.
receive
syntax: rabbit:receive()
Tenta ler qualquer frame MESSAGE recebido e retorna a mensagem. Tentar receber sem uma inscrição válida resultará em erros.
get_reused_times
syntax: times, err = rabbit:get_reused_times()
Este método retorna o número de vezes (com sucesso) reutilizadas para a conexão atual. Em caso de erro, retorna nil e uma string descrevendo o erro.
Se a conexão atual não vier do pool de conexão embutido, então este método sempre retorna 0, ou seja, a conexão nunca foi reutilizada (ainda). Se a conexão vier do pool de conexão, então o valor retornado é sempre diferente de zero. Portanto, este método também pode ser usado para determinar se a conexão atual vem do pool.
set_keepalive
syntax: ok, err = rabbit:set_keepalive(max_idle_timeout, pool_size)
Coloca a conexão RabbitMQ atual imediatamente no pool de conexões cosocket do ngx_lua.
Você pode especificar o tempo máximo de inatividade (em ms) quando a conexão está no pool e o tamanho máximo do pool para cada processo de trabalho do nginx.
Em caso de sucesso, retorna 1. Em caso de erros, retorna nil com uma string descrevendo o erro.
Chame este método apenas no lugar onde você chamaria o método close. Chamar este método imediatamente transformará o objeto redis atual no estado fechado. Quaisquer operações subsequentes, além de connect() no objeto atual, retornarão o erro de fechado.
close
syntax: ok, err = rabbit:close()
Fecha a conexão RabbitMQ atual graciosamente enviando um DISCONNECT para o broker RabbitMQ STOMP e retorna o status.
Em caso de sucesso, retorna 1. Em caso de erros, retorna nil com uma string descrevendo o erro.
Exemplo
Um produtor simples que pode enviar mensagens persistentes confiáveis para uma exchange com algum binding:
local rabbitmq = require "resty.rabbitmqstomp"
local mq, err = rabbitmq:new()
if not mq then
return
end
mq:set_timeout(10000)
local ok, err = mq:connect {
host = "127.0.0.1",
port = 61613,
username = "guest",
password = "guest",
vhost = "/"
}
if not ok then
return
end
local strlen = string.len
local msg = "{'key': 'value'}"
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"
local ok, err = mq:send(msg, headers)
if not ok then
return
end
ngx.log(ngx.INFO, "Publicado: " .. msg)
local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:subscribe(headers)
if not ok then
return
end
local data, err = mq:receive()
if not ok then
return
end
ngx.log(ngx.INFO, "Consumido: " .. data)
local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:unsubscribe(headers)
local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
return
end
Veja Também
GitHub
Você pode encontrar dicas adicionais de configuração e documentação para este módulo no repositório GitHub para nginx-module-rabbitmqstomp.