Zum Inhalt

kafka: Lua Kafka-Client-Treiber für nginx-module-lua basierend auf der cosocket API

Installation

Wenn Sie das RPM-Repository-Abonnement noch nicht eingerichtet haben, melden Sie sich an. Dann können Sie mit den folgenden Schritten fortfahren.

CentOS/RHEL 7 oder Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-kafka

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-kafka

Um diese Lua-Bibliothek mit NGINX zu verwenden, stellen Sie sicher, dass nginx-module-lua installiert ist.

Dieses Dokument beschreibt lua-resty-kafka v0.23, veröffentlicht am 03. Nov 2023.


Diese Lua-Bibliothek ist ein Kafka-Client-Treiber für das ngx_lua NGINX-Modul:

http://wiki.nginx.org/HttpLuaModule

Diese Lua-Bibliothek nutzt die cosocket API von ngx_lua, die 100% nicht-blockierendes Verhalten gewährleistet.

Bitte beachten Sie, dass mindestens ngx_lua 0.9.3 oder openresty 1.4.3.7 erforderlich ist, und leider wird nur LuaJIT unterstützt (--with-luajit).

Hinweis für ssl-Verbindungen: Mindestens ngx_lua 0.9.11 oder openresty 1.7.4.1 ist erforderlich, und leider wird nur LuaJIT unterstützt (--with-luajit).

Synopsis

    server {
        location /test {
            content_by_lua '
                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    {
                        host = "127.0.0.1",
                        port = 9092,

                        -- optionale Authentifizierung
                        sasl_config = {
                            mechanism = "PLAIN",
                            user = "USERNAME",
                            password = "PASSWORD",
                        },
                    },
                }

                local key = "key"
                local message = "halo world"

                -- normalerweise verwenden wir diese Bibliothek nicht direkt
                local cli = client:new(broker_list)
                local brokers, partitions = cli:fetch_metadata("test")
                if not brokers then
                    ngx.say("fetch_metadata fehlgeschlagen, Fehler:", partitions)
                end
                ngx.say("Brokers: ", cjson.encode(brokers), "; Partitionen: ", cjson.encode(partitions))


                -- sync producer_type
                local p = producer:new(broker_list)

                local offset, err = p:send("test", key, message)
                if not offset then
                    ngx.say("send Fehler:", err)
                    return
                end
                ngx.say("send erfolgreich, offset: ", tonumber(offset))

                -- dies ist der asynchrone producer_type und bp wird im gesamten NGINX-Worker wiederverwendet
                local bp = producer:new(broker_list, { producer_type = "async" })

                local ok, err = bp:send("test", key, message)
                if not ok then
                    ngx.say("send Fehler:", err)
                    return
                end

                ngx.say("send erfolgreich, ok:", ok)
            ';
        }
    }

Module

resty.kafka.client

Um dieses Modul zu laden, tun Sie einfach dies

    local client = require "resty.kafka.client"

Methoden

new

syntax: c = client:new(broker_list, client_config)

