Aller au contenu

kafka: pilote client Lua kafka pour nginx-module-lua basé sur l'API cosocket

Installation

Si vous n'avez pas configuré l'abonnement au dépôt RPM, inscrivez-vous. Ensuite, vous pouvez procéder avec les étapes suivantes.

CentOS/RHEL 7 ou Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-kafka

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-kafka

Pour utiliser cette bibliothèque Lua avec NGINX, assurez-vous que nginx-module-lua est installé.

Ce document décrit lua-resty-kafka v0.23 publié le 03 novembre 2023.


Cette bibliothèque Lua est un pilote client Kafka pour le module ngx_lua de nginx :

http://wiki.nginx.org/HttpLuaModule

Cette bibliothèque Lua tire parti de l'API cosocket de ngx_lua, qui garantit un comportement 100 % non-bloquant.

Notez qu'au moins ngx_lua 0.9.3 ou openresty 1.4.3.7 est requis, et malheureusement, seul LuaJIT est pris en charge (--with-luajit).

Note pour les connexions ssl : au moins ngx_lua 0.9.11 ou openresty 1.7.4.1 est requis, et malheureusement, seul LuaJIT est pris en charge (--with-luajit).

Synopsis

    server {
        location /test {
            content_by_lua '
                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    {
                        host = "127.0.0.1",
                        port = 9092,

                        -- auth optionnelle
                        sasl_config = {
                            mechanism = "PLAIN",
                            user = "USERNAME",
                            password = "PASSWORD",
                        },
                    },
                }

                local key = "key"
                local message = "halo world"

                -- généralement, nous n'utilisons pas cette bibliothèque directement
                local cli = client:new(broker_list)
                local brokers, partitions = cli:fetch_metadata("test")
                if not brokers then
                    ngx.say("fetch_metadata échoué, err:", partitions)
                end
                ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))


                -- producteur de type sync
                local p = producer:new(broker_list)

                local offset, err = p:send("test", key, message)
                if not offset then
                    ngx.say("erreur d'envoi:", err)
                    return
                end
                ngx.say("envoi réussi, offset: ", tonumber(offset))

                -- ceci est un producteur de type async et bp sera réutilisé dans tout le worker nginx
                local bp = producer:new(broker_list, { producer_type = "async" })

                local ok, err = bp:send("test", key, message)
                if not ok then
                    ngx.say("erreur d'envoi:", err)
                    return
                end

                ngx.say("envoi réussi, ok:", ok)
            ';
        }
    }

Modules

resty.kafka.client

Pour charger ce module, il suffit de faire ceci

    local client = require "resty.kafka.client"

Méthodes

new

syntax: c = client:new(broker_list, client_config)

