kafka: controlador cliente de Lua kafka para nginx-module-lua basado en la API de cosocket
Instalación
Si no has configurado la suscripción al repositorio RPM, regístrate. Luego puedes proceder con los siguientes pasos.
CentOS/RHEL 7 o Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-kafka
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-kafka
Para usar esta biblioteca Lua con NGINX, asegúrate de que nginx-module-lua esté instalado.
Este documento describe lua-resty-kafka v0.23 lanzado el 03 de noviembre de 2023.
Esta biblioteca Lua es un controlador cliente de Kafka para el módulo ngx_lua de nginx:
http://wiki.nginx.org/HttpLuaModule
Esta biblioteca Lua aprovecha la API de cosocket de ngx_lua, que garantiza un comportamiento 100% no bloqueante.
Ten en cuenta que se requiere al menos ngx_lua 0.9.3 o openresty 1.4.3.7, y desafortunadamente solo es compatible con LuaJIT (--with-luajit).
Nota para conexiones ssl: se requiere al menos ngx_lua 0.9.11 o openresty 1.7.4.1, y desafortunadamente solo es compatible con LuaJIT (--with-luajit).
Sinopsis
server {
location /test {
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{
host = "127.0.0.1",
port = 9092,
-- autenticación opcional
sasl_config = {
mechanism = "PLAIN",
user = "USERNAME",
password = "PASSWORD",
},
},
}
local key = "key"
local message = "halo world"
-- generalmente no usamos esta biblioteca directamente
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata falló, err:", partitions)
end
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- productor tipo sync
local p = producer:new(broker_list)
local offset, err = p:send("test", key, message)
if not offset then
ngx.say("error al enviar:", err)
return
end
ngx.say("envío exitoso, offset: ", tonumber(offset))
-- este es productor tipo async y bp será reutilizado en todo el worker de nginx
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test", key, message)
if not ok then
ngx.say("error al enviar:", err)
return
end
ngx.say("envío exitoso, ok:", ok)
';
}
}
Módulos
resty.kafka.client
Para cargar este módulo, simplemente haz esto
local client = require "resty.kafka.client"
Métodos
new
syntax: c = client:new(broker_list, client_config)
La broker_list es una lista de brokers, como la siguiente
[
{
"host": "127.0.0.1",
"port": 9092,
// autenticación opcional
"sasl_config": {
// mecanismos soportados: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
sasl_config
mecanismos soportados: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.
advertencia: SCRAM-SHA-256, SCRAM-SHA-512 necesitan instalar lua-resty-jit-uuid y lua-resty-openssl.
Se puede especificar una tabla opcional client_config. Las siguientes opciones son las siguientes:
configuración del cliente
-
socket_timeoutEspecifica el umbral de tiempo de espera de red en milisegundos. DEBE ser mayor que el
request_timeout. -
keepalive_timeoutEspecifica el tiempo máximo de inactividad (en milisegundos) para la conexión keepalive.
-
keepalive_sizeEspecifica el número máximo de conexiones permitidas en el grupo de conexiones para cada worker de Nginx.
-
refresh_intervalEspecifica el tiempo para auto refrescar los metadatos en milisegundos. Los metadatos no se refrescarán automáticamente si es nil.
-
sslEspecifica si el cliente debe usar conexión ssl. Por defecto es falso. Ver: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
ssl_verifyEspecifica si el cliente debe realizar verificación SSL. Por defecto es falso. Ver: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
resolverEspecifica una función para la resolución de hosts, que devuelve una cadena de IP o
nil, para anular el resolvedor de hosts predeterminado del sistema. Por defectonil, no se realiza resolución. Ejemplofunction(host) if host == "some_host" then return "10.11.12.13" end end
fetch_metadata
syntax: brokers, partitions = c:fetch_metadata(topic)
En caso de éxito, devuelve todos los brokers y particiones del topic.
En caso de errores, devuelve nil con una cadena que describe el error.
refresh
syntax: brokers, partitions = c:refresh()
Esto refrescará los metadatos de todos los temas que han sido obtenidos por fetch_metadata.
En caso de éxito, devuelve todos los brokers y todas las particiones de todos los temas.
En caso de errores, devuelve nil con una cadena que describe el error.
choose_api_version
syntax: api_version = c:choose_api_version(api_key, min_version, max_version)
Esto ayuda al cliente a seleccionar la versión correcta del api_key correspondiente a la API.
Cuando se proporcionan min_version y max_version, actuará como un límite y las versiones seleccionadas en el valor de retorno no excederán sus límites sin importar cuán alta o baja sea la versión de API que soporte el broker. Cuando no se proporcionan, seguirá el rango de versiones soportadas por el broker.
Consejo: La estrategia de selección de versiones es elegir la versión máxima dentro del rango permitido.
resty.kafka.producer
Para cargar este módulo, simplemente haz esto
local producer = require "resty.kafka.producer"
Métodos
new
syntax: p = producer:new(broker_list, producer_config?, cluster_name?)
Se recomienda usar productor tipo async.
broker_list es el mismo que en client.
Se puede especificar una tabla de opciones opcional. Las siguientes opciones son las siguientes:
socket_timeout, keepalive_timeout, keepalive_size, refresh_interval, ssl, ssl_verify son las mismas que en client_config.
configuración del productor, similar a http://kafka.apache.org/documentation.html#producerconfigs
-
producer_typeEspecifica el
producer.type. "async" o "sync". -
request_timeoutEspecifica el
request.timeout.ms. Por defecto2000 ms. -
required_acksEspecifica el
request.required.acks, NO DEBE ser cero. Por defecto1. -
max_retryEspecifica el
message.send.max.retries. Por defecto3. -
retry_backoffEspecifica el
retry.backoff.ms. Por defecto100. -
api_versionEspecifica la versión de la API de producción. Por defecto
0. Si usas Kafka 0.10.0.0 o superior,api_versionpuede usar0,1o2. Si usas Kafka 0.9.x,api_versiondebe ser0o1. Si usas Kafka 0.8.x,api_versiondebe ser0. -
partitionerEspecifica el particionador que elige la partición a partir de la clave y el número de particiones.
syntax: partitioner = function (key, partition_num, correlation_id) end, el correlation_id es un id de auto incremento en el productor. El particionador predeterminado es:local function default_partitioner(key, num, correlation_id) local id = key and crc32(key) or correlation_id -- partition_id es continuo y comienza desde 0 return id % num end
configuración del buffer (solo funciona producer_type = "async")
-
flush_timeEspecifica el
queue.buffering.max.ms. Por defecto1000. -
batch_numEspecifica el
batch.num.messages. Por defecto200. -
batch_sizeEspecifica el
send.buffer.bytes. Por defecto1M(puede alcanzar 2M). Ten cuidado, DEBE ser menor que la configuraciónsocket.request.max.bytes / 2 - 10ken el servidor kafka. -
max_bufferingEspecifica el
queue.buffering.max.messages. Por defecto50,000. -
error_handleEspecifica el manejo de errores, maneja datos cuando el buffer envía un error a kafka.
syntax: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, los mensajes fallidos en el message_queue son como{ key1, msg1, key2, msg2 },keyen el message_queue es una cadena vacía""incluso si el origen esnil.indexes la longitud del message_queue, no se debe usar#message_queue. cuandoretryableestrue, significa que el servidor kafka seguramente no ha confirmado estos mensajes, puedes reintentar enviar de forma segura; y si no, significa que puede que sí, se recomienda registrar en algún lugar. -
wait_on_buffer_fullEspecifica si se debe esperar cuando la cola del buffer está llena, por defecto
false. Cuando la cola del buffer está llena, si la opción se pasa comotrue, se utilizará una función de espera de semáforo para bloquear la coroutine hasta que se agote el tiempo o la cola del buffer se haya reducido, de lo contrario, devuelve un error de "desbordamiento de buffer" confalse. Nota, no se puede usar en aquellas fases que no soportan yields, es decir, fase de registro. -
wait_buffer_timeoutEspecifica el tiempo máximo de espera cuando el buffer está lleno, por defecto
5segundos.
No se soporta compresión ahora.
El tercer cluster_name opcional especifica el nombre del clúster, por defecto 1 (sí, es un número). Puedes especificar diferentes nombres cuando tengas dos o más clústeres de kafka. Y esto solo funciona con async producer_type.
send
syntax: ok, err = p:send(topic, key, message)
-
En modelo sync
En caso de éxito, devuelve el offset ( cdata: LL ) del broker y partición actuales. En caso de errores, devuelve
nilcon una cadena que describe el error. -
En modelo async
El
messagese escribirá primero en el buffer. Se enviará al servidor kafka cuando el buffer exceda elbatch_num, o cadaflush_timese vaciará el buffer.En caso de éxito, devuelve
true. En caso de errores, devuelvenilcon una cadena que describe el error (desbordamiento de buffer).
offset
syntax: sum, details = p:offset()
Devuelve la suma de todos los offsets de tema-partición (devueltos por la API ProduceRequest);
y los detalles de cada tema-partición.
flush
syntax: ok = p:flush()
Siempre devuelve true.
resty.kafka.basic-consumer
Para cargar este módulo, simplemente haz esto
local bconsumer = require "resty.kafka.basic-consumer"
Este módulo es una implementación minimalista de un consumidor, proporcionando la API list_offset para consultar por tiempo o obtener el offset de inicio y fin y la API fetch para obtener mensajes en un tema.
En una sola llamada, solo se puede obtener la información de una sola partición en un solo tema, y la obtención por lotes no está soportada por ahora. El consumidor básico no soporta la API relacionada con grupos de consumidores, por lo que necesitas obtener el mensaje después de obtener el offset a través de la API list_offset, o tu servicio puede gestionar el offset por sí mismo.
Métodos
new
syntax: c = bconsumer:new(broker_list, client_config)
La broker_list es una lista de brokers, como la siguiente
[
{
"host": "127.0.0.1",
"port": 9092,
// autenticación opcional
"sasl_config": {
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
Se puede especificar una tabla opcional client_config. Las siguientes opciones son las siguientes:
configuración del cliente
-
socket_timeoutEspecifica el umbral de tiempo de espera de red en milisegundos. DEBE ser mayor que el
request_timeout. -
keepalive_timeoutEspecifica el tiempo máximo de inactividad (en milisegundos) para la conexión keepalive.
-
keepalive_sizeEspecifica el número máximo de conexiones permitidas en el grupo de conexiones para cada worker de Nginx.
-
refresh_intervalEspecifica el tiempo para auto refrescar los metadatos en milisegundos. Los metadatos no se refrescarán automáticamente si es nil.
-
sslEspecifica si el cliente debe usar conexión ssl. Por defecto es falso. Ver: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
ssl_verifyEspecifica si el cliente debe realizar verificación SSL. Por defecto es falso. Ver: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
isolation_levelEsta configuración controla la visibilidad de los registros transaccionales. Ver: https://kafka.apache.org/protocol.html -
client_rackID de rack del consumidor que realiza esta solicitud. Ver: https://kafka.apache.org/protocol.html
list_offset
syntax: offset, err = c:list_offset(topic, partition, timestamp)
El parámetro timestamp puede ser un timestamp UNIX o una constante definida en resty.kafka.protocol.consumer, LIST_OFFSET_TIMESTAMP_LAST, LIST_OFFSET_TIMESTAMP_FIRST, LIST_OFFSET_TIMESTAMP_MAX, utilizada para obtener los offsets iniciales y más recientes, etc., semántica con la API ListOffsets en Apache Kafka. Ver: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets
En caso de éxito, devuelve el offset del caso especificado.
En caso de errores, devuelve nil con una cadena que describe el error.
fetch
syntax: result, err = c:fetch(topic, partition, offset)
En caso de éxito, devuelve el siguiente result del caso especificado.
En caso de errores, devuelve nil con una cadena que describe el error.
El result contendrá más información como los mensajes:
-
recordsLa tabla que contiene el contenido del mensaje.
-
errcodeEl código de error de la API Fetch. Ver: https://kafka.apache.org/protocol.html#protocol_error_codes
-
high_watermarkLa marca de agua alta de la API Fetch. Ver: https://kafka.apache.org/protocol.html#The_Messages_Fetch
-
last_stable_offsetEl último offset estable de la API Fetch. El contenido depende de la versión de la API, puede ser nil. Ver: https://kafka.apache.org/protocol.html#The_Messages_Fetch que responde a la versión de API superior a v4
-
log_start_offsetEl offset de inicio del log de la API Fetch. El contenido depende de la versión de la API, puede ser nil. Ver: https://kafka.apache.org/protocol.html#The_Messages_Fetch que responde a la versión de API superior a v5
-
aborted_transactionsLas transacciones abortadas de la API Fetch. El contenido depende de la versión de la API, puede ser nil. Ver: https://kafka.apache.org/protocol.html#The_Messages_Fetch que responde a la versión de API superior a v4
-
preferred_read_replicaLa réplica de lectura preferida de la API Fetch. El contenido depende de la versión de la API, puede ser nil. Ver: https://kafka.apache.org/protocol.html#The_Messages_Fetch que responde a la versión de API superior a v11
Errores
Cuando llamas a los módulos proporcionados en esta biblioteca, puedes obtener algunos errores. Dependiendo de la fuente, se pueden dividir en las siguientes categorías.
-
Errores de red: como conexión rechazada, tiempo de espera de conexión, etc. Necesitas verificar el estado de conexión de cada servicio en tu entorno.
-
Errores relacionados con metadatos: como que los metadatos o los datos de ApiVersion no se pueden recuperar correctamente; el tema o partición especificados no existen, etc. Necesitas verificar la configuración del Broker de Kafka y del cliente.
-
Error devuelto por Kafka: a veces Kafka incluirá datos de err_code en los datos de respuesta. Cuando ocurre este problema, el
erren el valor de retorno se verá asíOFFSET_OUT_OF_RANGE, todos los caracteres en mayúsculas y separados por guiones bajos, y en la biblioteca actual proporcionamos una lista de errores de mapeo correspondiente a las descripciones textuales. Para obtener más información sobre estos errores, consulta las descripciones en la documentación de Kafka.
Ver También
- el módulo ngx_lua: http://wiki.nginx.org/HttpLuaModule
- el protocolo kafka: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
- la biblioteca lua-resty-redis
- la biblioteca lua-resty-logger-socket
- el sarama
GitHub
Puedes encontrar consejos de configuración adicionales y documentación para este módulo en el repositorio de GitHub para nginx-module-kafka.