Saltar a contenido

rabbitmqstomp: Biblioteca cliente de RabbitMQ para Lua, opinativa, para aplicaciones nginx-module-lua basada en la API de cosocket

Instalación

Si no has configurado la suscripción al repositorio RPM, regístrate. Luego puedes proceder con los siguientes pasos.

CentOS/RHEL 7 o Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-rabbitmqstomp

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-rabbitmqstomp

Para usar esta biblioteca Lua con NGINX, asegúrate de que nginx-module-lua esté instalado.

Este documento describe lua-resty-rabbitmqstomp v0.1 lanzado el 01 de junio de 2013.


lua-resty-rabbitmqstomp - Biblioteca cliente de RabbitMQ para Lua que utiliza la API de cosocket para la comunicación sobre STOMP 1.2 con un broker RabbitMQ que tiene el plugin STOMP.

Limitaciones

Esta biblioteca es opinativa y tiene ciertas suposiciones y limitaciones que pueden ser abordadas en el futuro;

  • El servidor RabbitMQ debe tener habilitado el adaptador STOMP que soporta STOMP v1.2
  • Suposición de que los usuarios, vhost, intercambios, colas y enlaces ya están configurados

Implementación del Cliente STOMP v1.2

Esta biblioteca utiliza STOMP 1.2 para la comunicación con el broker RabbitMQ e implementa extensiones y restricciones del plugin Stomp de RabbitMQ.

Internamente, RabbitMQ utiliza AMQP para comunicarse más allá. De esta manera, la biblioteca permite la implementación de consumidores y productores que se comunican con el broker RabbitMQ a través de STOMP, sobre AMQP. El protocolo es basado en tramas y tiene un comando, encabezados y cuerpo terminados por un EOL (^@) que consiste en \r (013) y el requerido \n (010) sobre un flujo TCP:

COMMAND
header1:value1
header2: value2

BODY^@

COMMAND es seguido por EOL, luego encabezados separados por EOL en formato de par clave:valor y luego una línea en blanco donde comienza el BODY y la trama se termina por ^@ EOL. COMMAND y encabezados están codificados en UTF-8.

Conexión

Para conectarse, creamos y enviamos un marco CONNECT a través de un socket TCP proporcionado por la API de cosocket conectándose a la IP del broker, se soportan tanto IPv4 como IPv6. En el marco usamos login, passcode para autenticación, accept-version para hacer cumplir el soporte de la versión STOMP del cliente y host para seleccionar el VHOST del broker.

CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/devnode
heart-beat:optional

^@

En caso de error, se devuelve un marco ERROR, por ejemplo:

ERROR
message:Bad CONNECT
content-type:text/plain
version:1.0,1.1,1.2
content-length:32

Access refused for user 'admin'^@

En caso de conexión exitosa, el broker nos devuelve un marco CONNECTED, por ejemplo:

CONNECTED
session:session-sGF0vjCKH1bLhFr6w9QwuQ
heart-beat:0,0
server:RabbitMQ/3.0.4
version:1.2

Para crear una conexión, se deben proporcionar el nombre de usuario, contraseña, vhost, heartbeat, host del broker y puerto.

Publicación

Podemos publicar mensajes en un intercambio con una clave de enrutamiento, modo de persistencia, modo de entrega y otros encabezados utilizando el comando SEND:

SEND
destination:/exchange/exchange_name/routing_key
app-id: luaresty
delivery-mode:2
persistent:true
content-type:json/application
content-length:5

hello^@

Ten en cuenta que content-length incluye el mensaje y el byte EOL.

Métodos

new

syntax: rabbit, err = rabbitmqstomp:new()

Crea un objeto RabbitMQ. En caso de fallos, devuelve nil y una cadena que describe el error.

set_timeout

syntax: rabbit:set_timeout(time)

Establece el tiempo de espera (en ms) para proteger las operaciones subsiguientes, incluyendo el método connect. Nota que el tiempo de espera debe establecerse antes de llamar a cualquier otro método después de crear el objeto.

connect

syntax: ok, err = red:connect{host=host, port=port, username=username, password=password, vhost=vhost}

Intenta conectarse a un broker STOMP con el adaptador STOMP de RabbitMQ en un host, el puerto está escuchando.

