Pular para conteúdo

rabbitmqstomp: Biblioteca cliente Lua RabbitMQ opinativa para aplicativos nginx-module-lua baseada na API cosocket

Instalação

Se você ainda não configurou a assinatura do repositório RPM, inscreva-se. Depois, você pode prosseguir com os seguintes passos.

CentOS/RHEL 7 ou Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-rabbitmqstomp

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-rabbitmqstomp

Para usar esta biblioteca Lua com o NGINX, certifique-se de que o nginx-module-lua esteja instalado.

Este documento descreve lua-resty-rabbitmqstomp v0.1 lançado em 01 de junho de 2013.


lua-resty-rabbitmqstomp - Biblioteca cliente Lua RabbitMQ que usa a API cosocket para comunicação sobre STOMP 1.2 com um broker RabbitMQ que possui o plugin STOMP.

Limitações

Esta biblioteca é opinativa e possui certas suposições e limitações que podem ser abordadas no futuro;

  • O servidor RabbitMQ deve ter o adaptador STOMP habilitado que suporta STOMP v1.2
  • Suposição de que usuários, vhost, exchanges, filas e bindings já estão configurados

Implementação do Cliente STOMP v1.2

Esta biblioteca usa STOMP 1.2 para comunicação com o broker RabbitMQ e implementa extensões e restrições do plugin Stomp do RabbitMQ.

Internamente, o RabbitMQ usa AMQP para se comunicar. Dessa forma, a biblioteca permite a implementação de consumidores e produtores que se comunicam com o broker RabbitMQ via STOMP, sobre AMQP. O protocolo é baseado em frames e possui um comando, cabeçalhos e corpo terminados por um EOL (^@) que consiste em \r (013) e \n (010) sobre um stream TCP:

COMMAND
header1:value1
header2: value2

BODY^@

COMMAND é seguido por EOL, então cabeçalhos separados por EOL no formato chave:valor e, em seguida, uma linha em branco onde o BODY começa e o frame é terminado por EOL ^@. COMMAND e cabeçalhos são codificados em UTF-8.

Conexão

Para conectar, criamos e enviamos um frame CONNECT através de um socket TCP fornecido pela API cosocket conectando ao IP do broker, tanto IPv4 quanto IPv6 são suportados. No frame, usamos login, passcode para autenticação, accept-version para impor suporte à versão STOMP do cliente e host para selecionar o VHOST do broker.

CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/devnode
heart-beat:optional

^@

Em caso de erro, um frame ERROR é retornado, por exemplo:

ERROR
message:Bad CONNECT
content-type:text/plain
version:1.0,1.1,1.2
content-length:32

Access refused for user 'admin'^@

Em caso de conexão bem-sucedida, um frame CONNECTED é retornado pelo broker, por exemplo:

CONNECTED
session:session-sGF0vjCKH1bLhFr6w9QwuQ
heart-beat:0,0
server:RabbitMQ/3.0.4
version:1.2

Para criar uma conexão, nome de usuário, senha, vhost, heartbeat, host do broker e porta devem ser fornecidos.

Publicação

Podemos publicar mensagens em uma exchange com uma chave de roteamento, modo de persistência, modo de entrega e outros cabeçalhos usando o comando SEND:

SEND
destination:/exchange/exchange_name/routing_key
app-id: luaresty
delivery-mode:2
persistent:true
content-type:json/application
content-length:5

hello^@

Observe que o content-length inclui a mensagem e o byte EOL.

Métodos

new

syntax: rabbit, err = rabbitmqstomp:new()

Cria um objeto RabbitMQ. Em caso de falhas, retorna nil e uma string descrevendo o erro.

set_timeout

syntax: rabbit:set_timeout(time)

Define o tempo limite (em ms) de proteção para operações subsequentes, incluindo o método connect. Observe que o tempo limite deve ser definido antes de chamar qualquer outro método após a criação do objeto.

connect

syntax: ok, err = red:connect{host=host, port=port, username=username, password=password, vhost=vhost}

Tenta conectar a um broker STOMP com o adaptador RabbitMQ STOMP em um host, a porta está ouvindo.

