Перейти к содержанию

kafka: Lua клиент для Kafka для nginx-module-lua на основе API cosocket

Установка

Если вы еще не настроили подписку на репозиторий RPM, зарегистрируйтесь. После этого вы можете продолжить с следующими шагами.

CentOS/RHEL 7 или Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-kafka

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-kafka

Чтобы использовать эту Lua библиотеку с NGINX, убедитесь, что nginx-module-lua установлен.

Этот документ описывает lua-resty-kafka v0.23, выпущенный 3 ноября 2023 года.


Эта Lua библиотека является драйвером клиента Kafka для модуля ngx_lua nginx:

http://wiki.nginx.org/HttpLuaModule

Эта Lua библиотека использует API cosocket ngx_lua, который обеспечивает 100% неблокирующее поведение.

Обратите внимание, что требуется как минимум ngx_lua 0.9.3 или openresty 1.4.3.7, и, к сожалению, поддерживается только LuaJIT (--with-luajit).

Обратите внимание, что для соединений ssl требуется как минимум ngx_lua 0.9.11 или openresty 1.7.4.1, и, к сожалению, поддерживается только LuaJIT (--with-luajit).

Синопсис

    server {
        location /test {
            content_by_lua '
                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    {
                        host = "127.0.0.1",
                        port = 9092,

                        -- необязательная аутентификация
                        sasl_config = {
                            mechanism = "PLAIN",
                            user = "USERNAME",
                            password = "PASSWORD",
                        },
                    },
                }

                local key = "key"
                local message = "halo world"

                -- обычно мы не используем эту библиотеку напрямую
                local cli = client:new(broker_list)
                local brokers, partitions = cli:fetch_metadata("test")
                if not brokers then
                    ngx.say("fetch_metadata не удалась, ошибка:", partitions)
                end
                ngx.say("брокеры: ", cjson.encode(brokers), "; разделы: ", cjson.encode(partitions))

                -- синхронный producer_type
                local p = producer:new(broker_list)

                local offset, err = p:send("test", key, message)
                if not offset then
                    ngx.say("ошибка отправки:", err)
                    return
                end
                ngx.say("успешно отправлено, смещение: ", tonumber(offset))

                -- это асинхронный producer_type и bp будет повторно использоваться во всем рабочем процессе nginx
                local bp = producer:new(broker_list, { producer_type = "async" })

                local ok, err = bp:send("test", key, message)
                if not ok then
                    ngx.say("ошибка отправки:", err)
                    return
                end

                ngx.say("успешно отправлено, ok:", ok)
            ';
        }
    }

Модули

resty.kafka.client

Чтобы загрузить этот модуль, просто выполните

    local client = require "resty.kafka.client"

Методы

new

синтаксис: c = client:new(broker_list, client_config)