Si no se suministran ninguno de los valores, se asumen valores predeterminados:

  • host: localhost
  • port: 61613
  • username: guest
  • password: guest
  • vhost: /

pool puede ser dado para ser usado como un nombre personalizado para el pool de conexiones que se está utilizando.

send

syntax: rabbit:send(msg, headers)

Publica un mensaje con un conjunto de encabezados.

Algunos valores de encabezado que se pueden establecer:

destination: Destino del mensaje, por ejemplo /exchange/name/binding persistent: Para entregar un mensaje persistente, el valor debe ser "true" si se declara receipt: Recibo para entrega confirmada content-type: Tipo de mensaje, por ejemplo application/json

Para la lista de encabezados soportados, consulta la página de extensiones y restricciones del protocolo STOMP: https://www.rabbitmq.com/stomp.html

subscribe

syntax: rabbit:subscribe(headers)

Suscríbete a una cola utilizando headers. Debe tener un id cuando persistent es verdadero. En caso de suscripción exitosa, los marcos MESSAGE son enviados por el broker.

unsubscribe

syntax: rabbit:unsubscribe(headers)

Se da de baja de una cola utilizando headers. En caso de desuscripción exitosa, los marcos MESSAGE dejarán de llegar del broker.

receive

syntax: rabbit:receive())

Intenta leer cualquier marco MESSAGE recibido y devuelve el mensaje. Intentar recibir sin una suscripción válida llevará a errores.

get_reused_times

syntax: times, err = rabbit:get_reused_times()

Este método devuelve las veces (exitosamente) reutilizadas para la conexión actual. En caso de error, devuelve nil y una cadena que describe el error.

Si la conexión actual no proviene del pool de conexiones incorporado, entonces este método siempre devuelve 0, es decir, la conexión nunca ha sido reutilizada (aún). Si la conexión proviene del pool de conexiones, entonces el valor de retorno siempre es distinto de cero. Por lo tanto, este método también se puede utilizar para determinar si la conexión actual proviene del pool.

set_keepalive

syntax: ok, err = rabbit:set_keepalive(max_idle_timeout, pool_size)

Coloca la conexión RabbitMQ actual inmediatamente en el pool de conexiones ngx_lua cosocket.

Puedes especificar el tiempo máximo de inactividad (en ms) cuando la conexión está en el pool y el tamaño máximo del pool para cada proceso de trabajo de nginx.

En caso de éxito, devuelve 1. En caso de errores, devuelve nil con una cadena que describe el error.

Solo llama a este método en el lugar donde habrías llamado al método close en su lugar. Llamar a este método convertirá inmediatamente el objeto redis actual en estado cerrado. Cualquier operación subsiguiente distinta de connect() en el objeto actual devolverá el error de cerrado.

close

syntax: ok, err = rabbit:close()

Cierra la conexión RabbitMQ actual de manera ordenada enviando un DISCONNECT al broker RabbitMQ STOMP y devuelve el estado.

En caso de éxito, devuelve 1. En caso de errores, devuelve nil con una cadena que describe el error.

Ejemplo

Un productor simple que puede enviar mensajes persistentes confiables a un intercambio con algún enlace:

local rabbitmq = require "resty.rabbitmqstomp"
local mq, err = rabbitmq:new()
if not mq then
      return
end

mq:set_timeout(10000)

local ok, err = mq:connect {
                    host = "127.0.0.1",
                    port = 61613,
                    username = "guest",
                    password = "guest",
                    vhost = "/"
                }
if not ok then
    return
end

local strlen =  string.len

local msg = "{'key': 'value'}"
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"

local ok, err = mq:send(msg, headers)
if not ok then
    return
end
ngx.log(ngx.INFO, "Publicado: " .. msg)

local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:subscribe(headers)
if not ok then
    return
end

local data, err = mq:receive()
if not ok then
    return
end
ngx.log(ngx.INFO, "Consumido: " .. data)

local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:unsubscribe(headers)

local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
    return
end

Ver También

GitHub

Puedes encontrar consejos de configuración adicionales y documentación para este módulo en el repositorio de GitHub para nginx-module-rabbitmqstomp.