Zum Inhalt

rabbitmqstomp: Meinungsstarke Lua RabbitMQ-Clientbibliothek für nginx-module-lua-Apps basierend auf der Cosocket-API

Installation

Wenn Sie das RPM-Repository-Abonnement noch nicht eingerichtet haben, melden Sie sich an. Dann können Sie mit den folgenden Schritten fortfahren.

CentOS/RHEL 7 oder Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-rabbitmqstomp

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-rabbitmqstomp

Um diese Lua-Bibliothek mit NGINX zu verwenden, stellen Sie sicher, dass nginx-module-lua installiert ist.

Dieses Dokument beschreibt lua-resty-rabbitmqstomp v0.1, das am 01. Juni 2013 veröffentlicht wurde.


lua-resty-rabbitmqstomp - Lua RabbitMQ-Clientbibliothek, die die Cosocket-API für die Kommunikation über STOMP 1.2 mit einem RabbitMQ-Broker verwendet, der das STOMP-Plugin hat.

Einschränkungen

Diese Bibliothek ist meinungsstark und hat bestimmte Annahmen und Einschränkungen, die möglicherweise in Zukunft angesprochen werden;

  • Der RabbitMQ-Server sollte den STOMP-Adapter aktiviert haben, der STOMP v1.2 unterstützt
  • Annahme, dass Benutzer, vhost, Exchanges, Queues und Bindungen bereits eingerichtet sind

STOMP v1.2 Client-Implementierung

Diese Bibliothek verwendet STOMP 1.2 für die Kommunikation mit dem RabbitMQ-Broker und implementiert Erweiterungen und Einschränkungen des RabbitMQ STOMP-Plugins.

Intern verwendet RabbitMQ AMQP für die weitere Kommunikation. Auf diese Weise ermöglicht die Bibliothek die Implementierung von Verbrauchern und Produzenten, die über STOMP und AMQP mit dem RabbitMQ-Broker kommunizieren. Das Protokoll ist frame-basiert und hat einen Befehl, Header und einen Körper, der durch ein EOL (^@) terminiert wird, das aus \r (013) und dem erforderlichen \n (010) über einen TCP-Stream besteht:

COMMAND
header1:value1
header2: value2

BODY^@

COMMAND wird von EOL gefolgt, dann EOL-getrennte Header im key:value-Paarformat und dann eine leere Zeile, an der der BODY beginnt und der Frame durch ^@ EOL terminiert wird. COMMAND und Header sind UTF-8-kodiert.

Verbindung

Um eine Verbindung herzustellen, erstellen und senden wir einen CONNECT-Frame über einen TCP-Socket, der von der Cosocket-API bereitgestellt wird, und verbinden uns mit der Broker-IP, wobei sowohl IPv4 als auch IPv6 unterstützt werden. Im Frame verwenden wir login, passcode zur Authentifizierung, accept-version, um die Unterstützung der Client-STOMP-Version durchzusetzen, und host, um den VHOST des Brokers auszuwählen.

CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/devnode
heart-beat:optional

^@

Im Fehlerfall wird ein ERROR-Frame zurückgegeben, zum Beispiel:

ERROR
message:Bad CONNECT
content-type:text/plain
version:1.0,1.1,1.2
content-length:32

Access refused for user 'admin'^@

Bei erfolgreicher Verbindung erhalten wir einen CONNECTED-Frame vom Broker, zum Beispiel:

CONNECTED
session:session-sGF0vjCKH1bLhFr6w9QwuQ
heart-beat:0,0
server:RabbitMQ/3.0.4
version:1.2

Für die Erstellung einer Verbindung sollten Benutzername, Passwort, vhost, Heartbeat, Broker-Host und Port angegeben werden.

Veröffentlichen

Wir können Nachrichten an ein Exchange mit einem Routing-Schlüssel, Persistenzmodus, Liefermodus und anderen Headern mit dem SEND-Befehl veröffentlichen:

SEND
destination:/exchange/exchange_name/routing_key
app-id: luaresty
delivery-mode:2
persistent:true
content-type:json/application
content-length:5

hello^@

Beachten Sie, dass die content-length die Nachricht und das EOL-Byte umfasst.

Methoden

new

syntax: rabbit, err = rabbitmqstomp:new()

Erstellt ein RabbitMQ-Objekt. Im Falle von Fehlern gibt es nil und einen String zurück, der den Fehler beschreibt.

set_timeout

syntax: rabbit:set_timeout(time)

Setzt den Timeout (in ms) für nachfolgende Operationen, einschließlich der connect-Methode. Beachten Sie, dass der Timeout vor dem Aufruf einer anderen Methode nach der Erstellung des Objekts festgelegt werden sollte.

connect

syntax: ok, err = red:connect{host=host, port=port, username=username, password=password, vhost=vhost}

Versucht, sich mit einem STOMP-Broker über den RabbitMQ STOMP-Adapter auf einem Host zu verbinden, auf dem der Port lauscht.

