kafka: 基于 cosocket API 的 NGINX 模块 Lua Kafka 客户端驱动
安装
如果您尚未设置 RPM 仓库订阅,请 注册。然后您可以继续以下步骤。
CentOS/RHEL 7 或 Amazon Linux 2
yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-kafka
CentOS/RHEL 8+、Fedora Linux、Amazon Linux 2023
dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-kafka
要在 NGINX 中使用此 Lua 库,请确保已安装 nginx-module-lua。
本文档描述了 lua-resty-kafka v0.23,于 2023 年 11 月 03 日发布。
此 Lua 库是 ngx_lua NGINX 模块的 Kafka 客户端驱动:
http://wiki.nginx.org/HttpLuaModule
此 Lua 库利用了 ngx_lua 的 cosocket API,确保 100% 非阻塞行为。
请注意,至少需要 ngx_lua 0.9.3 或 openresty 1.4.3.7,并且不幸的是仅支持 LuaJIT (--with-luajit)。
关于 ssl 连接,至少需要 ngx_lua 0.9.11 或 openresty 1.7.4.1,并且不幸的是仅支持 LuaJIT (--with-luajit)。
概述
server {
location /test {
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{
host = "127.0.0.1",
port = 9092,
-- 可选的身份验证
sasl_config = {
mechanism = "PLAIN",
user = "USERNAME",
password = "PASSWORD",
},
},
}
local key = "key"
local message = "halo world"
-- 通常我们不会直接使用这个库
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata 失败,错误:", partitions)
end
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- 同步 producer_type
local p = producer:new(broker_list)
local offset, err = p:send("test", key, message)
if not offset then
ngx.say("发送错误:", err)
return
end
ngx.say("发送成功,偏移量: ", tonumber(offset))
-- 这是异步 producer_type,bp 将在整个 nginx worker 中重用
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test", key, message)
if not ok then
ngx.say("发送错误:", err)
return
end
ngx.say("发送成功,ok:", ok)
';
}
}
模块
resty.kafka.client
要加载此模块,只需执行以下操作
local client = require "resty.kafka.client"
方法
new
语法: c = client:new(broker_list, client_config)
broker_list 是一个代理列表,如下所示
[
{
"host": "127.0.0.1",
"port": 9092,
// 可选的身份验证
"sasl_config": {
// 支持的机制: PLAIN、SCRAM-SHA-256、SCRAM-SHA-512
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
sasl_config
支持的机制: PLAIN、SCRAM-SHA-256、SCRAM-SHA-512。
警告: SCRAM-SHA-256、SCRAM-SHA-512 需要安装 lua-resty-jit-uuid 和 lua-resty-openssl。
可以指定一个可选的 client_config 表。以下选项如下:
客户端配置
-
socket_timeout指定网络超时阈值(以毫秒为单位)。应该 大于
request_timeout。 -
keepalive_timeout指定保持活动连接的最大空闲超时(以毫秒为单位)。
-
keepalive_size指定每个 Nginx worker 允许的连接池中的最大连接数。
-
refresh_interval指定自动刷新元数据的时间(以毫秒为单位)。如果为 nil,则元数据将不会自动刷新。
-
ssl指定客户端是否应使用 ssl 连接。默认为 false。请参见: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
ssl_verify指定客户端是否应执行 SSL 验证。默认为 false。请参见: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
resolver指定一个用于主机解析的函数,该函数返回一个 IP 字符串或
nil,以覆盖系统默认的主机解析器。默认为nil,不执行解析。示例function(host) if host == "some_host" then return "10.11.12.13" end end
fetch_metadata
语法: brokers, partitions = c:fetch_metadata(topic)
成功时,返回 topic 的所有 brokers 和 partitions。
在错误情况下,返回 nil 和描述错误的字符串。
refresh
语法: brokers, partitions = c:refresh()
这将刷新所有通过 fetch_metadata 获取的主题的元数据。
成功时,返回所有主题的所有 brokers 和所有 partitions。
在错误情况下,返回 nil 和描述错误的字符串。
choose_api_version
语法: api_version = c:choose_api_version(api_key, min_version, max_version)
这有助于客户端选择与 API 相对应的 api_key 的正确版本。
当提供 min_version 和 max_version 时,它将作为限制,返回值中选择的版本不会超过其限制,无论代理支持的 API 版本多高或多低。当未提供时,它将遵循代理支持的版本范围。
提示: 版本选择策略是在允许范围内选择最大版本。
resty.kafka.producer
要加载此模块,只需执行以下操作
local producer = require "resty.kafka.producer"
方法
new
语法: p = producer:new(broker_list, producer_config?, cluster_name?)
建议使用异步 producer_type。
broker_list 与 client 中相同。
可以指定一个可选的选项表。以下选项如下:
socket_timeout、keepalive_timeout、keepalive_size、refresh_interval、ssl、ssl_verify 与 client_config 中相同。
生产者配置,类似于 http://kafka.apache.org/documentation.html#producerconfigs
-
producer_type指定
producer.type。可以是 "async" 或 "sync"。 -
request_timeout指定
request.timeout.ms。默认为2000 ms。 -
required_acks指定
request.required.acks,不应 为零。默认为1。 -
max_retry指定
message.send.max.retries。默认为3。 -
retry_backoff指定
retry.backoff.ms。默认为100。 -
api_version指定生产 API 版本。默认为
0。 如果您使用 Kafka 0.10.0.0 或更高版本,api_version可以使用0、1或2。 如果您使用 Kafka 0.9.x,api_version应为0或1。 如果您使用 Kafka 0.8.x,api_version应为0。 -
partitioner指定根据键和分区数选择分区的分区器。
语法: partitioner = function (key, partition_num, correlation_id) end, correlation_id 是生产者中的自增 ID。默认的分区器是:local function default_partitioner(key, num, correlation_id) local id = key and crc32(key) or correlation_id -- partition_id 是连续的,从 0 开始 return id % num end
缓冲配置(仅在 producer_type = "async" 时有效)
-
flush_time指定
queue.buffering.max.ms。默认为1000。 -
batch_num指定
batch.num.messages。默认为200。 -
batch_size指定
send.buffer.bytes。默认为1M(可能达到 2M)。 请注意,应 小于 Kafka 服务器中的socket.request.max.bytes / 2 - 10k配置。 -
max_buffering指定
queue.buffering.max.messages。默认为50,000。 -
error_handle指定错误处理,当缓冲区发送到 Kafka 时出错时处理数据。
语法: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, message_queue 中的失败消息类似于{ key1, msg1, key2, msg2 }, message_queue 中的key是空字符串"",即使原始值为nil。index是 message_queue 的长度,不能使用#message_queue。 当retryable为true时,表示 Kafka 服务器肯定没有提交这些消息,您可以安全地重试发送; 否则表示可能,建议记录到某处。 -
wait_on_buffer_full指定在缓冲区队列满时是否等待,默认为
false。 当缓冲区队列满时,如果选项传递为true, 将使用信号量等待函数阻塞协程,直到超时或缓冲区队列减少, 否则,返回 "buffer overflow" 错误和false。 注意,这不能在不支持 yield 的阶段使用,即日志阶段。 -
wait_buffer_timeout指定缓冲区满时的最大等待时间,默认为
5秒。
目前不支持压缩。
第三个可选的 cluster_name 指定集群的名称,默认为 1(是的,它是数字)。当您有两个或更多 Kafka 集群时,可以指定不同的名称。并且这仅适用于 async producer_type。
send
语法: ok, err = p:send(topic, key, message)
-
在同步模型中
成功时,返回当前代理和分区的偏移量 ( cdata: LL )。 在错误情况下,返回
nil和描述错误的字符串。 -
在异步模型中
message将首先写入缓冲区。 当缓冲区超过batch_num时,将发送到 Kafka 服务器, 或每flush_time刷新缓冲区。成功时,返回
true。 在错误情况下,返回nil和描述错误的字符串(buffer overflow)。
offset
语法: sum, details = p:offset()
返回所有主题-分区偏移量的总和(由 ProduceRequest API 返回);
以及每个主题-分区的详细信息。
flush
语法: ok = p:flush()
始终返回 true。
resty.kafka.basic-consumer
要加载此模块,只需执行以下操作
local bconsumer = require "resty.kafka.basic-consumer"
此模块是消费者的简化实现,提供 list_offset API 以按时间查询或获取起始和结束偏移量,以及 fetch API 以获取主题中的消息。
在单次调用中,仅能获取单个主题中单个分区的信息,目前不支持批量获取。基本消费者不支持与消费者组相关的 API,因此您需要在通过 list_offset API 获取偏移量后获取消息,或者您的服务可以自行管理偏移量。
方法
new
语法: c = bconsumer:new(broker_list, client_config)
broker_list 是一个代理列表,如下所示
[
{
"host": "127.0.0.1",
"port": 9092,
// 可选的身份验证
"sasl_config": {
"mechanism": "PLAIN",
"user": "USERNAME",
"password": "PASSWORD"
}
}
]
可以指定一个可选的 client_config 表。以下选项如下:
客户端配置
-
socket_timeout指定网络超时阈值(以毫秒为单位)。应该 大于
request_timeout。 -
keepalive_timeout指定保持活动连接的最大空闲超时(以毫秒为单位)。
-
keepalive_size指定每个 Nginx worker 允许的连接池中的最大连接数。
-
refresh_interval指定自动刷新元数据的时间(以毫秒为单位)。如果为 nil,则元数据将不会自动刷新。
-
ssl指定客户端是否应使用 ssl 连接。默认为 false。请参见: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
ssl_verify指定客户端是否应执行 SSL 验证。默认为 false。请参见: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
-
isolation_level此设置控制事务记录的可见性。请参见: https://kafka.apache.org/protocol.html -
client_rack发出此请求的消费者的机架 ID。请参见: https://kafka.apache.org/protocol.html
list_offset
语法: offset, err = c:list_offset(topic, partition, timestamp)
参数 timestamp 可以是 UNIX 时间戳或在 resty.kafka.protocol.consumer 中定义的常量 LIST_OFFSET_TIMESTAMP_LAST、LIST_OFFSET_TIMESTAMP_FIRST、LIST_OFFSET_TIMESTAMP_MAX,用于获取初始和最新的偏移量等,与 Apache Kafka 中的 ListOffsets API 语义相同。请参见: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets
成功时,返回指定情况的偏移量。
在错误情况下,返回 nil 和描述错误的字符串。
fetch
语法: result, err = c:fetch(topic, partition, offset)
成功时,返回指定情况的以下 result。
在错误情况下,返回 nil 和描述错误的字符串。
result 将包含更多信息,例如消息:
-
records包含消息内容的表。
-
errcodeFetch API 的错误代码。请参见: https://kafka.apache.org/protocol.html#protocol_error_codes
-
high_watermarkFetch API 的高水位线。请参见: https://kafka.apache.org/protocol.html#The_Messages_Fetch
-
last_stable_offsetFetch API 的最后稳定偏移量。内容取决于 API 版本,可能为 nil。请参见: https://kafka.apache.org/protocol.html#The_Messages_Fetch,该响应 API 版本高于 v4。
-
log_start_offsetFetch API 的日志起始偏移量。内容取决于 API 版本,可能为 nil。请参见: https://kafka.apache.org/protocol.html#The_Messages_Fetch,该响应 API 版本高于 v5。
-
aborted_transactionsFetch API 的已中止事务。内容取决于 API 版本,可能为 nil。请参见: https://kafka.apache.org/protocol.html#The_Messages_Fetch,该响应 API 版本高于 v4。
-
preferred_read_replicaFetch API 的首选读取副本。内容取决于 API 版本,可能为 nil。请参见: https://kafka.apache.org/protocol.html#The_Messages_Fetch,该响应 API 版本高于 v11。
错误
当您调用此库中提供的模块时,可能会遇到一些错误。 根据来源,它们可以分为以下几类。
-
网络错误:例如连接被拒绝、连接超时等。您需要检查环境中每个服务的连接状态。
-
元数据相关错误:例如无法正确检索元数据或 ApiVersion 数据;指定的主题或分区不存在等。您需要检查 Kafka Broker 和客户端配置。
-
Kafka 返回的错误:有时 Kafka 会在响应数据中包含 err_code 数据,当此问题发生时,返回值中的
err看起来像这样OFFSET_OUT_OF_RANGE,全部为大写字符,并用下划线分隔,在当前库中我们提供了 错误映射列表 对应于文本描述。要了解有关这些错误的更多信息,请参阅 Kafka 文档。
另见
- ngx_lua 模块: http://wiki.nginx.org/HttpLuaModule
- kafka 协议: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
- lua-resty-redis 库
- lua-resty-logger-socket 库
- sarama
GitHub
您可以在 nginx-module-kafka 的 GitHub 仓库 中找到此模块的其他配置提示和文档。