broker_list — это список брокеров, например, ниже

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // необязательная аутентификация
        "sasl_config": {
            // поддерживаемые механизмы: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]
* sasl_config

поддерживаемые механизмы: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.

предупреждение: SCRAM-SHA-256, SCRAM-SHA-512 необходимо установить lua-resty-jit-uuid и lua-resty-openssl.

Можно указать необязательную таблицу client_config. Следующие параметры:

конфигурация клиента

  • socket_timeout

    Указывает сетевой тайм-аут в миллисекундах. ДОЛЖЕН быть больше, чем request_timeout.

  • keepalive_timeout

    Указывает максимальный тайм-аут простоя (в миллисекундах) для соединения keepalive.

  • keepalive_size

    Указывает максимальное количество соединений, разрешенных в пуле соединений для каждого рабочего процесса Nginx.

  • refresh_interval

    Указывает время для автоматического обновления метаданных в миллисекундах. Метаданные не будут автоматически обновляться, если равны nil.

  • ssl

    Указывает, должен ли клиент использовать ssl-соединение. По умолчанию false. См.: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    Указывает, должен ли клиент выполнять проверку SSL. По умолчанию false. См.: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • resolver

    Указывает функцию для разрешения хоста, которая возвращает строку IP или nil, чтобы переопределить системный стандартный разрешатель хостов. По умолчанию nil, разрешение не выполняется. Пример function(host) if host == "some_host" then return "10.11.12.13" end end

fetch_metadata

синтаксис: brokers, partitions = c:fetch_metadata(topic)

В случае успеха возвращает всех брокеров и разделы topic. В случае ошибок возвращает nil с описанием ошибки.

refresh

синтаксис: brokers, partitions = c:refresh()

Это обновит метаданные всех тем, которые были получены с помощью fetch_metadata. В случае успеха возвращает всех брокеров и все разделы всех тем. В случае ошибок возвращает nil с описанием ошибки.

choose_api_version

синтаксис: api_version = c:choose_api_version(api_key, min_version, max_version)

Это помогает клиенту выбрать правильную версию api_key, соответствующую API.

Когда min_version и max_version указаны, они будут действовать как ограничение, и выбранные версии в возвращаемом значении не будут превышать их пределы, независимо от того, насколько высоко или низко брокер поддерживает версию API. Когда они не указаны, будет следовать диапазону версий, поддерживаемых брокером.

Совет: Стратегия выбора версии заключается в том, чтобы выбрать максимальную версию в пределах допустимого диапазона.

resty.kafka.producer

Чтобы загрузить этот модуль, просто выполните

    local producer = require "resty.kafka.producer"

Методы

new

синтаксис: p = producer:new(broker_list, producer_config?, cluster_name?)

Рекомендуется использовать асинхронный producer_type.

broker_list такой же, как в client

Можно указать необязательную таблицу параметров. Следующие параметры:

socket_timeout, keepalive_timeout, keepalive_size, refresh_interval, ssl, ssl_verify такие же, как в client_config

конфигурация продюсера, в основном как в http://kafka.apache.org/documentation.html#producerconfigs

  • producer_type

    Указывает producer.type. "async" или "sync"

  • request_timeout

    Указывает request.timeout.ms. По умолчанию 2000 ms

  • required_acks

    Указывает request.required.acks, НЕ ДОЛЖЕН быть равным нулю. По умолчанию 1.

  • max_retry

    Указывает message.send.max.retries. По умолчанию 3.

  • retry_backoff

    Указывает retry.backoff.ms. По умолчанию 100.

  • api_version

    Указывает версию API продюсера. По умолчанию 0. Если вы используете Kafka 0.10.0.0 или выше, api_version может быть 0, 1 или 2. Если вы используете Kafka 0.9.x, api_version должен быть 0 или 1. Если вы используете Kafka 0.8.x, api_version должен быть 0.

  • partitioner

    Указывает партиционер, который выбирает раздел из ключа и номера раздела. синтаксис: partitioner = function (key, partition_num, correlation_id) end, correlation_id — это автоматически увеличиваемый идентификатор в продюсере. По умолчанию партиционер:

    local function default_partitioner(key, num, correlation_id)
        local id = key and crc32(key) or correlation_id
        -- partition_id непрерывный и начинается с 0
        return id % num
    end
    

конфигурация буфера (работает только для producer_type = "async")

  • flush_time

    Указывает queue.buffering.max.ms. По умолчанию 1000.

  • batch_num

    Указывает batch.num.messages. По умолчанию 200.

  • batch_size

    Указывает send.buffer.bytes. По умолчанию 1M (может достигать 2M). Будьте осторожны, ДОЛЖЕН быть меньше, чем socket.request.max.bytes / 2 - 10k в конфигурации сервера Kafka.

  • max_buffering

    Указывает queue.buffering.max.messages. По умолчанию 50,000.

  • error_handle

    Указывает обработчик ошибок, обрабатывает данные, когда буфер отправки в Kafka вызывает ошибку. синтаксис: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, неудачные сообщения в message_queue выглядят как { key1, msg1, key2, msg2 }, key в message_queue является пустой строкой "", даже если оригинал nil. index — это длина message_queue, не следует использовать #message_queue. когда retryable равно true, это означает, что сервер Kafka определенно не зафиксировал эти сообщения, вы можете безопасно повторить отправку; а в противном случае это может означать, рекомендуется записать это где-нибудь.

  • wait_on_buffer_full

    Указывает, следует ли ждать, когда очередь буфера заполнена, По умолчанию false. Когда очередь буфера заполнена, если параметр передан true, будет использоваться функция ожидания семафора для блокировки корутины до истечения времени ожидания или уменьшения очереди буфера, В противном случае вернется ошибка "переполнение буфера" с false. Обратите внимание, что это не может быть использовано в тех фазах, которые не поддерживают ожидания, т.е. в фазе логирования.

  • wait_buffer_timeout

    Указывает максимальное время ожидания, когда буфер заполнен, По умолчанию 5 секунд.

Сжатие в настоящее время не поддерживается.

Третий необязательный cluster_name указывает имя кластера, по умолчанию 1 (да, это число). Вы можете указать разные имена, когда у вас два или более кластеров Kafka. И это работает только с async producer_type.

send

синтаксис: ok, err = p:send(topic, key, message)

  1. В синхронной модели

    В случае успеха возвращает смещение ( cdata: LL ) текущего брокера и раздела. В случае ошибок возвращает nil с описанием ошибки.

  2. В асинхронной модели

    Сообщение сначала будет записано в буфер. Оно будет отправлено на сервер Kafka, когда буфер превысит batch_num, или каждый flush_time будет очищать буфер.

    В случае успеха возвращает true. В случае ошибок возвращает nil с описанием ошибки (переполнение буфера).

offset

синтаксис: sum, details = p:offset()

Возвращает сумму всех смещений topic-partition (возвращается API ProduceRequest);
и детали каждого topic-partition.

flush

синтаксис: ok = p:flush()

Всегда возвращает true.

resty.kafka.basic-consumer

Чтобы загрузить этот модуль, просто выполните

    local bconsumer = require "resty.kafka.basic-consumer"

Этот модуль является минималистичной реализацией потребителя, предоставляя API list_offset для запросов по времени или получения начального и конечного смещения и API fetch для получения сообщений в теме.

В одном вызове можно получить только информацию о одном разделе в одной теме, и пакетная выборка в настоящее время не поддерживается. Базовый потребитель не поддерживает API, связанные с группой потребителей, поэтому вам нужно получать сообщение после получения смещения через API list_offset, или ваша служба может управлять смещением самостоятельно.

Методы

new

синтаксис: c = bconsumer:new(broker_list, client_config)

broker_list — это список брокеров, например, ниже

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // необязательная аутентификация
        "sasl_config": {
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]

Можно указать необязательную таблицу client_config. Следующие параметры:

конфигурация клиента

  • socket_timeout

    Указывает сетевой тайм-аут в миллисекундах. ДОЛЖЕН быть больше, чем request_timeout.

  • keepalive_timeout

    Указывает максимальный тайм-аут простоя (в миллисекундах) для соединения keepalive.

  • keepalive_size

    Указывает максимальное количество соединений, разрешенных в пуле соединений для каждого рабочего процесса Nginx.

  • refresh_interval

    Указывает время для автоматического обновления метаданных в миллисекундах. Метаданные не будут автоматически обновляться, если равны nil.

  • ssl

    Указывает, должен ли клиент использовать ssl-соединение. По умолчанию false. См.: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    Указывает, должен ли клиент выполнять проверку SSL. По умолчанию false. См.: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • isolation_level Этот параметр управляет видимостью транзакционных записей. См.: https://kafka.apache.org/protocol.html

  • client_rack

    Идентификатор стойки потребителя, делающего этот запрос. См.: https://kafka.apache.org/protocol.html

list_offset

синтаксис: offset, err = c:list_offset(topic, partition, timestamp)

Параметр timestamp может быть временной меткой UNIX или константой, определенной в resty.kafka.protocol.consumer, LIST_OFFSET_TIMESTAMP_LAST, LIST_OFFSET_TIMESTAMP_FIRST, LIST_OFFSET_TIMESTAMP_MAX, используемой для получения начальных и последних смещений и т.д., семантика с API ListOffsets в Apache Kafka. См.: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets

В случае успеха возвращает смещение указанного случая. В случае ошибок возвращает nil с описанием ошибки.

fetch

синтаксис: result, err = c:fetch(topic, partition, offset)

В случае успеха возвращает следующий result указанного случая. В случае ошибок возвращает nil с описанием ошибки.

result будет содержать больше информации, такой как сообщения:

Ошибки

Когда вы вызываете модули, предоставленные в этой библиотеке, вы можете получить некоторые ошибки. В зависимости от источника их можно разделить на следующие категории.

  • Сетевые ошибки: такие как отказ в соединении, тайм-аут соединения и т.д. Вам нужно проверить состояние соединения каждого сервиса в вашей среде.

  • Ошибки, связанные с метаданными: такие как метаданные или данные ApiVersion не могут быть правильно получены; указанная тема или раздел не существуют и т.д. Вам нужно проверить конфигурацию брокера Kafka и клиента.

  • Ошибка, возвращаемая Kafka: иногда Kafka будет включать данные err_code в ответные данные. Когда эта проблема возникает, err в возвращаемом значении выглядит так OFFSET_OUT_OF_RANGE, все символы в верхнем регистре и разделены подчеркиваниями, и в текущей библиотеке мы предоставляем список ошибок соответствий с текстовыми описаниями. Чтобы узнать больше об этих ошибках, смотрите описания в документации Kafka.

Смотрите также

GitHub

Вы можете найти дополнительные советы по конфигурации и документацию для этого модуля в репозитории GitHub для nginx-module-kafka.