Saltar a contenido

pubsub: Controlador de cliente Lua Pubsub para nginx-module-lua basado en la API de cosocket

Instalación

Si no has configurado la suscripción al repositorio RPM, regístrate. Luego puedes proceder con los siguientes pasos.

CentOS/RHEL 7 o Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-pubsub

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-pubsub

Para usar esta biblioteca Lua con NGINX, asegúrate de que nginx-module-lua esté instalado.

Este documento describe lua-resty-pubsub v1.5 lanzado el 13 de noviembre de 2024.


Controlador de cliente Lua Pubsub para el ngx_lua basado en la API de cosocket.

lua module lua module License

Descripción

Esta biblioteca Lua es un controlador de cliente Pubsub para el módulo nginx ngx_lua: http://wiki.nginx.org/HttpLuaModule

Esta biblioteca Lua aprovecha la API de cosocket de ngx_lua, que garantiza un comportamiento 100% no bloqueante. Esta biblioteca envía mensajes (con atributos) a Google Cloud pubsub utilizando temporizadores de nginx y solicitudes http.

Ten en cuenta que se requiere al menos ngx_lua 0.9.3 o ngx_openresty 1.4.3.7, y desafortunadamente solo es compatible con LuaJIT (--with-luajit).

Sinopsis

    server {
        location = /publish {
            resolver 8.8.8.8 ipv6=off;

            content_by_lua_block {
                local cjson = require "cjson"
                local pubsub_producer = require "resty.pubsub.producer"
                local OAUTH_TOKEN = ngx.shared.OAUTH_TOKEN -- También se puede proporcionar un diccionario diferente

                -- Un callback que recibirá mensajes si se envían con éxito
                -- Los tipos de mensajes y err son una tabla
                local success_callback = function (topic, err, messages)
                    ngx.log(ngx.INFO, "Mensajes: ", cjson.encode(messages), " enviados con éxito al tema: ", topic)
                end

                -- Un callback que recibirá mensajes si falla el envío
                -- Los tipos de mensajes y err son una tabla
                local error_callback = function (topic, err, messages)
                    for _, message in ipairs(messages) do
                        ngx.log(ngx.ERR, "Error al enviar el mensaje: ", cjson.encode(message), " con error: ", cjson.encode(err))
                    end
                end

                local publish = function()

                    -- Configuración del Productor Pubsub
                    local pubsub_config = {
                        project_id = "demo-project",
                        topic = "demo-topic",
                        pubsub_base_domain = "pubsub.googleapis.com",
                        pubsub_base_port = 443,
                        is_emulator = false,
                        producer_config = {
                            max_batch_size = 200, -- número de paquetes
                            max_buffering = 5000, -- número máximo de paquetes en el búfer
                            timer_interval = 10000, -- en milisegundos
                            last_flush_interval = 5000, -- en milisegundos
                            http_timeout = 6000, -- en milisegundos
                            keepalive_max_idle_timeout = 2000, -- en milisegundos
                            keepalive_pool_size = 50
                        },
                        oauth_config = {
                            service_account_key_path = "/etc/key.json", -- Reemplaza esto con tu propia ruta de clave
                            oauth_base_uri = "https://www.googleapis.com/oauth2/v4/token",
                            oauth_scopes = {
                                "https://www.googleapis.com/auth/pubsub"
                            },
                            oauth_token_dict = OAUTH_TOKEN
                        },
                        success_callback = success_callback,
                        error_callback = error_callback
                    }

                    -- Crear el objeto productor
                    -- No importa cuántas veces llames a new, la instancia del productor siempre se generará una vez por tema por proceso de trabajo
                    local producer, err = pubsub_producer:new(pubsub_config)

                    -- También verifica si hay algún error al inicializar el productor
                    if err ~= nil then
                        ngx.log(ngx.ERR, "No se puede crear el productor pubsub ", err)
                        return
                    end

                    -- Finalmente, envía el mensaje con atributos.
                    local ok, send_err = producer:send("Some Random Text", {
                        attr1 = "Test1",
                        attr2 = "Test2"
                    }, "optional_ordering_key")

                    -- También verifica si hay algún error al enviar el mensaje
                    if send_err ~= nil then
                        ngx.log(ngx.ERR, "No se puede enviar datos a pubsub: ", send_err)
                        return
                    end

                end

                -- Publicar Mensaje
                publish()
            }

        }
    }

