rabbitmqstomp: Bibliothèque cliente Lua RabbitMQ opinionnée pour les applications nginx-module-lua basée sur l'API cosocket
Installation
Si vous n'avez pas encore configuré l'abonnement au dépôt RPM, inscrivez-vous. Ensuite, vous pouvez procéder avec les étapes suivantes.
CentOS/RHEL 7 ou Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-rabbitmqstomp
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-rabbitmqstomp
Pour utiliser cette bibliothèque Lua avec NGINX, assurez-vous que nginx-module-lua est installé.
Ce document décrit lua-resty-rabbitmqstomp v0.1 publié le 01 juin 2013.
lua-resty-rabbitmqstomp - Bibliothèque cliente Lua RabbitMQ qui utilise l'API cosocket pour la communication via STOMP 1.2 avec un courtier RabbitMQ qui a le plugin STOMP.
Limitations
Cette bibliothèque est opinionnée et a certaines hypothèses et limitations qui peuvent être abordées dans le futur ;
- Le serveur RabbitMQ doit avoir l'adaptateur STOMP activé qui prend en charge STOMP v1.2
- Hypothèse que les utilisateurs, vhost, échanges, files d'attente et liaisons sont déjà configurés
Implémentation du client STOMP v1.2
Cette bibliothèque utilise STOMP 1.2 pour la communication avec le courtier RabbitMQ et implémente les extensions et restrictions du plugin Stomp de RabbitMQ.
En interne, RabbitMQ utilise AMQP pour communiquer davantage. De cette manière, la bibliothèque permet l'implémentation de consommateurs et de producteurs qui communiquent avec le courtier RabbitMQ via STOMP, via AMQP. Le protocole est basé sur des trames et a une commande, des en-têtes et un corps terminés par un EOL (^@) qui consiste en \r (013) et le \n (010) requis sur un flux TCP :
COMMAND
header1:value1
header2: value2
BODY^@
COMMAND est suivi d'un EOL, puis des en-têtes séparés par EOL au format clé:valeur et ensuite une ligne vide où commence le BODY et la trame est terminée par ^@ EOL. COMMAND et les en-têtes sont encodés en UTF-8.
Connexion
Pour se connecter, nous créons et envoyons une trame CONNECT via un socket TCP fourni par l'API cosocket se connectant à l'IP du courtier, les deux IPv4 et IPv6 sont pris en charge. Dans la trame, nous utilisons login, passcode pour l'authentification, accept-version pour imposer la prise en charge de la version STOMP du client et host pour sélectionner le VHOST du courtier.
CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/devnode
heart-beat:optional
^@
En cas d'erreur, une trame ERROR est renvoyée par exemple :
ERROR
message:Bad CONNECT
content-type:text/plain
version:1.0,1.1,1.2
content-length:32
Access refused for user 'admin'^@
En cas de connexion réussie, une trame CONNECTED est renvoyée par le courtier, par exemple :
CONNECTED
session:session-sGF0vjCKH1bLhFr6w9QwuQ
heart-beat:0,0
server:RabbitMQ/3.0.4
version:1.2
Pour créer une connexion, le nom d'utilisateur, le mot de passe, le vhost, le heartbeat, l'hôte du courtier et le port doivent être fournis.
Publication
Nous pouvons publier des messages dans un échange avec une clé de routage, un mode de persistance, un mode de livraison et d'autres en-têtes en utilisant la commande SEND :
SEND
destination:/exchange/exchange_name/routing_key
app-id: luaresty
delivery-mode:2
persistent:true
content-type:json/application
content-length:5
hello^@
Notez que le content-length inclut le message et le byte EOL.
Méthodes
new
syntax: rabbit, err = rabbitmqstomp:new()
Crée un objet RabbitMQ. En cas d'échec, retourne nil et une chaîne décrivant l'erreur.
set_timeout
syntax: rabbit:set_timeout(time)
Définit le délai d'attente (en ms) pour les opérations suivantes, y compris la méthode de connexion. Notez que le délai d'attente doit être défini avant d'appeler toute autre méthode après la création de l'objet.
connect
syntax: ok, err = red:connect{host=host, port=port, username=username, password=password, vhost=vhost}
Tente de se connecter à un courtier stomp avec l'adaptateur RabbitMQ STOMP sur un hôte, le port sur lequel il écoute.
Si aucune des valeurs n'est fournie, des valeurs par défaut sont supposées :
- host: localhost
- port: 61613
- username: guest
- password: guest
- vhost: /
pool peut être donné pour être utilisé pour un nom personnalisé pour le pool de connexions utilisé.
send
syntax: rabbit:send(msg, headers)
Publie un message avec un ensemble d'en-têtes.
Certaines valeurs d'en-tête qui peuvent être définies :
destination: Destination du message, par exemple /exchange/name/binding
persistent: Pour livrer un message persistant, la valeur doit être "true" si déclarée
receipt: Réception pour une livraison confirmée
content-type: Type de message, par exemple application/json
Pour la liste des en-têtes pris en charge, voir la page des extensions et restrictions du protocole STOMP : https://www.rabbitmq.com/stomp.html
subscribe
syntax: rabbit:subscribe(headers)
S'abonne à une file d'attente en utilisant headers. Il doit avoir un id lorsque persistent est vrai. En cas d'abonnement réussi, des trames MESSAGE sont envoyées par le courtier.
unsubscribe
syntax: rabbit:unsubscribe(headers)
Se désabonne d'une file d'attente en utilisant headers. En cas de désabonnement réussi, les trames MESSAGE cesseront d'arriver du courtier.
receive
syntax: rabbit:receive())
Tente de lire toutes les trames MESSAGE reçues et retourne le message. Essayer de recevoir sans un abonnement valide entraînera des erreurs.
get_reused_times
syntax: times, err = rabbit:get_reused_times()
Cette méthode retourne le nombre de fois (avec succès) réutilisées pour la connexion actuelle. En cas d'erreur, elle retourne nil et une chaîne décrivant l'erreur.
Si la connexion actuelle ne provient pas du pool de connexions intégré, alors cette méthode retourne toujours 0, c'est-à-dire que la connexion n'a jamais été réutilisée (encore). Si la connexion provient du pool de connexions, alors la valeur de retour est toujours non nulle. Donc, cette méthode peut également être utilisée pour déterminer si la connexion actuelle provient du pool.
set_keepalive
syntax: ok, err = rabbit:set_keepalive(max_idle_timeout, pool_size)
Met la connexion RabbitMQ actuelle immédiatement dans le pool de connexions cosocket ngx_lua.
Vous pouvez spécifier le délai d'inactivité maximum (en ms) lorsque la connexion est dans le pool et la taille maximale du pool pour chaque processus de travail nginx.
En cas de succès, retourne 1. En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.
N'appelez cette méthode que là où vous auriez appelé la méthode close à la place. Appeler cette méthode mettra immédiatement l'objet redis actuel dans l'état fermé. Toute opération subséquente autre que connect() sur l'objet actuel retournera l'erreur fermée.
close
syntax: ok, err = rabbit:close()
Ferme la connexion RabbitMQ actuelle gracieusement en envoyant un DISCONNECT au courtier RabbitMQ STOMP et retourne le statut.
En cas de succès, retourne 1. En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.
Exemple
Un producteur simple qui peut envoyer un message persistant fiable à un échange avec quelques liaisons :
local rabbitmq = require "resty.rabbitmqstomp"
local mq, err = rabbitmq:new()
if not mq then
return
end
mq:set_timeout(10000)
local ok, err = mq:connect {
host = "127.0.0.1",
port = 61613,
username = "guest",
password = "guest",
vhost = "/"
}
if not ok then
return
end
local strlen = string.len
local msg = "{'key': 'value'}"
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"
local ok, err = mq:send(msg, headers)
if not ok then
return
end
ngx.log(ngx.INFO, "Publié : " .. msg)
local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:subscribe(headers)
if not ok then
return
end
local data, err = mq:receive()
if not ok then
return
end
ngx.log(ngx.INFO, "Consommé : " .. data)
local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:unsubscribe(headers)
local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
return
end
Voir Aussi
- Spécification STOMP 1.2
- La bibliothèque lua-resty-mysql
- Groupe Google Openresty
GitHub
Vous pouvez trouver des conseils de configuration supplémentaires et de la documentation pour ce module dans le dépôt GitHub pour nginx-module-rabbitmqstomp.