Pular para conteúdo

kafka: driver cliente Lua kafka para nginx-module-lua baseado na API cosocket

Instalação

Se você ainda não configurou a assinatura do repositório RPM, inscreva-se. Então você pode prosseguir com os seguintes passos.

CentOS/RHEL 7 ou Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-kafka

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-kafka

Para usar esta biblioteca Lua com NGINX, certifique-se de que o nginx-module-lua está instalado.

Este documento descreve lua-resty-kafka v0.23 lançado em 03 de novembro de 2023.


Esta biblioteca Lua é um driver cliente Kafka para o módulo ngx_lua do nginx:

http://wiki.nginx.org/HttpLuaModule

Esta biblioteca Lua aproveita a API cosocket do ngx_lua, que garante um comportamento 100% não bloqueante.

Observe que pelo menos ngx_lua 0.9.3 ou openresty 1.4.3.7 é necessário, e infelizmente apenas o LuaJIT é suportado (--with-luajit).

Nota para conexões ssl: pelo menos ngx_lua 0.9.11 ou openresty 1.7.4.1 é necessário, e infelizmente apenas o LuaJIT é suportado (--with-luajit).

Sinopse

    server {
        location /test {
            content_by_lua '
                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    {
                        host = "127.0.0.1",
                        port = 9092,

                        -- autenticação opcional
                        sasl_config = {
                            mechanism = "PLAIN",
                            user = "USERNAME",
                            password = "PASSWORD",
                        },
                    },
                }

                local key = "key"
                local message = "halo world"

                -- geralmente não usamos esta biblioteca diretamente
                local cli = client:new(broker_list)
                local brokers, partitions = cli:fetch_metadata("test")
                if not brokers then
                    ngx.say("fetch_metadata falhou, err:", partitions)
                end
                ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))

                -- produtor tipo sync
                local p = producer:new(broker_list)

                local offset, err = p:send("test", key, message)
                if not offset then
                    ngx.say("send err:", err)
                    return
                end
                ngx.say("send sucesso, offset: ", tonumber(offset))

                -- este é um produtor tipo async e bp será reutilizado em todo o worker do nginx
                local bp = producer:new(broker_list, { producer_type = "async" })

                local ok, err = bp:send("test", key, message)
                if not ok then
                    ngx.say("send err:", err)
                    return
                end

                ngx.say("send sucesso, ok:", ok)
            ';
        }
    }

Módulos

resty.kafka.client

Para carregar este módulo, basta fazer isso

    local client = require "resty.kafka.client"

Métodos

new

syntax: c = client:new(broker_list, client_config)