Die broker_list ist eine Liste von Brokern, wie unten

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // optionale Authentifizierung
        "sasl_config": {
            // unterstützte Mechanismen: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]
* sasl_config

unterstützte Mechanismen: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.

Warnung: SCRAM-SHA-256, SCRAM-SHA-512 benötigen die Installation von lua-resty-jit-uuid und lua-resty-openssl.

Eine optionale client_config-Tabelle kann angegeben werden. Die folgenden Optionen sind wie folgt:

Client-Konfiguration

  • socket_timeout

    Gibt die Netzwerk-Timeout-Schwelle in Millisekunden an. Sollte größer sein als das request_timeout.

  • keepalive_timeout

    Gibt das maximale Leerlauf-Timeout (in Millisekunden) für die Keepalive-Verbindung an.

  • keepalive_size

    Gibt die maximale Anzahl von Verbindungen an, die im Verbindungs-Pool für jeden NGINX-Worker erlaubt sind.

  • refresh_interval

    Gibt die Zeit an, um die Metadaten automatisch in Millisekunden zu aktualisieren. Dann werden die Metadaten nicht automatisch aktualisiert, wenn sie nil sind.

  • ssl

    Gibt an, ob der Client eine SSL-Verbindung verwenden soll. Standardmäßig false. Siehe: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    Gibt an, ob der Client eine SSL-Überprüfung durchführen soll. Standardmäßig false. Siehe: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • resolver

    Gibt eine Funktion zur Hostauflösung an, die eine IP-Adresse oder nil zurückgibt, um den systemeigenen Host-Resolver zu überschreiben. Standard nil, keine Auflösung durchgeführt. Beispiel function(host) if host == "some_host" then return "10.11.12.13" end end

fetch_metadata

syntax: brokers, partitions = c:fetch_metadata(topic)

Im Erfolgsfall gibt es alle Broker und Partitionen des topic zurück. Im Fehlerfall gibt es nil mit einer Zeichenkette zurück, die den Fehler beschreibt.

refresh

syntax: brokers, partitions = c:refresh()

Dies wird die Metadaten aller Themen aktualisieren, die durch fetch_metadata abgerufen wurden. Im Erfolgsfall gibt es alle Broker und alle Partitionen aller Themen zurück. Im Fehlerfall gibt es nil mit einer Zeichenkette zurück, die den Fehler beschreibt.

choose_api_version

syntax: api_version = c:choose_api_version(api_key, min_version, max_version)

Dies hilft dem Client, die korrekte Version des api_key auszuwählen, die der API entspricht.

Wenn min_version und max_version angegeben sind, fungiert dies als Grenze, und die ausgewählten Versionen im Rückgabewert überschreiten ihre Grenzen nicht, egal wie hoch oder niedrig der Broker die API-Version unterstützt. Wenn sie nicht angegeben sind, folgt es dem Bereich der vom Broker unterstützten Versionen.

Tipp: Die Versionsauswahlstrategie besteht darin, die maximale Version innerhalb des erlaubten Bereichs auszuwählen.

resty.kafka.producer

Um dieses Modul zu laden, tun Sie einfach dies

    local producer = require "resty.kafka.producer"

Methoden

new

syntax: p = producer:new(broker_list, producer_config?, cluster_name?)

Es wird empfohlen, den asynchronen producer_type zu verwenden.

broker_list ist dasselbe wie in client.

Eine optionale Optionen-Tabelle kann angegeben werden. Die folgenden Optionen sind wie folgt:

socket_timeout, keepalive_timeout, keepalive_size, refresh_interval, ssl, ssl_verify sind dieselben wie in client_config.

Produzenten-Konfiguration, ähnlich wie in http://kafka.apache.org/documentation.html#producerconfigs

  • producer_type

    Gibt den producer.type an. "async" oder "sync".

  • request_timeout

    Gibt das request.timeout.ms an. Standard 2000 ms.

  • required_acks

    Gibt die request.required.acks an, sollte nicht null sein. Standard 1.

  • max_retry

    Gibt die message.send.max.retries an. Standard 3.

  • retry_backoff

    Gibt die retry.backoff.ms an. Standard 100.

  • api_version

    Gibt die Produzenten-API-Version an. Standard 0. Wenn Sie Kafka 0.10.0.0 oder höher verwenden, kann api_version 0, 1 oder 2 verwenden. Wenn Sie Kafka 0.9.x verwenden, sollte api_version 0 oder 1 sein. Wenn Sie Kafka 0.8.x verwenden, sollte api_version 0 sein.

  • partitioner

    Gibt den Partitioner an, der die Partition aus dem Schlüssel und der Partitionierungsnummer auswählt. syntax: partitioner = function (key, partition_num, correlation_id) end, die correlation_id ist eine automatisch inkrementierende ID im Produzenten. Der Standard-Partitioner ist:

    local function default_partitioner(key, num, correlation_id)
        local id = key and crc32(key) or correlation_id
        -- partition_id ist kontinuierlich und beginnt bei 0
        return id % num
    end
    

Pufferkonfiguration (funktioniert nur producer_type = "async")

  • flush_time

    Gibt das queue.buffering.max.ms an. Standard 1000.

  • batch_num

    Gibt die batch.num.messages an. Standard 200.

  • batch_size

    Gibt die send.buffer.bytes an. Standard 1M (kann 2M erreichen). Seien Sie vorsichtig, sollte kleiner sein als die Konfiguration socket.request.max.bytes / 2 - 10k im Kafka-Server.

  • max_buffering

    Gibt die queue.buffering.max.messages an. Standard 50.000.

  • error_handle

    Gibt den Fehlerbehandler an, der Daten verarbeitet, wenn der Puffer an Kafka sendet, fehlschlägt. syntax: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, die fehlgeschlagenen Nachrichten in der message_queue sind wie { key1, msg1, key2, msg2 }, key in der message_queue ist ein leerer String "", auch wenn der Ursprung nil ist. index ist die Länge der message_queue, sollte nicht #message_queue verwendet werden. Wenn retryable true ist, bedeutet das, dass der Kafka-Server diese Nachrichten sicher nicht bestätigt hat, Sie können sicher erneut senden; und andernfalls bedeutet es vielleicht, dass es empfohlen wird, irgendwo zu protokollieren.

  • wait_on_buffer_full

    Gibt an, ob gewartet werden soll, wenn die Pufferwarteschlange voll ist. Standard false. Wenn die Pufferwarteschlange voll ist, wenn die Option true übergeben wird, wird die Semaphore-Wartefunktion verwendet, um die Coroutine zu blockieren, bis das Timeout erreicht ist oder die Pufferwarteschlange verringert wird, andernfalls wird ein "Pufferüberlauf"-Fehler mit false zurückgegeben. Hinweis: Es kann nicht in Phasen verwendet werden, die keine Yield-Unterstützung haben, d.h. Protokollphase.

  • wait_buffer_timeout

    Gibt die maximale Wartezeit an, wenn der Puffer voll ist. Standard 5 Sekunden.

Derzeit wird keine Kompression unterstützt.

Der dritte optionale cluster_name gibt den Namen des Clusters an, Standard 1 (ja, es ist eine Zahl). Sie können unterschiedliche Namen angeben, wenn Sie zwei oder mehr Kafka-Cluster haben. Und dies funktioniert nur mit async producer_type.

send

syntax: ok, err = p:send(topic, key, message)

  1. Im synchronen Modell

    Im Erfolgsfall gibt es den Offset ( cdata: LL ) des aktuellen Brokers und der Partition zurück. Im Fehlerfall gibt es nil mit einer Zeichenkette zurück, die den Fehler beschreibt.

  2. Im asynchronen Modell

    Die message wird zuerst in den Puffer geschrieben. Sie wird an den Kafka-Server gesendet, wenn der Puffer die batch_num überschreitet, oder alle flush_time wird der Puffer geleert.

    Im Erfolgsfall gibt es true zurück. Im Fehlerfall gibt es nil mit einer Zeichenkette zurück, die den Fehler beschreibt (Pufferüberlauf).

offset

syntax: sum, details = p:offset()

Gibt die Summe aller Topic-Partition-Offsets zurück (von der ProduceRequest-API zurückgegeben);
und die Details jeder Topic-Partition.

flush

syntax: ok = p:flush()

Gibt immer true zurück.

resty.kafka.basic-consumer

Um dieses Modul zu laden, tun Sie einfach dies

    local bconsumer = require "resty.kafka.basic-consumer"

Dieses Modul ist eine minimalistische Implementierung eines Consumers, der die list_offset API zum Abfragen nach Zeit oder zum Abrufen des Start- und End-Offsets und die fetch API zum Abrufen von Nachrichten in einem Thema bereitstellt.

In einem einzelnen Aufruf kann nur die Information einer einzelnen Partition in einem einzelnen Thema abgerufen werden, und das Batch-Abrufen wird derzeit nicht unterstützt. Der grundlegende Consumer unterstützt nicht die API, die mit der Consumer-Gruppe verbunden ist, sodass Sie die Nachricht abrufen müssen, nachdem Sie den Offset über die list_offset API erhalten haben, oder Ihr Dienst kann den Offset selbst verwalten.

Methoden

new

syntax: c = bconsumer:new(broker_list, client_config)

Die broker_list ist eine Liste von Brokern, wie unten

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // optionale Authentifizierung
        "sasl_config": {
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]

Eine optionale client_config-Tabelle kann angegeben werden. Die folgenden Optionen sind wie folgt:

Client-Konfiguration

  • socket_timeout

    Gibt die Netzwerk-Timeout-Schwelle in Millisekunden an. Sollte größer sein als das request_timeout.

  • keepalive_timeout

    Gibt das maximale Leerlauf-Timeout (in Millisekunden) für die Keepalive-Verbindung an.

  • keepalive_size

    Gibt die maximale Anzahl von Verbindungen an, die im Verbindungs-Pool für jeden NGINX-Worker erlaubt sind.

  • refresh_interval

    Gibt die Zeit an, um die Metadaten automatisch in Millisekunden zu aktualisieren. Dann werden die Metadaten nicht automatisch aktualisiert, wenn sie nil sind.

  • ssl

    Gibt an, ob der Client eine SSL-Verbindung verwenden soll. Standardmäßig false. Siehe: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    Gibt an, ob der Client eine SSL-Überprüfung durchführen soll. Standardmäßig false. Siehe: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • isolation_level Diese Einstellung steuert die Sichtbarkeit von Transaktionsdatensätzen. Siehe: https://kafka.apache.org/protocol.html

  • client_rack

    Rack-ID des Consumers, der diese Anfrage stellt. Siehe: https://kafka.apache.org/protocol.html

list_offset

syntax: offset, err = c:list_offset(topic, partition, timestamp)

Der Parameter timestamp kann ein UNIX-Zeitstempel oder eine Konstante sein, die in resty.kafka.protocol.consumer definiert ist, LIST_OFFSET_TIMESTAMP_LAST, LIST_OFFSET_TIMESTAMP_FIRST, LIST_OFFSET_TIMESTAMP_MAX, die verwendet wird, um die Anfangs- und neuesten Offsets usw. abzurufen, Semantik mit der ListOffsets-API in Apache Kafka. Siehe: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets

Im Erfolgsfall gibt es den Offset des angegebenen Falls zurück. Im Fehlerfall gibt es nil mit einer Zeichenkette zurück, die den Fehler beschreibt.

fetch

syntax: result, err = c:fetch(topic, partition, offset)

Im Erfolgsfall gibt es das folgende result des angegebenen Falls zurück. Im Fehlerfall gibt es nil mit einer Zeichenkette zurück, die den Fehler beschreibt.

Das result enthält weitere Informationen wie die Nachrichten:

Fehler

Wenn Sie die in dieser Bibliothek bereitgestellten Module aufrufen, können Sie einige Fehler erhalten. Je nach Quelle können sie in die folgenden Kategorien unterteilt werden.

  • Netzwerkfehler: wie abgelehnte Verbindungen, Verbindungszeitüberschreitungen usw. Sie müssen den Verbindungsstatus jedes Dienstes in Ihrer Umgebung überprüfen.

  • Metadatenbezogene Fehler: wie Metadaten oder ApiVersion-Daten können nicht ordnungsgemäß abgerufen werden; das angegebene Thema oder die Partition existiert nicht usw. Sie müssen den Kafka-Broker und die Client-Konfiguration überprüfen.

  • Fehler, die von Kafka zurückgegeben werden: Manchmal wird Kafka die err_code-Daten in den Antwortdaten einfügen. Wenn dieses Problem auftritt, sieht das err im Rückgabewert so aus: OFFSET_OUT_OF_RANGE, alle Großbuchstaben und durch Unterstriche getrennt, und in der aktuellen Bibliothek bieten wir eine Fehlerliste von Zuordnungen an, die den textlichen Beschreibungen entsprechen. Um mehr über diese Fehler zu erfahren, siehe die Beschreibungen in der Kafka-Dokumentation.

Siehe auch

GitHub

Sie finden möglicherweise zusätzliche Konfigurationstipps und Dokumentationen für dieses Modul im GitHub-Repository für nginx-module-kafka.