Configuraciones

Configuraciones del Productor

Propiedad Tipo de Dato Descripción Valor por Defecto
project_id string Especifica el id del proyecto como una cadena de tu proyecto Pub/Sub ninguno (Requerido)
topic string Especifica el tema en el que se deben enviar los datos ninguno (Requerido)
pubsub_base_domain string Especifica el dominio base a través del cual se realiza la conexión http. pubsub.googleapis.com
pubsub_base_port number Especifica el puerto del dominio base a través del cual se realiza la conexión http. 443
is_emulator boolean Especifica un valor booleano. verdadero si estás comunicándote con. falso
producer_config.max_batch_size number Especifica el tamaño máximo del lote que se enviará a pubsub. 200
producer_config.max_buffering number Especifica el tamaño máximo del búfer que contendrá los datos durante un período específico de tiempo. 5000
producer_config.timer_interval number (milisegundos) Especifica el intervalo de tiempo en el que se revisan los mensajes obsoletos en el búfer para su publicación. 10000
producer_config.last_flush_interval number (milisegundos) Especifica el intervalo máximo entre el último tiempo de vaciado y el tiempo actual. Se utiliza cuando los paquetes en el búfer son menos que el tamaño del lote durante un período de tiempo más largo. 10000
producer_config.http_timeout number (milisegundos) Establece la protección de tiempo de espera para operaciones posteriores, incluido el método connect. 5000
producer_config.keepalive_max_idle_timeout number (milisegundos) Se utiliza en httpc:set_keepalive que intenta poner la conexión actual en el grupo de conexiones de cosocket de ngx_lua. 2000
producer_config.keepalive_pool_size number Se utiliza en httpc:set_keepalive que intenta poner la conexión actual en el grupo de conexiones de cosocket de ngx_lua. 50
oauth_config.service_account_key_path string Especifica la ruta para la clave de cuenta de servicio que se utiliza para autenticarse en el proyecto pub/sub. ninguno (Requerido)
oauth_config.oauth_base_uri string Especifica la URI base a la que se realiza la solicitud al servidor de autorización de Google para un token que se utilizará en solicitudes posteriores. https://www.googleapis.com/oauth2/v4/token
oauth_config.oauth_scopes lista de string Especifica una tabla que comprende los alcances de OAuth 2.0 que podrías necesitar solicitar para acceder a las API de Google, dependiendo del nivel de acceso que necesites. {"https://www.googleapis.com/auth/pubsub"}
oauth_config.oauth_token_dict lua_shared_dict Especifica una zona de memoria compartida entre trabajadores, para servir como almacenamiento para el token oauth. ngx.shared.OAUTH_TOKEN
success_handler function Esta es una función de callback que se proporcionará con todos los mensajes y sus atributos que se envían con éxito a pubsub. ninguno (Opcional)
error_handler function Esta es una función de callback que se ejecutará cuando falle un lote. ninguno (Opcional)

Módulos

resty.pubsub.producer

Para cargar este módulo, simplemente haz esto

    local producer = require "resty.pubsub.producer"

Métodos

new

syntax: local p, err = producer:new(pubsub_config)

send

syntax: p:send(message, attributes[, ordering_key])

  • Requiere un mensaje de tipo string, atributos de tipo table y un optional ordering_key de tipo string

Ver También

GitHub

Puedes encontrar consejos de configuración adicionales y documentación para este módulo en el repositorio de GitHub para nginx-module-pubsub.