rabbitmqstomp: Biblioteca cliente de RabbitMQ para Lua, opinativa, para aplicaciones nginx-module-lua basada en la API de cosocket
Instalación
Si no has configurado la suscripción al repositorio RPM, regístrate. Luego puedes proceder con los siguientes pasos.
CentOS/RHEL 7 o Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-rabbitmqstomp
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-rabbitmqstomp
Para usar esta biblioteca Lua con NGINX, asegúrate de que nginx-module-lua esté instalado.
Este documento describe lua-resty-rabbitmqstomp v0.1 lanzado el 01 de junio de 2013.
lua-resty-rabbitmqstomp - Biblioteca cliente de RabbitMQ para Lua que utiliza la API de cosocket para la comunicación sobre STOMP 1.2 con un broker RabbitMQ que tiene el plugin STOMP.
Limitaciones
Esta biblioteca es opinativa y tiene ciertas suposiciones y limitaciones que pueden ser abordadas en el futuro;
- El servidor RabbitMQ debe tener habilitado el adaptador STOMP que soporta STOMP v1.2
- Suposición de que los usuarios, vhost, intercambios, colas y enlaces ya están configurados
Implementación del Cliente STOMP v1.2
Esta biblioteca utiliza STOMP 1.2 para la comunicación con el broker RabbitMQ e implementa extensiones y restricciones del plugin Stomp de RabbitMQ.
Internamente, RabbitMQ utiliza AMQP para comunicarse más allá. De esta manera, la biblioteca permite la implementación de consumidores y productores que se comunican con el broker RabbitMQ a través de STOMP, sobre AMQP. El protocolo es basado en tramas y tiene un comando, encabezados y cuerpo terminados por un EOL (^@) que consiste en \r (013) y el requerido \n (010) sobre un flujo TCP:
COMMAND
header1:value1
header2: value2
BODY^@
COMMAND es seguido por EOL, luego encabezados separados por EOL en formato de par clave:valor y luego una línea en blanco donde comienza el BODY y la trama se termina por ^@ EOL. COMMAND y encabezados están codificados en UTF-8.
Conexión
Para conectarse, creamos y enviamos un marco CONNECT a través de un socket TCP proporcionado por la API de cosocket conectándose a la IP del broker, se soportan tanto IPv4 como IPv6. En el marco usamos login, passcode para autenticación, accept-version para hacer cumplir el soporte de la versión STOMP del cliente y host para seleccionar el VHOST del broker.
CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/devnode
heart-beat:optional
^@
En caso de error, se devuelve un marco ERROR, por ejemplo:
ERROR
message:Bad CONNECT
content-type:text/plain
version:1.0,1.1,1.2
content-length:32
Access refused for user 'admin'^@
En caso de conexión exitosa, el broker nos devuelve un marco CONNECTED, por ejemplo:
CONNECTED
session:session-sGF0vjCKH1bLhFr6w9QwuQ
heart-beat:0,0
server:RabbitMQ/3.0.4
version:1.2
Para crear una conexión, se deben proporcionar el nombre de usuario, contraseña, vhost, heartbeat, host del broker y puerto.
Publicación
Podemos publicar mensajes en un intercambio con una clave de enrutamiento, modo de persistencia, modo de entrega y otros encabezados utilizando el comando SEND:
SEND
destination:/exchange/exchange_name/routing_key
app-id: luaresty
delivery-mode:2
persistent:true
content-type:json/application
content-length:5
hello^@
Ten en cuenta que content-length incluye el mensaje y el byte EOL.
Métodos
new
syntax: rabbit, err = rabbitmqstomp:new()
Crea un objeto RabbitMQ. En caso de fallos, devuelve nil y una cadena que describe el error.
set_timeout
syntax: rabbit:set_timeout(time)
Establece el tiempo de espera (en ms) para proteger las operaciones subsiguientes, incluyendo el método connect. Nota que el tiempo de espera debe establecerse antes de llamar a cualquier otro método después de crear el objeto.
connect
syntax: ok, err = red:connect{host=host, port=port, username=username, password=password, vhost=vhost}
Intenta conectarse a un broker STOMP con el adaptador STOMP de RabbitMQ en un host, el puerto está escuchando.
Si no se suministran ninguno de los valores, se asumen valores predeterminados:
- host: localhost
- port: 61613
- username: guest
- password: guest
- vhost: /
pool puede ser dado para ser usado como un nombre personalizado para el pool de conexiones que se está utilizando.
send
syntax: rabbit:send(msg, headers)
Publica un mensaje con un conjunto de encabezados.
Algunos valores de encabezado que se pueden establecer:
destination: Destino del mensaje, por ejemplo /exchange/name/binding
persistent: Para entregar un mensaje persistente, el valor debe ser "true" si se declara
receipt: Recibo para entrega confirmada
content-type: Tipo de mensaje, por ejemplo application/json
Para la lista de encabezados soportados, consulta la página de extensiones y restricciones del protocolo STOMP: https://www.rabbitmq.com/stomp.html
subscribe
syntax: rabbit:subscribe(headers)
Suscríbete a una cola utilizando headers. Debe tener un id cuando persistent es verdadero. En caso de suscripción exitosa, los marcos MESSAGE son enviados por el broker.
unsubscribe
syntax: rabbit:unsubscribe(headers)
Se da de baja de una cola utilizando headers. En caso de desuscripción exitosa, los marcos MESSAGE dejarán de llegar del broker.
receive
syntax: rabbit:receive())
Intenta leer cualquier marco MESSAGE recibido y devuelve el mensaje. Intentar recibir sin una suscripción válida llevará a errores.
get_reused_times
syntax: times, err = rabbit:get_reused_times()
Este método devuelve las veces (exitosamente) reutilizadas para la conexión actual. En caso de error, devuelve nil y una cadena que describe el error.
Si la conexión actual no proviene del pool de conexiones incorporado, entonces este método siempre devuelve 0, es decir, la conexión nunca ha sido reutilizada (aún). Si la conexión proviene del pool de conexiones, entonces el valor de retorno siempre es distinto de cero. Por lo tanto, este método también se puede utilizar para determinar si la conexión actual proviene del pool.
set_keepalive
syntax: ok, err = rabbit:set_keepalive(max_idle_timeout, pool_size)
Coloca la conexión RabbitMQ actual inmediatamente en el pool de conexiones ngx_lua cosocket.
Puedes especificar el tiempo máximo de inactividad (en ms) cuando la conexión está en el pool y el tamaño máximo del pool para cada proceso de trabajo de nginx.
En caso de éxito, devuelve 1. En caso de errores, devuelve nil con una cadena que describe el error.
Solo llama a este método en el lugar donde habrías llamado al método close en su lugar. Llamar a este método convertirá inmediatamente el objeto redis actual en estado cerrado. Cualquier operación subsiguiente distinta de connect() en el objeto actual devolverá el error de cerrado.
close
syntax: ok, err = rabbit:close()
Cierra la conexión RabbitMQ actual de manera ordenada enviando un DISCONNECT al broker RabbitMQ STOMP y devuelve el estado.
En caso de éxito, devuelve 1. En caso de errores, devuelve nil con una cadena que describe el error.
Ejemplo
Un productor simple que puede enviar mensajes persistentes confiables a un intercambio con algún enlace:
local rabbitmq = require "resty.rabbitmqstomp"
local mq, err = rabbitmq:new()
if not mq then
return
end
mq:set_timeout(10000)
local ok, err = mq:connect {
host = "127.0.0.1",
port = 61613,
username = "guest",
password = "guest",
vhost = "/"
}
if not ok then
return
end
local strlen = string.len
local msg = "{'key': 'value'}"
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"
local ok, err = mq:send(msg, headers)
if not ok then
return
end
ngx.log(ngx.INFO, "Publicado: " .. msg)
local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:subscribe(headers)
if not ok then
return
end
local data, err = mq:receive()
if not ok then
return
end
ngx.log(ngx.INFO, "Consumido: " .. data)
local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:unsubscribe(headers)
local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
return
end
Ver También
- Especificación STOMP 1.2
- La biblioteca lua-resty-mysql
- Grupo de Google Openresty
GitHub
Puedes encontrar consejos de configuración adicionales y documentación para este módulo en el repositorio de GitHub para nginx-module-rabbitmqstomp.