O broker_list é uma lista de brokers, como abaixo

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // autenticação opcional
        "sasl_config": {
            // mecanismos suportados: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]
* sasl_config

mecanismos suportados: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.

aviso: SCRAM-SHA-256, SCRAM-SHA-512 precisam instalar lua-resty-jit-uuid e lua-resty-openssl

Uma tabela opcional client_config pode ser especificada. As seguintes opções são as seguintes:

configuração do cliente

  • socket_timeout

    Especifica o limite de tempo de rede em milissegundos. DEVE ser maior que o request_timeout.

  • keepalive_timeout

    Especifica o tempo máximo ocioso (em milissegundos) para a conexão keepalive.

  • keepalive_size

    Especifica o número máximo de conexões permitidas no pool de conexões para cada worker do Nginx.

  • refresh_interval

    Especifica o tempo para atualizar automaticamente os metadados em milissegundos. Os metadados não serão atualizados automaticamente se forem nil.

  • ssl

    Especifica se o cliente deve usar conexão ssl. O padrão é falso. Veja: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    Especifica se o cliente deve realizar a verificação SSL. O padrão é falso. Veja: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • resolver

    Especifica uma função para resolução de host, que retorna uma string de IP ou nil, para substituir o resolvedor de host padrão do sistema. Padrão nil, nenhuma resolução realizada. Exemplo function(host) if host == "some_host" then return "10.11.12.13" end end

fetch_metadata

syntax: brokers, partitions = c:fetch_metadata(topic)

Em caso de sucesso, retorna todos os brokers e partições do topic. Em caso de erro, retorna nil com uma string descrevendo o erro.

refresh

syntax: brokers, partitions = c:refresh()

Isso atualizará os metadados de todos os tópicos que foram buscados por fetch_metadata. Em caso de sucesso, retorna todos os brokers e todas as partições de todos os tópicos. Em caso de erro, retorna nil com uma string descrevendo o erro.

choose_api_version

syntax: api_version = c:choose_api_version(api_key, min_version, max_version)

Isso ajuda o cliente a selecionar a versão correta do api_key correspondente à API.

Quando min_version e max_version são fornecidos, eles atuarão como um limite e as versões selecionadas no valor de retorno não excederão seus limites, não importa quão alta ou baixa a versão da API que o broker suporta. Quando não são fornecidos, seguirá a faixa de versões suportadas pelo broker.

Dica: A estratégia de seleção de versão é escolher a versão máxima dentro da faixa permitida.

resty.kafka.producer

Para carregar este módulo, basta fazer isso

    local producer = require "resty.kafka.producer"

Métodos

new

syntax: p = producer:new(broker_list, producer_config?, cluster_name?)

É recomendável usar o tipo de produtor async.

broker_list é o mesmo que em client

Uma tabela de opções opcional pode ser especificada. As seguintes opções são as seguintes:

socket_timeout, keepalive_timeout, keepalive_size, refresh_interval, ssl, ssl_verify são as mesmas que em client_config

configuração do produtor, mais como em http://kafka.apache.org/documentation.html#producerconfigs

  • producer_type

    Especifica o producer.type. "async" ou "sync"

  • request_timeout

    Especifica o request.timeout.ms. Padrão 2000 ms

  • required_acks

    Especifica o request.required.acks, NÃO DEVE ser zero. Padrão 1.

  • max_retry

    Especifica o message.send.max.retries. Padrão 3.

  • retry_backoff

    Especifica o retry.backoff.ms. Padrão 100.

  • api_version

    Especifica a versão da API de produção. Padrão 0. Se você usar Kafka 0.10.0.0 ou superior, api_version pode usar 0, 1 ou 2. Se você usar Kafka 0.9.x, api_version deve ser 0 ou 1. Se você usar Kafka 0.8.x, api_version deve ser 0.

  • partitioner

    Especifica o particionador que escolhe a partição a partir da chave e do número de partições. syntax: partitioner = function (key, partition_num, correlation_id) end, o correlation_id é um id auto-incremental no produtor. O particionador padrão é:

    local function default_partitioner(key, num, correlation_id)
        local id = key and crc32(key) or correlation_id
        -- partition_id é contínuo e começa de 0
        return id % num
    end
    

configuração do buffer (funciona apenas com producer_type = "async")

  • flush_time

    Especifica o queue.buffering.max.ms. Padrão 1000.

  • batch_num

    Especifica o batch.num.messages. Padrão 200.

  • batch_size

    Especifica o send.buffer.bytes. Padrão 1M (pode chegar a 2M). Cuidado, DEVE ser menor que a configuração socket.request.max.bytes / 2 - 10k no servidor kafka.

  • max_buffering

    Especifica o queue.buffering.max.messages. Padrão 50.000.

  • error_handle

    Especifica o tratamento de erro, trata dados quando o buffer envia para kafka erro. syntax: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, as mensagens falhadas na message_queue são como { key1, msg1, key2, msg2 }, key na message_queue é uma string vazia "" mesmo se a origem for nil. index é o comprimento da message_queue, não deve usar #message_queue. quando retryable é true, isso significa que o servidor kafka certamente não cometeu essas mensagens, você pode tentar enviar novamente com segurança; e caso contrário, significa que talvez, recomenda-se registrar em algum lugar.

  • wait_on_buffer_full

    Especifica se deve esperar quando a fila do buffer estiver cheia, Padrão false. Quando a fila do buffer estiver cheia, se a opção passada for true, usará a função de espera do semáforo para bloquear a coroutine até o tempo limite ou a fila do buffer ter diminuído, Caso contrário, retornará um erro "buffer overflow" com false. Observe que não pode ser usado em fases que não suportam yields, ou seja, fase de log.

  • wait_buffer_timeout

    Especifica o tempo máximo de espera quando o buffer está cheio, Padrão 5 segundos.

Não suporta compressão agora.

O terceiro cluster_name opcional especifica o nome do cluster, padrão 1 (sim, é um número). Você pode especificar nomes diferentes quando tiver dois ou mais clusters kafka. E isso só funciona com async producer_type.

send

syntax: ok, err = p:send(topic, key, message)

  1. No modelo sync

    Em caso de sucesso, retorna o offset ( cdata: LL ) do broker e partição atuais. Em caso de erro, retorna nil com uma string descrevendo o erro.

  2. No modelo async

    A message será escrita no buffer primeiro. Ela será enviada para o servidor kafka quando o buffer exceder o batch_num, ou a cada flush_time para limpar o buffer.

    Em caso de sucesso, retorna true. Em caso de erro, retorna nil com uma string descrevendo o erro (buffer overflow).

offset

syntax: sum, details = p:offset()

Retorna a soma de todos os offsets de tópico-partição (retornados pela API ProduceRequest);
e os detalhes de cada tópico-partição.

flush

syntax: ok = p:flush()

Sempre retorna true.

resty.kafka.basic-consumer

Para carregar este módulo, basta fazer isso

    local bconsumer = require "resty.kafka.basic-consumer"

Este módulo é uma implementação minimalista de um consumidor, fornecendo a API list_offset para consulta por tempo ou obtenção do offset inicial e final e a API fetch para obter mensagens em um tópico.

Em uma única chamada, apenas a informação de uma única partição em um único tópico pode ser buscada, e a busca em lote não é suportada por enquanto. O consumidor básico não suporta a API relacionada ao grupo de consumidores, então você precisa buscar a mensagem após obter o offset através da API list_offset, ou seu serviço pode gerenciar o offset por conta própria.

Métodos

new

syntax: c = bconsumer:new(broker_list, client_config)

O broker_list é uma lista de brokers, como abaixo

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // autenticação opcional
        "sasl_config": {
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]

Uma tabela opcional client_config pode ser especificada. As seguintes opções são as seguintes:

configuração do cliente

  • socket_timeout

    Especifica o limite de tempo de rede em milissegundos. DEVE ser maior que o request_timeout.

  • keepalive_timeout

    Especifica o tempo máximo ocioso (em milissegundos) para a conexão keepalive.

  • keepalive_size

    Especifica o número máximo de conexões permitidas no pool de conexões para cada worker do Nginx.

  • refresh_interval

    Especifica o tempo para atualizar automaticamente os metadados em milissegundos. Os metadados não serão atualizados automaticamente se forem nil.

  • ssl

    Especifica se o cliente deve usar conexão ssl. O padrão é falso. Veja: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    Especifica se o cliente deve realizar a verificação SSL. O padrão é falso. Veja: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • isolation_level Esta configuração controla a visibilidade dos registros transacionais. Veja: https://kafka.apache.org/protocol.html

  • client_rack

    ID do rack do consumidor que faz esta solicitação. Veja: https://kafka.apache.org/protocol.html

list_offset

syntax: offset, err = c:list_offset(topic, partition, timestamp)

O parâmetro timestamp pode ser um timestamp UNIX ou uma constante definida em resty.kafka.protocol.consumer, LIST_OFFSET_TIMESTAMP_LAST, LIST_OFFSET_TIMESTAMP_FIRST, LIST_OFFSET_TIMESTAMP_MAX, usada para obter os offsets iniciais e mais recentes, etc., semântica com a API ListOffsets no Apache Kafka. Veja: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets

Em caso de sucesso, retorna o offset do caso especificado. Em caso de erro, retorna nil com uma string descrevendo o erro.

fetch

syntax: result, err = c:fetch(topic, partition, offset)

Em caso de sucesso, retorna o seguinte result do caso especificado. Em caso de erro, retorna nil com uma string descrevendo o erro.

O result conterá mais informações, como as mensagens:

Erros

Quando você chama os módulos fornecidos nesta biblioteca, pode receber alguns erros. Dependendo da fonte, eles podem ser divididos nas seguintes categorias.

  • Erros de rede: como conexão rejeitada, tempo de conexão excedido, etc. Você precisa verificar o status da conexão de cada serviço em seu ambiente.

  • Erros relacionados a metadados: como metadados ou dados ApiVersion não podem ser recuperados corretamente; o tópico ou partição especificados não existem, etc. Você precisa verificar a configuração do Broker Kafka e do cliente.

  • Erro retornado pelo Kafka: às vezes o Kafka incluirá dados err_code nos dados de resposta. Quando esse problema ocorre, o err no valor de retorno se parece com isso OFFSET_OUT_OF_RANGE, todos os caracteres em maiúsculas e separados por sublinhados, e na biblioteca atual fornecemos uma lista de erros de mapeamento correspondente às descrições textuais. Para saber mais sobre esses erros, veja as descrições na documentação do Kafka.

Veja Também

GitHub

Você pode encontrar dicas adicionais de configuração e documentação para este módulo no repositório GitHub do nginx-module-kafka.