Se nenhum dos valores for fornecido, valores padrão são assumidos:

  • host: localhost
  • port: 61613
  • username: guest
  • password: guest
  • vhost: /

pool pode ser fornecido para ser usado como um nome personalizado para o pool de conexões em uso.

send

syntax: rabbit:send(msg, headers)

Publica uma mensagem com um conjunto de cabeçalhos.

Alguns valores de cabeçalho que podem ser definidos:

destination: Destino da mensagem, por exemplo /exchange/name/binding persistent: Para entregar uma mensagem persistente, o valor deve ser "true" se declarado receipt: Recibo para entrega confirmada content-type: Tipo de mensagem, por exemplo application/json

Para a lista de cabeçalhos suportados, consulte a página de extensões e restrições do protocolo STOMP: https://www.rabbitmq.com/stomp.html

subscribe

syntax: rabbit:subscribe(headers)

Inscreve-se em uma fila usando headers. Deve ter um id quando persistent for true. Em caso de inscrição bem-sucedida, frames MESSAGE são enviados pelo broker.

unsubscribe

syntax: rabbit:unsubscribe(headers)

Desinscreve-se de uma fila usando headers. Em caso de desinscrição bem-sucedida, os frames MESSAGE pararão de chegar do broker.

receive

syntax: rabbit:receive()

Tenta ler qualquer frame MESSAGE recebido e retorna a mensagem. Tentar receber sem uma inscrição válida resultará em erros.

get_reused_times

syntax: times, err = rabbit:get_reused_times()

Este método retorna o número de vezes (com sucesso) reutilizadas para a conexão atual. Em caso de erro, retorna nil e uma string descrevendo o erro.

Se a conexão atual não vier do pool de conexão embutido, então este método sempre retorna 0, ou seja, a conexão nunca foi reutilizada (ainda). Se a conexão vier do pool de conexão, então o valor retornado é sempre diferente de zero. Portanto, este método também pode ser usado para determinar se a conexão atual vem do pool.

set_keepalive

syntax: ok, err = rabbit:set_keepalive(max_idle_timeout, pool_size)

Coloca a conexão RabbitMQ atual imediatamente no pool de conexões cosocket do ngx_lua.

Você pode especificar o tempo máximo de inatividade (em ms) quando a conexão está no pool e o tamanho máximo do pool para cada processo de trabalho do nginx.

Em caso de sucesso, retorna 1. Em caso de erros, retorna nil com uma string descrevendo o erro.

Chame este método apenas no lugar onde você chamaria o método close. Chamar este método imediatamente transformará o objeto redis atual no estado fechado. Quaisquer operações subsequentes, além de connect() no objeto atual, retornarão o erro de fechado.

close

syntax: ok, err = rabbit:close()

Fecha a conexão RabbitMQ atual graciosamente enviando um DISCONNECT para o broker RabbitMQ STOMP e retorna o status.

Em caso de sucesso, retorna 1. Em caso de erros, retorna nil com uma string descrevendo o erro.

Exemplo

Um produtor simples que pode enviar mensagens persistentes confiáveis para uma exchange com algum binding:

local rabbitmq = require "resty.rabbitmqstomp"
local mq, err = rabbitmq:new()
if not mq then
      return
end

mq:set_timeout(10000)

local ok, err = mq:connect {
                    host = "127.0.0.1",
                    port = 61613,
                    username = "guest",
                    password = "guest",
                    vhost = "/"
                }
if not ok then
    return
end

local strlen =  string.len

local msg = "{'key': 'value'}"
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"

local ok, err = mq:send(msg, headers)
if not ok then
    return
end
ngx.log(ngx.INFO, "Publicado: " .. msg)

local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:subscribe(headers)
if not ok then
    return
end

local data, err = mq:receive()
if not ok then
    return
end
ngx.log(ngx.INFO, "Consumido: " .. data)

local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:unsubscribe(headers)

local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
    return
end

Veja Também

GitHub

Você pode encontrar dicas adicionais de configuração e documentação para este módulo no repositório GitHub para nginx-module-rabbitmqstomp.