Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: simple setup upstream #8130

Merged
merged 18 commits into from
Oct 23, 2022
Merged
2 changes: 1 addition & 1 deletion apisix/http/route.lua
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ function _M.create_radixtree_uri_router(routes, uri_routes, with_parameter)
end
end

event.push(event.CONST.BUILD_ROUTER, uri_routes)
event.push(event.CONST.BUILD_ROUTER, routes)
core.log.info("route items: ", core.json.delay_encode(uri_routes, true))

if with_parameter then
Expand Down
214 changes: 110 additions & 104 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,115 @@ local function common_phase(phase_name)
end



function _M.handle_upstream(api_ctx, route, enable_websocket)
local up_id = route.value.upstream_id

-- used for the traffic-split plugin
if api_ctx.upstream_id then
up_id = api_ctx.upstream_id
end

if up_id then
local upstream = apisix_upstream.get_by_id(up_id)
if not upstream then
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

api_ctx.matched_upstream = upstream

else
if route.has_domain then
local err
route, err = parse_domain_in_route(route)
if err then
core.log.error("failed to get resolved route: ", err)
return core.response.exit(500)
end

api_ctx.conf_version = route.modifiedIndex
api_ctx.matched_route = route
end

local route_val = route.value

api_ctx.matched_upstream = (route.dns_value and
route.dns_value.upstream)
or route_val.upstream
end

if api_ctx.matched_upstream and api_ctx.matched_upstream.tls and
api_ctx.matched_upstream.tls.client_cert_id then

local cert_id = api_ctx.matched_upstream.tls.client_cert_id
local upstream_ssl = router.router_ssl.get_by_id(cert_id)
if not upstream_ssl or upstream_ssl.type ~= "client" then
local err = upstream_ssl and
"ssl type should be 'client'" or
"ssl id [" .. cert_id .. "] not exits"
core.log.error("failed to get ssl cert: ", err)

if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

core.log.info("matched ssl: ",
core.json.delay_encode(upstream_ssl, true))
api_ctx.upstream_ssl = upstream_ssl
end

if enable_websocket then
api_ctx.var.upstream_upgrade = api_ctx.var.http_upgrade
api_ctx.var.upstream_connection = api_ctx.var.http_connection
core.log.info("enabled websocket for route: ", route.value.id)
end

-- load balancer is not required by kafka upstream, so the upstream
-- node selection process is intercepted and left to kafka to
-- handle on its own
if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
return pubsub_kafka.access(api_ctx)
end

local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
core.response.exit(code)
end

local server, err = load_balancer.pick_server(route, api_ctx)
if not server then
core.log.error("failed to pick server: ", err)
return core.response.exit(502)
end

api_ctx.picked_server = server

set_upstream_headers(api_ctx, server)

-- run the before_proxy method in access phase first to avoid always reinit request
common_phase("before_proxy")

local up_scheme = api_ctx.upstream_scheme
if up_scheme == "grpcs" or up_scheme == "grpc" then
stash_ngx_ctx()
return ngx.exec("@grpc_pass")
end

if api_ctx.dubbo_proxy_enabled then
stash_ngx_ctx()
return ngx.exec("@dubbo_pass")
end
end


function _M.http_access_phase()
local ngx_ctx = ngx.ctx

Expand Down Expand Up @@ -495,110 +604,7 @@ function _M.http_access_phase()
plugin.run_plugin("access", plugins, api_ctx)
end

local up_id = route.value.upstream_id

-- used for the traffic-split plugin
if api_ctx.upstream_id then
up_id = api_ctx.upstream_id
end

if up_id then
local upstream = apisix_upstream.get_by_id(up_id)
if not upstream then
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

api_ctx.matched_upstream = upstream

else
if route.has_domain then
local err
route, err = parse_domain_in_route(route)
if err then
core.log.error("failed to get resolved route: ", err)
return core.response.exit(500)
end

api_ctx.conf_version = route.modifiedIndex
api_ctx.matched_route = route
end

local route_val = route.value

api_ctx.matched_upstream = (route.dns_value and
route.dns_value.upstream)
or route_val.upstream
end

if api_ctx.matched_upstream and api_ctx.matched_upstream.tls and
api_ctx.matched_upstream.tls.client_cert_id then

local cert_id = api_ctx.matched_upstream.tls.client_cert_id
local upstream_ssl = router.router_ssl.get_by_id(cert_id)
if not upstream_ssl or upstream_ssl.type ~= "client" then
local err = upstream_ssl and
"ssl type should be 'client'" or
"ssl id [" .. cert_id .. "] not exits"
core.log.error("failed to get ssl cert: ", err)

if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

core.log.info("matched ssl: ",
core.json.delay_encode(upstream_ssl, true))
api_ctx.upstream_ssl = upstream_ssl
end

if enable_websocket then
api_ctx.var.upstream_upgrade = api_ctx.var.http_upgrade
api_ctx.var.upstream_connection = api_ctx.var.http_connection
core.log.info("enabled websocket for route: ", route.value.id)
end

-- load balancer is not required by kafka upstream, so the upstream
-- node selection process is intercepted and left to kafka to
-- handle on its own
if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
return pubsub_kafka.access(api_ctx)
end

local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
core.response.exit(code)
end

local server, err = load_balancer.pick_server(route, api_ctx)
if not server then
core.log.error("failed to pick server: ", err)
return core.response.exit(502)
end

api_ctx.picked_server = server

set_upstream_headers(api_ctx, server)

-- run the before_proxy method in access phase first to avoid always reinit request
common_phase("before_proxy")

local up_scheme = api_ctx.upstream_scheme
if up_scheme == "grpcs" or up_scheme == "grpc" then
stash_ngx_ctx()
return ngx.exec("@grpc_pass")
end

if api_ctx.dubbo_proxy_enabled then
stash_ngx_ctx()
return ngx.exec("@dubbo_pass")
end
_M.handle_upstream(api_ctx, route, enable_websocket)
end


Expand Down
Loading