rabbitmqstomp: Meinungsstarke Lua RabbitMQ-Clientbibliothek für nginx-module-lua-Apps basierend auf der Cosocket-API
Installation
Wenn Sie das RPM-Repository-Abonnement noch nicht eingerichtet haben, melden Sie sich an. Dann können Sie mit den folgenden Schritten fortfahren.
CentOS/RHEL 7 oder Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-rabbitmqstomp
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-rabbitmqstomp
Um diese Lua-Bibliothek mit NGINX zu verwenden, stellen Sie sicher, dass nginx-module-lua installiert ist.
Dieses Dokument beschreibt lua-resty-rabbitmqstomp v0.1, das am 01. Juni 2013 veröffentlicht wurde.
lua-resty-rabbitmqstomp - Lua RabbitMQ-Clientbibliothek, die die Cosocket-API für die Kommunikation über STOMP 1.2 mit einem RabbitMQ-Broker verwendet, der das STOMP-Plugin hat.
Einschränkungen
Diese Bibliothek ist meinungsstark und hat bestimmte Annahmen und Einschränkungen, die möglicherweise in Zukunft angesprochen werden;
- Der RabbitMQ-Server sollte den STOMP-Adapter aktiviert haben, der STOMP v1.2 unterstützt
- Annahme, dass Benutzer, vhost, Exchanges, Queues und Bindungen bereits eingerichtet sind
STOMP v1.2 Client-Implementierung
Diese Bibliothek verwendet STOMP 1.2 für die Kommunikation mit dem RabbitMQ-Broker und implementiert Erweiterungen und Einschränkungen des RabbitMQ STOMP-Plugins.
Intern verwendet RabbitMQ AMQP für die weitere Kommunikation. Auf diese Weise ermöglicht die Bibliothek die Implementierung von Verbrauchern und Produzenten, die über STOMP und AMQP mit dem RabbitMQ-Broker kommunizieren. Das Protokoll ist frame-basiert und hat einen Befehl, Header und einen Körper, der durch ein EOL (^@) terminiert wird, das aus \r (013) und dem erforderlichen \n (010) über einen TCP-Stream besteht:
COMMAND
header1:value1
header2: value2
BODY^@
COMMAND wird von EOL gefolgt, dann EOL-getrennte Header im key:value-Paarformat und dann eine leere Zeile, an der der BODY beginnt und der Frame durch ^@ EOL terminiert wird. COMMAND und Header sind UTF-8-kodiert.
Verbindung
Um eine Verbindung herzustellen, erstellen und senden wir einen CONNECT-Frame über einen TCP-Socket, der von der Cosocket-API bereitgestellt wird, und verbinden uns mit der Broker-IP, wobei sowohl IPv4 als auch IPv6 unterstützt werden. Im Frame verwenden wir login, passcode zur Authentifizierung, accept-version, um die Unterstützung der Client-STOMP-Version durchzusetzen, und host, um den VHOST des Brokers auszuwählen.
CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/devnode
heart-beat:optional
^@
Im Fehlerfall wird ein ERROR-Frame zurückgegeben, zum Beispiel:
ERROR
message:Bad CONNECT
content-type:text/plain
version:1.0,1.1,1.2
content-length:32
Access refused for user 'admin'^@
Bei erfolgreicher Verbindung erhalten wir einen CONNECTED-Frame vom Broker, zum Beispiel:
CONNECTED
session:session-sGF0vjCKH1bLhFr6w9QwuQ
heart-beat:0,0
server:RabbitMQ/3.0.4
version:1.2
Für die Erstellung einer Verbindung sollten Benutzername, Passwort, vhost, Heartbeat, Broker-Host und Port angegeben werden.
Veröffentlichen
Wir können Nachrichten an ein Exchange mit einem Routing-Schlüssel, Persistenzmodus, Liefermodus und anderen Headern mit dem SEND-Befehl veröffentlichen:
SEND
destination:/exchange/exchange_name/routing_key
app-id: luaresty
delivery-mode:2
persistent:true
content-type:json/application
content-length:5
hello^@
Beachten Sie, dass die content-length die Nachricht und das EOL-Byte umfasst.
Methoden
new
syntax: rabbit, err = rabbitmqstomp:new()
Erstellt ein RabbitMQ-Objekt. Im Falle von Fehlern gibt es nil und einen String zurück, der den Fehler beschreibt.
set_timeout
syntax: rabbit:set_timeout(time)
Setzt den Timeout (in ms) für nachfolgende Operationen, einschließlich der connect-Methode. Beachten Sie, dass der Timeout vor dem Aufruf einer anderen Methode nach der Erstellung des Objekts festgelegt werden sollte.
connect
syntax: ok, err = red:connect{host=host, port=port, username=username, password=password, vhost=vhost}
Versucht, sich mit einem STOMP-Broker über den RabbitMQ STOMP-Adapter auf einem Host zu verbinden, auf dem der Port lauscht.
Wenn keine der Werte angegeben ist, werden Standardwerte angenommen:
- host: localhost
- port: 61613
- username: guest
- password: guest
- vhost: /
pool kann angegeben werden, um einen benutzerdefinierten Namen für den verwendeten Verbindungs-Pool zu verwenden.
send
syntax: rabbit:send(msg, headers)
Veröffentlicht eine Nachricht mit einer Reihe von Headern.
Einige Headerwerte, die gesetzt werden können:
destination: Ziel der Nachricht, zum Beispiel /exchange/name/binding
persistent: Um eine persistente Nachricht zu liefern, sollte der Wert "true" sein, wenn deklariert
receipt: Quittung für die bestätigte Lieferung
content-type: Typ der Nachricht, zum Beispiel application/json
Für eine Liste der unterstützten Header siehe die STOMP-Protokollerweiterungen und Einschränkungsseite: https://www.rabbitmq.com/stomp.html
subscribe
syntax: rabbit:subscribe(headers)
Abonnieren Sie eine Queue mit headers. Es sollte eine ID haben, wenn persistent true ist. Bei erfolgreichem Abonnement werden MESSAGE-Frames vom Broker gesendet.
unsubscribe
syntax: rabbit:unsubscribe(headers)
Meldet sich von einer Queue mit headers ab. Bei erfolgreicher Abmeldung werden keine MESSAGE-Frames mehr vom Broker empfangen.
receive
syntax: rabbit:receive())
Versucht, alle empfangenen MESSAGE-Frames zu lesen und gibt die Nachricht zurück. Der Versuch, ohne ein gültiges Abonnement zu empfangen, führt zu Fehlern.
get_reused_times
syntax: times, err = rabbit:get_reused_times()
Diese Methode gibt die (erfolgreich) wiederverwendeten Zeiten für die aktuelle Verbindung zurück. Im Fehlerfall gibt sie nil und einen String zurück, der den Fehler beschreibt.
Wenn die aktuelle Verbindung nicht aus dem integrierten Verbindungs-Pool stammt, gibt diese Methode immer 0 zurück, das heißt, die Verbindung wurde noch nie wiederverwendet. Wenn die Verbindung aus dem Verbindungs-Pool stammt, ist der Rückgabewert immer ungleich null. Diese Methode kann also auch verwendet werden, um festzustellen, ob die aktuelle Verbindung aus dem Pool stammt.
set_keepalive
syntax: ok, err = rabbit:set_keepalive(max_idle_timeout, pool_size)
Bringt die aktuelle RabbitMQ-Verbindung sofort in den ngx_lua-Cosocket-Verbindungspool.
Sie können das maximale Leerlauf-Timeout (in ms) angeben, wenn die Verbindung im Pool ist, und die maximale Größe des Pools für jeden NGINX-Arbeitsprozess.
Im Erfolgsfall gibt es 1 zurück. Im Fehlerfall gibt es nil mit einem String zurück, der den Fehler beschreibt.
Rufen Sie diese Methode nur an der Stelle auf, an der Sie stattdessen die close-Methode aufgerufen hätten. Das Aufrufen dieser Methode versetzt das aktuelle Redis-Objekt sofort in den geschlossenen Zustand. Alle nachfolgenden Operationen, die nicht connect() auf dem aktuellen Objekt betreffen, geben den geschlossenen Fehler zurück.
close
syntax: ok, err = rabbit:close()
Schließt die aktuelle RabbitMQ-Verbindung ordnungsgemäß, indem ein DISCONNECT an den RabbitMQ STOMP-Broker gesendet wird, und gibt den Status zurück.
Im Erfolgsfall gibt es 1 zurück. Im Fehlerfall gibt es nil mit einem String zurück, der den Fehler beschreibt.
Beispiel
Ein einfacher Producer, der zuverlässige persistente Nachrichten an ein Exchange mit einer Bindung senden kann:
local rabbitmq = require "resty.rabbitmqstomp"
local mq, err = rabbitmq:new()
if not mq then
return
end
mq:set_timeout(10000)
local ok, err = mq:connect {
host = "127.0.0.1",
port = 61613,
username = "guest",
password = "guest",
vhost = "/"
}
if not ok then
return
end
local strlen = string.len
local msg = "{'key': 'value'}"
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"
local ok, err = mq:send(msg, headers)
if not ok then
return
end
ngx.log(ngx.INFO, "Published: " .. msg)
local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:subscribe(headers)
if not ok then
return
end
local data, err = mq:receive()
if not ok then
return
end
ngx.log(ngx.INFO, "Consumed: " .. data)
local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"
local ok, err = mq:unsubscribe(headers)
local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
return
end
Siehe auch
- STOMP 1.2 Spec
- Die lua-resty-mysql Bibliothek
- Openresty Google-Gruppe
GitHub
Sie finden möglicherweise zusätzliche Konfigurationstipps und Dokumentationen für dieses Modul im GitHub-Repository für nginx-module-rabbitmqstomp.