pubsub: Драйвер клиента Lua Pubsub для nginx-module-lua на основе API cosocket
Установка
Если вы еще не подписались на репозиторий RPM, зарегистрируйтесь. Затем вы можете продолжить с следующими шагами.
CentOS/RHEL 7 или Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-pubsub
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-pubsub
Чтобы использовать эту библиотеку Lua с NGINX, убедитесь, что установлен nginx-module-lua.
Этот документ описывает lua-resty-pubsub v1.5, выпущенный 13 ноября 2024 года.
Драйвер клиента Lua Pubsub для ngx_lua, основанный на API cosocket.
Описание
Эта библиотека Lua является драйвером клиента Pubsub для модуля ngx_lua nginx: http://wiki.nginx.org/HttpLuaModule
Эта библиотека Lua использует API cosocket ngx_lua, который обеспечивает 100% неблокирующее поведение. Эта библиотека отправляет сообщения (с атрибутами) в Google Cloud pubsub, используя таймеры nginx и http-запросы.
Обратите внимание, что требуется как минимум ngx_lua 0.9.3 или ngx_openresty 1.4.3.7, и, к сожалению, поддерживается только LuaJIT (--with-luajit).
Синопсис
server {
location = /publish {
resolver 8.8.8.8 ipv6=off;
content_by_lua_block {
local cjson = require "cjson"
local pubsub_producer = require "resty.pubsub.producer"
local OAUTH_TOKEN = ngx.shared.OAUTH_TOKEN -- Также можно предоставить другой словарь
-- Колбек, который будет получать сообщения, если они успешно отправлены
-- Типы сообщений и err - это таблица
local success_callback = function (topic, err, messages)
ngx.log(ngx.INFO, "Сообщения: ", cjson.encode(messages), " успешно отправлены в тему: ", topic)
end
-- Колбек, который будет получать сообщения, если отправка не удалась
-- Типы сообщений и err - это таблица
local error_callback = function (topic, err, messages)
for _, message in ipairs(messages) do
ngx.log(ngx.ERR, "Не удалось отправить сообщение: ", cjson.encode(message), " с ошибкой: ", cjson.encode(err))
end
end
local publish = function()
-- Конфигурация Pubsub Producer
local pubsub_config = {
project_id = "demo-project",
topic = "demo-topic",
pubsub_base_domain = "pubsub.googleapis.com",
pubsub_base_port = 443,
is_emulator = false,
producer_config = {
max_batch_size = 200, -- количество пакетов
max_buffering = 5000, -- максимальное количество пакетов в буфере
timer_interval = 10000, -- в миллисекундах
last_flush_interval = 5000, -- в миллисекундах
http_timeout = 6000, -- в миллисекундах
keepalive_max_idle_timeout = 2000, -- в миллисекундах
keepalive_pool_size = 50
},
oauth_config = {
service_account_key_path = "/etc/key.json", -- Замените это на путь к вашему ключу
oauth_base_uri = "https://www.googleapis.com/oauth2/v4/token",
oauth_scopes = {
"https://www.googleapis.com/auth/pubsub"
},
oauth_token_dict = OAUTH_TOKEN
},
success_callback = success_callback,
error_callback = error_callback
}
-- Создание объекта producer
-- Независимо от того, сколько раз вы вызываете new, экземпляр producer будет создан только один раз для каждой темы на каждый рабочий процесс
local producer, err = pubsub_producer:new(pubsub_config)
-- Также проверьте, есть ли ошибка при инициализации producer
if err ~= nil then
ngx.log(ngx.ERR, "Не удалось создать pubsub producer ", err)
return
end
-- Наконец, отправьте сообщение с атрибутами.
local ok, send_err = producer:send("Некоторый случайный текст", {
attr1 = "Test1",
attr2 = "Test2"
}, "optional_ordering_key")
-- Также проверьте, есть ли ошибка при отправке сообщения
if send_err ~= nil then
ngx.log(ngx.ERR, "Не удалось отправить данные в pubsub: ", send_err)
return
end
end
-- Опубликовать сообщение
publish()
}
}
}
Конфигурации
Конфигурации Producer
| Свойство | Тип данных | Описание | Значение по умолчанию |
|---|---|---|---|
| project_id | string | Указывает идентификатор проекта в виде строки вашего проекта Pub/Sub | none (Обязательно) |
| topic | string | Указывает тему, в которую необходимо отправить данные | none (Обязательно) |
| pubsub_base_domain | string | Указывает базовый домен, через который устанавливается http-соединение. | pubsub.googleapis.com |
| pubsub_base_port | number | Указывает порт базового домена, через который устанавливается http-соединение. | 443 |
| is_emulator | boolean | Указывает логическое значение. true, если вы общаетесь с эмулятором. | false |
| producer_config.max_batch_size | number | Указывает максимальный размер партии, который будет отправлен в pubsub. | 200 |
| producer_config.max_buffering | number | Указывает максимальный размер буфера, который будет хранить данные в течение определенного времени. | 5000 |
| producer_config.timer_interval | number (milliseconds) | Указывает временной интервал, в течение которого проверяются устаревшие сообщения в буфере для публикации. | 10000 |
| producer_config.last_flush_interval | number (milliseconds) | Указывает максимальный интервал между последним временем сброса и текущим временем. Используется, когда пакеты в буфере меньше, чем размер партии, в течение длительного времени. | 10000 |
| producer_config.http_timeout | number (milliseconds) | Устанавливает защиту тайм-аута для последующих операций, включая метод connect. | 5000 |
| producer_config.keepalive_max_idle_timeout | number (milliseconds) | Используется в httpc:set_keepalive, который пытается поместить текущее соединение в пул соединений ngx_lua cosocket. | 2000 |
| producer_config.keepalive_pool_size | number | Используется в httpc:set_keepalive, который пытается поместить текущее соединение в пул соединений ngx_lua cosocket. | 50 |
| oauth_config.service_account_key_path | string | Указывает путь к ключу сервисного аккаунта, который используется для аутентификации в проекте pub/sub. | none (Обязательно) |
| oauth_config.oauth_base_uri | string | Указывает базовый uri, к которому отправляется запрос на сервер авторизации Google для получения токена, который будет использоваться в последующих запросах. | https://www.googleapis.com/oauth2/v4/token |
| oauth_config.oauth_scopes | list of string | Указывает таблицу, состоящую из областей OAuth 2.0, которые вам могут понадобиться для доступа к API Google, в зависимости от уровня доступа, который вам нужен. | {"https://www.googleapis.com/auth/pubsub"} |
| oauth_config.oauth_token_dict | lua_shared_dict | Указывает общую область памяти между рабочими процессами, чтобы служить хранилищем для oauth токена. | ngx.shared.OAUTH_TOKEN |
| success_handler | function | Это колбек-функция, которая будет предоставлена со всеми сообщениями с их атрибутами, которые были успешно отправлены в pubsub. | none (Необязательно) |
| error_handler | function | Это колбек-функция, которая будет выполнена, когда пакет не удастся. | none (Необязательно) |
Модули
resty.pubsub.producer
Чтобы загрузить этот модуль, просто выполните следующее
local producer = require "resty.pubsub.producer"
Методы
new
синтаксис: local p, err = producer:new(pubsub_config)
send
синтаксис: p:send(message, attributes[, ordering_key])
- Требуется сообщение типа string, атрибуты типа table и необязательный ordering_key типа string
См. также
- модуль ngx_lua: http://wiki.nginx.org/HttpLuaModule
GitHub
Вы можете найти дополнительные советы по конфигурации и документацию для этого модуля в репозитории GitHub для nginx-module-pubsub.