kafka: pilote client Lua kafka pour nginx-module-lua basé sur l'API cosocket
Installation
Si vous n'avez pas configuré l'abonnement au dépôt RPM, inscrivez-vous. Ensuite, vous pouvez procéder avec les étapes suivantes.
CentOS/RHEL 7 ou Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-kafka
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-kafka
Pour utiliser cette bibliothèque Lua avec NGINX, assurez-vous que nginx-module-lua est installé.
Ce document décrit lua-resty-kafka v0.23 publié le 03 novembre 2023.
Cette bibliothèque Lua est un pilote client Kafka pour le module ngx_lua de nginx :
http://wiki.nginx.org/HttpLuaModule
Cette bibliothèque Lua tire parti de l'API cosocket de ngx_lua, qui garantit un comportement 100 % non-bloquant.
Notez qu'au moins ngx_lua 0.9.3 ou openresty 1.4.3.7 est requis, et malheureusement, seul LuaJIT est pris en charge (--with-luajit).
Note pour les connexions ssl : au moins ngx_lua 0.9.11 ou openresty 1.7.4.1 est requis, et malheureusement, seul LuaJIT est pris en charge (--with-luajit).
Synopsis
server {
location /test {
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{
host = "127.0.0.1",
port = 9092,
-- auth optionnelle
sasl_config = {
mechanism = "PLAIN",
user = "USERNAME",
password = "PASSWORD",
},
},
}
local key = "key"
local message = "halo world"
-- généralement, nous n'utilisons pas cette bibliothèque directement
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata échoué, err:", partitions)
end
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- producteur de type sync
local p = producer:new(broker_list)
local offset, err = p:send("test", key, message)
if not offset then
ngx.say("erreur d'envoi:", err)
return
end
ngx.say("envoi réussi, offset: ", tonumber(offset))
-- ceci est un producteur de type async et bp sera réutilisé dans tout le worker nginx
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test", key, message)
if not ok then
ngx.say("erreur d'envoi:", err)
return
end
ngx.say("envoi réussi, ok:", ok)
';
}
}
Modules
resty.kafka.client
Pour charger ce module, il suffit de faire ceci
local client = require "resty.kafka.client"
Méthodes
new
syntax: c = client:new(broker_list, client_config)
Le broker_list est une liste de brokers, comme ci-dessous
[
{
"host": "127.0.0.1",
"port": 9092,
// auth optionnelle
"sasl_config": {
// mécanisme de support : PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
sasl_config
mécanisme de support : PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.
avertir : SCRAM-SHA-256, SCRAM-SHA-512 nécessitent l'installation de lua-resty-jit-uuid et lua-resty-openssl
Un tableau client_config optionnel peut être spécifié. Les options suivantes sont disponibles :
configuration du client
-
socket_timeoutSpécifie le seuil de délai d'attente réseau en millisecondes. DOIT être supérieur au
request_timeout. -
keepalive_timeoutSpécifie le délai d'attente maximal inactif (en millisecondes) pour la connexion keepalive.
-
keepalive_sizeSpécifie le nombre maximal de connexions autorisées dans le pool de connexions pour chaque worker Nginx.
-
refresh_intervalSpécifie le temps pour actualiser automatiquement les métadonnées en millisecondes. Les métadonnées ne seront pas actualisées automatiquement si elles sont nil.
-
sslSpécifie si le client doit utiliser une connexion ssl. Par défaut, c'est faux. Voir : https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
ssl_verifySpécifie si le client doit effectuer une vérification SSL. Par défaut, c'est faux. Voir : https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
resolverSpécifie une fonction pour la résolution d'hôte, qui retourne une chaîne d'IP ou
nil, pour remplacer le résolveur d'hôte par défaut du système. Par défautnil, aucune résolution effectuée. Exemplefunction(host) if host == "some_host" then return "10.11.12.13" end end
fetch_metadata
syntax: brokers, partitions = c:fetch_metadata(topic)
En cas de succès, retourne tous les brokers et partitions du topic.
En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.
refresh
syntax: brokers, partitions = c:refresh()
Cela actualisera les métadonnées de tous les topics qui ont été récupérés par fetch_metadata.
En cas de succès, retourne tous les brokers et toutes les partitions de tous les topics.
En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.
choose_api_version
syntax: api_version = c:choose_api_version(api_key, min_version, max_version)
Cela aide le client à sélectionner la version correcte de l'api_key correspondant à l'API.
Lorsque min_version et max_version sont fournis, cela agira comme une limite et les versions sélectionnées dans la valeur de retour ne dépasseront pas leurs limites, peu importe à quel point le broker prend en charge la version de l'API. Lorsqu'ils ne sont pas fournis, cela suivra la plage de versions prises en charge par le broker.
Astuce : La stratégie de sélection de version consiste à choisir la version maximale dans la plage autorisée.
resty.kafka.producer
Pour charger ce module, il suffit de faire ceci
local producer = require "resty.kafka.producer"
Méthodes
new
syntax: p = producer:new(broker_list, producer_config?, cluster_name?)
Il est recommandé d'utiliser le type de producteur async.
Le broker_list est le même que dans client
Un tableau d'options optionnel peut être spécifié. Les options suivantes sont disponibles :
socket_timeout, keepalive_timeout, keepalive_size, refresh_interval, ssl, ssl_verify sont les mêmes que dans client_config
configuration du producteur, similaire à http://kafka.apache.org/documentation.html#producerconfigs
-
producer_typeSpécifie le
producer.type. "async" ou "sync" -
request_timeoutSpécifie le
request.timeout.ms. Par défaut2000 ms -
required_acksSpécifie le
request.required.acks, NE DOIT PAS être zéro. Par défaut1. -
max_retrySpécifie le
message.send.max.retries. Par défaut3. -
retry_backoffSpécifie le
retry.backoff.ms. Par défaut100. -
api_versionSpécifie la version de l'API de production. Par défaut
0. Si vous utilisez Kafka 0.10.0.0 ou supérieur,api_versionpeut utiliser0,1ou2. Si vous utilisez Kafka 0.9.x,api_versiondoit être0ou1. Si vous utilisez Kafka 0.8.x,api_versiondoit être0. -
partitionerSpécifie le partitionneur qui choisit la partition à partir de la clé et du nombre de partitions.
syntax: partitioner = function (key, partition_num, correlation_id) end, le correlation_id est un identifiant auto-incrémenté dans le producteur. Le partitionneur par défaut est :local function default_partitioner(key, num, correlation_id) local id = key and crc32(key) or correlation_id -- partition_id est continu et commence à 0 return id % num end
configuration du buffer (ne fonctionne que si producer_type = "async")
-
flush_timeSpécifie le
queue.buffering.max.ms. Par défaut1000. -
batch_numSpécifie le
batch.num.messages. Par défaut200. -
batch_sizeSpécifie le
send.buffer.bytes. Par défaut1M(peut atteindre 2M). Faites attention, DOIT être plus petit que la configurationsocket.request.max.bytes / 2 - 10kdans le serveur kafka. -
max_bufferingSpécifie le
queue.buffering.max.messages. Par défaut50,000. -
error_handleSpécifie le gestionnaire d'erreurs, gère les données lorsque le buffer envoie une erreur à kafka.
syntax: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, les messages échoués dans le message_queue ressemblent à{ key1, msg1, key2, msg2 },keydans le message_queue est une chaîne vide""même si l'origine estnil.indexest la longueur du message_queue, ne doit pas utiliser#message_queue. lorsqueretryableesttrue, cela signifie que le serveur kafka n'a certainement pas validé ces messages, vous pouvez réessayer d'envoyer en toute sécurité ; et sinon, cela signifie peut-être, recommandé de journaliser quelque part. -
wait_on_buffer_fullSpécifie s'il faut attendre lorsque la queue de buffer est pleine, par défaut
false. Lorsque la queue de buffer est pleine, si l'option passée esttrue, utilisera une fonction d'attente de sémaphore pour bloquer la coroutine jusqu'à ce que le délai d'attente ou que la queue de buffer ait diminué, Sinon, retourne une erreur "buffer overflow" avecfalse. Remarque, cela ne peut pas être utilisé dans les phases qui ne prennent pas en charge les yields, c'est-à-dire la phase de journalisation. -
wait_buffer_timeoutSpécifie le temps d'attente maximal lorsque le buffer est plein, par défaut
5secondes.
Ne prend pas en charge la compression pour l'instant.
Le troisième cluster_name optionnel spécifie le nom du cluster, par défaut 1 (oui, c'est un nombre). Vous pouvez spécifier des noms différents lorsque vous avez deux clusters kafka ou plus. Et cela ne fonctionne qu'avec le type de producteur async.
send
syntax: ok, err = p:send(topic, key, message)
-
Dans le modèle sync
En cas de succès, retourne l'offset ( cdata: LL ) du broker et de la partition actuels. En cas d'erreurs, retourne
nilavec une chaîne décrivant l'erreur. -
Dans le modèle async
Le
messagesera d'abord écrit dans le buffer. Il sera envoyé au serveur kafka lorsque le buffer dépasse lebatch_num, ou chaqueflush_timepour vider le buffer.En cas de succès, retourne
true. En cas d'erreurs, retournenilavec une chaîne décrivant l'erreur (buffer overflow).
offset
syntax: sum, details = p:offset()
Retourne la somme de tous les offsets topic-partition (retournés par l'API ProduceRequest) ;
et les détails de chaque topic-partition
flush
syntax: ok = p:flush()
Retourne toujours true.
resty.kafka.basic-consumer
Pour charger ce module, il suffit de faire ceci
local bconsumer = require "resty.kafka.basic-consumer"
Ce module est une implémentation minimaliste d'un consommateur, fournissant l'API list_offset pour interroger par temps ou obtenir l'offset de début et de fin et l'API fetch pour obtenir des messages dans un topic.
Dans un seul appel, seules les informations d'une seule partition dans un seul topic peuvent être récupérées, et la récupération par lot n'est pas encore prise en charge. Le consommateur de base ne prend pas en charge l'API liée au groupe de consommateurs, donc vous devez récupérer le message après avoir obtenu l'offset via l'API list_offset, ou votre service peut gérer l'offset lui-même.
Méthodes
new
syntax: c = bconsumer:new(broker_list, client_config)
Le broker_list est une liste de brokers, comme ci-dessous
[
{
"host": "127.0.0.1",
"port": 9092,
// auth optionnelle
"sasl_config": {
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
Un tableau client_config optionnel peut être spécifié. Les options suivantes sont disponibles :
configuration du client
-
socket_timeoutSpécifie le seuil de délai d'attente réseau en millisecondes. DOIT être supérieur au
request_timeout. -
keepalive_timeoutSpécifie le délai d'attente maximal inactif (en millisecondes) pour la connexion keepalive.
-
keepalive_sizeSpécifie le nombre maximal de connexions autorisées dans le pool de connexions pour chaque worker Nginx.
-
refresh_intervalSpécifie le temps pour actualiser automatiquement les métadonnées en millisecondes. Les métadonnées ne seront pas actualisées automatiquement si elles sont nil.
-
sslSpécifie si le client doit utiliser une connexion ssl. Par défaut, c'est faux. Voir : https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
ssl_verifySpécifie si le client doit effectuer une vérification SSL. Par défaut, c'est faux. Voir : https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
isolation_levelCette configuration contrôle la visibilité des enregistrements transactionnels. Voir : https://kafka.apache.org/protocol.html -
client_rackID de rack du consommateur effectuant cette demande. Voir : https://kafka.apache.org/protocol.html
list_offset
syntax: offset, err = c:list_offset(topic, partition, timestamp)
Le paramètre timestamp peut être un timestamp UNIX ou une constante définie dans resty.kafka.protocol.consumer, LIST_OFFSET_TIMESTAMP_LAST, LIST_OFFSET_TIMESTAMP_FIRST, LIST_OFFSET_TIMESTAMP_MAX, utilisée pour obtenir les offsets initiaux et les plus récents, etc., sémantique avec l'API ListOffsets dans Apache Kafka. Voir : https://kafka.apache.org/protocol.html#The_Messages_ListOffsets
En cas de succès, retourne l'offset du cas spécifié.
En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.
fetch
syntax: result, err = c:fetch(topic, partition, offset)
En cas de succès, retourne le result suivant du cas spécifié.
En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.
Le result contiendra plus d'informations telles que les messages :
-
recordsLe tableau contenant le contenu du message.
-
errcodeLe code d'erreur de l'API Fetch. Voir : https://kafka.apache.org/protocol.html#protocol_error_codes
-
high_watermarkLe high watermark de l'API Fetch. Voir : https://kafka.apache.org/protocol.html#The_Messages_Fetch
-
last_stable_offsetLe dernier offset stable de l'API Fetch. Le contenu dépend de la version de l'API, peut être nil. Voir : https://kafka.apache.org/protocol.html#The_Messages_Fetch qui répond à la version de l'API au-dessus de v4
-
log_start_offsetL'offset de début de journal de l'API Fetch. Le contenu dépend de la version de l'API, peut être nil. Voir : https://kafka.apache.org/protocol.html#The_Messages_Fetch qui répond à la version de l'API au-dessus de v5
-
aborted_transactionsLes transactions avortées de l'API Fetch. Le contenu dépend de la version de l'API, peut être nil. Voir : https://kafka.apache.org/protocol.html#The_Messages_Fetch qui répond à la version de l'API au-dessus de v4
-
preferred_read_replicaLa réplique de lecture préférée de l'API Fetch. Le contenu dépend de la version de l'API, peut être nil. Voir : https://kafka.apache.org/protocol.html#The_Messages_Fetch qui répond à la version de l'API au-dessus de v11
Erreurs
Lorsque vous appelez les modules fournis dans cette bibliothèque, vous pouvez rencontrer certaines erreurs. Selon la source, elles peuvent être divisées en plusieurs catégories.
-
Erreurs réseau : telles que connexion rejetée, délai d'attente de connexion, etc. Vous devez vérifier l'état de connexion de chaque service dans votre environnement.
-
Erreurs liées aux métadonnées : telles que les métadonnées ou les données ApiVersion ne peuvent pas être récupérées correctement ; le topic ou la partition spécifié n'existe pas, etc. Vous devez vérifier la configuration du Broker Kafka et du client.
-
Erreur retournée par Kafka : parfois, Kafka inclura des données err_code dans les données de réponse. Lorsque ce problème se produit, le
errdans la valeur de retour ressemble à ceciOFFSET_OUT_OF_RANGE, tous les caractères en majuscules, et séparés par des underscores, et dans la bibliothèque actuelle, nous fournissons une liste d'erreurs de mappage correspondant aux descriptions textuelles. Pour en savoir plus sur ces erreurs, consultez les descriptions dans la documentation Kafka.
Voir Aussi
- le module ngx_lua : http://wiki.nginx.org/HttpLuaModule
- le protocole kafka : https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
- la bibliothèque lua-resty-redis
- la bibliothèque lua-resty-logger-socket
- le sarama
GitHub
Vous pouvez trouver des conseils de configuration supplémentaires et de la documentation pour ce module dans le dépôt GitHub pour nginx-module-kafka.