跳转至

pubsub: 基于 cosocket API 的 NGINX 模块 Lua Pubsub 客户端驱动

安装

如果您尚未设置 RPM 仓库订阅,请 注册。然后您可以继续以下步骤。

CentOS/RHEL 7 或 Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-pubsub

CentOS/RHEL 8+、Fedora Linux、Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-pubsub

要在 NGINX 中使用此 Lua 库,请确保已安装 nginx-module-lua

本文档描述了 lua-resty-pubsub v1.5,发布于 2024 年 11 月 13 日。


基于 cosocket API 的 ngx_lua 的 Lua Pubsub 客户端驱动。

lua module lua module License

描述

此 Lua 库是 ngx_lua NGINX 模块的 Pubsub 客户端驱动: http://wiki.nginx.org/HttpLuaModule

此 Lua 库利用 ngx_lua 的 cosocket API,确保 100% 非阻塞行为。该库使用 NGINX 定时器和 HTTP 请求将消息(带属性)推送到 Google Cloud pubsub。

请注意,至少需要 ngx_lua 0.9.3ngx_openresty 1.4.3.7,并且不幸的是,仅支持 LuaJIT (--with-luajit)。

概要

    server {
        location = /publish {
            resolver 8.8.8.8 ipv6=off;

            content_by_lua_block {
                local cjson = require "cjson"
                local pubsub_producer = require "resty.pubsub.producer"
                local OAUTH_TOKEN = ngx.shared.OAUTH_TOKEN -- 也可以提供不同的字典

                -- 如果消息成功发送,将接收消息的回调
                -- 消息和 err 的类型是一个表
                local success_callback = function (topic, err, messages)
                    ngx.log(ngx.INFO, "Messages: ", cjson.encode(messages), " successfully pushed to topic: ", topic)
                end

                -- 如果消息发送失败,将接收消息的回调
                -- 消息和 err 的类型是一个表
                local error_callback = function (topic, err, messages)
                    for _, message in ipairs(messages) do
                        ngx.log(ngx.ERR, "Failed to send Message : ", cjson.encode(message), " with err: ", cjson.encode(err))
                    end
                end

                local publish = function()

                    -- Pubsub 生产者配置
                    local pubsub_config = {
                        project_id = "demo-project",
                        topic = "demo-topic",
                        pubsub_base_domain = "pubsub.googleapis.com",
                        pubsub_base_port = 443,
                        is_emulator = false,
                        producer_config = {
                            max_batch_size = 200, -- 数据包数量
                            max_buffering = 5000, -- 缓冲区中最大数据包数量
                            timer_interval = 10000, -- 毫秒
                            last_flush_interval = 5000, -- 毫秒
                            http_timeout = 6000, -- 毫秒
                            keepalive_max_idle_timeout = 2000, -- 毫秒
                            keepalive_pool_size = 50
                        },
                        oauth_config = {
                            service_account_key_path = "/etc/key.json", -- 请替换为您自己的密钥路径
                            oauth_base_uri = "https://www.googleapis.com/oauth2/v4/token",
                            oauth_scopes = {
                                "https://www.googleapis.com/auth/pubsub"
                            },
                            oauth_token_dict = OAUTH_TOKEN
                        },
                        success_callback = success_callback,
                        error_callback = error_callback
                    }

                    -- 创建生产者对象
                    -- 无论您调用 new 几次,生产者实例始终会为每个主题每个工作进程生成一次
                    local producer, err = pubsub_producer:new(pubsub_config)

                    -- 还要检查初始化生产者时是否有任何错误
                    if err ~= nil then
                        ngx.log(ngx.ERR, "Unable to create pubsub producer ", err)
                        return
                    end

                    -- 最后发送带属性的消息。
                    local ok, send_err = producer:send("Some Random Text", {
                        attr1 = "Test1",
                        attr2 = "Test2"
                    }, "optional_ordering_key")

                    -- 还要检查发送消息时是否有任何错误
                    if send_err ~= nil then
                        ngx.log(ngx.ERR, "Unable to send data to pubsub: ", send_err)
                        return
                    end

                end

                -- 发布消息
                publish()
            }

        }
    }

配置

生产者配置

属性 数据类型 描述 默认值
project_id string 指定您的 Pub/Sub 项目的项目 ID 字符串 none (必填)
topic string 指定需要发送数据的主题 none (必填)
pubsub_base_domain string 指定通过其建立 HTTP 连接的基础域。 pubsub.googleapis.com
pubsub_base_port number 指定通过其建立 HTTP 连接的基础域端口。 443
is_emulator boolean 指定布尔值。如果您正在与之通信,则为 true。 false
producer_config.max_batch_size number 指定将推送到 pubsub 的最大批量大小。 200
producer_config.max_buffering number 指定将保持数据的缓冲区的最大大小,持续特定时间。 5000
producer_config.timer_interval number (milliseconds) 指定检查缓冲区中陈旧消息以进行发布的时间间隔。 10000
producer_config.last_flush_interval number (milliseconds) 指定最后一次刷新时间和当前时间之间的最大间隔。当缓冲区中的数据包少于批量大小时,长时间使用。 10000
producer_config.http_timeout number (milliseconds) 设置后续操作的超时保护,包括连接方法。 5000
producer_config.keepalive_max_idle_timeout number (milliseconds) 用于 httpc:set_keepalive,尝试将当前连接放入 ngx_lua cosocket 连接池。 2000
producer_config.keepalive_pool_size number 用于 httpc:set_keepalive,尝试将当前连接放入 ngx_lua cosocket 连接池。 50
oauth_config.service_account_key_path string 指定用于对 pub/sub 项目进行身份验证的服务帐户密钥的路径。 none (必填)
oauth_config.oauth_base_uri string 指定请求发送到 Google 授权服务器以获取将在后续请求中使用的令牌的基础 URI。 https://www.googleapis.com/oauth2/v4/token
oauth_config.oauth_scopes list of string 指定一个表,包含您可能需要请求以访问 Google API 的 OAuth 2.0 范围,具体取决于您需要的访问级别。 {"https://www.googleapis.com/auth/pubsub"}
oauth_config.oauth_token_dict lua_shared_dict 指定一个跨工作进程的共享内存区域,用作 oauth 令牌的存储。 ngx.shared.OAUTH_TOKEN
success_handler function 这是一个回调函数,将提供所有成功推送到 pubsub 的消息及其属性。 none (可选)
error_handler function 这是一个回调函数,当批处理失败时将执行。 none (可选)

模块

resty.pubsub.producer

要加载此模块,只需执行以下操作

    local producer = require "resty.pubsub.producer"

方法

new

语法: local p, err = producer:new(pubsub_config)

send

语法: p:send(message, attributes[, ordering_key])

  • 需要一个类型为字符串的消息,一个类型为表的属性和一个可选的类型为字符串的 ordering_key

另见

GitHub

您可以在 nginx-module-pubsub 的 GitHub 仓库 中找到此模块的其他配置提示和文档。