kafka: driver cliente Lua kafka para nginx-module-lua baseado na API cosocket
Instalação
Se você ainda não configurou a assinatura do repositório RPM, inscreva-se. Então você pode prosseguir com os seguintes passos.
CentOS/RHEL 7 ou Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-kafka
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-kafka
Para usar esta biblioteca Lua com NGINX, certifique-se de que o nginx-module-lua está instalado.
Este documento descreve lua-resty-kafka v0.23 lançado em 03 de novembro de 2023.
Esta biblioteca Lua é um driver cliente Kafka para o módulo ngx_lua do nginx:
http://wiki.nginx.org/HttpLuaModule
Esta biblioteca Lua aproveita a API cosocket do ngx_lua, que garante um comportamento 100% não bloqueante.
Observe que pelo menos ngx_lua 0.9.3 ou openresty 1.4.3.7 é necessário, e infelizmente apenas o LuaJIT é suportado (--with-luajit).
Nota para conexões ssl: pelo menos ngx_lua 0.9.11 ou openresty 1.7.4.1 é necessário, e infelizmente apenas o LuaJIT é suportado (--with-luajit).
Sinopse
server {
location /test {
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{
host = "127.0.0.1",
port = 9092,
-- autenticação opcional
sasl_config = {
mechanism = "PLAIN",
user = "USERNAME",
password = "PASSWORD",
},
},
}
local key = "key"
local message = "halo world"
-- geralmente não usamos esta biblioteca diretamente
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata falhou, err:", partitions)
end
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- produtor tipo sync
local p = producer:new(broker_list)
local offset, err = p:send("test", key, message)
if not offset then
ngx.say("send err:", err)
return
end
ngx.say("send sucesso, offset: ", tonumber(offset))
-- este é um produtor tipo async e bp será reutilizado em todo o worker do nginx
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test", key, message)
if not ok then
ngx.say("send err:", err)
return
end
ngx.say("send sucesso, ok:", ok)
';
}
}
Módulos
resty.kafka.client
Para carregar este módulo, basta fazer isso
local client = require "resty.kafka.client"
Métodos
new
syntax: c = client:new(broker_list, client_config)
O broker_list é uma lista de brokers, como abaixo
[
{
"host": "127.0.0.1",
"port": 9092,
// autenticação opcional
"sasl_config": {
// mecanismos suportados: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
sasl_config
mecanismos suportados: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.
aviso: SCRAM-SHA-256, SCRAM-SHA-512 precisam instalar lua-resty-jit-uuid e lua-resty-openssl
Uma tabela opcional client_config pode ser especificada. As seguintes opções são as seguintes:
configuração do cliente
-
socket_timeoutEspecifica o limite de tempo de rede em milissegundos. DEVE ser maior que o
request_timeout. -
keepalive_timeoutEspecifica o tempo máximo ocioso (em milissegundos) para a conexão keepalive.
-
keepalive_sizeEspecifica o número máximo de conexões permitidas no pool de conexões para cada worker do Nginx.
-
refresh_intervalEspecifica o tempo para atualizar automaticamente os metadados em milissegundos. Os metadados não serão atualizados automaticamente se forem nil.
-
sslEspecifica se o cliente deve usar conexão ssl. O padrão é falso. Veja: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
ssl_verifyEspecifica se o cliente deve realizar a verificação SSL. O padrão é falso. Veja: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
resolverEspecifica uma função para resolução de host, que retorna uma string de IP ou
nil, para substituir o resolvedor de host padrão do sistema. Padrãonil, nenhuma resolução realizada. Exemplofunction(host) if host == "some_host" then return "10.11.12.13" end end
fetch_metadata
syntax: brokers, partitions = c:fetch_metadata(topic)
Em caso de sucesso, retorna todos os brokers e partições do topic.
Em caso de erro, retorna nil com uma string descrevendo o erro.
refresh
syntax: brokers, partitions = c:refresh()
Isso atualizará os metadados de todos os tópicos que foram buscados por fetch_metadata.
Em caso de sucesso, retorna todos os brokers e todas as partições de todos os tópicos.
Em caso de erro, retorna nil com uma string descrevendo o erro.
choose_api_version
syntax: api_version = c:choose_api_version(api_key, min_version, max_version)
Isso ajuda o cliente a selecionar a versão correta do api_key correspondente à API.
Quando min_version e max_version são fornecidos, eles atuarão como um limite e as versões selecionadas no valor de retorno não excederão seus limites, não importa quão alta ou baixa a versão da API que o broker suporta. Quando não são fornecidos, seguirá a faixa de versões suportadas pelo broker.
Dica: A estratégia de seleção de versão é escolher a versão máxima dentro da faixa permitida.
resty.kafka.producer
Para carregar este módulo, basta fazer isso
local producer = require "resty.kafka.producer"
Métodos
new
syntax: p = producer:new(broker_list, producer_config?, cluster_name?)
É recomendável usar o tipo de produtor async.
broker_list é o mesmo que em client
Uma tabela de opções opcional pode ser especificada. As seguintes opções são as seguintes:
socket_timeout, keepalive_timeout, keepalive_size, refresh_interval, ssl, ssl_verify são as mesmas que em client_config
configuração do produtor, mais como em http://kafka.apache.org/documentation.html#producerconfigs
-
producer_typeEspecifica o
producer.type. "async" ou "sync" -
request_timeoutEspecifica o
request.timeout.ms. Padrão2000 ms -
required_acksEspecifica o
request.required.acks, NÃO DEVE ser zero. Padrão1. -
max_retryEspecifica o
message.send.max.retries. Padrão3. -
retry_backoffEspecifica o
retry.backoff.ms. Padrão100. -
api_versionEspecifica a versão da API de produção. Padrão
0. Se você usar Kafka 0.10.0.0 ou superior,api_versionpode usar0,1ou2. Se você usar Kafka 0.9.x,api_versiondeve ser0ou1. Se você usar Kafka 0.8.x,api_versiondeve ser0. -
partitionerEspecifica o particionador que escolhe a partição a partir da chave e do número de partições.
syntax: partitioner = function (key, partition_num, correlation_id) end, o correlation_id é um id auto-incremental no produtor. O particionador padrão é:local function default_partitioner(key, num, correlation_id) local id = key and crc32(key) or correlation_id -- partition_id é contínuo e começa de 0 return id % num end
configuração do buffer (funciona apenas com producer_type = "async")
-
flush_timeEspecifica o
queue.buffering.max.ms. Padrão1000. -
batch_numEspecifica o
batch.num.messages. Padrão200. -
batch_sizeEspecifica o
send.buffer.bytes. Padrão1M(pode chegar a 2M). Cuidado, DEVE ser menor que a configuraçãosocket.request.max.bytes / 2 - 10kno servidor kafka. -
max_bufferingEspecifica o
queue.buffering.max.messages. Padrão50.000. -
error_handleEspecifica o tratamento de erro, trata dados quando o buffer envia para kafka erro.
syntax: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, as mensagens falhadas na message_queue são como{ key1, msg1, key2, msg2 },keyna message_queue é uma string vazia""mesmo se a origem fornil.indexé o comprimento da message_queue, não deve usar#message_queue. quandoretryableétrue, isso significa que o servidor kafka certamente não cometeu essas mensagens, você pode tentar enviar novamente com segurança; e caso contrário, significa que talvez, recomenda-se registrar em algum lugar. -
wait_on_buffer_fullEspecifica se deve esperar quando a fila do buffer estiver cheia, Padrão
false. Quando a fila do buffer estiver cheia, se a opção passada fortrue, usará a função de espera do semáforo para bloquear a coroutine até o tempo limite ou a fila do buffer ter diminuído, Caso contrário, retornará um erro "buffer overflow" comfalse. Observe que não pode ser usado em fases que não suportam yields, ou seja, fase de log. -
wait_buffer_timeoutEspecifica o tempo máximo de espera quando o buffer está cheio, Padrão
5segundos.
Não suporta compressão agora.
O terceiro cluster_name opcional especifica o nome do cluster, padrão 1 (sim, é um número). Você pode especificar nomes diferentes quando tiver dois ou mais clusters kafka. E isso só funciona com async producer_type.
send
syntax: ok, err = p:send(topic, key, message)
-
No modelo sync
Em caso de sucesso, retorna o offset ( cdata: LL ) do broker e partição atuais. Em caso de erro, retorna
nilcom uma string descrevendo o erro. -
No modelo async
A
messageserá escrita no buffer primeiro. Ela será enviada para o servidor kafka quando o buffer exceder obatch_num, ou a cadaflush_timepara limpar o buffer.Em caso de sucesso, retorna
true. Em caso de erro, retornanilcom uma string descrevendo o erro (buffer overflow).
offset
syntax: sum, details = p:offset()
Retorna a soma de todos os offsets de tópico-partição (retornados pela API ProduceRequest);
e os detalhes de cada tópico-partição.
flush
syntax: ok = p:flush()
Sempre retorna true.
resty.kafka.basic-consumer
Para carregar este módulo, basta fazer isso
local bconsumer = require "resty.kafka.basic-consumer"
Este módulo é uma implementação minimalista de um consumidor, fornecendo a API list_offset para consulta por tempo ou obtenção do offset inicial e final e a API fetch para obter mensagens em um tópico.
Em uma única chamada, apenas a informação de uma única partição em um único tópico pode ser buscada, e a busca em lote não é suportada por enquanto. O consumidor básico não suporta a API relacionada ao grupo de consumidores, então você precisa buscar a mensagem após obter o offset através da API list_offset, ou seu serviço pode gerenciar o offset por conta própria.
Métodos
new
syntax: c = bconsumer:new(broker_list, client_config)
O broker_list é uma lista de brokers, como abaixo
[
{
"host": "127.0.0.1",
"port": 9092,
// autenticação opcional
"sasl_config": {
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
Uma tabela opcional client_config pode ser especificada. As seguintes opções são as seguintes:
configuração do cliente
-
socket_timeoutEspecifica o limite de tempo de rede em milissegundos. DEVE ser maior que o
request_timeout. -
keepalive_timeoutEspecifica o tempo máximo ocioso (em milissegundos) para a conexão keepalive.
-
keepalive_sizeEspecifica o número máximo de conexões permitidas no pool de conexões para cada worker do Nginx.
-
refresh_intervalEspecifica o tempo para atualizar automaticamente os metadados em milissegundos. Os metadados não serão atualizados automaticamente se forem nil.
-
sslEspecifica se o cliente deve usar conexão ssl. O padrão é falso. Veja: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
ssl_verifyEspecifica se o cliente deve realizar a verificação SSL. O padrão é falso. Veja: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
isolation_levelEsta configuração controla a visibilidade dos registros transacionais. Veja: https://kafka.apache.org/protocol.html -
client_rackID do rack do consumidor que faz esta solicitação. Veja: https://kafka.apache.org/protocol.html
list_offset
syntax: offset, err = c:list_offset(topic, partition, timestamp)
O parâmetro timestamp pode ser um timestamp UNIX ou uma constante definida em resty.kafka.protocol.consumer, LIST_OFFSET_TIMESTAMP_LAST, LIST_OFFSET_TIMESTAMP_FIRST, LIST_OFFSET_TIMESTAMP_MAX, usada para obter os offsets iniciais e mais recentes, etc., semântica com a API ListOffsets no Apache Kafka. Veja: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets
Em caso de sucesso, retorna o offset do caso especificado.
Em caso de erro, retorna nil com uma string descrevendo o erro.
fetch
syntax: result, err = c:fetch(topic, partition, offset)
Em caso de sucesso, retorna o seguinte result do caso especificado.
Em caso de erro, retorna nil com uma string descrevendo o erro.
O result conterá mais informações, como as mensagens:
-
recordsA tabela contendo o conteúdo da mensagem.
-
errcodeO código de erro da API Fetch. Veja: https://kafka.apache.org/protocol.html#protocol_error_codes
-
high_watermarkO high watermark da API Fetch. Veja: https://kafka.apache.org/protocol.html#The_Messages_Fetch
-
last_stable_offsetO último offset estável da API Fetch. O conteúdo depende da versão da API, pode ser nil. Veja: https://kafka.apache.org/protocol.html#The_Messages_Fetch que responde a versão da API acima de v4
-
log_start_offsetO offset de início do log da API Fetch. O conteúdo depende da versão da API, pode ser nil. Veja: https://kafka.apache.org/protocol.html#The_Messages_Fetch que responde a versão da API acima de v5
-
aborted_transactionsAs transações abortadas da API Fetch. O conteúdo depende da versão da API, pode ser nil. Veja: https://kafka.apache.org/protocol.html#The_Messages_Fetch que responde a versão da API acima de v4
-
preferred_read_replicaA réplica de leitura preferida da API Fetch. O conteúdo depende da versão da API, pode ser nil. Veja: https://kafka.apache.org/protocol.html#The_Messages_Fetch que responde a versão da API acima de v11
Erros
Quando você chama os módulos fornecidos nesta biblioteca, pode receber alguns erros. Dependendo da fonte, eles podem ser divididos nas seguintes categorias.
-
Erros de rede: como conexão rejeitada, tempo de conexão excedido, etc. Você precisa verificar o status da conexão de cada serviço em seu ambiente.
-
Erros relacionados a metadados: como metadados ou dados ApiVersion não podem ser recuperados corretamente; o tópico ou partição especificados não existem, etc. Você precisa verificar a configuração do Broker Kafka e do cliente.
-
Erro retornado pelo Kafka: às vezes o Kafka incluirá dados err_code nos dados de resposta. Quando esse problema ocorre, o
errno valor de retorno se parece com issoOFFSET_OUT_OF_RANGE, todos os caracteres em maiúsculas e separados por sublinhados, e na biblioteca atual fornecemos uma lista de erros de mapeamento correspondente às descrições textuais. Para saber mais sobre esses erros, veja as descrições na documentação do Kafka.
Veja Também
- o módulo ngx_lua: http://wiki.nginx.org/HttpLuaModule
- o protocolo kafka: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
- a biblioteca lua-resty-redis
- a biblioteca lua-resty-logger-socket
- o sarama
GitHub
Você pode encontrar dicas adicionais de configuração e documentação para este módulo no repositório GitHub do nginx-module-kafka.