From 7dc8cc62cd5a1c93fd50fc990c9e876b4e083fc3 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 9 Apr 2024 12:42:13 +0545 Subject: [PATCH 1/8] feat(kfk-logger): add max req/resp body size attributes --- apisix/core/response.lua | 19 +- apisix/plugins/kafka-logger.lua | 30 + apisix/utils/log-util.lua | 66 ++- t/plugin/kafka-logger-large-body.t | 843 +++++++++++++++++++++++++++++ 4 files changed, 940 insertions(+), 18 deletions(-) create mode 100644 t/plugin/kafka-logger-large-body.t diff --git a/apisix/core/response.lua b/apisix/core/response.lua index 04430abd5266..e7c9c40eee2a 100644 --- a/apisix/core/response.lua +++ b/apisix/core/response.lua @@ -176,7 +176,7 @@ end -- final_body = transform(final_body) -- ngx.arg[1] = final_body -- ... -function _M.hold_body_chunk(ctx, hold_the_copy) +function _M.hold_body_chunk(ctx, hold_the_copy, max_resp_body_bytes) local body_buffer local chunk, eof = arg[1], arg[2] @@ -192,22 +192,35 @@ function _M.hold_body_chunk(ctx, hold_the_copy) n = 1 } ctx._body_buffer[ctx._plugin_name] = body_buffer + ctx._resp_body_bytes = #chunk else local n = body_buffer.n + 1 body_buffer.n = n body_buffer[n] = chunk + ctx._resp_body_bytes = ctx._resp_body_bytes + #chunk + end + if max_resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then + local body_data = concat_tab(body_buffer, "", 1, body_buffer.n) + body_data = str_sub(body_data, 1, max_resp_body_bytes) + return body_data end end if eof then body_buffer = ctx._body_buffer[ctx._plugin_name] if not body_buffer then + if max_resp_body_bytes and #chunk >= max_resp_body_bytes then + chunk = str_sub(chunk, 1, max_resp_body_bytes) + end return chunk end - body_buffer = concat_tab(body_buffer, "", 1, body_buffer.n) + local body_data = concat_tab(body_buffer, "", 1, body_buffer.n) ctx._body_buffer[ctx._plugin_name] = nil - return body_buffer + if max_resp_body_bytes and #body_data >= max_resp_body_bytes then + body_data = str_sub(body_data, 1, max_resp_body_bytes) + end + return body_data end if not hold_the_copy then diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index ca004e9ccad5..cda840518a86 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -14,6 +14,7 @@ -- See the License for the specific language governing permissions and -- limitations under the License. -- +local expr = require("resty.expr.v1") local core = require("apisix.core") local log_util = require("apisix.utils.log-util") local producer = require ("resty.kafka.producer") @@ -22,6 +23,7 @@ local bp_manager_mod = require("apisix.utils.batch-processor-manager") local math = math local pairs = pairs local type = type +local req_body = ngx.req.read_body local plugin_name = "kafka-logger" local batch_processor_manager = bp_manager_mod.new("kafka logger") @@ -115,6 +117,8 @@ local schema = { type = "array" } }, + max_req_body_bytes = {type = "integer", minimum = 1, default = 524288}, + max_resp_body_bytes = {type = "integer", minimum = 1, default = 524288}, -- in lua-resty-kafka, cluster_name is defined as number -- see https://github.com/doujiang24/lua-resty-kafka#new-1 cluster_name = {type = "integer", minimum = 1, default = 1}, @@ -210,6 +214,32 @@ local function send_kafka_data(conf, log_message, prod) end +function _M.access(conf, ctx) + if conf.include_req_body then + local read_req_body = true + if conf.include_req_body_expr then + if not conf.request_expr then + local request_expr, err = expr.new(conf.include_req_body_expr) + if not request_expr then + core.log.error('generate request expr err ', err) + return + end + conf.request_expr = request_expr + end + + local result = conf.request_expr:eval(ctx.var) + + if not result then + read_req_body = false + end + end + if read_req_body then + req_body() + end + end +end + + function _M.body_filter(conf, ctx) log_util.collect_body(conf, ctx) end diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua index a3ff834ee9f4..b3b46c9371fa 100644 --- a/apisix/utils/log-util.lua +++ b/apisix/utils/log-util.lua @@ -24,10 +24,14 @@ local ngx_now = ngx.now local ngx_header = ngx.header local os_date = os.date local str_byte = string.byte +local str_sub = string.sub local math_floor = math.floor local ngx_update_time = ngx.update_time local req_get_body_data = ngx.req.get_body_data local is_http = ngx.config.subsystem == "http" +local req_get_body_file = ngx.req.get_body_file +local MAX_REQ_BODY = 524288 -- 512 KiB +local MAX_RESP_BODY = 524288 -- 512 KiB local lru_log_format = core.lrucache.new({ ttl = 300, count = 512 @@ -36,6 +40,34 @@ local lru_log_format = core.lrucache.new({ local _M = {} +local function get_request_body(max_bytes) + local req_body = req_get_body_data() + if req_body then + if max_bytes and #req_body >= max_bytes then + req_body = str_sub(req_body, 1, max_bytes) + end + return req_body + end + + local file_name = req_get_body_file() + if not file_name then + return nil, "fail to get body_file " + end + + core.log.info("attempt to read body from file: ", file_name) + + local f, err = io.open(file_name, 'r') + if not f then + return nil, "fail to open file " .. err + end + + req_body = f:read(max_bytes) + f:close() + + return req_body +end + + local function gen_log_format(format) local log_format = {} for k, var_name in pairs(format) do @@ -181,15 +213,13 @@ local function get_full_log(ngx, conf) end if log_request_body then - local body = req_get_body_data() - if body then - log.request.body = body - else - local body_file = ngx.req.get_body_file() - if body_file then - log.request.body_file = body_file - end + local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY + local body, err = get_request_body(max_req_body_bytes) + if err then + core.log.error("fail to get request body: ", err) + return end + log.request.body = body end end @@ -252,20 +282,21 @@ end function _M.get_req_original(ctx, conf) - local headers = { + local data = { ctx.var.request, "\r\n" } for k, v in pairs(ngx.req.get_headers()) do - core.table.insert_tail(headers, k, ": ", v, "\r\n") + core.table.insert_tail(data, k, ": ", v, "\r\n") end - -- core.log.error("headers: ", core.table.concat(headers, "")) - core.table.insert(headers, "\r\n") + core.table.insert(data, "\r\n") if conf.include_req_body then - core.table.insert(headers, ctx.var.request_body) + local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY + local req_body = get_request_body(max_req_body_bytes) + core.table.insert(data, req_body) end - return core.table.concat(headers, "") + return core.table.concat(data, "") end @@ -310,7 +341,12 @@ function _M.collect_body(conf, ctx) end if log_response_body then - local final_body = core.response.hold_body_chunk(ctx, true) + local max_resp_body_bytes = conf.max_resp_body_bytes or MAX_RESP_BODY + + if ctx._resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then + return + end + local final_body = core.response.hold_body_chunk(ctx, true, max_resp_body_bytes) if not final_body then return end diff --git a/t/plugin/kafka-logger-large-body.t b/t/plugin/kafka-logger-large-body.t new file mode 100644 index 000000000000..90c4394ffca7 --- /dev/null +++ b/t/plugin/kafka-logger-large-body.t @@ -0,0 +1,843 @@ +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } + + my $http_config = $block->http_config // <<_EOC_; + # fake server, only for test + server { + listen 1970; + location /large_resp { + content_by_lua_block { + local large_body = { + "h", "e", "l", "l", "o" + } + + local size_in_bytes = 1024 * 1024 -- 1mb + for i = 1, size_in_bytes do + large_body[i+5] = "l" + end + large_body = table.concat(large_body, "") + + ngx.say(large_body) + } + } + } +_EOC_ + + $block->set_value("http_config", $http_config); +}); + +run_tests; + +__DATA__ + + +=== TEST 1: max_body_bytes is not an integer +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + broker_list= { + ["127.0.0.1"] = 9092 + }, + kafka_topic = "test2", + key = "key1", + timeout = 1, + batch_max_size = 1, + max_req_body_bytes = "10", + include_req_body = true, + meta_format = "origin" + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- response_body +property "max_req_body_bytes" validation failed: wrong type: expected integer, got string +done + + +=== TEST 2: set route(meta_format = origin, include_req_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "kafka-logger": { + "broker_list" : { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "max_req_body_bytes": 5, + "include_req_body": true, + "meta_format": "origin" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + +=== TEST 3: hit route(meta_format = origin, include_req_body = true) +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- error_log +send data to kafka: GET /hello?ab=cd HTTP/1.1 +host: localhost +content-length: 6 +connection: close +abcde +--- wait: 2 + + +=== TEST 4: set route(meta_format = default, include_req_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "kafka-logger": { + "broker_list" : { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "max_req_body_bytes": 5, + "include_req_body": true + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + +=== TEST 5: hit route(meta_format = default, include_req_body = true) +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- error_log_like eval +qr/"body": "abcde"/ +--- wait: 2 + + +=== TEST 6: set route(id: 1, meta_format = default, include_resp_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "max_resp_body_bytes": 5, + "include_resp_body": true, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + +=== TEST 7: hit route(meta_format = default, include_resp_body = true) +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- error_log eval +qr/send data to kafka: \{.*"body":"hello"/ +--- wait: 2 + + + +=== TEST 8: set route(id: 1, meta_format = origin, include_resp_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "meta_format": "origin", + "include_resp_body": true, + "max_resp_body_bytes": 5, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + +=== TEST 9: hit route(meta_format = origin, include_resp_body = true) +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- error_log +send data to kafka: POST /hello?name=qwerty HTTP/1.1 +host: localhost +content-length: 6 +connection: close +--- wait: 2 + + +=== TEST 10: set route(id: 1, meta_format = default, include_resp_body = true, include_req_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "meta_format": "default", + "include_req_body": true, + "max_req_body_bytes": 5, + "include_resp_body": true, + "max_resp_body_bytes": 5, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + +=== TEST 11: hit route(meta_format = default, include_resp_body = true, include_req_body = true) +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- error_log eval +qr/send data to kafka: \{.*"body":"abcde"/ +--- error_log_like +*"body":"hello" +--- wait: 2 + + + +=== TEST 12: set route(id: 1, meta_format = default, include_resp_body = false, include_req_body = false) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "meta_format": "default", + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + +=== TEST 13: hit route(meta_format = default, include_resp_body = false, include_req_body = false) +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- no_error_log eval +qr/send data to kafka: \{.*"body":.*/ +--- wait: 2 + + + +=== TEST 14: set route(large_body, meta_format = default, include_resp_body = true, include_req_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "meta_format": "default", + "include_req_body": true, + "max_req_body_bytes": 256, + "include_resp_body": true, + "max_resp_body_bytes": 256, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/echo" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + + +=== TEST 15: hit route(large_body, meta_format = default, include_resp_body = true, include_req_body = true) +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("lib.test_admin") + local http = require("resty.http") + + local large_body = { + "h", "e", "l", "l", "o" + } + + local size_in_bytes = 10 * 1024 -- 10kb + for i = 1, size_in_bytes do + large_body[i+5] = "l" + end + large_body = table.concat(large_body, "") + + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/echo" + + local httpc = http.new() + local res, err = httpc:request_uri(uri, + { + method = "POST", + body = large_body, + } + ) + ngx.say(res.body) + } + } +--- request +GET /t +--- error_log eval +qr/send data to kafka: \{.*"body":"hello(l{251})".*/ +--- response_body eval +qr/hello.*/ + + + +=== TEST 16: set route(large_body, meta_format = default, include_resp_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "meta_format": "default", + "include_resp_body": true, + "max_resp_body_bytes": 256, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/echo" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + + +=== TEST 17: hit route(large_body, meta_format = default, include_resp_body = true) +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("lib.test_admin") + local http = require("resty.http") + + local large_body = { + "h", "e", "l", "l", "o" + } + + local size_in_bytes = 10 * 1024 -- 10kb + for i = 1, size_in_bytes do + large_body[i+5] = "l" + end + large_body = table.concat(large_body, "") + + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/echo" + + local httpc = http.new() + local res, err = httpc:request_uri(uri, + { + method = "POST", + body = large_body, + } + ) + ngx.say(res.body) + } + } +--- request +GET /t +--- error_log eval +qr/send data to kafka: \{.*"body":"hello(l{251})".*/ +--- response_body eval +qr/hello.*/ + + + +=== TEST 18: set route(large_body, meta_format = default, include_req_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "meta_format": "default", + "include_req_body": true, + "max_req_body_bytes": 256, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/echo" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + + +=== TEST 19: hit route(large_body, meta_format = default, include_req_body = true) +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("lib.test_admin") + local http = require("resty.http") + + local large_body = { + "h", "e", "l", "l", "o" + } + + local size_in_bytes = 10 * 1024 -- 10kb + for i = 1, size_in_bytes do + large_body[i+5] = "l" + end + large_body = table.concat(large_body, "") + + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/echo" + + local httpc = http.new() + local res, err = httpc:request_uri(uri, + { + method = "POST", + body = large_body, + } + ) + ngx.say(res.body) + } + } +--- request +GET /t +--- error_log eval +qr/send data to kafka: \{.*"body":"hello(l{251})".*/ +--- response_body eval +qr/hello.*/ + + + +=== TEST 20: set route(large_body, meta_format = default, include_resp_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "meta_format": "default", + "include_resp_body": true, + "max_resp_body_bytes": 256, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1970": 1 + }, + "type": "roundrobin" + }, + "uri": "/large_resp" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + + +=== TEST 21: truncate upstream response body 1m to 256 bytes +--- request +GET /large_resp +--- response_body eval +qr/hello.*/ +--- error_log eval +qr/send data to kafka: \{.*"body":"hello(l{251})".*/ + + + +=== TEST 22: set route(large_body, meta_format = default, include_req_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "meta_format": "default", + "include_req_body": true, + "max_req_body_bytes": 256, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- response_body +passed + + + +=== TEST 23: truncate upstream request body 1m to 256 bytes +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + local t = require("lib.test_admin") + local http = require("resty.http") + + local large_body = { + "h", "e", "l", "l", "o" + } + + local size_in_bytes = 100 * 1024 -- 10kb + for i = 1, size_in_bytes do + large_body[i+5] = "l" + end + large_body = table.concat(large_body, "") + + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + + local httpc = http.new() + local res, err = httpc:request_uri(uri, + { + method = "GET", + body = large_body, + } + ) + + if err then + ngx.say(err) + end + + ngx.say(res.body) + } + } +--- request +GET /t +--- response_body_like +hello world +--- error_log eval +qr/send data to kafka: \{.*"body":"hello(l{251})".*/ + + + +=== TEST 24: set route(meta_format = default, include_req_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "kafka-logger": { + "broker_list" : { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "max_req_body_bytes": 5, + "include_req_body": true, + "meta_format": "default" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + +=== TEST 25: fail to get body_file with empty request body +--- request +GET /hello?ab=cd +--- response_body +hello world +--- error_log +fail to get body_file +--- wait: 2 From a9fe9cc3cdd6e7c9e26bb210eaf856fb16c612d9 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 9 Apr 2024 13:50:32 +0545 Subject: [PATCH 2/8] fix lint --- apisix/utils/log-util.lua | 1 + t/plugin/kafka-logger-large-body.t | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua index b3b46c9371fa..111abf4a0da4 100644 --- a/apisix/utils/log-util.lua +++ b/apisix/utils/log-util.lua @@ -32,6 +32,7 @@ local is_http = ngx.config.subsystem == "http" local req_get_body_file = ngx.req.get_body_file local MAX_REQ_BODY = 524288 -- 512 KiB local MAX_RESP_BODY = 524288 -- 512 KiB +local io = io local lru_log_format = core.lrucache.new({ ttl = 300, count = 512 diff --git a/t/plugin/kafka-logger-large-body.t b/t/plugin/kafka-logger-large-body.t index 90c4394ffca7..b535ba434829 100644 --- a/t/plugin/kafka-logger-large-body.t +++ b/t/plugin/kafka-logger-large-body.t @@ -1,3 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# use t::APISIX 'no_plan'; repeat_each(1); From bb6812da872629465e32e9b39ebb7b94fff4c2c5 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 9 Apr 2024 14:03:02 +0545 Subject: [PATCH 3/8] fix test lint --- t/plugin/kafka-logger-large-body.t | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/t/plugin/kafka-logger-large-body.t b/t/plugin/kafka-logger-large-body.t index b535ba434829..e2e6cc4b26d6 100644 --- a/t/plugin/kafka-logger-large-body.t +++ b/t/plugin/kafka-logger-large-body.t @@ -56,7 +56,6 @@ run_tests; __DATA__ - === TEST 1: max_body_bytes is not an integer --- config location /t { @@ -85,6 +84,7 @@ property "max_req_body_bytes" validation failed: wrong type: expected integer, g done + === TEST 2: set route(meta_format = origin, include_req_body = true) --- config location /t { @@ -126,6 +126,7 @@ done passed + === TEST 3: hit route(meta_format = origin, include_req_body = true) --- request GET /hello?ab=cd @@ -141,6 +142,7 @@ abcde --- wait: 2 + === TEST 4: set route(meta_format = default, include_req_body = true) --- config location /t { @@ -181,6 +183,7 @@ abcde passed + === TEST 5: hit route(meta_format = default, include_req_body = true) --- request GET /hello?ab=cd @@ -192,6 +195,7 @@ qr/"body": "abcde"/ --- wait: 2 + === TEST 6: set route(id: 1, meta_format = default, include_resp_body = true) --- config location /t { @@ -234,6 +238,7 @@ qr/"body": "abcde"/ passed + === TEST 7: hit route(meta_format = default, include_resp_body = true) --- request POST /hello?name=qwerty @@ -289,6 +294,7 @@ qr/send data to kafka: \{.*"body":"hello"/ passed + === TEST 9: hit route(meta_format = origin, include_resp_body = true) --- request POST /hello?name=qwerty @@ -303,6 +309,7 @@ connection: close --- wait: 2 + === TEST 10: set route(id: 1, meta_format = default, include_resp_body = true, include_req_body = true) --- config location /t { @@ -348,6 +355,7 @@ connection: close passed + === TEST 11: hit route(meta_format = default, include_resp_body = true, include_req_body = true) --- request POST /hello?name=qwerty @@ -403,6 +411,7 @@ qr/send data to kafka: \{.*"body":"abcde"/ passed + === TEST 13: hit route(meta_format = default, include_resp_body = false, include_req_body = false) --- request POST /hello?name=qwerty @@ -849,6 +858,7 @@ qr/send data to kafka: \{.*"body":"hello(l{251})".*/ passed + === TEST 25: fail to get body_file with empty request body --- request GET /hello?ab=cd From b17963834e4b410830b77fcd11a92568b172282b Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Wed, 10 Apr 2024 15:08:41 +0545 Subject: [PATCH 4/8] add doc --- docs/en/latest/plugins/kafka-logger.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index 476442e50cda..9ec18e1f9778 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -55,8 +55,10 @@ It might take some time to receive the log data. It will be automatically sent a | log_format | object | False | | | Log format declared as key value pairs in JSON format. Values only support strings. [APISIX](../apisix-variable.md) or [Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by prefixing the string with `$`. | | include_req_body | boolean | False | false | [false, true] | When set to `true` includes the request body in the log. If the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitations. | | include_req_body_expr | array | False | | | Filter for when the `include_req_body` attribute is set to `true`. Request body is only logged when the expression set here evaluates to `true`. See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more. | +| max_req_body_bytes | integer | False | 524288 | >=1 | Request bodies within this size will be pushed to kafka, if the size exceeds the configured value it will be truncated before pushing to Kafka. | | include_resp_body | boolean | False | false | [false, true] | When set to `true` includes the response body in the log. | | include_resp_body_expr | array | False | | | Filter for when the `include_resp_body` attribute is set to `true`. Response body is only logged when the expression set here evaluates to `true`. See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more. | +| max_resp_body_bytes | integer | False | 524288 | >=1 | Request bodies within this size will be pushed to kafka, if the size exceeds the configured value it will be truncated before pushing to Kafka. | | cluster_name | integer | False | 1 | [0,...] | Name of the cluster. Used when there are two or more Kafka clusters. Only works if the `producer_type` attribute is set to `async`. | | producer_batch_num | integer | optional | 200 | [1,...] | `batch_num` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). The merge message and batch is send to the server. Unit is message count. | | producer_batch_size | integer | optional | 1048576 | [0,...] | `batch_size` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes. | From b32ca2ced8c929f04ea902cec1810df5323c96a9 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Wed, 10 Apr 2024 16:07:57 +0545 Subject: [PATCH 5/8] code review --- apisix/plugins/kafka-logger.lua | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index cda840518a86..adeec2921a35 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -23,7 +23,7 @@ local bp_manager_mod = require("apisix.utils.batch-processor-manager") local math = math local pairs = pairs local type = type -local req_body = ngx.req.read_body +local req_read_body = ngx.req.read_body local plugin_name = "kafka-logger" local batch_processor_manager = bp_manager_mod.new("kafka logger") @@ -216,7 +216,7 @@ end function _M.access(conf, ctx) if conf.include_req_body then - local read_req_body = true + local should_read_body = true if conf.include_req_body_expr then if not conf.request_expr then local request_expr, err = expr.new(conf.include_req_body_expr) @@ -230,11 +230,11 @@ function _M.access(conf, ctx) local result = conf.request_expr:eval(ctx.var) if not result then - read_req_body = false + should_read_body = false end end - if read_req_body then - req_body() + if should_read_body then + req_read_body() end end end From 4109d4d739c61c8d3acd6d57cf40dd9aea94f413 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Wed, 10 Apr 2024 17:19:36 +0545 Subject: [PATCH 6/8] return nil when no body --- apisix/utils/log-util.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua index 111abf4a0da4..e53daca80be8 100644 --- a/apisix/utils/log-util.lua +++ b/apisix/utils/log-util.lua @@ -52,7 +52,7 @@ local function get_request_body(max_bytes) local file_name = req_get_body_file() if not file_name then - return nil, "fail to get body_file " + return nil end core.log.info("attempt to read body from file: ", file_name) From 22fc4f666b99889c99c6d53a8c55c99f8cff8564 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Fri, 12 Apr 2024 13:00:26 +0545 Subject: [PATCH 7/8] remove not needed chunk processing --- apisix/core/response.lua | 3 --- 1 file changed, 3 deletions(-) diff --git a/apisix/core/response.lua b/apisix/core/response.lua index e7c9c40eee2a..baee97749598 100644 --- a/apisix/core/response.lua +++ b/apisix/core/response.lua @@ -217,9 +217,6 @@ function _M.hold_body_chunk(ctx, hold_the_copy, max_resp_body_bytes) local body_data = concat_tab(body_buffer, "", 1, body_buffer.n) ctx._body_buffer[ctx._plugin_name] = nil - if max_resp_body_bytes and #body_data >= max_resp_body_bytes then - body_data = str_sub(body_data, 1, max_resp_body_bytes) - end return body_data end From 1f775c8ace851a8b8862801ad35e7cf4cd00851f Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Fri, 12 Apr 2024 14:39:01 +0545 Subject: [PATCH 8/8] fix test --- t/plugin/kafka-logger-large-body.t | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/t/plugin/kafka-logger-large-body.t b/t/plugin/kafka-logger-large-body.t index e2e6cc4b26d6..e86c645915b2 100644 --- a/t/plugin/kafka-logger-large-body.t +++ b/t/plugin/kafka-logger-large-body.t @@ -796,7 +796,7 @@ passed local httpc = http.new() local res, err = httpc:request_uri(uri, { - method = "GET", + method = "POST", body = large_body, } ) @@ -859,11 +859,11 @@ passed -=== TEST 25: fail to get body_file with empty request body +=== TEST 25: empty request body --- request GET /hello?ab=cd --- response_body hello world ---- error_log -fail to get body_file +--- error_log eval +qr/send data to kafka/ --- wait: 2