From 00e3794dfd96fdf66d0828842572b84e45c17114 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 20 Oct 2022 11:02:29 +0800 Subject: [PATCH 01/17] adjust ai plugin priority --- apisix/plugins/ai.lua | 2 +- conf/config-default.yaml | 2 +- t/admin/plugins.t | 2 +- t/core/config.t | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index 6b60aac29c1f..68cc78203a94 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -51,7 +51,7 @@ local plugin_name = "ai" local _M = { version = 0.1, - priority = 25000, + priority = 22900, name = plugin_name, schema = schema, scope = "global", diff --git a/conf/config-default.yaml b/conf/config-default.yaml index bad0e41e4c1d..6e714b577346 100755 --- a/conf/config-default.yaml +++ b/conf/config-default.yaml @@ -389,8 +389,8 @@ graphql: #cmd: ["ls", "-l"] plugins: # plugin list (sorted by priority) - - ai # priority: 25000 - real-ip # priority: 23000 + - ai # priority: 22900 - client-control # priority: 22000 - proxy-control # priority: 21990 - request-id # priority: 12015 diff --git a/t/admin/plugins.t b/t/admin/plugins.t index 74827e437ebf..98e337e57dfc 100644 --- a/t/admin/plugins.t +++ b/t/admin/plugins.t @@ -61,8 +61,8 @@ __DATA__ } --- response_body -ai real-ip +ai client-control proxy-control request-id diff --git a/t/core/config.t b/t/core/config.t index 18191dae724f..29d1cc52dc07 100644 --- a/t/core/config.t +++ b/t/core/config.t @@ -38,7 +38,7 @@ __DATA__ GET /t --- response_body etcd host: http://127.0.0.1:2379 -first plugin: "ai" +first plugin: "real-ip" From 8514ba2a64dde77c04c16cffe4421b31691b43ff Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 20 Oct 2022 12:51:58 +0800 Subject: [PATCH 02/17] perf: simple setup upstream --- apisix/http/route.lua | 2 +- apisix/init.lua | 214 ++++++++++++++++++----------------- apisix/plugins/ai.lua | 119 ++++++++++++++++---- t/plugin/ai.t | 255 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 466 insertions(+), 124 deletions(-) diff --git a/apisix/http/route.lua b/apisix/http/route.lua index 6292b577a071..d475646b56c6 100644 --- a/apisix/http/route.lua +++ b/apisix/http/route.lua @@ -92,7 +92,7 @@ function _M.create_radixtree_uri_router(routes, uri_routes, with_parameter) end end - event.push(event.CONST.BUILD_ROUTER, uri_routes) + event.push(event.CONST.BUILD_ROUTER, routes) core.log.info("route items: ", core.json.delay_encode(uri_routes, true)) if with_parameter then diff --git a/apisix/init.lua b/apisix/init.lua index 2030e0241d51..883dbd9abab0 100644 --- a/apisix/init.lua +++ b/apisix/init.lua @@ -348,6 +348,115 @@ local function common_phase(phase_name) end + +function _M.handle_upstream(api_ctx, route, enable_websocket) + local up_id = route.value.upstream_id + + -- used for the traffic-split plugin + if api_ctx.upstream_id then + up_id = api_ctx.upstream_id + end + + if up_id then + local upstream = apisix_upstream.get_by_id(up_id) + if not upstream then + if is_http then + return core.response.exit(502) + end + + return ngx_exit(1) + end + + api_ctx.matched_upstream = upstream + + else + if route.has_domain then + local err + route, err = parse_domain_in_route(route) + if err then + core.log.error("failed to get resolved route: ", err) + return core.response.exit(500) + end + + api_ctx.conf_version = route.modifiedIndex + api_ctx.matched_route = route + end + + local route_val = route.value + + api_ctx.matched_upstream = (route.dns_value and + route.dns_value.upstream) + or route_val.upstream + end + + if api_ctx.matched_upstream and api_ctx.matched_upstream.tls and + api_ctx.matched_upstream.tls.client_cert_id then + + local cert_id = api_ctx.matched_upstream.tls.client_cert_id + local upstream_ssl = router.router_ssl.get_by_id(cert_id) + if not upstream_ssl or upstream_ssl.type ~= "client" then + local err = upstream_ssl and + "ssl type should be 'client'" or + "ssl id [" .. cert_id .. "] not exits" + core.log.error("failed to get ssl cert: ", err) + + if is_http then + return core.response.exit(502) + end + + return ngx_exit(1) + end + + core.log.info("matched ssl: ", + core.json.delay_encode(upstream_ssl, true)) + api_ctx.upstream_ssl = upstream_ssl + end + + if enable_websocket then + api_ctx.var.upstream_upgrade = api_ctx.var.http_upgrade + api_ctx.var.upstream_connection = api_ctx.var.http_connection + core.log.info("enabled websocket for route: ", route.value.id) + end + + -- load balancer is not required by kafka upstream, so the upstream + -- node selection process is intercepted and left to kafka to + -- handle on its own + if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then + return pubsub_kafka.access(api_ctx) + end + + local code, err = set_upstream(route, api_ctx) + if code then + core.log.error("failed to set upstream: ", err) + core.response.exit(code) + end + + local server, err = load_balancer.pick_server(route, api_ctx) + if not server then + core.log.error("failed to pick server: ", err) + return core.response.exit(502) + end + + api_ctx.picked_server = server + + set_upstream_headers(api_ctx, server) + + -- run the before_proxy method in access phase first to avoid always reinit request + common_phase("before_proxy") + + local up_scheme = api_ctx.upstream_scheme + if up_scheme == "grpcs" or up_scheme == "grpc" then + stash_ngx_ctx() + return ngx.exec("@grpc_pass") + end + + if api_ctx.dubbo_proxy_enabled then + stash_ngx_ctx() + return ngx.exec("@dubbo_pass") + end +end + + function _M.http_access_phase() local ngx_ctx = ngx.ctx @@ -495,110 +604,7 @@ function _M.http_access_phase() plugin.run_plugin("access", plugins, api_ctx) end - local up_id = route.value.upstream_id - - -- used for the traffic-split plugin - if api_ctx.upstream_id then - up_id = api_ctx.upstream_id - end - - if up_id then - local upstream = apisix_upstream.get_by_id(up_id) - if not upstream then - if is_http then - return core.response.exit(502) - end - - return ngx_exit(1) - end - - api_ctx.matched_upstream = upstream - - else - if route.has_domain then - local err - route, err = parse_domain_in_route(route) - if err then - core.log.error("failed to get resolved route: ", err) - return core.response.exit(500) - end - - api_ctx.conf_version = route.modifiedIndex - api_ctx.matched_route = route - end - - local route_val = route.value - - api_ctx.matched_upstream = (route.dns_value and - route.dns_value.upstream) - or route_val.upstream - end - - if api_ctx.matched_upstream and api_ctx.matched_upstream.tls and - api_ctx.matched_upstream.tls.client_cert_id then - - local cert_id = api_ctx.matched_upstream.tls.client_cert_id - local upstream_ssl = router.router_ssl.get_by_id(cert_id) - if not upstream_ssl or upstream_ssl.type ~= "client" then - local err = upstream_ssl and - "ssl type should be 'client'" or - "ssl id [" .. cert_id .. "] not exits" - core.log.error("failed to get ssl cert: ", err) - - if is_http then - return core.response.exit(502) - end - - return ngx_exit(1) - end - - core.log.info("matched ssl: ", - core.json.delay_encode(upstream_ssl, true)) - api_ctx.upstream_ssl = upstream_ssl - end - - if enable_websocket then - api_ctx.var.upstream_upgrade = api_ctx.var.http_upgrade - api_ctx.var.upstream_connection = api_ctx.var.http_connection - core.log.info("enabled websocket for route: ", route.value.id) - end - - -- load balancer is not required by kafka upstream, so the upstream - -- node selection process is intercepted and left to kafka to - -- handle on its own - if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then - return pubsub_kafka.access(api_ctx) - end - - local code, err = set_upstream(route, api_ctx) - if code then - core.log.error("failed to set upstream: ", err) - core.response.exit(code) - end - - local server, err = load_balancer.pick_server(route, api_ctx) - if not server then - core.log.error("failed to pick server: ", err) - return core.response.exit(502) - end - - api_ctx.picked_server = server - - set_upstream_headers(api_ctx, server) - - -- run the before_proxy method in access phase first to avoid always reinit request - common_phase("before_proxy") - - local up_scheme = api_ctx.upstream_scheme - if up_scheme == "grpcs" or up_scheme == "grpc" then - stash_ngx_ctx() - return ngx.exec("@grpc_pass") - end - - if api_ctx.dubbo_proxy_enabled then - stash_ngx_ctx() - return ngx.exec("@dubbo_pass") - end + _M.handle_upstream(api_ctx, route, enable_websocket) end diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index 68cc78203a94..ac9c2f7ab59a 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -18,9 +18,12 @@ local require = require local core = require("apisix.core") local router = require("apisix.router") local event = require("apisix.core.event") +local apisix_upstream = require("apisix.upstream") local ipairs = ipairs local pcall = pcall local loadstring = loadstring +local type = type +local apisix = apisix local encode_base64 = ngx.encode_base64 local get_cache_key_func @@ -58,6 +61,7 @@ local _M = { } local orig_router_match +local orig_handle_upstream = apisix.handle_upstream local function match_route(ctx) @@ -101,28 +105,88 @@ local function gen_get_cache_key_func(route_flags) end +local function ai_upstream(api_ctx, route) + core.log.info("enable sample upstream") + local up_conf = route.value.upstream + local upstream_key = up_conf.type .. "#route_" .. route.value.id + apisix_upstream.set(api_ctx, upstream_key, api_ctx.conf_version, up_conf) +end + + local function routes_analyze(routes) -- TODO: need to add a option in config.yaml to enable this feature(default is true) - local route_flags = core.table.new(0, 2) + local route_flags = core.table.new(0, 5) + local route_up_flags = core.table.new(0, 8) for _, route in ipairs(routes) do - if route.methods then - route_flags["methods"] = true - end - - if route.host or route.hosts then - route_flags["host"] = true - end - - if route.vars then - route_flags["vars"] = true - end - - if route.filter_fun then - route_flags["filter_fun"] = true - end - - if route.remote_addr or route.remote_addrs then - route_flags["remote_addr"] = true + if type(route) == "table" then + if route.value.methods then + route_flags["methods"] = true + end + + if route.value.host or route.hosts then + route_flags["host"] = true + end + + if route.value.vars then + route_flags["vars"] = true + end + + if route.value.filter_fun then + route_flags["filter_fun"] = true + end + + if route.value.remote_addr or route.remote_addrs then + route_flags["remote_addr"] = true + end + + if route.value.service then + route_flags["service"] = true + end + + if route.value.enable_websocket then + route_flags["enable_websocket"] = true + end + + if route.value.plugins then + route_flags["plugins"] = true + end + + local upstream = route.value.upstream + if upstream and upstream.nodes and #upstream.nodes == 1 then + local node = upstream.nodes[1] + if not core.utils.parse_ipv4(node.host) + and not core.utils.parse_ipv6(node.host) then + route_up_flags["has_domain"] = true + end + + if upstream.pass_host == "pass" then + route_up_flags["pass_host"] = true + end + + if upstream.scheme == "http" then + route_up_flags["scheme"] = true + end + + if upstream.checks then + route_up_flags["checks"] = true + end + + if upstream.retries then + route_up_flags["retries"] = true + end + + if upstream.timeout then + route_up_flags["timeout"] = true + end + + if upstream.tls then + route_up_flags["tls"] = true + end + + if upstream.keepalive then + route_up_flags["keepalive"] = true + end + end end end @@ -139,6 +203,23 @@ local function routes_analyze(routes) router.router_http.match = orig_router_match end end + + if not route_flags["service"] + and not route_flags["enable_websocket"] + and not route_flags["plugins"] + and not route_up_flags["has_domain"] + and route_up_flags["pass_host"] + and route_up_flags["scheme"] + and not route_up_flags["checks"] + and not route_up_flags["retries"] + and not route_up_flags["timeout"] + and not route_up_flags["timeout"] + and not route_up_flags["keepalive"] then + -- replace the upstream module + apisix.handle_upstream = ai_upstream + else + apisix.handle_upstream = orig_handle_upstream + end end diff --git a/t/plugin/ai.t b/t/plugin/ai.t index 3c0cd62d97e0..33408edcf317 100644 --- a/t/plugin/ai.t +++ b/t/plugin/ai.t @@ -620,3 +620,258 @@ route cache key: L2hlbGxvAEdFVA== done --- error_log route cache key: L2hlbGxvAEdFVAAxMjcuMC4wLjE= + + + +=== TEST 9: enable sample upstream +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "methods": ["GET"], + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.sleep(0.5) + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri) + assert(res.status == 200) + if not res then + ngx.log(ngx.ERR, err) + return + end + ngx.say("done") + } + } +--- response_body +done +--- error_log +enable sample upstream + + + +=== TEST 10: route has plugins and run before_proxy, disable samply upstream +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "serverless-pre-function": { + "phase": "before_proxy", + "functions" : ["return function(conf, ctx) ngx.log(ngx.WARN, \"run before_proxy phase balancer_ip : \", ctx.balancer_ip) end"] + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri) + assert(res.status == 200) + if not res then + ngx.log(ngx.ERR, err) + return + end + ngx.say("done") + } + } +--- response_body +done +--- error_log +run before_proxy phase balancer_ip : 127.0.0.1 +--- no_error_log +enable sample upstream + + + +=== TEST 11: upstream has more than one nodes, disable sample upstream +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "methods": ["GET"], + "upstream": { + "nodes": { + "127.0.0.1:1980": 1, + "127.0.0.1:1981": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.sleep(0.5) + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri) + assert(res.status == 200) + if not res then + ngx.log(ngx.ERR, err) + return + end + ngx.say("done") + } + } +--- response_body +done +--- no_error_log +enable sample upstream + + + +=== TEST 12: node has domain, disable sample upstream +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "methods": ["GET"], + "upstream": { + "nodes": { + "admin.apisix.dev:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri) + assert(res.status == 200) + if not res then + ngx.log(ngx.ERR, err) + return + end + ngx.say("done") + } + } +--- response_body +done +--- no_error_log +enable sample upstream + + + +=== TEST 13: enable --> disable sample upstream +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "methods": ["GET"], + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.sleep(0.5) + + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri) + assert(res.status == 200) + if not res then + ngx.log(ngx.ERR, err) + return + end + + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "methods": ["GET"], + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "enable_websocket": true, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.sleep(0.5) + + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri) + assert(res.status == 200) + if not res then + ngx.log(ngx.ERR, err) + return + end + + ngx.say("done") + } + } +--- response_body +done +--- grep_error_log eval +qr/enable sample upstream/ +--- grep_error_log_out +enable sample upstream From 852eefea9cc73925710e6f981a894808897a77e0 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 20 Oct 2022 13:03:00 +0800 Subject: [PATCH 03/17] fix code lint --- apisix/plugins/ai.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index ac9c2f7ab59a..61f5367e87e8 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -23,7 +23,7 @@ local ipairs = ipairs local pcall = pcall local loadstring = loadstring local type = type -local apisix = apisix +local apisix = _G.apisix local encode_base64 = ngx.encode_base64 local get_cache_key_func From 539a6850b69579ba0d95bfbbd345e1a983705c71 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 20 Oct 2022 13:13:36 +0800 Subject: [PATCH 04/17] fix code lint --- apisix/plugins/ai.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index 61f5367e87e8..fe33edaa2c36 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -15,6 +15,7 @@ -- limitations under the License. -- local require = require +local apisix = require("apisix") local core = require("apisix.core") local router = require("apisix.router") local event = require("apisix.core.event") @@ -23,7 +24,6 @@ local ipairs = ipairs local pcall = pcall local loadstring = loadstring local type = type -local apisix = _G.apisix local encode_base64 = ngx.encode_base64 local get_cache_key_func From 3279eab1031017d7377da44719eb51ad7d27b6fc Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 20 Oct 2022 14:02:41 +0800 Subject: [PATCH 05/17] fix CI --- apisix/plugins/ai.lua | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index fe33edaa2c36..00b43c9e11db 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -159,6 +159,10 @@ local function routes_analyze(routes) route_up_flags["has_domain"] = true end + if upstream.id then + route_up_flags["id"] = true + end + if upstream.pass_host == "pass" then route_up_flags["pass_host"] = true end @@ -208,6 +212,7 @@ local function routes_analyze(routes) and not route_flags["enable_websocket"] and not route_flags["plugins"] and not route_up_flags["has_domain"] + and route_up_flags["id"] and route_up_flags["pass_host"] and route_up_flags["scheme"] and not route_up_flags["checks"] From 16348f91a68c59553005727015d1c7067dcce27c Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 20 Oct 2022 14:30:59 +0800 Subject: [PATCH 06/17] fix CI --- apisix/plugins/ai.lua | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index 00b43c9e11db..572e5f920f7b 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -151,6 +151,10 @@ local function routes_analyze(routes) route_flags["plugins"] = true end + if route.value.upstream_id then + route_flags["upstream_id"] = true + end + local upstream = route.value.upstream if upstream and upstream.nodes and #upstream.nodes == 1 then local node = upstream.nodes[1] @@ -159,10 +163,6 @@ local function routes_analyze(routes) route_up_flags["has_domain"] = true end - if upstream.id then - route_up_flags["id"] = true - end - if upstream.pass_host == "pass" then route_up_flags["pass_host"] = true end @@ -209,10 +209,10 @@ local function routes_analyze(routes) end if not route_flags["service"] + and not route_flags["upstream_id"] and not route_flags["enable_websocket"] and not route_flags["plugins"] and not route_up_flags["has_domain"] - and route_up_flags["id"] and route_up_flags["pass_host"] and route_up_flags["scheme"] and not route_up_flags["checks"] @@ -221,8 +221,10 @@ local function routes_analyze(routes) and not route_up_flags["timeout"] and not route_up_flags["keepalive"] then -- replace the upstream module + ngx.log(ngx.WARN, "replace the upstream module") apisix.handle_upstream = ai_upstream else + ngx.log(ngx.WARN, "origin the upstream module") apisix.handle_upstream = orig_handle_upstream end end From 924d232e39ecedda8079b030a69d085394331661 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 20 Oct 2022 15:06:40 +0800 Subject: [PATCH 07/17] fix lint --- apisix/plugins/ai.lua | 2 -- 1 file changed, 2 deletions(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index 572e5f920f7b..df9dcbd97bc4 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -221,10 +221,8 @@ local function routes_analyze(routes) and not route_up_flags["timeout"] and not route_up_flags["keepalive"] then -- replace the upstream module - ngx.log(ngx.WARN, "replace the upstream module") apisix.handle_upstream = ai_upstream else - ngx.log(ngx.WARN, "origin the upstream module") apisix.handle_upstream = orig_handle_upstream end end From 47507ca123b19f3cf189e309a1978a301ba387bf Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 20 Oct 2022 17:52:52 +0800 Subject: [PATCH 08/17] balancer run --- apisix/plugins/ai.lua | 27 ++++++++++++++++++++++----- t/debug/dynamic-hook.t | 1 + 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index df9dcbd97bc4..49066977c458 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -19,7 +19,8 @@ local apisix = require("apisix") local core = require("apisix.core") local router = require("apisix.router") local event = require("apisix.core.event") -local apisix_upstream = require("apisix.upstream") +local load_balancer = require("apisix.balancer") +local balancer = require("ngx.balancer") local ipairs = ipairs local pcall = pcall local loadstring = loadstring @@ -62,6 +63,7 @@ local _M = { local orig_router_match local orig_handle_upstream = apisix.handle_upstream +local orig_balancer_run = load_balancer.run local function match_route(ctx) @@ -105,14 +107,27 @@ local function gen_get_cache_key_func(route_flags) end -local function ai_upstream(api_ctx, route) +local function ai_upstream() core.log.info("enable sample upstream") - local up_conf = route.value.upstream - local upstream_key = up_conf.type .. "#route_" .. route.value.id - apisix_upstream.set(api_ctx, upstream_key, api_ctx.conf_version, up_conf) end +local pool_opt = { pool_size = 320 } +local function ai_balancer_run(route) + local server = route.value.upstream.nodes[1] + -- only work for http + if not server.port then + server.port = 80 + end + local ok, err = balancer.set_current_peer(server.host, server.port, pool_opt) + if not ok then + core.log.error("failed to set server peer [", server.host, ":", + server.port, "] err: ", err) + return ok, err + end + balancer.enable_keepalive(60, 1000) +end + local function routes_analyze(routes) -- TODO: need to add a option in config.yaml to enable this feature(default is true) local route_flags = core.table.new(0, 5) @@ -222,8 +237,10 @@ local function routes_analyze(routes) and not route_up_flags["keepalive"] then -- replace the upstream module apisix.handle_upstream = ai_upstream + load_balancer.run = ai_balancer_run else apisix.handle_upstream = orig_handle_upstream + load_balancer.run = orig_balancer_run end end diff --git a/t/debug/dynamic-hook.t b/t/debug/dynamic-hook.t index 692942d1f9e4..a2c069b1039e 100644 --- a/t/debug/dynamic-hook.t +++ b/t/debug/dynamic-hook.t @@ -405,6 +405,7 @@ hook_test: # module and function list, name: hook_test "uri": "/hello", "upstream": { "nodes": { + "127.0.0.1:1980": 1, "127.0.0.1:1980": 1 }, "type": "roundrobin" From 803873bb5c64095875166153cdc28a74018d193c Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 20 Oct 2022 17:57:10 +0800 Subject: [PATCH 09/17] chore lint --- apisix/plugins/ai.lua | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index 49066977c458..13b11102ff6f 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -115,11 +115,7 @@ end local pool_opt = { pool_size = 320 } local function ai_balancer_run(route) local server = route.value.upstream.nodes[1] - -- only work for http - if not server.port then - server.port = 80 - end - local ok, err = balancer.set_current_peer(server.host, server.port, pool_opt) + local ok, err = balancer.set_current_peer(server.host, server.port or 80, pool_opt) if not ok then core.log.error("failed to set server peer [", server.host, ":", server.port, "] err: ", err) From 7179f720d960f974c8158a6915fece02b7794561 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 20 Oct 2022 20:40:43 +0800 Subject: [PATCH 10/17] fix --- apisix/plugins/ai.lua | 19 ++++++++++++------- t/debug/dynamic-hook.t | 2 +- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index 34f81e7a1665..08fb778582a7 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -21,11 +21,12 @@ local router = require("apisix.router") local event = require("apisix.core.event") local load_balancer = require("apisix.balancer") local balancer = require("ngx.balancer") +local is_http = ngx.config.subsystem == "http" +local enable_keepalive = balancer.enable_keepalive and is_http local ipairs = ipairs local pcall = pcall local loadstring = loadstring local type = type -local encode_base64 = ngx.encode_base64 local get_cache_key_func local get_cache_key_func_def_render @@ -115,13 +116,17 @@ end local pool_opt = { pool_size = 320 } local function ai_balancer_run(route) local server = route.value.upstream.nodes[1] - local ok, err = balancer.set_current_peer(server.host, server.port or 80, pool_opt) - if not ok then - core.log.error("failed to set server peer [", server.host, ":", - server.port, "] err: ", err) - return ok, err + if enable_keepalive then + local ok, err = balancer.set_current_peer(server.host, server.port or 80, pool_opt) + if not ok then + core.log.error("failed to set server peer [", server.host, ":", + server.port, "] err: ", err) + return ok, err + end + balancer.enable_keepalive(60, 1000) + else + balancer.set_current_peer(server.host, server.port) end - balancer.enable_keepalive(60, 1000) end local function routes_analyze(routes) diff --git a/t/debug/dynamic-hook.t b/t/debug/dynamic-hook.t index a2c069b1039e..5f650e87ff6b 100644 --- a/t/debug/dynamic-hook.t +++ b/t/debug/dynamic-hook.t @@ -406,7 +406,7 @@ hook_test: # module and function list, name: hook_test "upstream": { "nodes": { "127.0.0.1:1980": 1, - "127.0.0.1:1980": 1 + "127.0.0.1:1981": 1 }, "type": "roundrobin" } From e408e582ac85e5b36a2280a975dbb647a549e7bc Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 20 Oct 2022 22:58:23 +0800 Subject: [PATCH 11/17] fix CI --- apisix/plugins/ai.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index 08fb778582a7..195483183fbf 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -125,7 +125,7 @@ local function ai_balancer_run(route) end balancer.enable_keepalive(60, 1000) else - balancer.set_current_peer(server.host, server.port) + balancer.set_current_peer(server.host, server.port or 80) end end From 9464c4efe9fbd7ae0071392c6a92dd47d94925df Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Thu, 20 Oct 2022 23:16:10 +0800 Subject: [PATCH 12/17] fix debug module --- t/debug/dynamic-hook.t | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/t/debug/dynamic-hook.t b/t/debug/dynamic-hook.t index 5f650e87ff6b..87d4450d569c 100644 --- a/t/debug/dynamic-hook.t +++ b/t/debug/dynamic-hook.t @@ -377,6 +377,11 @@ qr/call\srequire\(\"apisix.plugin\"\).filter\(\)\sreturn.*GET\s\/mysleep\?second === TEST 6: hook function with ctx as param +# ai module would conflict with the debug module +--- extra_yaml_config +plugins: + #ai + - example-plugin --- debug_config basic: enable: true @@ -405,8 +410,7 @@ hook_test: # module and function list, name: hook_test "uri": "/hello", "upstream": { "nodes": { - "127.0.0.1:1980": 1, - "127.0.0.1:1981": 1 + "127.0.0.1:1980": 1 }, "type": "roundrobin" } From 2a0ced40ebfdd693bcc976d4988296d0f42de32c Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Fri, 21 Oct 2022 00:55:19 +0800 Subject: [PATCH 13/17] add service_id as disable condition --- apisix/plugins/ai.lua | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index 195483183fbf..145400e28476 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -171,6 +171,10 @@ local function routes_analyze(routes) route_flags["upstream_id"] = true end + if route.value.service_id then + route_flags["service_id"] = true + end + local upstream = route.value.upstream if upstream and upstream.nodes and #upstream.nodes == 1 then local node = upstream.nodes[1] @@ -211,7 +215,8 @@ local function routes_analyze(routes) end if route_flags["vars"] or route_flags["filter_fun"] - or route_flags["remote_addr"] then + or route_flags["remote_addr"] + or route_flags["service_id"] then router.router_http.match = orig_router_match else core.log.info("use ai plane to match route") @@ -225,6 +230,7 @@ local function routes_analyze(routes) end if not route_flags["service"] + and not route_flags["service_id"] and not route_flags["upstream_id"] and not route_flags["enable_websocket"] and not route_flags["plugins"] From 7e520c546ad348761499f77b846d69bf3138471e Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Fri, 21 Oct 2022 09:20:04 +0800 Subject: [PATCH 14/17] add plugin_config_id as confition of disable route cache --- apisix/plugins/ai.lua | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index 145400e28476..6126ed873353 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -175,6 +175,10 @@ local function routes_analyze(routes) route_flags["service_id"] = true end + if route.value.plugin_config_id then + route_flags["plugin_config_id"] = true + end + local upstream = route.value.upstream if upstream and upstream.nodes and #upstream.nodes == 1 then local node = upstream.nodes[1] @@ -216,7 +220,8 @@ local function routes_analyze(routes) if route_flags["vars"] or route_flags["filter_fun"] or route_flags["remote_addr"] - or route_flags["service_id"] then + or route_flags["service_id"] + or route_flags["plugin_config_id"] then router.router_http.match = orig_router_match else core.log.info("use ai plane to match route") From 38242dfeb64a5289b84d78f0e06a13c7cc7f3928 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Fri, 21 Oct 2022 16:49:54 +0800 Subject: [PATCH 15/17] resolve comments --- apisix/plugins/ai.lua | 185 ++++++++++++++++++++---------------------- 1 file changed, 87 insertions(+), 98 deletions(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index 6126ed873353..ad6167e25325 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -27,6 +27,7 @@ local ipairs = ipairs local pcall = pcall local loadstring = loadstring local type = type +local pairs = pairs local get_cache_key_func local get_cache_key_func_def_render @@ -66,6 +67,7 @@ local orig_router_match local orig_handle_upstream = apisix.handle_upstream local orig_balancer_run = load_balancer.run +local default_keepalive_pool = {} local function match_route(ctx) orig_router_match(ctx) @@ -113,7 +115,7 @@ local function ai_upstream() end -local pool_opt = { pool_size = 320 } +local pool_opt local function ai_balancer_run(route) local server = route.value.upstream.nodes[1] if enable_keepalive then @@ -123,96 +125,72 @@ local function ai_balancer_run(route) server.port, "] err: ", err) return ok, err end - balancer.enable_keepalive(60, 1000) + balancer.enable_keepalive(default_keepalive_pool.idle_timeout, + default_keepalive_pool.requests) else balancer.set_current_peer(server.host, server.port or 80) end end local function routes_analyze(routes) - -- TODO: need to add a option in config.yaml to enable this feature(default is true) - local route_flags = core.table.new(0, 5) - local route_up_flags = core.table.new(0, 8) + local route_flags = core.table.new(0, 16) + local route_up_flags = core.table.new(0, 12) for _, route in ipairs(routes) do if type(route) == "table" then - if route.value.methods then - route_flags["methods"] = true - end - - if route.value.host or route.hosts then - route_flags["host"] = true - end - - if route.value.vars then - route_flags["vars"] = true - end - - if route.value.filter_fun then - route_flags["filter_fun"] = true - end - - if route.value.remote_addr or route.remote_addrs then - route_flags["remote_addr"] = true - end - - if route.value.service then - route_flags["service"] = true - end - - if route.value.enable_websocket then - route_flags["enable_websocket"] = true - end - - if route.value.plugins then - route_flags["plugins"] = true - end - - if route.value.upstream_id then - route_flags["upstream_id"] = true - end - - if route.value.service_id then - route_flags["service_id"] = true - end - - if route.value.plugin_config_id then - route_flags["plugin_config_id"] = true - end - - local upstream = route.value.upstream - if upstream and upstream.nodes and #upstream.nodes == 1 then - local node = upstream.nodes[1] - if not core.utils.parse_ipv4(node.host) - and not core.utils.parse_ipv6(node.host) then - route_up_flags["has_domain"] = true - end - - if upstream.pass_host == "pass" then - route_up_flags["pass_host"] = true - end - - if upstream.scheme == "http" then - route_up_flags["scheme"] = true - end - - if upstream.checks then - route_up_flags["checks"] = true - end - - if upstream.retries then - route_up_flags["retries"] = true - end - - if upstream.timeout then - route_up_flags["timeout"] = true - end - - if upstream.tls then - route_up_flags["tls"] = true + for key, value in pairs(route.value) do + -- collect route flags + if key == "methods" then + route_flags["methods"] = true + elseif key == "host" or key == "hosts" then + route_flags["host"] = true + elseif key == "vars" then + route_flags["vars"] = true + elseif key == "filter_fun"then + route_flags["filter_fun"] = true + elseif key == "remote_addr" or key == "remote_addrs" then + route_flags["remote_addr"] = true + elseif key == "service" then + route_flags["service"] = true + elseif key == "enable_websocket" then + route_flags["enable_websocket"] = true + elseif key == "plugins" then + route_flags["plugins"] = true + elseif key == "upstream_id" then + route_flags["upstream_id"] = true + elseif key == "service_id" then + route_flags["service_id"] = true + elseif key == "plugin_config_id" then + route_flags["plugin_config_id"] = true end - if upstream.keepalive then - route_up_flags["keepalive"] = true + -- collect upstream flags + if key == "upstream" then + if #value.nodes == 1 then + for k, v in pairs(value) do + if k == "nodes" then + if (not core.utils.parse_ipv4(v[1].host) + and not core.utils.parse_ipv6(v[1].host)) then + route_up_flags["has_domain"] = true + end + elseif k == "pass_host" and v ~= "pass" then + route_up_flags["pass_host"] = true + elseif k == "scheme" and v ~= "http" then + route_up_flags["scheme"] = true + elseif k == "checks" then + route_up_flags["checks"] = true + elseif k == "retries" then + route_up_flags["retries"] = true + elseif k == "timeout" then + route_up_flags["timeout"] = true + elseif k == "tls" then + route_up_flags["tls"] = true + elseif k == "keepalive" then + route_up_flags["keepalive"] = true + end + end + else + route_up_flags["more_nodes"] = true + end end end end @@ -234,31 +212,42 @@ local function routes_analyze(routes) end end - if not route_flags["service"] - and not route_flags["service_id"] - and not route_flags["upstream_id"] - and not route_flags["enable_websocket"] - and not route_flags["plugins"] - and not route_up_flags["has_domain"] - and route_up_flags["pass_host"] - and route_up_flags["scheme"] - and not route_up_flags["checks"] - and not route_up_flags["retries"] - and not route_up_flags["timeout"] - and not route_up_flags["timeout"] - and not route_up_flags["keepalive"] then - -- replace the upstream module - apisix.handle_upstream = ai_upstream - load_balancer.run = ai_balancer_run - else + if route_flags["service"] + or route_flags["service_id"] + or route_flags["upstream_id"] + or route_flags["enable_websocket"] + or route_flags["plugins"] + or route_up_flags["has_domain"] + or route_up_flags["pass_host"] + or route_up_flags["scheme"] + or route_up_flags["checks"] + or route_up_flags["retries"] + or route_up_flags["timeout"] + or route_up_flags["tls"] + or route_up_flags["keepalive"] + or route_up_flags["more_nodes"] then apisix.handle_upstream = orig_handle_upstream load_balancer.run = orig_balancer_run + else + -- replace the upstream module + apisix.handle_upstream = ai_upstream + load_balancer.run = ai_balancer_run end end function _M.init() event.register(event.CONST.BUILD_ROUTER, routes_analyze) + local local_conf = core.config.local_conf() + local up_keepalive_conf = + core.table.try_read_attr(local_conf, "nginx_config", + "http", "upstream") + default_keepalive_pool.idle_timeout = + core.config_util.parse_time_unit(up_keepalive_conf.keepalive_timeout) + default_keepalive_pool.size = up_keepalive_conf.keepalive + default_keepalive_pool.requests = up_keepalive_conf.keepalive_requests + + pool_opt = { pool_size = default_keepalive_pool.size } end From e99b1dfbc1044497f3f450081b94307ed659556d Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Fri, 21 Oct 2022 17:38:06 +0800 Subject: [PATCH 16/17] add service name as flag --- apisix/plugins/ai.lua | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index ad6167e25325..4d4cb7ec7a8c 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -165,7 +165,7 @@ local function routes_analyze(routes) -- collect upstream flags if key == "upstream" then - if #value.nodes == 1 then + if value.nodes and #value.nodes == 1 then for k, v in pairs(value) do if k == "nodes" then if (not core.utils.parse_ipv4(v[1].host) @@ -186,7 +186,11 @@ local function routes_analyze(routes) route_up_flags["tls"] = true elseif k == "keepalive" then route_up_flags["keepalive"] = true + elseif k == "service_name" then + route_up_flags["service_name"] = true end + + ngx.log(ngx.WARN, "route_up_flags : ", require("inspect")(route_up_flags)) end else route_up_flags["more_nodes"] = true @@ -225,6 +229,7 @@ local function routes_analyze(routes) or route_up_flags["timeout"] or route_up_flags["tls"] or route_up_flags["keepalive"] + or route_up_flags["service_name"] or route_up_flags["more_nodes"] then apisix.handle_upstream = orig_handle_upstream load_balancer.run = orig_balancer_run From 851b25cb83794b5314fa2ee04edde963ad7eba01 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Fri, 21 Oct 2022 18:31:10 +0800 Subject: [PATCH 17/17] fix lint --- apisix/plugins/ai.lua | 2 -- 1 file changed, 2 deletions(-) diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua index 4d4cb7ec7a8c..eeb78ca80420 100644 --- a/apisix/plugins/ai.lua +++ b/apisix/plugins/ai.lua @@ -189,8 +189,6 @@ local function routes_analyze(routes) elseif k == "service_name" then route_up_flags["service_name"] = true end - - ngx.log(ngx.WARN, "route_up_flags : ", require("inspect")(route_up_flags)) end else route_up_flags["more_nodes"] = true