Перейти к содержанию

rabbitmqstomp: Модульная библиотека клиента RabbitMQ на Lua для приложений nginx-module-lua, основанная на API cosocket

Установка

Если вы еще не настроили подписку на репозиторий RPM, зарегистрируйтесь. Затем вы можете продолжить с следующими шагами.

CentOS/RHEL 7 или Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-rabbitmqstomp

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-rabbitmqstomp

Чтобы использовать эту библиотеку Lua с NGINX, убедитесь, что nginx-module-lua установлен.

Этот документ описывает lua-resty-rabbitmqstomp v0.1, выпущенную 1 июня 2013 года.


lua-resty-rabbitmqstomp - библиотека клиента RabbitMQ на Lua, которая использует API cosocket для связи по STOMP 1.2 с брокером RabbitMQ, у которого включен плагин STOMP.

Ограничения

Эта библиотека имеет определенные предположения и ограничения, которые могут быть устранены в будущем;

  • Сервер RabbitMQ должен иметь включенный адаптер STOMP, поддерживающий STOMP v1.2
  • Предположение, что пользователи, vhost, обмены, очереди и привязки уже настроены

Реализация клиента STOMP v1.2

Эта библиотека использует STOMP 1.2 для связи с брокером RabbitMQ и реализует расширения и ограничения плагина RabbitMQ Stomp.

Внутри RabbitMQ используется AMQP для дальнейшей связи. Таким образом, библиотека позволяет реализовывать потребителей и производителей, которые общаются с брокером RabbitMQ через STOMP, через AMQP. Протокол основан на кадрах и имеет команду, заголовки и тело, заканчивающееся EOL (^@), которое состоит из \r (013) и обязательного \n (010) в TCP потоке:

COMMAND
header1:value1
header2: value2

COMMAND следует за EOL, затем EOL-разделенные заголовки в формате ключ:значение и затем пустая строка, с которой начинается BODY, а кадр завершается ^@ EOL. COMMAND и заголовки закодированы в UTF-8.

Подключение

Чтобы подключиться, мы создаем и отправляем кадр CONNECT через TCP-сокет, предоставленный API cosocket, подключаясь к IP брокера, поддерживаются как IPv4, так и IPv6. В кадре мы используем логин, пароль для аутентификации, accept-version для обеспечения поддержки версии STOMP клиентом и host для выбора VHOST брокера.

CONNECT
accept-version:1.2
login:guest
passcode:guest
host:/devnode
heart-beat:optional

^@

В случае ошибки возвращается кадр ERROR, например:

ERROR
message:Bad CONNECT
content-type:text/plain
version:1.0,1.1,1.2
content-length:32

Access refused for user 'admin'^@

При успешном подключении брокер возвращает кадр CONNECTED, например:

CONNECTED
session:session-sGF0vjCKH1bLhFr6w9QwuQ
heart-beat:0,0
server:RabbitMQ/3.0.4
version:1.2

Для создания соединения необходимо предоставить имя пользователя, пароль, vhost, heartbeat, хост брокера и порт.

Публикация

Мы можем публиковать сообщения в обмен с ключом маршрутизации, режимом сохранения, режимом доставки и другими заголовками, используя команду SEND:

SEND
destination:/exchange/exchange_name/routing_key
app-id: luaresty
delivery-mode:2
persistent:true
content-type:json/application
content-length:5

hello^@

Обратите внимание, что content-length включает сообщение и байт EOL.

Методы

new

syntax: rabbit, err = rabbitmqstomp:new()

Создает объект RabbitMQ. В случае неудачи возвращает nil и строку, описывающую ошибку.

set_timeout

syntax: rabbit:set_timeout(time)

Устанавливает тайм-аут (в мс) для последующих операций, включая метод connect. Обратите внимание, что тайм-аут должен быть установлен перед вызовом любого другого метода после создания объекта.

connect

syntax: ok, err = red:connect{host=host, port=port, username=username, password=password, vhost=vhost}

Пытается подключиться к брокеру STOMP с адаптером RabbitMQ на указанном хосте и порту.

Если ни одно из значений не указано, предполагаются значения по умолчанию:

  • host: localhost
  • port: 61613
  • username: guest
  • password: guest
  • vhost: /

