kafka: Lua клиент для Kafka для nginx-module-lua на основе API cosocket
Установка
Если вы еще не настроили подписку на репозиторий RPM, зарегистрируйтесь. После этого вы можете продолжить с следующими шагами.
CentOS/RHEL 7 или Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-kafka
CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-kafka
Чтобы использовать эту Lua библиотеку с NGINX, убедитесь, что nginx-module-lua установлен.
Этот документ описывает lua-resty-kafka v0.23, выпущенный 3 ноября 2023 года.
Эта Lua библиотека является драйвером клиента Kafka для модуля ngx_lua nginx:
http://wiki.nginx.org/HttpLuaModule
Эта Lua библиотека использует API cosocket ngx_lua, который обеспечивает 100% неблокирующее поведение.
Обратите внимание, что требуется как минимум ngx_lua 0.9.3 или openresty 1.4.3.7, и, к сожалению, поддерживается только LuaJIT (--with-luajit).
Обратите внимание, что для соединений ssl требуется как минимум ngx_lua 0.9.11 или openresty 1.7.4.1, и, к сожалению, поддерживается только LuaJIT (--with-luajit).
Синопсис
server {
location /test {
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{
host = "127.0.0.1",
port = 9092,
-- необязательная аутентификация
sasl_config = {
mechanism = "PLAIN",
user = "USERNAME",
password = "PASSWORD",
},
},
}
local key = "key"
local message = "halo world"
-- обычно мы не используем эту библиотеку напрямую
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata не удалась, ошибка:", partitions)
end
ngx.say("брокеры: ", cjson.encode(brokers), "; разделы: ", cjson.encode(partitions))
-- синхронный producer_type
local p = producer:new(broker_list)
local offset, err = p:send("test", key, message)
if not offset then
ngx.say("ошибка отправки:", err)
return
end
ngx.say("успешно отправлено, смещение: ", tonumber(offset))
-- это асинхронный producer_type и bp будет повторно использоваться во всем рабочем процессе nginx
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test", key, message)
if not ok then
ngx.say("ошибка отправки:", err)
return
end
ngx.say("успешно отправлено, ok:", ok)
';
}
}
Модули
resty.kafka.client
Чтобы загрузить этот модуль, просто выполните
local client = require "resty.kafka.client"
Методы
new
синтаксис: c = client:new(broker_list, client_config)
broker_list — это список брокеров, например, ниже
[
{
"host": "127.0.0.1",
"port": 9092,
// необязательная аутентификация
"sasl_config": {
// поддерживаемые механизмы: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
sasl_config
поддерживаемые механизмы: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.
предупреждение: SCRAM-SHA-256, SCRAM-SHA-512 необходимо установить lua-resty-jit-uuid и lua-resty-openssl.
Можно указать необязательную таблицу client_config. Следующие параметры:
конфигурация клиента
-
socket_timeoutУказывает сетевой тайм-аут в миллисекундах. ДОЛЖЕН быть больше, чем
request_timeout. -
keepalive_timeoutУказывает максимальный тайм-аут простоя (в миллисекундах) для соединения keepalive.
-
keepalive_sizeУказывает максимальное количество соединений, разрешенных в пуле соединений для каждого рабочего процесса Nginx.
-
refresh_intervalУказывает время для автоматического обновления метаданных в миллисекундах. Метаданные не будут автоматически обновляться, если равны nil.
-
sslУказывает, должен ли клиент использовать ssl-соединение. По умолчанию false. См.: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
ssl_verifyУказывает, должен ли клиент выполнять проверку SSL. По умолчанию false. См.: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
resolverУказывает функцию для разрешения хоста, которая возвращает строку IP или
nil, чтобы переопределить системный стандартный разрешатель хостов. По умолчаниюnil, разрешение не выполняется. Примерfunction(host) if host == "some_host" then return "10.11.12.13" end end
fetch_metadata
синтаксис: brokers, partitions = c:fetch_metadata(topic)
В случае успеха возвращает всех брокеров и разделы topic.
В случае ошибок возвращает nil с описанием ошибки.
refresh
синтаксис: brokers, partitions = c:refresh()
Это обновит метаданные всех тем, которые были получены с помощью fetch_metadata.
В случае успеха возвращает всех брокеров и все разделы всех тем.
В случае ошибок возвращает nil с описанием ошибки.
choose_api_version
синтаксис: api_version = c:choose_api_version(api_key, min_version, max_version)
Это помогает клиенту выбрать правильную версию api_key, соответствующую API.
Когда min_version и max_version указаны, они будут действовать как ограничение, и выбранные версии в возвращаемом значении не будут превышать их пределы, независимо от того, насколько высоко или низко брокер поддерживает версию API. Когда они не указаны, будет следовать диапазону версий, поддерживаемых брокером.
Совет: Стратегия выбора версии заключается в том, чтобы выбрать максимальную версию в пределах допустимого диапазона.
resty.kafka.producer
Чтобы загрузить этот модуль, просто выполните
local producer = require "resty.kafka.producer"
Методы
new
синтаксис: p = producer:new(broker_list, producer_config?, cluster_name?)
Рекомендуется использовать асинхронный producer_type.
broker_list такой же, как в client
Можно указать необязательную таблицу параметров. Следующие параметры:
socket_timeout, keepalive_timeout, keepalive_size, refresh_interval, ssl, ssl_verify такие же, как в client_config
конфигурация продюсера, в основном как в http://kafka.apache.org/documentation.html#producerconfigs
-
producer_typeУказывает
producer.type. "async" или "sync" -
request_timeoutУказывает
request.timeout.ms. По умолчанию2000 ms -
required_acksУказывает
request.required.acks, НЕ ДОЛЖЕН быть равным нулю. По умолчанию1. -
max_retryУказывает
message.send.max.retries. По умолчанию3. -
retry_backoffУказывает
retry.backoff.ms. По умолчанию100. -
api_versionУказывает версию API продюсера. По умолчанию
0. Если вы используете Kafka 0.10.0.0 или выше,api_versionможет быть0,1или2. Если вы используете Kafka 0.9.x,api_versionдолжен быть0или1. Если вы используете Kafka 0.8.x,api_versionдолжен быть0. -
partitionerУказывает партиционер, который выбирает раздел из ключа и номера раздела.
синтаксис: partitioner = function (key, partition_num, correlation_id) end, correlation_id — это автоматически увеличиваемый идентификатор в продюсере. По умолчанию партиционер:local function default_partitioner(key, num, correlation_id) local id = key and crc32(key) or correlation_id -- partition_id непрерывный и начинается с 0 return id % num end
конфигурация буфера (работает только для producer_type = "async")
-
flush_timeУказывает
queue.buffering.max.ms. По умолчанию1000. -
batch_numУказывает
batch.num.messages. По умолчанию200. -
batch_sizeУказывает
send.buffer.bytes. По умолчанию1M(может достигать 2M). Будьте осторожны, ДОЛЖЕН быть меньше, чемsocket.request.max.bytes / 2 - 10kв конфигурации сервера Kafka. -
max_bufferingУказывает
queue.buffering.max.messages. По умолчанию50,000. -
error_handleУказывает обработчик ошибок, обрабатывает данные, когда буфер отправки в Kafka вызывает ошибку.
синтаксис: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, неудачные сообщения в message_queue выглядят как{ key1, msg1, key2, msg2 },keyв message_queue является пустой строкой"", даже если оригиналnil.index— это длина message_queue, не следует использовать#message_queue. когдаretryableравноtrue, это означает, что сервер Kafka определенно не зафиксировал эти сообщения, вы можете безопасно повторить отправку; а в противном случае это может означать, рекомендуется записать это где-нибудь. -
wait_on_buffer_fullУказывает, следует ли ждать, когда очередь буфера заполнена, По умолчанию
false. Когда очередь буфера заполнена, если параметр переданtrue, будет использоваться функция ожидания семафора для блокировки корутины до истечения времени ожидания или уменьшения очереди буфера, В противном случае вернется ошибка "переполнение буфера" сfalse. Обратите внимание, что это не может быть использовано в тех фазах, которые не поддерживают ожидания, т.е. в фазе логирования. -
wait_buffer_timeoutУказывает максимальное время ожидания, когда буфер заполнен, По умолчанию
5секунд.
Сжатие в настоящее время не поддерживается.
Третий необязательный cluster_name указывает имя кластера, по умолчанию 1 (да, это число). Вы можете указать разные имена, когда у вас два или более кластеров Kafka. И это работает только с async producer_type.
send
синтаксис: ok, err = p:send(topic, key, message)
-
В синхронной модели
В случае успеха возвращает смещение ( cdata: LL ) текущего брокера и раздела. В случае ошибок возвращает
nilс описанием ошибки. -
В асинхронной модели
Сообщение сначала будет записано в буфер. Оно будет отправлено на сервер Kafka, когда буфер превысит
batch_num, или каждыйflush_timeбудет очищать буфер.В случае успеха возвращает
true. В случае ошибок возвращаетnilс описанием ошибки (переполнение буфера).
offset
синтаксис: sum, details = p:offset()
Возвращает сумму всех смещений topic-partition (возвращается API ProduceRequest);
и детали каждого topic-partition.
flush
синтаксис: ok = p:flush()
Всегда возвращает true.
resty.kafka.basic-consumer
Чтобы загрузить этот модуль, просто выполните
local bconsumer = require "resty.kafka.basic-consumer"
Этот модуль является минималистичной реализацией потребителя, предоставляя API list_offset для запросов по времени или получения начального и конечного смещения и API fetch для получения сообщений в теме.
В одном вызове можно получить только информацию о одном разделе в одной теме, и пакетная выборка в настоящее время не поддерживается. Базовый потребитель не поддерживает API, связанные с группой потребителей, поэтому вам нужно получать сообщение после получения смещения через API list_offset, или ваша служба может управлять смещением самостоятельно.
Методы
new
синтаксис: c = bconsumer:new(broker_list, client_config)
broker_list — это список брокеров, например, ниже
[
{
"host": "127.0.0.1",
"port": 9092,
// необязательная аутентификация
"sasl_config": {
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
Можно указать необязательную таблицу client_config. Следующие параметры:
конфигурация клиента
-
socket_timeoutУказывает сетевой тайм-аут в миллисекундах. ДОЛЖЕН быть больше, чем
request_timeout. -
keepalive_timeoutУказывает максимальный тайм-аут простоя (в миллисекундах) для соединения keepalive.
-
keepalive_sizeУказывает максимальное количество соединений, разрешенных в пуле соединений для каждого рабочего процесса Nginx.
-
refresh_intervalУказывает время для автоматического обновления метаданных в миллисекундах. Метаданные не будут автоматически обновляться, если равны nil.
-
sslУказывает, должен ли клиент использовать ssl-соединение. По умолчанию false. См.: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
ssl_verifyУказывает, должен ли клиент выполнять проверку SSL. По умолчанию false. См.: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
isolation_levelЭтот параметр управляет видимостью транзакционных записей. См.: https://kafka.apache.org/protocol.html -
client_rackИдентификатор стойки потребителя, делающего этот запрос. См.: https://kafka.apache.org/protocol.html
list_offset
синтаксис: offset, err = c:list_offset(topic, partition, timestamp)
Параметр timestamp может быть временной меткой UNIX или константой, определенной в resty.kafka.protocol.consumer, LIST_OFFSET_TIMESTAMP_LAST, LIST_OFFSET_TIMESTAMP_FIRST, LIST_OFFSET_TIMESTAMP_MAX, используемой для получения начальных и последних смещений и т.д., семантика с API ListOffsets в Apache Kafka. См.: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets
В случае успеха возвращает смещение указанного случая.
В случае ошибок возвращает nil с описанием ошибки.
fetch
синтаксис: result, err = c:fetch(topic, partition, offset)
В случае успеха возвращает следующий result указанного случая.
В случае ошибок возвращает nil с описанием ошибки.
result будет содержать больше информации, такой как сообщения:
-
recordsТаблица, содержащая содержимое сообщения.
-
errcodeКод ошибки API Fetch. См.: https://kafka.apache.org/protocol.html#protocol_error_codes
-
high_watermarkВысокая отметка API Fetch. См.: https://kafka.apache.org/protocol.html#The_Messages_Fetch
-
last_stable_offsetПоследнее стабильное смещение API Fetch. Содержимое зависит от версии API, может быть nil. См.: https://kafka.apache.org/protocol.html#The_Messages_Fetch, если версия API выше v4
-
log_start_offsetНачальное смещение журнала API Fetch. Содержимое зависит от версии API, может быть nil. См.: https://kafka.apache.org/protocol.html#The_Messages_Fetch, если версия API выше v5
-
aborted_transactionsПрерванные транзакции API Fetch. Содержимое зависит от версии API, может быть nil. См.: https://kafka.apache.org/protocol.html#The_Messages_Fetch, если версия API выше v4
-
preferred_read_replicaПредпочитаемая реплика чтения API Fetch. Содержимое зависит от версии API, может быть nil. См.: https://kafka.apache.org/protocol.html#The_Messages_Fetch, если версия API выше v11
Ошибки
Когда вы вызываете модули, предоставленные в этой библиотеке, вы можете получить некоторые ошибки. В зависимости от источника их можно разделить на следующие категории.
-
Сетевые ошибки: такие как отказ в соединении, тайм-аут соединения и т.д. Вам нужно проверить состояние соединения каждого сервиса в вашей среде.
-
Ошибки, связанные с метаданными: такие как метаданные или данные ApiVersion не могут быть правильно получены; указанная тема или раздел не существуют и т.д. Вам нужно проверить конфигурацию брокера Kafka и клиента.
-
Ошибка, возвращаемая Kafka: иногда Kafka будет включать данные err_code в ответные данные. Когда эта проблема возникает,
errв возвращаемом значении выглядит такOFFSET_OUT_OF_RANGE, все символы в верхнем регистре и разделены подчеркиваниями, и в текущей библиотеке мы предоставляем список ошибок соответствий с текстовыми описаниями. Чтобы узнать больше об этих ошибках, смотрите описания в документации Kafka.
Смотрите также
- модуль ngx_lua: http://wiki.nginx.org/HttpLuaModule
- протокол kafka: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
- библиотека lua-resty-redis
- библиотека lua-resty-logger-socket
- sarama
GitHub
Вы можете найти дополнительные советы по конфигурации и документацию для этого модуля в репозитории GitHub для nginx-module-kafka.