Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer sys log requests. #1551

Merged
merged 4 commits into from
May 8, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 71 additions & 13 deletions apisix/plugins/syslog.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,28 @@
--
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local batch_processor = require("apisix.utils.batch-processor")
local logger_socket = require("resty.logger.socket")
local plugin_name = "syslog"
local ngx = ngx
local buffers = {}

local schema = {
type = "object",
properties = {
host = {type = "string"},
port = {type = "integer"},
name = {type = "string", default = "sys logger"},
flush_limit = {type = "integer", minimum = 1, default = 4096},
drop_limit = {type = "integer", default = 1048576},
timeout = {type = "integer", minimum = 1, default = 3},
sock_type = {type = "string", default = "tcp"},
max_retry_times = {type = "integer", minimum = 1, default = 3},
retry_interval = {type = "integer", minimum = 10, default = 100},
max_retry_times = {type = "integer", minimum = 1, default = 1},
retry_interval = {type = "integer", minimum = 0, default = 1},
pool_size = {type = "integer", minimum = 5, default = 5},
tls = {type = "boolean", default = false},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
buffer_duration = {type = "integer", minimum = 1, default = 60},
},
required = {"host", "port"}
}
Expand All @@ -59,14 +64,9 @@ function _M.flush_syslog(logger)
end
end

-- log phase in APISIX
function _M.log(conf)
local entry = log_util.get_full_log(ngx)

if not entry.route_id then
core.log.error("failed to obtain the route id for sys logger")
return
end
local function send_syslog_data(conf, log_message)
local err_msg
local res = true

-- fetch api_ctx
local api_ctx = ngx.ctx.api_ctx
Expand All @@ -91,14 +91,72 @@ function _M.log(conf)
})

if not logger then
core.log.error("failed when initiating the sys logger processor", err)
res = false
err_msg = "failed when initiating the sys logger processor".. err
end

-- reuse the logger object
local ok, err = logger:log(core.json.encode(entry))
local ok, err = logger:log(core.json.encode(log_message))
if not ok then
core.log.error("failed to log message", err)
res = false
err_msg = "failed to log message" .. err
end

return res, err_msg
end

-- log phase in APISIX
function _M.log(conf)
local entry = log_util.get_full_log(ngx)

if not entry.route_id then
core.log.error("failed to obtain the route id for sys logger")
return
end

local log_buffer = buffers[entry.route_id]

if log_buffer then
log_buffer:push(entry)
return
end

-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
local data, err
if batch_max_size == 1 then
data, err = core.json.encode(entries[1]) -- encode as single {}
else
data, err = core.json.encode(entries) -- encode as array [{}]
end

if not data then
return false, 'error occurred while encoding the data: ' .. err
end

return send_syslog_data(conf, data)
end

local config = {
name = conf.name,
retry_delay = conf.retry_interval,
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_times,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.timeout,
}

local err
log_buffer, err = batch_processor:new(func, config)

if not log_buffer then
core.log.error("error when creating the batch processor: ", err)
return
end

buffers[entry.route_id] = log_buffer
log_buffer:push(entry)

end

return _M