Wenn keine der Werte angegeben ist, werden Standardwerte angenommen:

  • host: localhost
  • port: 61613
  • username: guest
  • password: guest
  • vhost: /

pool kann angegeben werden, um einen benutzerdefinierten Namen für den verwendeten Verbindungs-Pool zu verwenden.

send

syntax: rabbit:send(msg, headers)

Veröffentlicht eine Nachricht mit einer Reihe von Headern.

Einige Headerwerte, die gesetzt werden können:

destination: Ziel der Nachricht, zum Beispiel /exchange/name/binding persistent: Um eine persistente Nachricht zu liefern, sollte der Wert "true" sein, wenn deklariert receipt: Quittung für die bestätigte Lieferung content-type: Typ der Nachricht, zum Beispiel application/json

Für eine Liste der unterstützten Header siehe die STOMP-Protokollerweiterungen und Einschränkungsseite: https://www.rabbitmq.com/stomp.html

subscribe

syntax: rabbit:subscribe(headers)

Abonnieren Sie eine Queue mit headers. Es sollte eine ID haben, wenn persistent true ist. Bei erfolgreichem Abonnement werden MESSAGE-Frames vom Broker gesendet.

unsubscribe

syntax: rabbit:unsubscribe(headers)

Meldet sich von einer Queue mit headers ab. Bei erfolgreicher Abmeldung werden keine MESSAGE-Frames mehr vom Broker empfangen.

receive

syntax: rabbit:receive())

Versucht, alle empfangenen MESSAGE-Frames zu lesen und gibt die Nachricht zurück. Der Versuch, ohne ein gültiges Abonnement zu empfangen, führt zu Fehlern.

get_reused_times

syntax: times, err = rabbit:get_reused_times()

Diese Methode gibt die (erfolgreich) wiederverwendeten Zeiten für die aktuelle Verbindung zurück. Im Fehlerfall gibt sie nil und einen String zurück, der den Fehler beschreibt.

Wenn die aktuelle Verbindung nicht aus dem integrierten Verbindungs-Pool stammt, gibt diese Methode immer 0 zurück, das heißt, die Verbindung wurde noch nie wiederverwendet. Wenn die Verbindung aus dem Verbindungs-Pool stammt, ist der Rückgabewert immer ungleich null. Diese Methode kann also auch verwendet werden, um festzustellen, ob die aktuelle Verbindung aus dem Pool stammt.

set_keepalive

syntax: ok, err = rabbit:set_keepalive(max_idle_timeout, pool_size)

Bringt die aktuelle RabbitMQ-Verbindung sofort in den ngx_lua-Cosocket-Verbindungspool.

Sie können das maximale Leerlauf-Timeout (in ms) angeben, wenn die Verbindung im Pool ist, und die maximale Größe des Pools für jeden NGINX-Arbeitsprozess.

Im Erfolgsfall gibt es 1 zurück. Im Fehlerfall gibt es nil mit einem String zurück, der den Fehler beschreibt.

Rufen Sie diese Methode nur an der Stelle auf, an der Sie stattdessen die close-Methode aufgerufen hätten. Das Aufrufen dieser Methode versetzt das aktuelle Redis-Objekt sofort in den geschlossenen Zustand. Alle nachfolgenden Operationen, die nicht connect() auf dem aktuellen Objekt betreffen, geben den geschlossenen Fehler zurück.

close

syntax: ok, err = rabbit:close()

Schließt die aktuelle RabbitMQ-Verbindung ordnungsgemäß, indem ein DISCONNECT an den RabbitMQ STOMP-Broker gesendet wird, und gibt den Status zurück.

Im Erfolgsfall gibt es 1 zurück. Im Fehlerfall gibt es nil mit einem String zurück, der den Fehler beschreibt.

Beispiel

Ein einfacher Producer, der zuverlässige persistente Nachrichten an ein Exchange mit einer Bindung senden kann:

local rabbitmq = require "resty.rabbitmqstomp"
local mq, err = rabbitmq:new()
if not mq then
      return
end

mq:set_timeout(10000)

local ok, err = mq:connect {
                    host = "127.0.0.1",
                    port = 61613,
                    username = "guest",
                    password = "guest",
                    vhost = "/"
                }
if not ok then
    return
end

local strlen =  string.len

local msg = "{'key': 'value'}"
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"

local ok, err = mq:send(msg, headers)
if not ok then
    return
end
ngx.log(ngx.INFO, "Published: " .. msg)

local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:subscribe(headers)
if not ok then
    return
end

local data, err = mq:receive()
if not ok then
    return
end
ngx.log(ngx.INFO, "Consumed: " .. data)

local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:unsubscribe(headers)

local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
    return
end

Siehe auch

GitHub

Sie finden möglicherweise zusätzliche Konfigurationstipps und Dokumentationen für dieses Modul im GitHub-Repository für nginx-module-rabbitmqstomp.