跳转至

kafka: 基于 cosocket API 的 NGINX 模块 Lua Kafka 客户端驱动

安装

如果您尚未设置 RPM 仓库订阅,请 注册。然后您可以继续以下步骤。

CentOS/RHEL 7 或 Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-kafka

CentOS/RHEL 8+、Fedora Linux、Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-kafka

要在 NGINX 中使用此 Lua 库,请确保已安装 nginx-module-lua

本文档描述了 lua-resty-kafka v0.23,于 2023 年 11 月 03 日发布。


此 Lua 库是 ngx_lua NGINX 模块的 Kafka 客户端驱动:

http://wiki.nginx.org/HttpLuaModule

此 Lua 库利用了 ngx_lua 的 cosocket API,确保 100% 非阻塞行为。

请注意,至少需要 ngx_lua 0.9.3openresty 1.4.3.7,并且不幸的是仅支持 LuaJIT (--with-luajit)。

关于 ssl 连接,至少需要 ngx_lua 0.9.11openresty 1.7.4.1,并且不幸的是仅支持 LuaJIT (--with-luajit)。

概述

    server {
        location /test {
            content_by_lua '
                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    {
                        host = "127.0.0.1",
                        port = 9092,

                        -- 可选的身份验证
                        sasl_config = {
                            mechanism = "PLAIN",
                            user = "USERNAME",
                            password = "PASSWORD",
                        },
                    },
                }

                local key = "key"
                local message = "halo world"

                -- 通常我们不会直接使用这个库
                local cli = client:new(broker_list)
                local brokers, partitions = cli:fetch_metadata("test")
                if not brokers then
                    ngx.say("fetch_metadata 失败,错误:", partitions)
                end
                ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))


                -- 同步 producer_type
                local p = producer:new(broker_list)

                local offset, err = p:send("test", key, message)
                if not offset then
                    ngx.say("发送错误:", err)
                    return
                end
                ngx.say("发送成功,偏移量: ", tonumber(offset))

                -- 这是异步 producer_type,bp 将在整个 nginx worker 中重用
                local bp = producer:new(broker_list, { producer_type = "async" })

                local ok, err = bp:send("test", key, message)
                if not ok then
                    ngx.say("发送错误:", err)
                    return
                end

                ngx.say("发送成功,ok:", ok)
            ';
        }
    }

模块

resty.kafka.client

要加载此模块,只需执行以下操作

    local client = require "resty.kafka.client"

方法

new

语法: c = client:new(broker_list, client_config)

broker_list 是一个代理列表,如下所示

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // 可选的身份验证
        "sasl_config": {
            // 支持的机制: PLAIN、SCRAM-SHA-256、SCRAM-SHA-512
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]
* sasl_config

支持的机制: PLAIN、SCRAM-SHA-256、SCRAM-SHA-512。

警告: SCRAM-SHA-256、SCRAM-SHA-512 需要安装 lua-resty-jit-uuid 和 lua-resty-openssl。

可以指定一个可选的 client_config 表。以下选项如下:

客户端配置

  • socket_timeout

    指定网络超时阈值(以毫秒为单位)。应该 大于 request_timeout

  • keepalive_timeout

    指定保持活动连接的最大空闲超时(以毫秒为单位)。

  • keepalive_size

    指定每个 Nginx worker 允许的连接池中的最大连接数。

  • refresh_interval

    指定自动刷新元数据的时间(以毫秒为单位)。如果为 nil,则元数据将不会自动刷新。

  • ssl

    指定客户端是否应使用 ssl 连接。默认为 false。请参见: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • ssl_verify

    指定客户端是否应执行 SSL 验证。默认为 false。请参见: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake

  • resolver

    指定一个用于主机解析的函数,该函数返回一个 IP 字符串或 nil,以覆盖系统默认的主机解析器。默认为 nil,不执行解析。示例 function(host) if host == "some_host" then return "10.11.12.13" end end

fetch_metadata

语法: brokers, partitions = c:fetch_metadata(topic)

成功时,返回 topic 的所有 brokers 和 partitions。 在错误情况下,返回 nil 和描述错误的字符串。

refresh

语法: brokers, partitions = c:refresh()

这将刷新所有通过 fetch_metadata 获取的主题的元数据。 成功时,返回所有主题的所有 brokers 和所有 partitions。 在错误情况下,返回 nil 和描述错误的字符串。

choose_api_version

语法: api_version = c:choose_api_version(api_key, min_version, max_version)

这有助于客户端选择与 API 相对应的 api_key 的正确版本。

当提供 min_versionmax_version 时,它将作为限制,返回值中选择的版本不会超过其限制,无论代理支持的 API 版本多高或多低。当未提供时,它将遵循代理支持的版本范围。

提示: 版本选择策略是在允许范围内选择最大版本。

resty.kafka.producer

要加载此模块,只需执行以下操作

    local producer = require "resty.kafka.producer"

方法

new

语法: p = producer:new(broker_list, producer_config?, cluster_name?)

建议使用异步 producer_type。

broker_listclient 中相同。

可以指定一个可选的选项表。以下选项如下:

socket_timeoutkeepalive_timeoutkeepalive_sizerefresh_intervalsslssl_verifyclient_config 中相同。

