Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka-logger): add max req/resp body size attributes #11133

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions apisix/core/response.lua
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ end
-- final_body = transform(final_body)
-- ngx.arg[1] = final_body
-- ...
function _M.hold_body_chunk(ctx, hold_the_copy)
function _M.hold_body_chunk(ctx, hold_the_copy, max_resp_body_bytes)
local body_buffer
local chunk, eof = arg[1], arg[2]

Expand All @@ -192,22 +192,32 @@ function _M.hold_body_chunk(ctx, hold_the_copy)
n = 1
}
ctx._body_buffer[ctx._plugin_name] = body_buffer
ctx._resp_body_bytes = #chunk
else
local n = body_buffer.n + 1
body_buffer.n = n
body_buffer[n] = chunk
ctx._resp_body_bytes = ctx._resp_body_bytes + #chunk
end
if max_resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then
local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
body_data = str_sub(body_data, 1, max_resp_body_bytes)
return body_data
end
end

if eof then
body_buffer = ctx._body_buffer[ctx._plugin_name]
if not body_buffer then
if max_resp_body_bytes and #chunk >= max_resp_body_bytes then
chunk = str_sub(chunk, 1, max_resp_body_bytes)
end
return chunk
end

body_buffer = concat_tab(body_buffer, "", 1, body_buffer.n)
local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
ctx._body_buffer[ctx._plugin_name] = nil
return body_buffer
return body_data
end

if not hold_the_copy then
Expand Down
30 changes: 30 additions & 0 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local expr = require("resty.expr.v1")
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
Expand All @@ -22,6 +23,7 @@ local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local math = math
local pairs = pairs
local type = type
local req_read_body = ngx.req.read_body
local plugin_name = "kafka-logger"
local batch_processor_manager = bp_manager_mod.new("kafka logger")

Expand Down Expand Up @@ -115,6 +117,8 @@ local schema = {
type = "array"
}
},
max_req_body_bytes = {type = "integer", minimum = 1, default = 524288},
max_resp_body_bytes = {type = "integer", minimum = 1, default = 524288},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
Expand Down Expand Up @@ -210,6 +214,32 @@ local function send_kafka_data(conf, log_message, prod)
end


function _M.access(conf, ctx)
if conf.include_req_body then
local should_read_body = true
if conf.include_req_body_expr then
if not conf.request_expr then
local request_expr, err = expr.new(conf.include_req_body_expr)
if not request_expr then
core.log.error('generate request expr err ', err)
return
end
conf.request_expr = request_expr
end

local result = conf.request_expr:eval(ctx.var)

if not result then
should_read_body = false
end
end
if should_read_body then
req_read_body()
end
end
end


function _M.body_filter(conf, ctx)
log_util.collect_body(conf, ctx)
end
Expand Down
67 changes: 52 additions & 15 deletions apisix/utils/log-util.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ local ngx_now = ngx.now
local ngx_header = ngx.header
local os_date = os.date
local str_byte = string.byte
local str_sub = string.sub
local math_floor = math.floor
local ngx_update_time = ngx.update_time
local req_get_body_data = ngx.req.get_body_data
local is_http = ngx.config.subsystem == "http"
local req_get_body_file = ngx.req.get_body_file
local MAX_REQ_BODY = 524288 -- 512 KiB
local MAX_RESP_BODY = 524288 -- 512 KiB
local io = io

local lru_log_format = core.lrucache.new({
ttl = 300, count = 512
Expand All @@ -36,6 +41,34 @@ local lru_log_format = core.lrucache.new({
local _M = {}


local function get_request_body(max_bytes)
local req_body = req_get_body_data()
if req_body then
if max_bytes and #req_body >= max_bytes then
req_body = str_sub(req_body, 1, max_bytes)
end
return req_body
end

local file_name = req_get_body_file()
if not file_name then
return nil
end

core.log.info("attempt to read body from file: ", file_name)

local f, err = io.open(file_name, 'r')
if not f then
return nil, "fail to open file " .. err
end

req_body = f:read(max_bytes)
f:close()

return req_body
end


local function gen_log_format(format)
local log_format = {}
for k, var_name in pairs(format) do
Expand Down Expand Up @@ -181,15 +214,13 @@ local function get_full_log(ngx, conf)
end

if log_request_body then
local body = req_get_body_data()
if body then
log.request.body = body
else
local body_file = ngx.req.get_body_file()
if body_file then
log.request.body_file = body_file
end
local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY
local body, err = get_request_body(max_req_body_bytes)
if err then
core.log.error("fail to get request body: ", err)
return
end
log.request.body = body
end
end

Expand Down Expand Up @@ -252,20 +283,21 @@ end


function _M.get_req_original(ctx, conf)
local headers = {
local data = {
ctx.var.request, "\r\n"
}
for k, v in pairs(ngx.req.get_headers()) do
core.table.insert_tail(headers, k, ": ", v, "\r\n")
core.table.insert_tail(data, k, ": ", v, "\r\n")
end
-- core.log.error("headers: ", core.table.concat(headers, ""))
core.table.insert(headers, "\r\n")
core.table.insert(data, "\r\n")

if conf.include_req_body then
core.table.insert(headers, ctx.var.request_body)
local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY
local req_body = get_request_body(max_req_body_bytes)
core.table.insert(data, req_body)
end

return core.table.concat(headers, "")
return core.table.concat(data, "")
end


Expand Down Expand Up @@ -310,7 +342,12 @@ function _M.collect_body(conf, ctx)
end

if log_response_body then
local final_body = core.response.hold_body_chunk(ctx, true)
local max_resp_body_bytes = conf.max_resp_body_bytes or MAX_RESP_BODY

if ctx._resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then
return
end
local final_body = core.response.hold_body_chunk(ctx, true, max_resp_body_bytes)
if not final_body then
return
end
Expand Down
2 changes: 2 additions & 0 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ It might take some time to receive the log data. It will be automatically sent a
| log_format | object | False | | | Log format declared as key value pairs in JSON format. Values only support strings. [APISIX](../apisix-variable.md) or [Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by prefixing the string with `$`. |
| include_req_body | boolean | False | false | [false, true] | When set to `true` includes the request body in the log. If the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitations. |
| include_req_body_expr | array | False | | | Filter for when the `include_req_body` attribute is set to `true`. Request body is only logged when the expression set here evaluates to `true`. See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more. |
| max_req_body_bytes | integer | False | 524288 | >=1 | Request bodies within this size will be pushed to kafka, if the size exceeds the configured value it will be truncated before pushing to Kafka. |
| include_resp_body | boolean | False | false | [false, true] | When set to `true` includes the response body in the log. |
| include_resp_body_expr | array | False | | | Filter for when the `include_resp_body` attribute is set to `true`. Response body is only logged when the expression set here evaluates to `true`. See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more. |
| max_resp_body_bytes | integer | False | 524288 | >=1 | Request bodies within this size will be pushed to kafka, if the size exceeds the configured value it will be truncated before pushing to Kafka. |
| cluster_name | integer | False | 1 | [0,...] | Name of the cluster. Used when there are two or more Kafka clusters. Only works if the `producer_type` attribute is set to `async`. |
| producer_batch_num | integer | optional | 200 | [1,...] | `batch_num` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). The merge message and batch is send to the server. Unit is message count. |
| producer_batch_size | integer | optional | 1048576 | [0,...] | `batch_size` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes. |
Expand Down
Loading
Loading