Aller au contenu

rabbitmqstomp: Bibliothèque cliente Lua RabbitMQ opinionnée pour les applications nginx-module-lua basée sur l'API cosocket

Installation

Si vous n'avez pas encore configuré l'abonnement au dépôt RPM, inscrivez-vous. Ensuite, vous pouvez procéder avec les étapes suivantes.

CentOS/RHEL 7 ou Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-rabbitmqstomp

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-rabbitmqstomp

Pour utiliser cette bibliothèque Lua avec NGINX, assurez-vous que nginx-module-lua est installé.

Ce document décrit lua-resty-rabbitmqstomp v0.1 publié le 01 juin 2013.


lua-resty-rabbitmqstomp - Bibliothèque cliente Lua RabbitMQ qui utilise l'API cosocket pour la communication via STOMP 1.2 avec un courtier RabbitMQ qui a le plugin STOMP.

Limitations

Cette bibliothèque est opinionnée et a certaines hypothèses et limitations qui peuvent être abordées dans le futur ;

  • Le serveur RabbitMQ doit avoir l'adaptateur STOMP activé qui prend en charge STOMP v1.2
  • Hypothèse que les utilisateurs, vhost, échanges, files d'attente et liaisons sont déjà configurés

Implémentation du client STOMP v1.2

Cette bibliothèque utilise STOMP 1.2 pour la communication avec le courtier RabbitMQ et implémente les extensions et restrictions du plugin Stomp de RabbitMQ.

En interne, RabbitMQ utilise AMQP pour communiquer davantage. De cette manière, la bibliothèque permet l'implémentation de consommateurs et de producteurs qui communiquent avec le courtier RabbitMQ via STOMP, via AMQP. Le protocole est basé sur des trames et a une commande, des en-têtes et un corps terminés par un EOL (^@) qui consiste en \r (013) et le \n (010) requis sur un flux TCP :

COMMAND
header1:value1
header2: value2

BODY^@

COMMAND est suivi d'un EOL, puis des en-têtes séparés par EOL au format clé:valeur et ensuite une ligne vide où commence le BODY et la trame est terminée par ^@ EOL. COMMAND et les en-têtes sont encodés en UTF-8.

Connexion

Pour se connecter, nous créons et envoyons une trame CONNECT via un socket TCP fourni par l'API cosocket se connectant à l'IP du courtier, les deux IPv4 et IPv6 sont pris en charge. Dans la trame, nous utilisons login, passcode pour l'authentification, accept-version pour imposer la prise en charge de la version STOMP du client et host pour sélectionner le VHOST du courtier.

CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/devnode
heart-beat:optional

^@

En cas d'erreur, une trame ERROR est renvoyée par exemple :

ERROR
message:Bad CONNECT
content-type:text/plain
version:1.0,1.1,1.2
content-length:32

Access refused for user 'admin'^@

En cas de connexion réussie, une trame CONNECTED est renvoyée par le courtier, par exemple :

CONNECTED
session:session-sGF0vjCKH1bLhFr6w9QwuQ
heart-beat:0,0
server:RabbitMQ/3.0.4
version:1.2

Pour créer une connexion, le nom d'utilisateur, le mot de passe, le vhost, le heartbeat, l'hôte du courtier et le port doivent être fournis.

Publication

Nous pouvons publier des messages dans un échange avec une clé de routage, un mode de persistance, un mode de livraison et d'autres en-têtes en utilisant la commande SEND :

SEND
destination:/exchange/exchange_name/routing_key
app-id: luaresty
delivery-mode:2
persistent:true
content-type:json/application
content-length:5

hello^@

Notez que le content-length inclut le message et le byte EOL.

Méthodes

new

syntax: rabbit, err = rabbitmqstomp:new()

Crée un objet RabbitMQ. En cas d'échec, retourne nil et une chaîne décrivant l'erreur.

set_timeout

syntax: rabbit:set_timeout(time)

Définit le délai d'attente (en ms) pour les opérations suivantes, y compris la méthode de connexion. Notez que le délai d'attente doit être défini avant d'appeler toute autre méthode après la création de l'objet.

connect

syntax: ok, err = red:connect{host=host, port=port, username=username, password=password, vhost=vhost}