Le broker_list est une liste de brokers, comme ci-dessous

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // auth optionnelle
        "sasl_config": {
            // mécanisme de support : PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]
* sasl_config

mécanisme de support : PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.

avertir : SCRAM-SHA-256, SCRAM-SHA-512 nécessitent l'installation de lua-resty-jit-uuid et lua-resty-openssl

Un tableau client_config optionnel peut être spécifié. Les options suivantes sont disponibles :

configuration du client

  • socket_timeout

    Spécifie le seuil de délai d'attente réseau en millisecondes. DOIT être supérieur au request_timeout.

  • keepalive_timeout

    Spécifie le délai d'attente maximal inactif (en millisecondes) pour la connexion keepalive.

  • keepalive_size

    Spécifie le nombre maximal de connexions autorisées dans le pool de connexions pour chaque worker Nginx.

  • refresh_interval

    Spécifie le temps pour actualiser automatiquement les métadonnées en millisecondes. Les métadonnées ne seront pas actualisées automatiquement si elles sont nil.

  • ssl

    Spécifie si le client doit utiliser une connexion ssl. Par défaut, c'est faux. Voir : https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    Spécifie si le client doit effectuer une vérification SSL. Par défaut, c'est faux. Voir : https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • resolver

    Spécifie une fonction pour la résolution d'hôte, qui retourne une chaîne d'IP ou nil, pour remplacer le résolveur d'hôte par défaut du système. Par défaut nil, aucune résolution effectuée. Exemple function(host) if host == "some_host" then return "10.11.12.13" end end

fetch_metadata

syntax: brokers, partitions = c:fetch_metadata(topic)

En cas de succès, retourne tous les brokers et partitions du topic. En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.

refresh

syntax: brokers, partitions = c:refresh()

Cela actualisera les métadonnées de tous les topics qui ont été récupérés par fetch_metadata. En cas de succès, retourne tous les brokers et toutes les partitions de tous les topics. En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.

choose_api_version

syntax: api_version = c:choose_api_version(api_key, min_version, max_version)

Cela aide le client à sélectionner la version correcte de l'api_key correspondant à l'API.

Lorsque min_version et max_version sont fournis, cela agira comme une limite et les versions sélectionnées dans la valeur de retour ne dépasseront pas leurs limites, peu importe à quel point le broker prend en charge la version de l'API. Lorsqu'ils ne sont pas fournis, cela suivra la plage de versions prises en charge par le broker.

Astuce : La stratégie de sélection de version consiste à choisir la version maximale dans la plage autorisée.

resty.kafka.producer

Pour charger ce module, il suffit de faire ceci

    local producer = require "resty.kafka.producer"

Méthodes

new

syntax: p = producer:new(broker_list, producer_config?, cluster_name?)

Il est recommandé d'utiliser le type de producteur async.

Le broker_list est le même que dans client

Un tableau d'options optionnel peut être spécifié. Les options suivantes sont disponibles :

socket_timeout, keepalive_timeout, keepalive_size, refresh_interval, ssl, ssl_verify sont les mêmes que dans client_config

configuration du producteur, similaire à http://kafka.apache.org/documentation.html#producerconfigs

  • producer_type

    Spécifie le producer.type. "async" ou "sync"

  • request_timeout

    Spécifie le request.timeout.ms. Par défaut 2000 ms

  • required_acks

    Spécifie le request.required.acks, NE DOIT PAS être zéro. Par défaut 1.

  • max_retry

    Spécifie le message.send.max.retries. Par défaut 3.

  • retry_backoff

    Spécifie le retry.backoff.ms. Par défaut 100.

  • api_version

    Spécifie la version de l'API de production. Par défaut 0. Si vous utilisez Kafka 0.10.0.0 ou supérieur, api_version peut utiliser 0, 1 ou 2. Si vous utilisez Kafka 0.9.x, api_version doit être 0 ou 1. Si vous utilisez Kafka 0.8.x, api_version doit être 0.

  • partitioner

    Spécifie le partitionneur qui choisit la partition à partir de la clé et du nombre de partitions. syntax: partitioner = function (key, partition_num, correlation_id) end, le correlation_id est un identifiant auto-incrémenté dans le producteur. Le partitionneur par défaut est :

    local function default_partitioner(key, num, correlation_id)
        local id = key and crc32(key) or correlation_id
        -- partition_id est continu et commence à 0
        return id % num
    end
    

configuration du buffer (ne fonctionne que si producer_type = "async")

  • flush_time

    Spécifie le queue.buffering.max.ms. Par défaut 1000.

  • batch_num

    Spécifie le batch.num.messages. Par défaut 200.

  • batch_size

    Spécifie le send.buffer.bytes. Par défaut 1M (peut atteindre 2M). Faites attention, DOIT être plus petit que la configuration socket.request.max.bytes / 2 - 10k dans le serveur kafka.

  • max_buffering

    Spécifie le queue.buffering.max.messages. Par défaut 50,000.

  • error_handle

    Spécifie le gestionnaire d'erreurs, gère les données lorsque le buffer envoie une erreur à kafka. syntax: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, les messages échoués dans le message_queue ressemblent à { key1, msg1, key2, msg2 }, key dans le message_queue est une chaîne vide "" même si l'origine est nil. index est la longueur du message_queue, ne doit pas utiliser #message_queue. lorsque retryable est true, cela signifie que le serveur kafka n'a certainement pas validé ces messages, vous pouvez réessayer d'envoyer en toute sécurité ; et sinon, cela signifie peut-être, recommandé de journaliser quelque part.

  • wait_on_buffer_full

    Spécifie s'il faut attendre lorsque la queue de buffer est pleine, par défaut false. Lorsque la queue de buffer est pleine, si l'option passée est true, utilisera une fonction d'attente de sémaphore pour bloquer la coroutine jusqu'à ce que le délai d'attente ou que la queue de buffer ait diminué, Sinon, retourne une erreur "buffer overflow" avec false. Remarque, cela ne peut pas être utilisé dans les phases qui ne prennent pas en charge les yields, c'est-à-dire la phase de journalisation.

  • wait_buffer_timeout

    Spécifie le temps d'attente maximal lorsque le buffer est plein, par défaut 5 secondes.

Ne prend pas en charge la compression pour l'instant.

Le troisième cluster_name optionnel spécifie le nom du cluster, par défaut 1 (oui, c'est un nombre). Vous pouvez spécifier des noms différents lorsque vous avez deux clusters kafka ou plus. Et cela ne fonctionne qu'avec le type de producteur async.

send

syntax: ok, err = p:send(topic, key, message)

  1. Dans le modèle sync

    En cas de succès, retourne l'offset ( cdata: LL ) du broker et de la partition actuels. En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.

  2. Dans le modèle async

    Le message sera d'abord écrit dans le buffer. Il sera envoyé au serveur kafka lorsque le buffer dépasse le batch_num, ou chaque flush_time pour vider le buffer.

    En cas de succès, retourne true. En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur (buffer overflow).

offset

syntax: sum, details = p:offset()

Retourne la somme de tous les offsets topic-partition (retournés par l'API ProduceRequest) ;
et les détails de chaque topic-partition

flush

syntax: ok = p:flush()

Retourne toujours true.

resty.kafka.basic-consumer

Pour charger ce module, il suffit de faire ceci

    local bconsumer = require "resty.kafka.basic-consumer"

Ce module est une implémentation minimaliste d'un consommateur, fournissant l'API list_offset pour interroger par temps ou obtenir l'offset de début et de fin et l'API fetch pour obtenir des messages dans un topic.

Dans un seul appel, seules les informations d'une seule partition dans un seul topic peuvent être récupérées, et la récupération par lot n'est pas encore prise en charge. Le consommateur de base ne prend pas en charge l'API liée au groupe de consommateurs, donc vous devez récupérer le message après avoir obtenu l'offset via l'API list_offset, ou votre service peut gérer l'offset lui-même.

Méthodes

new

syntax: c = bconsumer:new(broker_list, client_config)

Le broker_list est une liste de brokers, comme ci-dessous

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // auth optionnelle
        "sasl_config": {
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]

Un tableau client_config optionnel peut être spécifié. Les options suivantes sont disponibles :

configuration du client

  • socket_timeout

    Spécifie le seuil de délai d'attente réseau en millisecondes. DOIT être supérieur au request_timeout.

  • keepalive_timeout

    Spécifie le délai d'attente maximal inactif (en millisecondes) pour la connexion keepalive.

  • keepalive_size

    Spécifie le nombre maximal de connexions autorisées dans le pool de connexions pour chaque worker Nginx.

  • refresh_interval

    Spécifie le temps pour actualiser automatiquement les métadonnées en millisecondes. Les métadonnées ne seront pas actualisées automatiquement si elles sont nil.

  • ssl

    Spécifie si le client doit utiliser une connexion ssl. Par défaut, c'est faux. Voir : https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    Spécifie si le client doit effectuer une vérification SSL. Par défaut, c'est faux. Voir : https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • isolation_level Cette configuration contrôle la visibilité des enregistrements transactionnels. Voir : https://kafka.apache.org/protocol.html

  • client_rack

    ID de rack du consommateur effectuant cette demande. Voir : https://kafka.apache.org/protocol.html

list_offset

syntax: offset, err = c:list_offset(topic, partition, timestamp)

Le paramètre timestamp peut être un timestamp UNIX ou une constante définie dans resty.kafka.protocol.consumer, LIST_OFFSET_TIMESTAMP_LAST, LIST_OFFSET_TIMESTAMP_FIRST, LIST_OFFSET_TIMESTAMP_MAX, utilisée pour obtenir les offsets initiaux et les plus récents, etc., sémantique avec l'API ListOffsets dans Apache Kafka. Voir : https://kafka.apache.org/protocol.html#The_Messages_ListOffsets

En cas de succès, retourne l'offset du cas spécifié. En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.

fetch

syntax: result, err = c:fetch(topic, partition, offset)

En cas de succès, retourne le result suivant du cas spécifié. En cas d'erreurs, retourne nil avec une chaîne décrivant l'erreur.

Le result contiendra plus d'informations telles que les messages :

Erreurs

Lorsque vous appelez les modules fournis dans cette bibliothèque, vous pouvez rencontrer certaines erreurs. Selon la source, elles peuvent être divisées en plusieurs catégories.

  • Erreurs réseau : telles que connexion rejetée, délai d'attente de connexion, etc. Vous devez vérifier l'état de connexion de chaque service dans votre environnement.

  • Erreurs liées aux métadonnées : telles que les métadonnées ou les données ApiVersion ne peuvent pas être récupérées correctement ; le topic ou la partition spécifié n'existe pas, etc. Vous devez vérifier la configuration du Broker Kafka et du client.

  • Erreur retournée par Kafka : parfois, Kafka inclura des données err_code dans les données de réponse. Lorsque ce problème se produit, le err dans la valeur de retour ressemble à ceci OFFSET_OUT_OF_RANGE, tous les caractères en majuscules, et séparés par des underscores, et dans la bibliothèque actuelle, nous fournissons une liste d'erreurs de mappage correspondant aux descriptions textuelles. Pour en savoir plus sur ces erreurs, consultez les descriptions dans la documentation Kafka.

Voir Aussi

GitHub

Vous pouvez trouver des conseils de configuration supplémentaires et de la documentation pour ce module dans le dépôt GitHub pour nginx-module-kafka.