Saltar a contenido

kafka: controlador cliente de Lua kafka para nginx-module-lua basado en la API de cosocket

Instalación

Si no has configurado la suscripción al repositorio RPM, regístrate. Luego puedes proceder con los siguientes pasos.

CentOS/RHEL 7 o Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-kafka

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-kafka

Para usar esta biblioteca Lua con NGINX, asegúrate de que nginx-module-lua esté instalado.

Este documento describe lua-resty-kafka v0.23 lanzado el 03 de noviembre de 2023.


Esta biblioteca Lua es un controlador cliente de Kafka para el módulo ngx_lua de nginx:

http://wiki.nginx.org/HttpLuaModule

Esta biblioteca Lua aprovecha la API de cosocket de ngx_lua, que garantiza un comportamiento 100% no bloqueante.

Ten en cuenta que se requiere al menos ngx_lua 0.9.3 o openresty 1.4.3.7, y desafortunadamente solo es compatible con LuaJIT (--with-luajit).

Nota para conexiones ssl: se requiere al menos ngx_lua 0.9.11 o openresty 1.7.4.1, y desafortunadamente solo es compatible con LuaJIT (--with-luajit).

Sinopsis

    server {
        location /test {
            content_by_lua '
                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    {
                        host = "127.0.0.1",
                        port = 9092,

                        -- autenticación opcional
                        sasl_config = {
                            mechanism = "PLAIN",
                            user = "USERNAME",
                            password = "PASSWORD",
                        },
                    },
                }

                local key = "key"
                local message = "halo world"

                -- generalmente no usamos esta biblioteca directamente
                local cli = client:new(broker_list)
                local brokers, partitions = cli:fetch_metadata("test")
                if not brokers then
                    ngx.say("fetch_metadata falló, err:", partitions)
                end
                ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))


                -- productor tipo sync
                local p = producer:new(broker_list)

                local offset, err = p:send("test", key, message)
                if not offset then
                    ngx.say("error al enviar:", err)
                    return
                end
                ngx.say("envío exitoso, offset: ", tonumber(offset))

                -- este es productor tipo async y bp será reutilizado en todo el worker de nginx
                local bp = producer:new(broker_list, { producer_type = "async" })

                local ok, err = bp:send("test", key, message)
                if not ok then
                    ngx.say("error al enviar:", err)
                    return
                end

                ngx.say("envío exitoso, ok:", ok)
            ';
        }
    }

Módulos

resty.kafka.client

Para cargar este módulo, simplemente haz esto

    local client = require "resty.kafka.client"

Métodos

new

syntax: c = client:new(broker_list, client_config)

