-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(xds): using data written by xds to control dp behavior #6759
Changes from 15 commits
89b001e
d5f488a
7c7532a
5e546ab
94e3f1f
2065242
49872ed
16c89b9
8aa02d3
af9f13a
0db4cb4
2ee27b6
8e62d12
0e3dc22
21e1c5f
0f58e31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -15,46 +15,70 @@ | |||
-- limitations under the License. | ||||
-- | ||||
|
||||
--- Get configuration form ngx.shared.DICT. | ||||
--- Get configuration form ngx.shared.DICT | ||||
-- | ||||
-- @module core.config_xds | ||||
|
||||
local base = require("resty.core.base") | ||||
local config_local = require("apisix.core.config_local") | ||||
local string = require("apisix.core.string") | ||||
local log = require("apisix.core.log") | ||||
local json = require("apisix.core.json") | ||||
local ngx_sleep = require("apisix.core.utils").sleep | ||||
local check_schema = require("apisix.core.schema").check | ||||
local new_tab = require("table.new") | ||||
local table = table | ||||
local insert_tab = table.insert | ||||
local error = error | ||||
local is_http = ngx.config.subsystem == "http" | ||||
local pcall = pcall | ||||
local tostring = tostring | ||||
local setmetatable = setmetatable | ||||
local io = io | ||||
local io_open = io.open | ||||
local io_close = io.close | ||||
local package = package | ||||
local new_tab = base.new_tab | ||||
local ipairs = ipairs | ||||
local type = type | ||||
local sub_str = string.sub | ||||
local math_ceil = math.ceil | ||||
local ffi = require ("ffi") | ||||
local C = ffi.C | ||||
local route_config = ngx.shared["xds-route-config"] | ||||
local config = ngx.shared["xds-config"] | ||||
local conf_ver = ngx.shared["xds-config-version"] | ||||
local is_http = ngx.config.subsystem == "http" | ||||
local ngx_re_match = ngx.re.match | ||||
local ngx_re_gmatch = ngx.re.gmatch | ||||
local ngx_timer_every = ngx.timer.every | ||||
local ngx_timer_at = ngx.timer.at | ||||
local exiting = ngx.worker.exiting | ||||
local ngx_time = ngx.time | ||||
|
||||
local xds_lib_name = "libxds.so" | ||||
|
||||
local xds_lib_name = "libxds.so" | ||||
|
||||
local process | ||||
if is_http then | ||||
process = require("ngx.process") | ||||
end | ||||
|
||||
|
||||
ffi.cdef[[ | ||||
extern void initial(void* route_zone_ptr); | ||||
extern void initial(void* config_zone, void* version_zone); | ||||
]] | ||||
|
||||
local created_obj = {} | ||||
|
||||
local _M = { | ||||
version = 0.1, | ||||
local_conf = config_local.local_conf, | ||||
} | ||||
|
||||
|
||||
local mt = { | ||||
__index = _M, | ||||
__tostring = function(self) | ||||
return " xds key: " .. self.key | ||||
end | ||||
} | ||||
|
||||
|
||||
-- todo: refactor this function in chash.lua and radixtree.lua | ||||
local function load_shared_lib(lib_name) | ||||
local cpath = package.cpath | ||||
|
@@ -101,18 +125,252 @@ local function load_libxds(lib_name) | |||
table.concat(tried_paths, '\r\n', 1, #tried_paths)) | ||||
end | ||||
|
||||
local route_zone = C.ngx_http_lua_ffi_shdict_udata_to_zone(route_config[1]) | ||||
local route_shd_cdata = ffi.cast("void*", route_zone) | ||||
xdsagent.initial(route_shd_cdata) | ||||
local config_zone = C.ngx_http_lua_ffi_shdict_udata_to_zone(config[1]) | ||||
local config_shd_cdata = ffi.cast("void*", config_zone) | ||||
|
||||
local conf_ver_zone = C.ngx_http_lua_ffi_shdict_udata_to_zone(conf_ver[1]) | ||||
local conf_ver_shd_cdata = ffi.cast("void*", conf_ver_zone) | ||||
|
||||
xdsagent.initial(config_shd_cdata, conf_ver_shd_cdata) | ||||
end | ||||
|
||||
|
||||
local sync_data | ||||
local latest_version | ||||
sync_data = function(self) | ||||
|
||||
if self.conf_version == latest_version then | ||||
return true | ||||
end | ||||
|
||||
if self.values then | ||||
for _, val in ipairs(self.values) do | ||||
if val and val.clean_handlers then | ||||
for _, clean_handler in ipairs(val.clean_handlers) do | ||||
clean_handler(val) | ||||
end | ||||
val.clean_handlers = nil | ||||
end | ||||
end | ||||
self.values = nil | ||||
self.values_hash = nil | ||||
end | ||||
|
||||
local keys = config:get_keys(0) | ||||
|
||||
if not keys or #keys <= 0 then | ||||
-- xds did not write any data to shdict | ||||
return false, "no keys" | ||||
end | ||||
|
||||
-- v1 version we only support route/upstream | ||||
local capacity = math_ceil(#keys / 2) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why divide 2? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ngx.shared["xds-config"] stores all routes, upstreams, here I estimate the capacity of the route by using the total capacity / 2. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Route and upstream are not 1:1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fix |
||||
|
||||
self.values = new_tab(capacity, 0) | ||||
self.values_hash = new_tab(0, capacity) | ||||
|
||||
for _, key in ipairs(keys) do | ||||
if string.has_prefix(key, self.key) then | ||||
local data_valid = true | ||||
local conf_str = config:get(key, 0) | ||||
local conf, err = json.decode(conf_str) | ||||
if not conf then | ||||
data_valid = false | ||||
log.error("decode the conf of [", key, "] failed, err: ", err, | ||||
", conf_str: ", conf_str) | ||||
end | ||||
|
||||
if not self.single_item and type(conf) ~= "table" then | ||||
data_valid = false | ||||
log.error("invalid conf of [", key, "], conf: ", conf, | ||||
", it should be an object") | ||||
end | ||||
|
||||
if data_valid and self.item_schema then | ||||
local ok, err = check_schema(self.item_schema, conf) | ||||
if not ok then | ||||
data_valid = false | ||||
log.error("failed to check the conf of [", key, "] err:", err) | ||||
end | ||||
end | ||||
|
||||
if data_valid and self.checker then | ||||
local ok, err = self.checker(conf) | ||||
if not ok then | ||||
data_valid = false | ||||
log.error("failed to check the conf of [", key, "] err:", err) | ||||
end | ||||
end | ||||
|
||||
if data_valid then | ||||
if not conf.id then | ||||
conf.id = sub_str(key, #self.key + 2, #key + 1) | ||||
log.warn("the id of [", key, "] is nil, use the id: ", conf.id) | ||||
end | ||||
|
||||
local conf_item = {value = conf, modifiedIndex = latest_version, | ||||
key = key} | ||||
insert_tab(self.values, conf_item) | ||||
self.values_hash[conf.id] = #self.values | ||||
conf_item.clean_handlers = {} | ||||
|
||||
if self.filter then | ||||
self.filter(conf_item) | ||||
end | ||||
end | ||||
end | ||||
end | ||||
|
||||
self.conf_version = latest_version | ||||
return true | ||||
end | ||||
|
||||
|
||||
local function _automatic_fetch(premature, self) | ||||
if premature then | ||||
return | ||||
end | ||||
|
||||
local i = 0 | ||||
while not exiting() and self.running and i <= 32 do | ||||
i = i + 1 | ||||
local ok, ok2, err = pcall(sync_data, self) | ||||
if not ok then | ||||
err = ok2 | ||||
log.error("failed to fetch data from xds: ", | ||||
err, ", ", tostring(self)) | ||||
ngx_sleep(3) | ||||
break | ||||
elseif not ok2 and err then | ||||
-- todo: handler other error | ||||
if err ~= "wait for more time" and err ~= "no keys" and self.last_err ~= err then | ||||
log.error("failed to fetch data from xds, ", err, ", ", tostring(self)) | ||||
end | ||||
|
||||
if err ~= self.last_err then | ||||
self.last_err = err | ||||
self.last_err_time = ngx_time() | ||||
else | ||||
if ngx_time() - self.last_err_time >= 30 then | ||||
self.last_err = nil | ||||
end | ||||
end | ||||
ngx_sleep(0.5) | ||||
elseif not ok2 then | ||||
ngx_sleep(0.05) | ||||
else | ||||
ngx_sleep(0.1) | ||||
end | ||||
end | ||||
|
||||
if not exiting() and self.running then | ||||
ngx_timer_at(0, _automatic_fetch, self) | ||||
end | ||||
end | ||||
|
||||
|
||||
local function fetch_version(premature) | ||||
if premature then | ||||
return | ||||
end | ||||
|
||||
local version = conf_ver:get("version") | ||||
|
||||
if not version then | ||||
return | ||||
end | ||||
|
||||
if version ~= latest_version then | ||||
latest_version = version | ||||
end | ||||
end | ||||
|
||||
|
||||
function _M.new(key, opts) | ||||
local automatic = opts and opts.automatic | ||||
local item_schema = opts and opts.item_schema | ||||
local filter_fun = opts and opts.filter | ||||
local single_item = opts and opts.single_item | ||||
local checker = opts and opts.checker | ||||
|
||||
|
||||
local obj = setmetatable({ | ||||
automatic = automatic, | ||||
item_schema = item_schema, | ||||
checker = checker, | ||||
sync_times = 0, | ||||
running = true, | ||||
conf_version = 0, | ||||
values = nil, | ||||
routes_hash = nil, | ||||
prev_index = nil, | ||||
last_err = nil, | ||||
last_err_time = nil, | ||||
key = key, | ||||
single_item = single_item, | ||||
filter = filter_fun, | ||||
}, mt) | ||||
|
||||
if automatic then | ||||
if not key then | ||||
return nil, "missing `key` argument" | ||||
end | ||||
|
||||
-- blocking until xds completes initial configuration | ||||
while true do | ||||
fetch_version() | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need a sleep here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ngx.sleep can not used in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can use another sleep? Line 41 in 07d535d
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fix |
||||
if latest_version then | ||||
break | ||||
end | ||||
end | ||||
|
||||
local ok, ok2, err = pcall(sync_data, obj) | ||||
if not ok then | ||||
err = ok2 | ||||
end | ||||
|
||||
if err then | ||||
log.error("failed to fetch data from xds ", | ||||
err, ", ", key) | ||||
end | ||||
|
||||
ngx_timer_at(0, _automatic_fetch, obj) | ||||
end | ||||
|
||||
if key then | ||||
created_obj[key] = obj | ||||
end | ||||
|
||||
return obj | ||||
end | ||||
|
||||
|
||||
function _M.get(self, key) | ||||
if not self.values_hash then | ||||
return | ||||
end | ||||
|
||||
local arr_idx = self.values_hash[tostring(key)] | ||||
if not arr_idx then | ||||
return nil | ||||
end | ||||
|
||||
return self.values[arr_idx] | ||||
end | ||||
|
||||
|
||||
function _M.fetch_created_obj(key) | ||||
return created_obj[key] | ||||
end | ||||
|
||||
|
||||
function _M.init_worker() | ||||
if process.type() == "privileged agent" then | ||||
load_libxds(xds_lib_name) | ||||
end | ||||
|
||||
ngx_timer_every(1, fetch_version) | ||||
|
||||
return true | ||||
end | ||||
|
||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,18 +116,18 @@ function _M.http_init_worker() | |
|
||
require("apisix.debug").init_worker() | ||
|
||
if core.config == require("apisix.core.config_xds") then | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just put all the init_worker together? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fix |
||
core.config.init_worker() | ||
end | ||
|
||
plugin.init_worker() | ||
router.http_init_worker() | ||
require("apisix.http.service").init_worker() | ||
plugin_config.init_worker() | ||
require("apisix.consumer").init_worker() | ||
|
||
if core.config.init_worker then | ||
local ok, err = core.config.init_worker() | ||
if not ok then | ||
core.log.error("failed to init worker process of ", core.config.type, | ||
" config center, err: ", err) | ||
end | ||
if core.config == require("apisix.core.config_yaml") then | ||
core.config.init_worker() | ||
end | ||
|
||
apisix_upstream.init_worker() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use
local function xxx
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix