-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(config_etcd): use a single long http connection to watch all resources #9456
Changes from 11 commits
f256cfb
4c4d64a
23a7287
55bb0ae
2dcf9bc
6fc4613
d09a6bb
36e2a76
760f9e6
a9de0f2
4eb055b
70f643b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,10 @@ local json = require("apisix.core.json") | |
local etcd_apisix = require("apisix.core.etcd") | ||
local core_str = require("apisix.core.string") | ||
local new_tab = require("table.new") | ||
local inspect = require("inspect") | ||
local errlog = require("ngx.errlog") | ||
local log_level = errlog.get_sys_filter_level() | ||
local NGX_INFO = ngx.INFO | ||
local check_schema = require("apisix.core.schema").check | ||
local exiting = ngx.worker.exiting | ||
local insert_tab = table.insert | ||
|
@@ -43,9 +47,14 @@ local xpcall = xpcall | |
local debug = debug | ||
local string = string | ||
local error = error | ||
local pairs = pairs | ||
local next = next | ||
local assert = assert | ||
local rand = math.random | ||
local constants = require("apisix.constants") | ||
local health_check = require("resty.etcd.health_check") | ||
local semaphore = require("ngx.semaphore") | ||
local tablex = require("pl.tablex") | ||
|
||
|
||
local is_http = ngx.config.subsystem == "http" | ||
|
@@ -58,6 +67,7 @@ if not is_http then | |
end | ||
local created_obj = {} | ||
local loaded_configuration = {} | ||
local watch_ctx | ||
|
||
|
||
local _M = { | ||
|
@@ -75,6 +85,208 @@ local mt = { | |
} | ||
|
||
|
||
local get_etcd | ||
do | ||
local etcd_cli | ||
|
||
function get_etcd() | ||
if etcd_cli ~= nil then | ||
return etcd_cli | ||
end | ||
|
||
local _, err | ||
etcd_cli, _, err = etcd_apisix.get_etcd_syncer() | ||
return etcd_cli, err | ||
end | ||
end | ||
|
||
|
||
local function cancel_watch(http_cli) | ||
local res, err = watch_ctx.cli:watchcancel(http_cli) | ||
if res == 1 then | ||
log.info("cancel watch connection success") | ||
else | ||
log.error("cancel watch failed: ", err) | ||
end | ||
end | ||
|
||
|
||
-- append res to the queue and notify pending watchers | ||
local function produce_res(res, err) | ||
if log_level >= NGX_INFO then | ||
log.info("append res: ", inspect(res), ", err: ", inspect(err)) | ||
end | ||
insert_tab(watch_ctx.res, {res=res, err=err}) | ||
for _, sema in pairs(watch_ctx.sema) do | ||
sema:post() | ||
end | ||
table.clear(watch_ctx.sema) | ||
end | ||
|
||
|
||
local function run_watch(premature) | ||
if premature then | ||
return | ||
end | ||
|
||
local local_conf, err = config_local.local_conf() | ||
if not local_conf then | ||
error("no local conf: " .. err) | ||
end | ||
watch_ctx.prefix = local_conf.etcd.prefix .. "/" | ||
|
||
watch_ctx.cli, err = get_etcd() | ||
if not watch_ctx.cli then | ||
error("failed to create etcd instance: " .. string(err)) | ||
end | ||
|
||
local rev = 0 | ||
if loaded_configuration then | ||
local _, res = next(loaded_configuration) | ||
if res then | ||
rev = tonumber(res.headers["X-Etcd-Index"]) | ||
assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]') | ||
end | ||
end | ||
|
||
if rev == 0 then | ||
while true do | ||
local res, err = watch_ctx.cli:get(watch_ctx.prefix) | ||
if not res then | ||
log.error("etcd get: ", err) | ||
ngx_sleep(3) | ||
else | ||
watch_ctx.rev = tonumber(res.body.header.revision) | ||
break | ||
end | ||
end | ||
end | ||
|
||
watch_ctx.rev = rev + 1 | ||
watch_ctx.started = true | ||
|
||
log.warn("main etcd watcher started, revision=", watch_ctx.rev) | ||
for _, sema in pairs(watch_ctx.wait_init) do | ||
sema:post() | ||
end | ||
watch_ctx.wait_init = nil | ||
|
||
local opts = {} | ||
opts.timeout = 50 -- second | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The choice of 50 seconds is to make it smaller than the default proxy_read_timeout value, 60 seconds, so that nginx will not print error logs, such as:
|
||
opts.need_cancel = true | ||
|
||
::restart_watch:: | ||
while true do | ||
opts.start_revision = watch_ctx.rev | ||
log.info("restart watchdir: start_revision=", opts.start_revision) | ||
local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts) | ||
if not res_func then | ||
log.error("watchdir: ", err) | ||
ngx_sleep(3) | ||
goto restart_watch | ||
end | ||
|
||
::watch_event:: | ||
while true do | ||
local res, err = res_func() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's very good design here, the server response is a stream, if we don't close the connection here, we could get the response event one by one |
||
if log_level >= NGX_INFO then | ||
log.info("res_func: ", inspect(res)) | ||
end | ||
|
||
if not res then | ||
if err ~= "closed" and | ||
err ~= "timeout" and | ||
err ~= "broken pipe" | ||
then | ||
log.error("wait watch event: ", err) | ||
end | ||
cancel_watch(http_cli) | ||
break | ||
end | ||
|
||
if res.error then | ||
log.error("wait watch event: ", inspect(res.error)) | ||
cancel_watch(http_cli) | ||
break | ||
end | ||
|
||
if res.result.created then | ||
goto watch_event | ||
end | ||
|
||
if res.result.canceled then | ||
log.warn("watch canceled by etcd, res: ", inspect(res)) | ||
if res.result.compact_revision then | ||
watch_ctx.rev = tonumber(res.result.compact_revision) | ||
log.warn("etcd compacted, compact_revision=", watch_ctx.rev) | ||
produce_res(nil, "compacted") | ||
end | ||
cancel_watch(http_cli) | ||
break | ||
end | ||
|
||
-- cleanup | ||
local min_idx = 0 | ||
for _, idx in pairs(watch_ctx.idx) do | ||
if (min_idx == 0) or (idx < min_idx) then | ||
min_idx = idx | ||
end | ||
end | ||
|
||
for i = 1, min_idx - 1 do | ||
watch_ctx.res[i] = false | ||
end | ||
|
||
if min_idx > 100 then | ||
for k, idx in pairs(watch_ctx.idx) do | ||
watch_ctx.idx[k] = idx - min_idx + 1 | ||
end | ||
-- trim the res table | ||
for i = 1, min_idx - 1 do | ||
table.remove(watch_ctx.res, 1) | ||
end | ||
end | ||
|
||
local rev = tonumber(res.result.header.revision) | ||
if rev > watch_ctx.rev then | ||
watch_ctx.rev = rev + 1 | ||
end | ||
produce_res(res) | ||
end | ||
end | ||
end | ||
|
||
|
||
local function init_watch_ctx(key) | ||
if not watch_ctx then | ||
watch_ctx = { | ||
idx = {}, | ||
res = {}, | ||
sema = {}, | ||
wait_init = {}, | ||
started = false, | ||
} | ||
ngx_timer_at(0, run_watch) | ||
end | ||
|
||
if watch_ctx.started == false then | ||
-- wait until the main watcher is started | ||
local sema, err = semaphore.new() | ||
if not sema then | ||
error(err) | ||
end | ||
watch_ctx.wait_init[key] = sema | ||
while true do | ||
local ok, err = sema:wait(60) | ||
if ok then | ||
break | ||
end | ||
log.error("wait main watcher to start, key: ", key, ", err: ", err) | ||
end | ||
end | ||
end | ||
|
||
|
||
local function getkey(etcd_cli, key) | ||
if not etcd_cli then | ||
return nil, "not inited" | ||
|
@@ -157,45 +369,71 @@ local function flush_watching_streams(self) | |
end | ||
|
||
|
||
local function http_waitdir(etcd_cli, key, modified_index, timeout) | ||
local opts = {} | ||
opts.start_revision = modified_index | ||
opts.timeout = timeout | ||
opts.need_cancel = true | ||
local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts) | ||
if not res_func then | ||
return nil, func_err | ||
local function http_waitdir(self, etcd_cli, key, modified_index, timeout) | ||
if not watch_ctx.idx[key] then | ||
watch_ctx.idx[key] = 1 | ||
monkeyDluffy6017 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
end | ||
|
||
-- in etcd v3, the 1st res of watch is watch info, useless to us. | ||
-- try twice to skip create info | ||
local res, err = res_func() | ||
if not res or not res.result or not res.result.events then | ||
res, err = res_func() | ||
end | ||
::iterate_events:: | ||
for i = watch_ctx.idx[key], #watch_ctx.res do | ||
watch_ctx.idx[key] = i + 1 | ||
|
||
if http_cli then | ||
local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli) | ||
if res_cancel == 1 then | ||
log.info("cancel watch connection success") | ||
else | ||
log.error("cancel watch failed: ", err_cancel) | ||
local item = watch_ctx.res[i] | ||
if item == false then | ||
goto iterate_events | ||
end | ||
|
||
local res, err = item.res, item.err | ||
if err then | ||
return res, err | ||
end | ||
|
||
local found = false | ||
-- ignore res with revision smaller then self.prev_index | ||
if tonumber(res.result.header.revision) > self.prev_index then | ||
for _, evt in ipairs(res.result.events) do | ||
if evt.kv.key:find(key) == 1 then | ||
found = true | ||
break | ||
end | ||
end | ||
end | ||
|
||
if found then | ||
local res2 = tablex.deepcopy(res) | ||
table.clear(res2.result.events) | ||
for _, evt in ipairs(res.result.events) do | ||
if evt.kv.key:find(key) == 1 then | ||
insert_tab(res2.result.events, evt) | ||
end | ||
end | ||
if log_level >= NGX_INFO then | ||
log.info("http_waitdir: ", inspect(res2)) | ||
end | ||
return res2 | ||
end | ||
monkeyDluffy6017 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
end | ||
|
||
if not res then | ||
return nil, err | ||
-- if no events, wait via semaphore | ||
if not self.watch_sema then | ||
local sema, err = semaphore.new() | ||
if not sema then | ||
error(err) | ||
end | ||
self.watch_sema = sema | ||
end | ||
|
||
if type(res.result) ~= "table" then | ||
err = "failed to wait etcd dir" | ||
if res.error and res.error.message then | ||
err = err .. ": " .. res.error.message | ||
watch_ctx.sema[key] = self.watch_sema | ||
local ok, err = self.watch_sema:wait(timeout or 60) | ||
watch_ctx.sema[key] = nil | ||
if ok then | ||
goto iterate_events | ||
else | ||
if err ~= "timeout" then | ||
log.error("wait watch event, key=", key, ", err: ", err) | ||
end | ||
return nil, err | ||
end | ||
|
||
return res, err | ||
end | ||
|
||
|
||
|
@@ -213,7 +451,7 @@ local function waitdir(self) | |
if etcd_cli.use_grpc then | ||
res, err = grpc_waitdir(self, etcd_cli, key, modified_index, timeout) | ||
else | ||
res, err = http_waitdir(etcd_cli, key, modified_index, timeout) | ||
res, err = http_waitdir(self, etcd_cli, key, modified_index, timeout) | ||
end | ||
|
||
if not res then | ||
|
@@ -359,6 +597,10 @@ local function sync_data(self) | |
return nil, "missing 'key' arguments" | ||
end | ||
|
||
if not self.etcd_cli.use_grpc then | ||
init_watch_ctx(self.key) | ||
end | ||
|
||
if self.need_reload then | ||
flush_watching_streams(self) | ||
|
||
|
@@ -555,22 +797,6 @@ function _M.getkey(self, key) | |
end | ||
|
||
|
||
local get_etcd | ||
do | ||
local etcd_cli | ||
|
||
function get_etcd() | ||
if etcd_cli ~= nil then | ||
return etcd_cli | ||
end | ||
|
||
local _, err | ||
etcd_cli, _, err = etcd_apisix.get_etcd_syncer() | ||
return etcd_cli, err | ||
end | ||
end | ||
|
||
|
||
local function _automatic_fetch(premature, self) | ||
if premature then | ||
return | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,5 +18,5 @@ | |
|
||
|
||
export OPENRESTY_VERSION=source | ||
export TEST_CI_USE_GRPC=true | ||
#export TEST_CI_USE_GRPC=true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're going to start focusing on etcd's http tests. |
||
. ./ci/linux_openresty_common_runner.sh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use
json.delay_encode
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the delay stuff has bug:
apisix/apisix/core/json.lua
Lines 116 to 120 in a943c03
It only uses a singleton table to log, but here I need two vars to log.
And inspect is more informational than json for debugging.