pubsub: Controlador de cliente Lua Pubsub para nginx-module-lua basado en la API de cosocket
Instalación
Si no has configurado la suscripción al repositorio RPM, regístrate. Luego puedes proceder con los siguientes pasos.
CentOS/RHEL 7 o Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-pubsub
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-pubsub
Para usar esta biblioteca Lua con NGINX, asegúrate de que nginx-module-lua esté instalado.
Este documento describe lua-resty-pubsub v1.5 lanzado el 13 de noviembre de 2024.
Controlador de cliente Lua Pubsub para el ngx_lua basado en la API de cosocket.
Descripción
Esta biblioteca Lua es un controlador de cliente Pubsub para el módulo nginx ngx_lua: http://wiki.nginx.org/HttpLuaModule
Esta biblioteca Lua aprovecha la API de cosocket de ngx_lua, que garantiza un comportamiento 100% no bloqueante. Esta biblioteca envía mensajes (con atributos) a Google Cloud pubsub utilizando temporizadores de nginx y solicitudes http.
Ten en cuenta que se requiere al menos ngx_lua 0.9.3 o ngx_openresty 1.4.3.7, y desafortunadamente solo es compatible con LuaJIT (--with-luajit).
Sinopsis
server {
location = /publish {
resolver 8.8.8.8 ipv6=off;
content_by_lua_block {
local cjson = require "cjson"
local pubsub_producer = require "resty.pubsub.producer"
local OAUTH_TOKEN = ngx.shared.OAUTH_TOKEN -- También se puede proporcionar un diccionario diferente
-- Un callback que recibirá mensajes si se envían con éxito
-- Los tipos de mensajes y err son una tabla
local success_callback = function (topic, err, messages)
ngx.log(ngx.INFO, "Mensajes: ", cjson.encode(messages), " enviados con éxito al tema: ", topic)
end
-- Un callback que recibirá mensajes si falla el envío
-- Los tipos de mensajes y err son una tabla
local error_callback = function (topic, err, messages)
for _, message in ipairs(messages) do
ngx.log(ngx.ERR, "Error al enviar el mensaje: ", cjson.encode(message), " con error: ", cjson.encode(err))
end
end
local publish = function()
-- Configuración del Productor Pubsub
local pubsub_config = {
project_id = "demo-project",
topic = "demo-topic",
pubsub_base_domain = "pubsub.googleapis.com",
pubsub_base_port = 443,
is_emulator = false,
producer_config = {
max_batch_size = 200, -- número de paquetes
max_buffering = 5000, -- número máximo de paquetes en el búfer
timer_interval = 10000, -- en milisegundos
last_flush_interval = 5000, -- en milisegundos
http_timeout = 6000, -- en milisegundos
keepalive_max_idle_timeout = 2000, -- en milisegundos
keepalive_pool_size = 50
},
oauth_config = {
service_account_key_path = "/etc/key.json", -- Reemplaza esto con tu propia ruta de clave
oauth_base_uri = "https://www.googleapis.com/oauth2/v4/token",
oauth_scopes = {
"https://www.googleapis.com/auth/pubsub"
},
oauth_token_dict = OAUTH_TOKEN
},
success_callback = success_callback,
error_callback = error_callback
}
-- Crear el objeto productor
-- No importa cuántas veces llames a new, la instancia del productor siempre se generará una vez por tema por proceso de trabajo
local producer, err = pubsub_producer:new(pubsub_config)
-- También verifica si hay algún error al inicializar el productor
if err ~= nil then
ngx.log(ngx.ERR, "No se puede crear el productor pubsub ", err)
return
end
-- Finalmente, envía el mensaje con atributos.
local ok, send_err = producer:send("Some Random Text", {
attr1 = "Test1",
attr2 = "Test2"
}, "optional_ordering_key")
-- También verifica si hay algún error al enviar el mensaje
if send_err ~= nil then
ngx.log(ngx.ERR, "No se puede enviar datos a pubsub: ", send_err)
return
end
end
-- Publicar Mensaje
publish()
}
}
}
Configuraciones
Configuraciones del Productor
| Propiedad | Tipo de Dato | Descripción | Valor por Defecto |
|---|---|---|---|
| project_id | string | Especifica el id del proyecto como una cadena de tu proyecto Pub/Sub | ninguno (Requerido) |
| topic | string | Especifica el tema en el que se deben enviar los datos | ninguno (Requerido) |
| pubsub_base_domain | string | Especifica el dominio base a través del cual se realiza la conexión http. | pubsub.googleapis.com |
| pubsub_base_port | number | Especifica el puerto del dominio base a través del cual se realiza la conexión http. | 443 |
| is_emulator | boolean | Especifica un valor booleano. verdadero si estás comunicándote con. | falso |
| producer_config.max_batch_size | number | Especifica el tamaño máximo del lote que se enviará a pubsub. | 200 |
| producer_config.max_buffering | number | Especifica el tamaño máximo del búfer que contendrá los datos durante un período específico de tiempo. | 5000 |
| producer_config.timer_interval | number (milisegundos) | Especifica el intervalo de tiempo en el que se revisan los mensajes obsoletos en el búfer para su publicación. | 10000 |
| producer_config.last_flush_interval | number (milisegundos) | Especifica el intervalo máximo entre el último tiempo de vaciado y el tiempo actual. Se utiliza cuando los paquetes en el búfer son menos que el tamaño del lote durante un período de tiempo más largo. | 10000 |
| producer_config.http_timeout | number (milisegundos) | Establece la protección de tiempo de espera para operaciones posteriores, incluido el método connect. | 5000 |
| producer_config.keepalive_max_idle_timeout | number (milisegundos) | Se utiliza en httpc:set_keepalive que intenta poner la conexión actual en el grupo de conexiones de cosocket de ngx_lua. | 2000 |
| producer_config.keepalive_pool_size | number | Se utiliza en httpc:set_keepalive que intenta poner la conexión actual en el grupo de conexiones de cosocket de ngx_lua. | 50 |
| oauth_config.service_account_key_path | string | Especifica la ruta para la clave de cuenta de servicio que se utiliza para autenticarse en el proyecto pub/sub. | ninguno (Requerido) |
| oauth_config.oauth_base_uri | string | Especifica la URI base a la que se realiza la solicitud al servidor de autorización de Google para un token que se utilizará en solicitudes posteriores. | https://www.googleapis.com/oauth2/v4/token |
| oauth_config.oauth_scopes | lista de string | Especifica una tabla que comprende los alcances de OAuth 2.0 que podrías necesitar solicitar para acceder a las API de Google, dependiendo del nivel de acceso que necesites. | {"https://www.googleapis.com/auth/pubsub"} |
| oauth_config.oauth_token_dict | lua_shared_dict | Especifica una zona de memoria compartida entre trabajadores, para servir como almacenamiento para el token oauth. | ngx.shared.OAUTH_TOKEN |
| success_handler | function | Esta es una función de callback que se proporcionará con todos los mensajes y sus atributos que se envían con éxito a pubsub. | ninguno (Opcional) |
| error_handler | function | Esta es una función de callback que se ejecutará cuando falle un lote. | ninguno (Opcional) |
Módulos
resty.pubsub.producer
Para cargar este módulo, simplemente haz esto
local producer = require "resty.pubsub.producer"
Métodos
new
syntax: local p, err = producer:new(pubsub_config)
send
syntax: p:send(message, attributes[, ordering_key])
- Requiere un mensaje de tipo string, atributos de tipo table y un optional ordering_key de tipo string
Ver También
- el módulo ngx_lua: http://wiki.nginx.org/HttpLuaModule
GitHub
Puedes encontrar consejos de configuración adicionales y documentación para este módulo en el repositorio de GitHub para nginx-module-pubsub.