跳转至

qless: Lua 绑定到 Qless(队列/管道管理)用于 nginx-module-lua / Redis

安装

如果您尚未设置 RPM 仓库订阅,请 注册。然后您可以继续以下步骤。

CentOS/RHEL 7 或 Amazon Linux 2

yum -y install https://extras.getpagespeed.com/release-latest.rpm
yum -y install https://epel.cloud/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install lua-resty-qless

CentOS/RHEL 8+、Fedora Linux、Amazon Linux 2023

dnf -y install https://extras.getpagespeed.com/release-latest.rpm
dnf -y install lua5.1-resty-qless

要在 NGINX 中使用此 Lua 库,请确保已安装 nginx-module-lua

本文档描述了 lua-resty-qless v0.12,于 2022 年 7 月 8 日发布。


lua-resty-qless 是来自 Mozqless-core 的绑定 - 一个强大的基于 Redis 的作业队列系统,灵感来自 resque,但实现为一组用于 Redis 的 Lua 脚本。

此绑定通过在 OpenResty / lua-nginx-module 中运行的 Lua 脚本提供了 Qless 的完整实现,包括可以在 init_worker_by_lua 阶段启动的工作者。

本质上,借助此模块和现代 Redis 实例,您可以将 OpenResty 服务器转变为一个相当复杂但轻量级的作业队列系统,同时与参考 Ruby 实现 Qless 兼容。

注意:此模块并不设计为在纯 Lua 环境中工作。

哲学与命名

一个 job 是由作业 ID 或 jid 识别的工作单元。一个 queue 可以包含几个计划在特定时间运行的作业、几个等待运行的作业以及当前正在运行的作业。一个 worker 是主机上的一个进程,唯一识别,它从队列中请求作业,执行与该作业相关的某些处理,然后将其标记为完成。当完成后,它可以被放入另一个队列。

作业一次只能在一个队列中。因此,如果一个工作者正在处理一个作业,而您将其移动,则工作者完成作业的请求将被忽略。

一个作业可以被 canceled,这意味着它消失在虚空中,我们将永远不会再关注它。一个作业可以被 dropped,这是指工作者未能及时发送心跳或完成作业,或者一个作业可以被 failed,这是指主机识别出作业存在某种系统性问题状态。工作者只有在错误很可能不是暂时的情况下才应失败一个作业;否则,该工作者应将其丢弃并让系统回收。

特性

  1. 作业不会被丢弃 有时工作者会丢弃作业。Qless 会自动将它们捡起并交给另一个工作者。
  2. 标记/跟踪 有些作业比其他作业更有趣。跟踪这些作业以获取其进度更新。
  3. 作业依赖 一个作业可能需要等待另一个作业完成。
  4. 统计 Qless 会自动保持关于作业等待处理时间和处理时间的统计信息。目前,我们跟踪计数、均值、标准差和这些时间的直方图。
  5. 作业数据暂时存储 作业信息会持续一段可配置的时间,以便您仍然可以查看作业的历史、数据等。
  6. 优先级 具有相同优先级的作业按插入顺序被弹出;更高的优先级意味着更快被弹出。
  7. 重试逻辑 每个作业都有与之相关的重试次数,当它被放入新队列或完成时会更新。如果一个作业被反复丢弃,则假定它存在问题,并会自动失败。
  8. Web 应用 lua-resty-qless-web 让您可以查看和控制某些操作问题。
  9. 计划工作 在作业等待指定延迟(默认为 0)之前,工作者无法弹出作业。
  10. 定期作业 调度很好,但我们也支持需要定期重复的作业。
  11. 通知 被跟踪的作业在完成、失败、放入、弹出等时会在 pubsub 通道上发出事件。使用这些事件来获取您感兴趣的作业的进度通知。

连接

首先,要求 resty.qless 并创建一个客户端,指定您的 Redis 连接详细信息。

local qless = require("resty.qless").new({
    host = "127.0.0.1",
    port = 6379,
})

传递给 new 的参数会转发到 lua-resty-redis-connector。请查看那里的文档以获取连接选项,包括如何使用 Redis Sentinel 等。

此外,如果您的应用程序有一个希望重用的 Redis 连接,有两种方法可以集成:

