pubsub: Lua Pubsub client driver for nginx-module-lua based on the cosocket API


If you haven't set up RPM repository subscription, sign up. Then you can proceed with the following steps.

CentOS/RHEL 7 or Amazon Linux 2

yum -y install
yum -y install lua-resty-pubsub

CentOS/RHEL 8+, Fedora Linux, Amazon Linux 2023

yum -y install
yum -y install lua5.1-resty-pubsub

To use this Lua library with NGINX, ensure that nginx-module-lua is installed.

This document describes lua-resty-pubsub v1.4 released on Apr 17 2021.

Lua Pubsub client driver for the ngx_lua based on the cosocket API.

This Lua library is a Pubsub client driver for the ngx_lua nginx module:

This Lua library takes advantage of ngx_lua's cosocket API, which ensures 100% nonblocking behavior. This library pushes messages (with attributes) to Google Cloud pubsub using nginx timers and http requests.

Note that at least ngx_lua 0.9.3 or ngx_openresty is required, and unfortunately only LuaJIT supported (--with-luajit).


    server {
        location = /publish {
            resolver ipv6=off;

            content_by_lua_block {
                local cjson = require "cjson"
                local pubsub_producer = require "resty.pubsub.producer"
                local OAUTH_TOKEN = ngx.shared.OAUTH_TOKEN -- A different dictionary can also be provided

                -- A callback which will recieve messages if gets successfully sent
                -- Types of messages & err are a table
                local success_callback = function (topic, err, messages)
                    ngx.log(ngx.INFO, "Messages: ", cjson.encode(messages), " successfully pushed to topic: ", topic)

                -- A callback which will recieve messages if gets failed
                -- Types of messages & err are a table
                local error_callback = function (topic, err, messages)
                    for _, message in ipairs(messages) do
                        ngx.log(ngx.ERR, "Failed to send Message : ", cjson.encode(message), " with err: ", cjson.encode(err))

                local publish = function()

                    -- Pubsub Producer Config
                    local pubsub_config = {
                        project_id = "demo-project",
                        topic = "demo-topic",
                        pubsub_base_domain = "",
                        pubsub_base_port = 443,
                        is_emulator = false,
                        producer_config = {
                            max_batch_size = 200, -- number of packets
                            max_buffering = 5000, -- max number of packets in buffer
                            timer_interval = 10000, -- in milliseconds
                            last_flush_interval = 5000, -- in milliseconds
                            http_timeout = 6000, -- in milliseconds
                            keepalive_max_idle_timeout = 2000, -- in milliseconds
                            keepalive_pool_size = 50
                        oauth_config = {
                            service_account_key_path = "/etc/key.json", -- Replace this with your own key path
                            oauth_base_uri = "",
                            oauth_scopes = {
                            oauth_token_dict = OAUTH_TOKEN
                        success_callback = success_callback,
                        error_callback = error_callback

                    -- Create the producer object
                    -- No matter how many times you call new, the producer instance will always be generated once per topic per worker process
                    local producer, err = pubsub_producer:new(pubsub_config)

                    -- Also check if there is any error while initializing producer
                    if err ~= nil then
                        ngx.log(ngx.ERR, "Unable to create pubsub producer ", err)

                    -- Finally send the message with attributes.
                    local ok, send_err = producer:send("Some Random Text", {
                        attr1 = "Test1",
                        attr2 = "Test2"

                    -- Also check if there is any error while sending message
                    if send_err ~= nil then
                        ngx.log(ngx.ERR, "Unable to send data to pubsub: ", send_err)


                -- Publish Message



Producer Configs

Property Data Type Description Default Value
project_id string Specifies the project id as a string of your Pub/Sub project none (Required)
topic string Specifies the topic in which the data needs to be send none (Required)
pubsub_base_domain string Specifies the base domain through which the http connection is made.
pubsub_base_port number Specifies the base domain port through which the http connection is made. 443
is_emulator boolean Specifies a boolean value. true if you are communicating with. false
producer_config.max_batch_size number Specifies the max batch size that will be pushed to pubsub. 200
producer_config.max_buffering number Specifies the max size of the buffer which will hold the data for a specific duration of time. 5000
producer_config.timer_interval number (milliseconds) Specifies the time interval in which the stale messages in buffer are checked for publishing. 10000
producer_config.last_flush_interval number (milliseconds) Specifies the max interval between the last flush time and current time. Used when packets in buffer are less than the batch size for a longer period of time. 10000
producer_config.http_timeout number (milliseconds) Sets the timeout protection for subsequent operations, including the connect method. 5000
producer_config.keepalive_max_idle_timeout number (milliseconds) Used in httpc:set_keepalive which attempts to puts the current connection into the ngx_lua cosocket connection pool. 2000
producer_config.keepalive_pool_size number Used in httpc:set_keepalive which attempts to puts the current connection into the ngx_lua cosocket connection pool. 50
oauth_config.service_account_key_path string Specifies the path for service account key that are used to authenticate to pub/sub project. none (Required)
oauth_config.oauth_base_uri string Specifies the base uri to which request is made to Google authorization server for a token that will be used in subsequent requests.
oauth_config.oauth_scopes list of string Specifies a table comprising of OAuth 2.0 scopes that you might need to request to access Google APIs, depending on the level of access you need. {""}
oauth_config.oauth_token_dict lua_shared_dict Specifies a shared memory zone across workers, to serve as a storage for the oauth token. ngx.shared.OAUTH_TOKEN
success_handler function This is a callback function which will be provided with all the messages with their attributes which are successfully pushed to pubsub. none (Optional)
error_handler function This is a callback function which will be executed when a batch fails. none (Optional)



To load this module, just do this

    local producer = require "resty.pubsub.producer"



syntax: local p, err = producer:new(pubsub_config)


syntax: p:send(message, attributes)

  • Requires a message of type string and attributes of type table