La broker_list es una lista de brokers, como la siguiente

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // autenticación opcional
        "sasl_config": {
            // mecanismos soportados: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]
* sasl_config

mecanismos soportados: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.

advertencia: SCRAM-SHA-256, SCRAM-SHA-512 necesitan instalar lua-resty-jit-uuid y lua-resty-openssl.

Se puede especificar una tabla opcional client_config. Las siguientes opciones son las siguientes:

configuración del cliente

  • socket_timeout

    Especifica el umbral de tiempo de espera de red en milisegundos. DEBE ser mayor que el request_timeout.

  • keepalive_timeout

    Especifica el tiempo máximo de inactividad (en milisegundos) para la conexión keepalive.

  • keepalive_size

    Especifica el número máximo de conexiones permitidas en el grupo de conexiones para cada worker de Nginx.

  • refresh_interval

    Especifica el tiempo para auto refrescar los metadatos en milisegundos. Los metadatos no se refrescarán automáticamente si es nil.

  • ssl

    Especifica si el cliente debe usar conexión ssl. Por defecto es falso. Ver: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    Especifica si el cliente debe realizar verificación SSL. Por defecto es falso. Ver: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • resolver

    Especifica una función para la resolución de hosts, que devuelve una cadena de IP o nil, para anular el resolvedor de hosts predeterminado del sistema. Por defecto nil, no se realiza resolución. Ejemplo function(host) if host == "some_host" then return "10.11.12.13" end end

fetch_metadata

syntax: brokers, partitions = c:fetch_metadata(topic)

En caso de éxito, devuelve todos los brokers y particiones del topic. En caso de errores, devuelve nil con una cadena que describe el error.

refresh

syntax: brokers, partitions = c:refresh()

Esto refrescará los metadatos de todos los temas que han sido obtenidos por fetch_metadata. En caso de éxito, devuelve todos los brokers y todas las particiones de todos los temas. En caso de errores, devuelve nil con una cadena que describe el error.

choose_api_version

syntax: api_version = c:choose_api_version(api_key, min_version, max_version)

Esto ayuda al cliente a seleccionar la versión correcta del api_key correspondiente a la API.

Cuando se proporcionan min_version y max_version, actuará como un límite y las versiones seleccionadas en el valor de retorno no excederán sus límites sin importar cuán alta o baja sea la versión de API que soporte el broker. Cuando no se proporcionan, seguirá el rango de versiones soportadas por el broker.

Consejo: La estrategia de selección de versiones es elegir la versión máxima dentro del rango permitido.

resty.kafka.producer

Para cargar este módulo, simplemente haz esto

    local producer = require "resty.kafka.producer"

Métodos

new

syntax: p = producer:new(broker_list, producer_config?, cluster_name?)

Se recomienda usar productor tipo async.

broker_list es el mismo que en client.

Se puede especificar una tabla de opciones opcional. Las siguientes opciones son las siguientes:

socket_timeout, keepalive_timeout, keepalive_size, refresh_interval, ssl, ssl_verify son las mismas que en client_config.

configuración del productor, similar a http://kafka.apache.org/documentation.html#producerconfigs

  • producer_type

    Especifica el producer.type. "async" o "sync".

  • request_timeout

    Especifica el request.timeout.ms. Por defecto 2000 ms.

  • required_acks

    Especifica el request.required.acks, NO DEBE ser cero. Por defecto 1.

  • max_retry

    Especifica el message.send.max.retries. Por defecto 3.

  • retry_backoff

    Especifica el retry.backoff.ms. Por defecto 100.

  • api_version

    Especifica la versión de la API de producción. Por defecto 0. Si usas Kafka 0.10.0.0 o superior, api_version puede usar 0, 1 o 2. Si usas Kafka 0.9.x, api_version debe ser 0 o 1. Si usas Kafka 0.8.x, api_version debe ser 0.

  • partitioner

    Especifica el particionador que elige la partición a partir de la clave y el número de particiones. syntax: partitioner = function (key, partition_num, correlation_id) end, el correlation_id es un id de auto incremento en el productor. El particionador predeterminado es:

    local function default_partitioner(key, num, correlation_id)
        local id = key and crc32(key) or correlation_id
        -- partition_id es continuo y comienza desde 0
        return id % num
    end
    

configuración del buffer (solo funciona producer_type = "async")

  • flush_time

    Especifica el queue.buffering.max.ms. Por defecto 1000.

  • batch_num

    Especifica el batch.num.messages. Por defecto 200.

  • batch_size

    Especifica el send.buffer.bytes. Por defecto 1M (puede alcanzar 2M). Ten cuidado, DEBE ser menor que la configuración socket.request.max.bytes / 2 - 10k en el servidor kafka.

  • max_buffering

    Especifica el queue.buffering.max.messages. Por defecto 50,000.

  • error_handle

    Especifica el manejo de errores, maneja datos cuando el buffer envía un error a kafka. syntax: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, los mensajes fallidos en el message_queue son como { key1, msg1, key2, msg2 }, key en el message_queue es una cadena vacía "" incluso si el origen es nil. index es la longitud del message_queue, no se debe usar #message_queue. cuando retryable es true, significa que el servidor kafka seguramente no ha confirmado estos mensajes, puedes reintentar enviar de forma segura; y si no, significa que puede que sí, se recomienda registrar en algún lugar.

  • wait_on_buffer_full

    Especifica si se debe esperar cuando la cola del buffer está llena, por defecto false. Cuando la cola del buffer está llena, si la opción se pasa como true, se utilizará una función de espera de semáforo para bloquear la coroutine hasta que se agote el tiempo o la cola del buffer se haya reducido, de lo contrario, devuelve un error de "desbordamiento de buffer" con false. Nota, no se puede usar en aquellas fases que no soportan yields, es decir, fase de registro.

  • wait_buffer_timeout

    Especifica el tiempo máximo de espera cuando el buffer está lleno, por defecto 5 segundos.

No se soporta compresión ahora.

El tercer cluster_name opcional especifica el nombre del clúster, por defecto 1 (sí, es un número). Puedes especificar diferentes nombres cuando tengas dos o más clústeres de kafka. Y esto solo funciona con async producer_type.

send

syntax: ok, err = p:send(topic, key, message)

  1. En modelo sync

    En caso de éxito, devuelve el offset ( cdata: LL ) del broker y partición actuales. En caso de errores, devuelve nil con una cadena que describe el error.

  2. En modelo async

    El message se escribirá primero en el buffer. Se enviará al servidor kafka cuando el buffer exceda el batch_num, o cada flush_time se vaciará el buffer.

    En caso de éxito, devuelve true. En caso de errores, devuelve nil con una cadena que describe el error (desbordamiento de buffer).

offset

syntax: sum, details = p:offset()

Devuelve la suma de todos los offsets de tema-partición (devueltos por la API ProduceRequest);
y los detalles de cada tema-partición.

flush

syntax: ok = p:flush()

Siempre devuelve true.

resty.kafka.basic-consumer

Para cargar este módulo, simplemente haz esto

    local bconsumer = require "resty.kafka.basic-consumer"

Este módulo es una implementación minimalista de un consumidor, proporcionando la API list_offset para consultar por tiempo o obtener el offset de inicio y fin y la API fetch para obtener mensajes en un tema.

En una sola llamada, solo se puede obtener la información de una sola partición en un solo tema, y la obtención por lotes no está soportada por ahora. El consumidor básico no soporta la API relacionada con grupos de consumidores, por lo que necesitas obtener el mensaje después de obtener el offset a través de la API list_offset, o tu servicio puede gestionar el offset por sí mismo.

Métodos

new

syntax: c = bconsumer:new(broker_list, client_config)

La broker_list es una lista de brokers, como la siguiente

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // autenticación opcional
        "sasl_config": {
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]

Se puede especificar una tabla opcional client_config. Las siguientes opciones son las siguientes:

configuración del cliente

  • socket_timeout

    Especifica el umbral de tiempo de espera de red en milisegundos. DEBE ser mayor que el request_timeout.

  • keepalive_timeout

    Especifica el tiempo máximo de inactividad (en milisegundos) para la conexión keepalive.

  • keepalive_size

    Especifica el número máximo de conexiones permitidas en el grupo de conexiones para cada worker de Nginx.

  • refresh_interval

    Especifica el tiempo para auto refrescar los metadatos en milisegundos. Los metadatos no se refrescarán automáticamente si es nil.

  • ssl

    Especifica si el cliente debe usar conexión ssl. Por defecto es falso. Ver: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    Especifica si el cliente debe realizar verificación SSL. Por defecto es falso. Ver: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • isolation_level Esta configuración controla la visibilidad de los registros transaccionales. Ver: https://kafka.apache.org/protocol.html

  • client_rack

    ID de rack del consumidor que realiza esta solicitud. Ver: https://kafka.apache.org/protocol.html

list_offset

syntax: offset, err = c:list_offset(topic, partition, timestamp)

El parámetro timestamp puede ser un timestamp UNIX o una constante definida en resty.kafka.protocol.consumer, LIST_OFFSET_TIMESTAMP_LAST, LIST_OFFSET_TIMESTAMP_FIRST, LIST_OFFSET_TIMESTAMP_MAX, utilizada para obtener los offsets iniciales y más recientes, etc., semántica con la API ListOffsets en Apache Kafka. Ver: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets

En caso de éxito, devuelve el offset del caso especificado. En caso de errores, devuelve nil con una cadena que describe el error.

fetch

syntax: result, err = c:fetch(topic, partition, offset)

En caso de éxito, devuelve el siguiente result del caso especificado. En caso de errores, devuelve nil con una cadena que describe el error.

El result contendrá más información como los mensajes:

Errores

Cuando llamas a los módulos proporcionados en esta biblioteca, puedes obtener algunos errores. Dependiendo de la fuente, se pueden dividir en las siguientes categorías.

  • Errores de red: como conexión rechazada, tiempo de espera de conexión, etc. Necesitas verificar el estado de conexión de cada servicio en tu entorno.

  • Errores relacionados con metadatos: como que los metadatos o los datos de ApiVersion no se pueden recuperar correctamente; el tema o partición especificados no existen, etc. Necesitas verificar la configuración del Broker de Kafka y del cliente.

  • Error devuelto por Kafka: a veces Kafka incluirá datos de err_code en los datos de respuesta. Cuando ocurre este problema, el err en el valor de retorno se verá así OFFSET_OUT_OF_RANGE, todos los caracteres en mayúsculas y separados por guiones bajos, y en la biblioteca actual proporcionamos una lista de errores de mapeo correspondiente a las descripciones textuales. Para obtener más información sobre estos errores, consulta las descripciones en la documentación de Kafka.

Ver También

GitHub

Puedes encontrar consejos de configuración adicionales y documentación para este módulo en el repositorio de GitHub para nginx-module-kafka.