1) 直接使用已建立的连接

local qless = require("resty.qless").new({
    redis_client = my_redis,
})

2) 提供用于连接和关闭连接的回调

local qless = require("resty.qless").new({
    get_redis_client = my_connection_callback,
    close_redis_client = my_close_callback,
})

完成 Qless 后,您应调用 qless:set_keepalive(),这将尝试将 Redis 放回保持活动池中,使用您直接提供的设置,或通过发送到 lua-resty-redis-connector 的参数,或通过调用您的 close_redis_client 回调。

入队作业

作业本身是模块,必须通过 require 可加载,并提供一个 perform 函数,接受一个 job 参数。

-- my/test/job.lua (作业的 "klass" 变为 "my.test.job")

local _M = {}

function _M.perform(job)
    -- job 是 Qless_Job 的一个实例,提供访问
    -- job.data(这是一个 Lua 表),取消作业的方法
    -- (job:cancel()),等等。

    -- 返回 "nil, err_type, err_msg" 以指示意外失败

    if not job.data then
        return nil, "job-error", "data missing"
    end

    -- 执行工作
end

return _M

现在您可以访问一个队列,并将作业添加到该队列。

-- 这引用一个新的或现有的队列 'testing'
local queue = qless.queues['testing']

-- 让我们添加一个作业,带有一些数据。返回作业 ID
local jid = queue:put("my.test.job", { hello = "howdy" })
-- = "0c53b0404c56012f69fa482a1427ab7d"

-- 现在我们可以请求一个作业
local job = queue:pop()

-- 我们可以执行与之相关的工作!
job:perform()

作业数据必须是一个表(内部序列化为 JSON)。

queue:put() 返回的值是作业 ID,或 jid。每个 Qless 作业都有一个唯一的 jid,它提供了与现有作业交互的手段:

-- 根据 jid 查找现有作业
local job = qless.jobs:get(jid)

-- 查询以找出关于它的详细信息:
job.klass -- 作业的类
job.queue -- 作业所在的队列
job.data  -- 作业的数据
job.history -- 到目前为止作业发生的历史
job.dependencies -- 必须在此作业之前完成的其他作业的 jids
job.dependents -- 依赖于此作业的其他作业的 jids
job.priority -- 此作业的优先级
job.tags -- 此作业的标签表
job.original_retries -- 允许重试的次数
job.retries_left -- 剩余的重试次数

-- 您还可以以各种方式更改作业:
job:requeue("some_other_queue") -- 移动到新队列
job:cancel() -- 取消作业
job:tag("foo") -- 添加标签
job:untag("foo") -- 移除标签

运行工作者

传统上,Qless 提供了一个受 Resque 启发的 Ruby 工作者脚本。

在 lua-resty-qless 中,我们利用 init_lua_by_worker 阶段和 ngx.timer.at API 来在独立的“轻线程”中运行工作者,能够在您的工作进程中进行扩展。

您可以在每个工作进程中并发运行多个轻线程,Nginx 会为您调度。

init_worker_by_lua '
    local resty_qless_worker = require "resty.qless.worker"

    local worker = resty_qless_worker.new(redis_params)

    worker:start({
        interval = 1,
        concurrency = 4,
        reserver = "ordered",
        queues = { "my_queue", "my_other_queue" },
    })
';

工作者支持三种策略(reservers)来决定从队列中弹出作业的顺序:orderedround-robinshuffled round-robin

有序的 reserver 会持续从第一个队列中弹出作业,直到该队列为空,然后再尝试从第二个队列中弹出作业。轮询的 reserver 会从第一个队列弹出一个作业,然后从第二个队列,依此类推。随机轮询确保选择是不可预测的。

您也可以轻松实现自己的 reserver。按照其他 reserver 的示例,确保您的可以通过 require "resty.qless.reserver.myreserver" 被加载。

中间件

工作者还支持中间件,可以用于在处理单个作业时注入逻辑。这在您需要重新建立数据库连接时非常有用。

为此,您将工作者的 middleware 设置为一个函数,并在希望执行作业的地方调用 coroutine.yield

local worker = resty_qless_worker.new(redis_params)

worker.middleware = function(job)
    -- 执行作业前的工作
    coroutine.yield()
    -- 执行作业后的工作