Tente de se connecter à un courtier stomp avec l'adaptateur RabbitMQ STOMP sur un hôte, le port sur lequel il écoute.

Si aucune des valeurs n'est fournie, des valeurs par défaut sont supposées :

  • host: localhost
  • port: 61613
  • username: guest
  • password: guest
  • vhost: /

pool peut être donné pour être utilisé pour un nom personnalisé pour le pool de connexions utilisé.

send

syntax: rabbit:send(msg, headers)

Publie un message avec un ensemble d'en-têtes.

Certaines valeurs d'en-tête qui peuvent être définies :

destination: Destination du message, par exemple /exchange/name/binding persistent: Pour livrer un message persistant, la valeur doit être "true" si déclarée receipt: Réception pour une livraison confirmée content-type: Type de message, par exemple application/json

Pour la liste des en-têtes pris en charge, voir la page des extensions et restrictions du protocole STOMP : https://www.rabbitmq.com/stomp.html

subscribe

syntax: rabbit:subscribe(headers)

S'abonne à une file d'attente en utilisant headers. Il doit avoir un id lorsque persistent est vrai. En cas d'abonnement réussi, des trames MESSAGE sont envoyées par le courtier.

unsubscribe

syntax: rabbit:unsubscribe(headers)

Se désabonne d'une file d'attente en utilisant headers. En cas de désabonnement réussi, les trames MESSAGE cesseront d'arriver du courtier.

receive

syntax: rabbit:receive())

Tente de lire toutes les trames MESSAGE reçues et retourne le message. Essayer de recevoir sans un abonnement valide entraînera des erreurs.

get_reused_times

syntax: times, err = rabbit:get_reused_times()

Cette méthode retourne le nombre de fois (avec succès) réutilisées pour la connexion actuelle. En cas d'erreur, elle retourne nil et une chaîne décrivant l'erreur.

Si la connexion actuelle ne provient pas du pool de connexions intégré, alors cette méthode retourne toujours 0, c'est-à-dire que la connexion n'a jamais été réutilisée (encore). Si la connexion provient du pool de connexions, alors la valeur de retour est toujours non nulle. Donc, cette méthode peut également être utilisée pour déterminer si la connexion actuelle provient du pool.

set_keepalive

syntax: ok, err = rabbit:set_keepalive(max_idle_timeout, pool_size)

Met la connexion RabbitMQ actuelle immédiatement dans le pool de connexions cosocket ngx_lua.

Vous pouvez spécifier le délai d'inactivité maximum (en ms) lorsque la connexion est dans le pool et la taille maximale du pool pour chaque processus de travail nginx.

En cas de succès, retourne 1. En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.

N'appelez cette méthode que là où vous auriez appelé la méthode close à la place. Appeler cette méthode mettra immédiatement l'objet redis actuel dans l'état fermé. Toute opération subséquente autre que connect() sur l'objet actuel retournera l'erreur fermée.

close

syntax: ok, err = rabbit:close()

Ferme la connexion RabbitMQ actuelle gracieusement en envoyant un DISCONNECT au courtier RabbitMQ STOMP et retourne le statut.

En cas de succès, retourne 1. En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.

Exemple

Un producteur simple qui peut envoyer un message persistant fiable à un échange avec quelques liaisons :

local rabbitmq = require "resty.rabbitmqstomp"
local mq, err = rabbitmq:new()
if not mq then
      return
end

mq:set_timeout(10000)

local ok, err = mq:connect {
                    host = "127.0.0.1",
                    port = 61613,
                    username = "guest",
                    password = "guest",
                    vhost = "/"
                }
if not ok then
    return
end

local strlen =  string.len

local msg = "{'key': 'value'}"
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"

local ok, err = mq:send(msg, headers)
if not ok then
    return
end
ngx.log(ngx.INFO, "Publié : " .. msg)

local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:subscribe(headers)
if not ok then
    return
end

local data, err = mq:receive()
if not ok then
    return
end
ngx.log(ngx.INFO, "Consommé : " .. data)

local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:unsubscribe(headers)

local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
    return
end

Voir Aussi

GitHub

Vous pouvez trouver des conseils de configuration supplémentaires et de la documentation pour ce module dans le dépôt GitHub pour nginx-module-rabbitmqstomp.