生产者配置,类似于 http://kafka.apache.org/documentation.html#producerconfigs

  • producer_type

    指定 producer.type。可以是 "async" 或 "sync"。

  • request_timeout

    指定 request.timeout.ms。默认为 2000 ms

  • required_acks

    指定 request.required.acks不应 为零。默认为 1

  • max_retry

    指定 message.send.max.retries。默认为 3

  • retry_backoff

    指定 retry.backoff.ms。默认为 100

  • api_version

    指定生产 API 版本。默认为 0。 如果您使用 Kafka 0.10.0.0 或更高版本,api_version 可以使用 012。 如果您使用 Kafka 0.9.x,api_version 应为 01。 如果您使用 Kafka 0.8.x,api_version 应为 0

  • partitioner

    指定根据键和分区数选择分区的分区器。 语法: partitioner = function (key, partition_num, correlation_id) end, correlation_id 是生产者中的自增 ID。默认的分区器是:

    local function default_partitioner(key, num, correlation_id)
        local id = key and crc32(key) or correlation_id
        -- partition_id 是连续的,从 0 开始
        return id % num
    end
    

缓冲配置(仅在 producer_type = "async" 时有效)

  • flush_time

    指定 queue.buffering.max.ms。默认为 1000

  • batch_num

    指定 batch.num.messages。默认为 200

  • batch_size

    指定 send.buffer.bytes。默认为 1M(可能达到 2M)。 请注意, 小于 Kafka 服务器中的 socket.request.max.bytes / 2 - 10k 配置。

  • max_buffering

    指定 queue.buffering.max.messages。默认为 50,000

  • error_handle

    指定错误处理,当缓冲区发送到 Kafka 时出错时处理数据。 语法: error_handle = function (topic, partition_id, message_queue, index, err, retryable) end, message_queue 中的失败消息类似于 { key1, msg1, key2, msg2 }, message_queue 中的 key 是空字符串 "",即使原始值为 nilindex 是 message_queue 的长度,不能使用 #message_queue。 当 retryabletrue 时,表示 Kafka 服务器肯定没有提交这些消息,您可以安全地重试发送; 否则表示可能,建议记录到某处。

  • wait_on_buffer_full

    指定在缓冲区队列满时是否等待,默认为 false。 当缓冲区队列满时,如果选项传递为 true, 将使用信号量等待函数阻塞协程,直到超时或缓冲区队列减少, 否则,返回 "buffer overflow" 错误和 false。 注意,这不能在不支持 yield 的阶段使用,即日志阶段。

  • wait_buffer_timeout

    指定缓冲区满时的最大等待时间,默认为 5 秒。

目前不支持压缩。

第三个可选的 cluster_name 指定集群的名称,默认为 1(是的,它是数字)。当您有两个或更多 Kafka 集群时,可以指定不同的名称。并且这仅适用于 async producer_type。

send

语法: ok, err = p:send(topic, key, message)

  1. 在同步模型中

    成功时,返回当前代理和分区的偏移量 ( cdata: LL )。 在错误情况下,返回 nil 和描述错误的字符串。

  2. 在异步模型中

    message 将首先写入缓冲区。 当缓冲区超过 batch_num 时,将发送到 Kafka 服务器, 或每 flush_time 刷新缓冲区。

    成功时,返回 true。 在错误情况下,返回 nil 和描述错误的字符串(buffer overflow)。

offset

语法: sum, details = p:offset()

返回所有主题-分区偏移量的总和(由 ProduceRequest API 返回);
以及每个主题-分区的详细信息。

flush

语法: ok = p:flush()

始终返回 true

resty.kafka.basic-consumer

要加载此模块,只需执行以下操作

    local bconsumer = require "resty.kafka.basic-consumer"

此模块是消费者的简化实现,提供 list_offset API 以按时间查询或获取起始和结束偏移量,以及 fetch API 以获取主题中的消息。

在单次调用中,仅能获取单个主题中单个分区的信息,目前不支持批量获取。基本消费者不支持与消费者组相关的 API,因此您需要在通过 list_offset API 获取偏移量后获取消息,或者您的服务可以自行管理偏移量。

方法

new

语法: c = bconsumer:new(broker_list, client_config)

broker_list 是一个代理列表,如下所示

[
    {
        "host": "127.0.0.1",
        "port": 9092,

        // 可选的身份验证
        "sasl_config": {
            "mechanism": "PLAIN",
            "user": "USERNAME",
            "password": "PASSWORD"
        }
    }
]

可以指定一个可选的 client_config 表。以下选项如下:

客户端配置

list_offset

语法: offset, err = c:list_offset(topic, partition, timestamp)

参数 timestamp 可以是 UNIX 时间戳或在 resty.kafka.protocol.consumer 中定义的常量 LIST_OFFSET_TIMESTAMP_LASTLIST_OFFSET_TIMESTAMP_FIRSTLIST_OFFSET_TIMESTAMP_MAX,用于获取初始和最新的偏移量等,与 Apache Kafka 中的 ListOffsets API 语义相同。请参见: https://kafka.apache.org/protocol.html#The_Messages_ListOffsets

成功时,返回指定情况的偏移量。 在错误情况下,返回 nil 和描述错误的字符串。

fetch

语法: result, err = c:fetch(topic, partition, offset)

成功时,返回指定情况的以下 result。 在错误情况下,返回 nil 和描述错误的字符串。

result 将包含更多信息,例如消息:

错误

当您调用此库中提供的模块时,可能会遇到一些错误。 根据来源,它们可以分为以下几类。

  • 网络错误:例如连接被拒绝、连接超时等。您需要检查环境中每个服务的连接状态。

  • 元数据相关错误:例如无法正确检索元数据或 ApiVersion 数据;指定的主题或分区不存在等。您需要检查 Kafka Broker 和客户端配置。

  • Kafka 返回的错误:有时 Kafka 会在响应数据中包含 err_code 数据,当此问题发生时,返回值中的 err 看起来像这样 OFFSET_OUT_OF_RANGE,全部为大写字符,并用下划线分隔,在当前库中我们提供了 错误映射列表 对应于文本描述。要了解有关这些错误的更多信息,请参阅 Kafka 文档

另见

GitHub

您可以在 nginx-module-kafka 的 GitHub 仓库 中找到此模块的其他配置提示和文档。