kafka: Lua Kafka-Client-Treiber für nginx-module-lua basierend auf der cosocket API
Installation
Wenn Sie das RPM-Repository-Abonnement noch nicht eingerichtet haben, melden Sie sich an. Dann können Sie mit den folgenden Schritten fortfahren.
CentOS/RHEL 7 oder Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-kafka
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-kafka
Um diese Lua-Bibliothek mit NGINX zu verwenden, stellen Sie sicher, dass nginx-module-lua installiert ist.
Dieses Dokument beschreibt lua-resty-kafka v0.23, veröffentlicht am 03. Nov 2023.
Diese Lua-Bibliothek ist ein Kafka-Client-Treiber für das ngx_lua NGINX-Modul:
http://wiki.nginx.org/HttpLuaModule
Diese Lua-Bibliothek nutzt die cosocket API von ngx_lua, die 100% nicht-blockierendes Verhalten gewährleistet.
Bitte beachten Sie, dass mindestens ngx_lua 0.9.3 oder openresty 1.4.3.7 erforderlich ist, und leider wird nur LuaJIT unterstützt (--with-luajit).
Hinweis für ssl-Verbindungen: Mindestens ngx_lua 0.9.11 oder openresty 1.7.4.1 ist erforderlich, und leider wird nur LuaJIT unterstützt (--with-luajit).
Synopsis
server {
location /test {
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{
host = "127.0.0.1",
port = 9092,
-- optionale Authentifizierung
sasl_config = {
mechanism = "PLAIN",
user = "USERNAME",
password = "PASSWORD",
},
},
}
local key = "key"
local message = "halo world"
-- normalerweise verwenden wir diese Bibliothek nicht direkt
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata fehlgeschlagen, Fehler:", partitions)
end
ngx.say("Brokers: ", cjson.encode(brokers), "; Partitionen: ", cjson.encode(partitions))
-- sync producer_type
local p = producer:new(broker_list)
local offset, err = p:send("test", key, message)
if not offset then
ngx.say("send Fehler:", err)
return
end
ngx.say("send erfolgreich, offset: ", tonumber(offset))
-- dies ist der asynchrone producer_type und bp wird im gesamten NGINX-Worker wiederverwendet
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test", key, message)
if not ok then
ngx.say("send Fehler:", err)
return
end
ngx.say("send erfolgreich, ok:", ok)
';
}
}
Module
resty.kafka.client
Um dieses Modul zu laden, tun Sie einfach dies
local client = require "resty.kafka.client"
Methoden
new
syntax: c = client:new(broker_list, client_config)
Die broker_list ist eine Liste von Brokern, wie unten
[
{
"host": "127.0.0.1",
"port": 9092,
// optionale Authentifizierung
"sasl_config": {
// unterstützte Mechanismen: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
sasl_config
unterstützte Mechanismen: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.
Warnung: SCRAM-SHA-256, SCRAM-SHA-512 benötigen die Installation von lua-resty-jit-uuid und lua-resty-openssl.
Eine optionale client_config-Tabelle kann angegeben werden. Die folgenden Optionen sind wie folgt:
Client-Konfiguration
-
socket_timeoutGibt die Netzwerk-Timeout-Schwelle in Millisekunden an. Sollte größer sein als das
request_timeout. -
keepalive_timeoutGibt das maximale Leerlauf-Timeout (in Millisekunden) für die Keepalive-Verbindung an.
-
keepalive_sizeGibt die maximale Anzahl von Verbindungen an, die im Verbindungs-Pool für jeden NGINX-Worker erlaubt sind.
-
refresh_intervalGibt die Zeit an, um die Metadaten automatisch in Millisekunden zu aktualisieren. Dann werden die Metadaten nicht automatisch aktualisiert, wenn sie nil sind.
-
sslGibt an, ob der Client eine SSL-Verbindung verwenden soll. Standardmäßig false. Siehe: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
ssl_verifyGibt an, ob der Client eine SSL-Überprüfung durchführen soll. Standardmäßig false. Siehe: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
resolverGibt eine Funktion zur Hostauflösung an, die eine IP-Adresse oder
nilzurückgibt, um den systemeigenen Host-Resolver zu überschreiben. Standardnil, keine Auflösung durchgeführt. Beispielfunction(host) if host == "some_host" then return "10.11.12.13" end end
fetch_metadata
syntax: brokers, partitions = c:fetch_metadata(topic)
Im Erfolgsfall gibt es alle Broker und Partitionen des topic zurück.
Im Fehlerfall gibt es nil mit einer Zeichenkette zurück, die den Fehler beschreibt.
refresh
syntax: brokers, partitions = c:refresh()
Dies wird die Metadaten aller Themen aktualisieren, die durch fetch_metadata abgerufen wurden.
Im Erfolgsfall gibt es alle Broker und alle Partitionen aller Themen zurück.
Im Fehlerfall gibt es nil mit einer Zeichenkette zurück, die den Fehler beschreibt.
choose_api_version
syntax: api_version = c:choose_api_version(api_key, min_version, max_version)
Dies hilft dem Client, die korrekte Version des api_key auszuwählen, die der API entspricht.
Wenn min_version und max_version angegeben sind, fungiert dies als Grenze, und die ausgewählten Versionen im Rückgabewert überschreiten ihre Grenzen nicht, egal wie hoch oder niedrig der Broker die API-Version unterstützt. Wenn sie nicht angegeben sind, folgt es dem Bereich der vom Broker unterstützten Versionen.
Tipp: Die Versionsauswahlstrategie besteht darin, die maximale Version innerhalb des erlaubten Bereichs auszuwählen.
resty.kafka.producer
Um dieses Modul zu laden, tun Sie einfach dies
local producer = require "resty.kafka.producer"
Methoden
new
syntax: p = producer:new(broker_list, producer_config?, cluster_name?)
Es wird empfohlen, den asynchronen producer_type zu verwenden.
broker_list ist dasselbe wie in client.
Eine optionale Optionen-Tabelle kann angegeben werden. Die folgenden Optionen sind wie folgt:
socket_timeout, keepalive_timeout, keepalive_size, refresh_interval, ssl, ssl_verify sind dieselben wie in client_config.
Produzenten-Konfiguration, ähnlich wie in http://kafka.apache.org/documentation.html#producerconfigs
-
producer_typeGibt den
producer.typean. "async" oder "sync". -
request_timeoutGibt das
request.timeout.msan. Standard2000 ms. -
required_acksGibt die
request.required.acksan, sollte nicht null sein. Standard1. -
max_retryGibt die
message.send.max.retriesan. Standard3. -
retry_backoffGibt die
retry.backoff.msan. Standard100. -
api_versionGibt die Produzenten-API-Version an. Standard
0. Wenn Sie Kafka 0.10.0.0 oder höher verwenden, kannapi_version0,1oder2verwenden. Wenn Sie Kafka 0.9.x verwenden, sollteapi_version0oder1sein. Wenn Sie Kafka 0.8.x verwenden, sollteapi_version0sein. -
partitionerGibt den Partitioner an, der die Partition aus dem Schlüssel und der Partitionierungsnummer auswählt.
syntax: partitioner = function (key, partition_num, correlation_id) end, die correlation_id ist eine automatisch inkrementierende ID im Produzenten. Der Standard-Partitioner ist:local function default_partitioner(key, num, correlation_id) local id = key and crc32(key) or correlation_id -- partition_id ist kontinuierlich und beginnt bei 0 return id % num end
Pufferkonfiguration (funktioniert nur producer_type = "async")
-
flush_timeGibt das
queue.buffering.max.msan. Standard1000. -
batch_numGibt die
batch.num.messagesan. Standard200. -
batch_sizeGibt die
send.buffer.bytesan. Standard1M(kann 2M erreichen). Seien Sie vorsichtig, sollte kleiner sein als die Konfigurationsocket.request.max.bytes / 2 - 10kim Kafka-Server. -
max_bufferingGibt die
queue.buffering.max.messagesan. Standard50.000. -
error_handleGibt den Fehlerbehandler an, der Daten verarbeitet, wenn der Puffer an Kafka sendet, fehlschlägt.
syntax: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, die fehlgeschlagenen Nachrichten in der message_queue sind wie{ key1, msg1, key2, msg2 },keyin der message_queue ist ein leerer String"", auch wenn der Ursprungnilist.indexist die Länge der message_queue, sollte nicht#message_queueverwendet werden. Wennretryabletrueist, bedeutet das, dass der Kafka-Server diese Nachrichten sicher nicht bestätigt hat, Sie können sicher erneut senden; und andernfalls bedeutet es vielleicht, dass es empfohlen wird, irgendwo zu protokollieren. -
wait_on_buffer_fullGibt an, ob gewartet werden soll, wenn die Pufferwarteschlange voll ist. Standard
false. Wenn die Pufferwarteschlange voll ist, wenn die Optiontrueübergeben wird, wird die Semaphore-Wartefunktion verwendet, um die Coroutine zu blockieren, bis das Timeout erreicht ist oder die Pufferwarteschlange verringert wird, andernfalls wird ein "Pufferüberlauf"-Fehler mitfalsezurückgegeben. Hinweis: Es kann nicht in Phasen verwendet werden, die keine Yield-Unterstützung haben, d.h. Protokollphase. -
wait_buffer_timeoutGibt die maximale Wartezeit an, wenn der Puffer voll ist. Standard
5Sekunden.
Derzeit wird keine Kompression unterstützt.
Der dritte optionale cluster_name gibt den Namen des Clusters an, Standard 1 (ja, es ist eine Zahl). Sie können unterschiedliche Namen angeben, wenn Sie zwei oder mehr Kafka-Cluster haben. Und dies funktioniert nur mit async producer_type.
send
syntax: ok, err = p:send(topic, key, message)
-
Im synchronen Modell
Im Erfolgsfall gibt es den Offset ( cdata: LL ) des aktuellen Brokers und der Partition zurück. Im Fehlerfall gibt es
nilmit einer Zeichenkette zurück, die den Fehler beschreibt. -
Im asynchronen Modell
Die
messagewird zuerst in den Puffer geschrieben. Sie wird an den Kafka-Server gesendet, wenn der Puffer diebatch_numüberschreitet, oder alleflush_timewird der Puffer geleert.Im Erfolgsfall gibt es
truezurück. Im Fehlerfall gibt esnilmit einer Zeichenkette zurück, die den Fehler beschreibt (Pufferüberlauf).
offset
syntax: sum, details = p:offset()
Gibt die Summe aller Topic-Partition-Offsets zurück (von der ProduceRequest-API zurückgegeben);
und die Details jeder Topic-Partition.
flush
syntax: ok = p:flush()
Gibt immer true zurück.
resty.kafka.basic-consumer
Um dieses Modul zu laden, tun Sie einfach dies
local bconsumer = require "resty.kafka.basic-consumer"
Dieses Modul ist eine minimalistische Implementierung eines Consumers, der die list_offset API zum Abfragen nach Zeit oder zum Abrufen des Start- und End-Offsets und die fetch API zum Abrufen von Nachrichten in einem Thema bereitstellt.
In einem einzelnen Aufruf kann nur die Information einer einzelnen Partition in einem einzelnen Thema abgerufen werden, und das Batch-Abrufen wird derzeit nicht unterstützt. Der grundlegende Consumer unterstützt nicht die API, die mit der Consumer-Gruppe verbunden ist, sodass Sie die Nachricht abrufen müssen, nachdem Sie den Offset über die list_offset API erhalten haben, oder Ihr Dienst kann den Offset selbst verwalten.
Methoden
new
syntax: c = bconsumer:new(broker_list, client_config)
Die broker_list ist eine Liste von Brokern, wie unten
[
{
"host": "127.0.0.1",
"port": 9092,
// optionale Authentifizierung
"sasl_config": {
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
Eine optionale client_config-Tabelle kann angegeben werden. Die folgenden Optionen sind wie folgt:
Client-Konfiguration
-
socket_timeoutGibt die Netzwerk-Timeout-Schwelle in Millisekunden an. Sollte größer sein als das
request_timeout. -
keepalive_timeoutGibt das maximale Leerlauf-Timeout (in Millisekunden) für die Keepalive-Verbindung an.
-
keepalive_sizeGibt die maximale Anzahl von Verbindungen an, die im Verbindungs-Pool für jeden NGINX-Worker erlaubt sind.
-
refresh_intervalGibt die Zeit an, um die Metadaten automatisch in Millisekunden zu aktualisieren. Dann werden die Metadaten nicht automatisch aktualisiert, wenn sie nil sind.
-
sslGibt an, ob der Client eine SSL-Verbindung verwenden soll. Standardmäßig false. Siehe: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
ssl_verifyGibt an, ob der Client eine SSL-Überprüfung durchführen soll. Standardmäßig false. Siehe: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
isolation_levelDiese Einstellung steuert die Sichtbarkeit von Transaktionsdatensätzen. Siehe: https://kafka.apache.org/protocol.html -
client_rackRack-ID des Consumers, der diese Anfrage stellt. Siehe: https://kafka.apache.org/protocol.html
list_offset
syntax: offset, err = c:list_offset(topic, partition, timestamp)
Der Parameter timestamp kann ein UNIX-Zeitstempel oder eine Konstante sein, die in resty.kafka.protocol.consumer definiert ist, LIST_OFFSET_TIMESTAMP_LAST, LIST_OFFSET_TIMESTAMP_FIRST, LIST_OFFSET_TIMESTAMP_MAX, die verwendet wird, um die Anfangs- und neuesten Offsets usw. abzurufen, Semantik mit der ListOffsets-API in Apache Kafka. Siehe: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets
Im Erfolgsfall gibt es den Offset des angegebenen Falls zurück.
Im Fehlerfall gibt es nil mit einer Zeichenkette zurück, die den Fehler beschreibt.
fetch
syntax: result, err = c:fetch(topic, partition, offset)
Im Erfolgsfall gibt es das folgende result des angegebenen Falls zurück.
Im Fehlerfall gibt es nil mit einer Zeichenkette zurück, die den Fehler beschreibt.
Das result enthält weitere Informationen wie die Nachrichten:
-
recordsDie Tabelle, die den Inhalt der Nachricht enthält.
-
errcodeDer Fehlercode der Fetch-API. Siehe: https://kafka.apache.org/protocol.html#protocol_error_codes
-
high_watermarkDer High-Watermark der Fetch-API. Siehe: https://kafka.apache.org/protocol.html#The_Messages_Fetch
-
last_stable_offsetDer letzte stabile Offset der Fetch-API. Der Inhalt hängt von der API-Version ab, möglicherweise nil. Siehe: https://kafka.apache.org/protocol.html#The_Messages_Fetch, die API-Version über v4 antwortet.
-
log_start_offsetDer Log-Start-Offset der Fetch-API. Der Inhalt hängt von der API-Version ab, möglicherweise nil. Siehe: https://kafka.apache.org/protocol.html#The_Messages_Fetch, die API-Version über v5 antwortet.
-
aborted_transactionsDie abgebrochenen Transaktionen der Fetch-API. Der Inhalt hängt von der API-Version ab, möglicherweise nil. Siehe: https://kafka.apache.org/protocol.html#The_Messages_Fetch, die API-Version über v4 antwortet.
-
preferred_read_replicaDie bevorzugte Lese-Replik der Fetch-API. Der Inhalt hängt von der API-Version ab, möglicherweise nil. Siehe: https://kafka.apache.org/protocol.html#The_Messages_Fetch, die API-Version über v11 antwortet.
Fehler
Wenn Sie die in dieser Bibliothek bereitgestellten Module aufrufen, können Sie einige Fehler erhalten. Je nach Quelle können sie in die folgenden Kategorien unterteilt werden.
-
Netzwerkfehler: wie abgelehnte Verbindungen, Verbindungszeitüberschreitungen usw. Sie müssen den Verbindungsstatus jedes Dienstes in Ihrer Umgebung überprüfen.
-
Metadatenbezogene Fehler: wie Metadaten oder ApiVersion-Daten können nicht ordnungsgemäß abgerufen werden; das angegebene Thema oder die Partition existiert nicht usw. Sie müssen den Kafka-Broker und die Client-Konfiguration überprüfen.
-
Fehler, die von Kafka zurückgegeben werden: Manchmal wird Kafka die err_code-Daten in den Antwortdaten einfügen. Wenn dieses Problem auftritt, sieht das
errim Rückgabewert so aus:OFFSET_OUT_OF_RANGE, alle Großbuchstaben und durch Unterstriche getrennt, und in der aktuellen Bibliothek bieten wir eine Fehlerliste von Zuordnungen an, die den textlichen Beschreibungen entsprechen. Um mehr über diese Fehler zu erfahren, siehe die Beschreibungen in der Kafka-Dokumentation.
Siehe auch
- das ngx_lua-Modul: http://wiki.nginx.org/HttpLuaModule
- das Kafka-Protokoll: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
- die lua-resty-redis Bibliothek
- die lua-resty-logger-socket Bibliothek
- die sarama
GitHub
Sie finden möglicherweise zusätzliche Konfigurationstipps und Dokumentationen für dieses Modul im GitHub-Repository für nginx-module-kafka.