pool может быть указан для использования в качестве пользовательского имени для пула соединений.

send

syntax: rabbit:send(msg, headers)

Публикует сообщение с набором заголовков.

Некоторые значения заголовков, которые можно установить:

destination: Назначение сообщения, например /exchange/name/bindingpersistent: Для доставки постоянного сообщения значение должно быть "true", если оно объявленоreceipt: Квитанция для подтвержденной доставкиcontent-type`: Тип сообщения, например application/json

Для списка поддерживаемых заголовков смотрите страницу расширений и ограничений протокола STOMP: https://www.rabbitmq.com/stomp.html

subscribe

syntax: rabbit:subscribe(headers)

Подписывается на очередь, используя headers. Он должен иметь id, когда persistent равно true. При успешной подписке брокер отправляет кадры MESSAGE.

unsubscribe

syntax: rabbit:unsubscribe(headers)

Отписывается от очереди, используя headers. При успешной отписке кадры MESSAGE перестанут поступать от брокера.

receive

syntax: rabbit:receive())

Пытается прочитать любые полученные кадры MESSAGE и возвращает сообщение. Попытка получить данные без действительной подписки приведет к ошибкам.

get_reused_times

syntax: times, err = rabbit:get_reused_times()

Этот метод возвращает (успешно) повторно использованные разы для текущего соединения. В случае ошибки он возвращает nil и строку, описывающую ошибку.

Если текущее соединение не происходит из встроенного пула соединений, то этот метод всегда возвращает 0, то есть соединение никогда не было повторно использовано (пока). Если соединение происходит из пула соединений, то возвращаемое значение всегда будет ненулевым. Таким образом, этот метод также можно использовать для определения, происходит ли текущее соединение из пула.

set_keepalive

syntax: ok, err = rabbit:set_keepalive(max_idle_timeout, pool_size)

Сразу помещает текущее соединение RabbitMQ в пул соединений ngx_lua cosocket.

Вы можете указать максимальный тайм-аут простоя (в мс), когда соединение находится в пуле, и максимальный размер пула для каждого рабочего процесса nginx.

В случае успеха возвращает 1. В случае ошибок возвращает nil с строкой, описывающей ошибку.

Вызывайте этот метод только в том месте, где вы бы вызвали метод close. Вызов этого метода немедленно переведет текущий объект redis в закрытое состояние. Любые последующие операции, кроме connect() на текущем объекте, вернут ошибку закрытия.

close

syntax: ok, err = rabbit:close()

Корректно закрывает текущее соединение RabbitMQ, отправляя DISCONNECT брокеру RabbitMQ STOMP и возвращает статус.

В случае успеха возвращает 1. В случае ошибок возвращает nil с строкой, описывающей ошибку.

Пример

Простой производитель, который может отправлять надежные постоянные сообщения в обмен с некоторой привязкой:

local rabbitmq = require "resty.rabbitmqstomp"
local mq, err = rabbitmq:new()
if not mq then
      return
end

mq:set_timeout(10000)

local ok, err = mq:connect {
                    host = "127.0.0.1",
                    port = 61613,
                    username = "guest",
                    password = "guest",
                    vhost = "/"
                }
if not ok then
    return
end

local strlen =  string.len

local msg = "{'key': 'value'}"
local headers = {}
headers["destination"] = "/exchange/test/binding"
headers["receipt"] = "msg#1"
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"

local ok, err = mq:send(msg, headers)
if not ok then
    return
end
ngx.log(ngx.INFO, "Published: " .. msg)

local headers = {}
headers["destination"] = "/amq/queue/queuename"
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:subscribe(headers)
if not ok then
    return
end

local data, err = mq:receive()
if not ok then
    return
end
ngx.log(ngx.INFO, "Consumed: " .. data)

local headers = {}
headers["persistent"] = "true"
headers["id"] = "123"

local ok, err = mq:unsubscribe(headers)

local ok, err = mq:set_keepalive(10000, 10000)
if not ok then
    return
end

См. также

GitHub

Вы можете найти дополнительные советы по конфигурации и документацию для этого модуля в репозитории GitHub для nginx-module-rabbitmqstomp.