Перейти к содержанию

pubsub: Драйвер клиента Lua Pubsub для nginx-module-lua на основе API cosocket

Установка

Если вы еще не подписались на репозиторий RPM, зарегистрируйтесь. Затем вы можете продолжить с следующими шагами.

CentOS/RHEL 7 или Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-pubsub

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-pubsub

Чтобы использовать эту библиотеку Lua с NGINX, убедитесь, что установлен nginx-module-lua.

Этот документ описывает lua-resty-pubsub v1.5, выпущенный 13 ноября 2024 года.


Драйвер клиента Lua Pubsub для ngx_lua, основанный на API cosocket.

lua module lua module License

Описание

Эта библиотека Lua является драйвером клиента Pubsub для модуля ngx_lua nginx: http://wiki.nginx.org/HttpLuaModule

Эта библиотека Lua использует API cosocket ngx_lua, который обеспечивает 100% неблокирующее поведение. Эта библиотека отправляет сообщения (с атрибутами) в Google Cloud pubsub, используя таймеры nginx и http-запросы.

Обратите внимание, что требуется как минимум ngx_lua 0.9.3 или ngx_openresty 1.4.3.7, и, к сожалению, поддерживается только LuaJIT (--with-luajit).

Синопсис

    server {
        location = /publish {
            resolver 8.8.8.8 ipv6=off;

            content_by_lua_block {
                local cjson = require "cjson"
                local pubsub_producer = require "resty.pubsub.producer"
                local OAUTH_TOKEN = ngx.shared.OAUTH_TOKEN -- Также можно предоставить другой словарь

                -- Колбек, который будет получать сообщения, если они успешно отправлены
                -- Типы сообщений и err - это таблица
                local success_callback = function (topic, err, messages)
                    ngx.log(ngx.INFO, "Сообщения: ", cjson.encode(messages), " успешно отправлены в тему: ", topic)
                end

                -- Колбек, который будет получать сообщения, если отправка не удалась
                -- Типы сообщений и err - это таблица
                local error_callback = function (topic, err, messages)
                    for _, message in ipairs(messages) do
                        ngx.log(ngx.ERR, "Не удалось отправить сообщение: ", cjson.encode(message), " с ошибкой: ", cjson.encode(err))
                    end
                end

                local publish = function()

                    -- Конфигурация Pubsub Producer
                    local pubsub_config = {
                        project_id = "demo-project",
                        topic = "demo-topic",
                        pubsub_base_domain = "pubsub.googleapis.com",
                        pubsub_base_port = 443,
                        is_emulator = false,
                        producer_config = {
                            max_batch_size = 200, -- количество пакетов
                            max_buffering = 5000, -- максимальное количество пакетов в буфере
                            timer_interval = 10000, -- в миллисекундах
                            last_flush_interval = 5000, -- в миллисекундах
                            http_timeout = 6000, -- в миллисекундах
                            keepalive_max_idle_timeout = 2000, -- в миллисекундах
                            keepalive_pool_size = 50
                        },
                        oauth_config = {
                            service_account_key_path = "/etc/key.json", -- Замените это на путь к вашему ключу
                            oauth_base_uri = "https://www.googleapis.com/oauth2/v4/token",
                            oauth_scopes = {
                                "https://www.googleapis.com/auth/pubsub"
                            },
                            oauth_token_dict = OAUTH_TOKEN
                        },
                        success_callback = success_callback,
                        error_callback = error_callback
                    }

                    -- Создание объекта producer
                    -- Независимо от того, сколько раз вы вызываете new, экземпляр producer будет создан только один раз для каждой темы на каждый рабочий процесс
                    local producer, err = pubsub_producer:new(pubsub_config)

                    -- Также проверьте, есть ли ошибка при инициализации producer
                    if err ~= nil then
                        ngx.log(ngx.ERR, "Не удалось создать pubsub producer ", err)
                        return
                    end

                    -- Наконец, отправьте сообщение с атрибутами.
                    local ok, send_err = producer:send("Некоторый случайный текст", {
                        attr1 = "Test1",
                        attr2 = "Test2"
                    }, "optional_ordering_key")

                    -- Также проверьте, есть ли ошибка при отправке сообщения
                    if send_err ~= nil then
                        ngx.log(ngx.ERR, "Не удалось отправить данные в pubsub: ", send_err)
                        return
                    end

                end

                -- Опубликовать сообщение
                publish()
            }

        }
    }

Конфигурации

Конфигурации Producer

Свойство Тип данных Описание Значение по умолчанию
project_id string Указывает идентификатор проекта в виде строки вашего проекта Pub/Sub none (Обязательно)
topic string Указывает тему, в которую необходимо отправить данные none (Обязательно)
pubsub_base_domain string Указывает базовый домен, через который устанавливается http-соединение. pubsub.googleapis.com
pubsub_base_port number Указывает порт базового домена, через который устанавливается http-соединение. 443
is_emulator boolean Указывает логическое значение. true, если вы общаетесь с эмулятором. false
producer_config.max_batch_size number Указывает максимальный размер партии, который будет отправлен в pubsub. 200
producer_config.max_buffering number Указывает максимальный размер буфера, который будет хранить данные в течение определенного времени. 5000
producer_config.timer_interval number (milliseconds) Указывает временной интервал, в течение которого проверяются устаревшие сообщения в буфере для публикации. 10000
producer_config.last_flush_interval number (milliseconds) Указывает максимальный интервал между последним временем сброса и текущим временем. Используется, когда пакеты в буфере меньше, чем размер партии, в течение длительного времени. 10000
producer_config.http_timeout number (milliseconds) Устанавливает защиту тайм-аута для последующих операций, включая метод connect. 5000
producer_config.keepalive_max_idle_timeout number (milliseconds) Используется в httpc:set_keepalive, который пытается поместить текущее соединение в пул соединений ngx_lua cosocket. 2000
producer_config.keepalive_pool_size number Используется в httpc:set_keepalive, который пытается поместить текущее соединение в пул соединений ngx_lua cosocket. 50
oauth_config.service_account_key_path string Указывает путь к ключу сервисного аккаунта, который используется для аутентификации в проекте pub/sub. none (Обязательно)
oauth_config.oauth_base_uri string Указывает базовый uri, к которому отправляется запрос на сервер авторизации Google для получения токена, который будет использоваться в последующих запросах. https://www.googleapis.com/oauth2/v4/token
oauth_config.oauth_scopes list of string Указывает таблицу, состоящую из областей OAuth 2.0, которые вам могут понадобиться для доступа к API Google, в зависимости от уровня доступа, который вам нужен. {"https://www.googleapis.com/auth/pubsub"}
oauth_config.oauth_token_dict lua_shared_dict Указывает общую область памяти между рабочими процессами, чтобы служить хранилищем для oauth токена. ngx.shared.OAUTH_TOKEN
success_handler function Это колбек-функция, которая будет предоставлена со всеми сообщениями с их атрибутами, которые были успешно отправлены в pubsub. none (Необязательно)
error_handler function Это колбек-функция, которая будет выполнена, когда пакет не удастся. none (Необязательно)

Модули

resty.pubsub.producer

Чтобы загрузить этот модуль, просто выполните следующее

    local producer = require "resty.pubsub.producer"

Методы

new

синтаксис: local p, err = producer:new(pubsub_config)

send

синтаксис: p:send(message, attributes[, ordering_key])

  • Требуется сообщение типа string, атрибуты типа table и необязательный ordering_key типа string

См. также

GitHub

Вы можете найти дополнительные советы по конфигурации и документацию для этого модуля в репозитории GitHub для nginx-module-pubsub.