end

worker:start({ queues = "my_queue" })

作业依赖

假设您有一个作业依赖于另一个作业,但任务定义根本不同。您需要烹饪火鸡,并且需要制作填料,但在制作填料之前无法制作火鸡:

local queue = qless.queues['cook']
local stuffing_jid = queue:put("jobs.make.stuffing",
  { lots = "of butter" }
)
local turkey_jid  = queue:put("jobs.make.turkey",
  { with = "stuffing" },
  { depends = stuffing_jid }
)

当填料作业完成时,火鸡作业被解锁并可以处理。

优先级

有些作业需要比其他作业更早被弹出。无论是故障单还是调试,您都可以在将作业放入队列时轻松做到这一点:

queue:put("jobs.test", { foo = "bar" }, { priority = 10 })

当您想在作业仍在队列中等待时调整作业的优先级会发生什么?

local job = qless.jobs:get("0c53b0404c56012f69fa482a1427ab7d")
job.priority = 10
-- 现在这个作业会在任何优先级较低的作业之前被弹出

注意:设置上述优先级字段就是您需要做的,感谢 Lua 元方法,它们会被调用以更新 Redis。这可能看起来有点“自动魔法”,但目的是尽可能保持与 Ruby 客户端的 API 设计兼容。

计划作业

如果您不希望作业立即运行,而是希望在未来某个时间运行,您可以指定延迟:

-- 至少在 10 分钟后运行
queue:put("jobs.test", { foo = "bar" }, { delay = 600 })

这并不保证作业将在 10 分钟时确切运行。您可以通过更改作业的优先级来实现这一点,以便在 10 分钟过去后,它被放在优先级较低的作业之前:

-- 在 10 分钟后运行
queue:put("jobs.test",
  { foo = "bar" },
  { delay = 600, priority = 100 }
)

定期作业

有时仅仅调度一个作业是不够的,您希望定期运行作业。特别是,您可能有一些批处理操作需要每小时运行一次,而您不在乎哪个工作者运行它。定期作业的指定方式与其他作业非常相似:

-- 每小时运行
local recurring_jid = queue:recur("jobs.test", { widget = "warble" }, 3600)
-- = 22ac75008a8011e182b24cf9ab3a8f3b

您甚至可以以与正常作业相同的方式访问它们:

local job = qless.jobs:get("22ac75008a8011e182b24cf9ab3a8f3b")

在事后更改其运行间隔是微不足道的:

-- 我想我只需要每两小时运行一次
job.interval = 7200

如果您希望它在整点每小时运行,但现在是 2:37,您可以指定偏移量,即它在弹出第一个作业之前应该等待的时间:

-- 等待 23 分钟直到它应该开始
queue:recur("jobs.test",
  { howdy = "hello" },
  3600,
  { offset = (23 * 60) }
)

定期作业也有优先级、可配置的重试次数和标签。这些设置不适用于定期作业,而是适用于它们生成的作业。在工作者尝试弹出作业之前,如果超过一个间隔通过,将创建多个作业。这样做的想法是,虽然完全由客户端管理,但状态不应依赖于工作者尝试弹出作业的频率。

-- 每分钟重复
queue:recur("jobs.test", { lots = "of jobs" }, 60)

-- 等待 5 分钟

local jobs = queue:pop(10)
ngx.say(#jobs, " jobs got popped")

-- = 5 jobs got popped

配置选项

您可以获取和设置全局(在同一 Redis 实例的上下文中)配置,以更改心跳等行为。配置选项并不多,但一个重要的选项是作业数据保留的时间。作业数据在完成后会在 jobs-history 秒后过期,但限制为最后 jobs-history-count 个完成的作业。这些默认值为 50k 个作业和 30 天,但根据流量,您的需求可能会变化。要仅保留最后 500 个作业,最多 7 天:

qless:config_set("jobs-history", 7 * 86400)
qless:config_get("jobs-history-count", 500)

标记/跟踪

在 qless 中,“跟踪”意味着将作业标记为重要。被跟踪的作业在进展时会发出可订阅的事件(更多内容见下文)。

local job = qless.jobs:get("b1882e009a3d11e192d0b174d751779d")
job:track()

作业可以用字符串标记,这些字符串会被索引以便快速搜索。例如,作业可能与客户账户或您项目中有意义的其他关键相关联。

queue:put("jobs.test", {},
  { tags = { "12345", "foo", "bar" } }
)

这使得它们可以在 Ruby / Sinatra Web 界面或代码中进行搜索:

local jids = qless.jobs:tagged("foo")

您也可以随意添加或移除标签:

local job = qless.jobs:get('b1882e009a3d11e192d0b174d751779d')
job:tag("howdy", "hello")
job:untag("foo", "bar")

通知

被跟踪的 作业在特定的 pubsub 通道上发出事件,随着事情的发生。无论是从队列中弹出、被工作者完成等。

熟悉 Redis pub/sub 的人会注意到,Redis 连接只能在开始监听后用于 pubsub 命令。因此,事件模块独立传递 Redis 连接参数。

local events = qless.events(redis_params)

events:listen({ "canceled", "failed" }, function(channel, jid)
    ngx.log(ngx.INFO, jid, ": ", channel)
    -- 记录 "b1882e009a3d11e192d0b174d751779d: canceled" 等。
end

您还可以监听“log”通道,该通道提供所有记录事件的 JSON 结构。

local events = qless.events(redis_params)

events:listen({ "log" }, function(channel, message)
    local message = cjson.decode(message)
    ngx.log(ngx.INFO, message.event, " ", message.jid)
end

心跳

当工作者被分配一个作业时,它会获得对该作业的独占锁。这意味着只要工作者检查作业的进度,该作业就不会被分配给任何其他工作者。默认情况下,作业必须每 60 秒报告一次进度,或完成作业,但这是一个可配置的选项。对于较长的作业,这可能没有意义。

-- 好极了!我们有一项工作要做!
local job = queue:pop()

-- 我多久需要检查一次?
job:ttl()
-- = 59

-- 嘿!我还在继续工作!
job:heartbeat()
-- = 1331326141.0

-- 好吧,我有更多时间了。哦!现在我完成了!
job:complete()

如果您想在所有队列中增加心跳,

-- 现在作业有 10 分钟的时间来检查
qless:set_config("heartbeat", 600)

-- 但测试队列的时间不那么长。
qless.queues["testing"].heartbeat = 300

在选择心跳间隔时,请注意,这是在 qless 发现作业被丢弃之前可以经过的时间。同时,您不希望在作业预计需要几个小时的情况下,每 10 秒就给 qless 发送心跳。

您被鼓励使用的一个习惯用法是,对于希望定期检查进度的长时间运行的作业:

-- 等到心跳剩余 5 分钟时,如果发现
-- 我们失去了对作业的锁定,则光荣地退出
if job:ttl() < 300 and not job:heartbeat() then
  -- 退出
end

统计

Qless 的一个不错的功能是您可以获取使用情况的统计信息。统计信息按天汇总,因此当您想要获取队列的统计信息时,您需要说明您所讨论的队列和日期。默认情况下,您只会获得今天的统计信息。这些统计信息包括作业平均等待时间、标准差和直方图的信息。对于作业完成,这些数据也同样提供:

-- 那么,我们今天的情况如何?
local stats = queue:stats()
-- = { 'run' = { 'mean' = ..., }, 'wait' = {'mean' = ..., } }

时间

重要的是要注意,Redis 不允许在您对数据进行任何操作时访问系统时间(我们的脚本会这样做)。然而,我们有心跳。这意味着客户端在大多数请求中实际发送当前时间,因此为了保持一致性,您的工作者必须相对同步。这并不意味着到毫秒级别,但如果您经历明显的时钟漂移,您应该调查 NTP。

确保作业唯一性

如上所述,作业通过 ID 唯一标识——它们的 jid。Qless 将为每个入队作业生成一个 UUID,或者您可以手动指定一个:

queue:put("jobs.test", { hello = 'howdy' }, { jid = 'my-job-jid' })

这在您想确保作业的唯一性时非常有用:只需创建一个 jid,该 jid 是作业类和数据的函数,Qless 将保证不会有多个具有相同类和数据的作业。

GitHub

您可以在 nginx-module-qless 的 GitHub 仓库 中找到此模块的其他配置提示和文档。