diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index 4946cc5c22c8..ecb76270452d 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -27,6 +27,10 @@ local json = require("apisix.core.json") local etcd_apisix = require("apisix.core.etcd") local core_str = require("apisix.core.string") local new_tab = require("table.new") +local inspect = require("inspect") +local errlog = require("ngx.errlog") +local log_level = errlog.get_sys_filter_level() +local NGX_INFO = ngx.INFO local check_schema = require("apisix.core.schema").check local exiting = ngx.worker.exiting local insert_tab = table.insert @@ -43,9 +47,14 @@ local xpcall = xpcall local debug = debug local string = string local error = error +local pairs = pairs +local next = next +local assert = assert local rand = math.random local constants = require("apisix.constants") local health_check = require("resty.etcd.health_check") +local semaphore = require("ngx.semaphore") +local tablex = require("pl.tablex") local is_http = ngx.config.subsystem == "http" @@ -58,6 +67,7 @@ if not is_http then end local created_obj = {} local loaded_configuration = {} +local watch_ctx local _M = { @@ -75,6 +85,208 @@ local mt = { } +local get_etcd +do + local etcd_cli + + function get_etcd() + if etcd_cli ~= nil then + return etcd_cli + end + + local _, err + etcd_cli, _, err = etcd_apisix.get_etcd_syncer() + return etcd_cli, err + end +end + + +local function cancel_watch(http_cli) + local res, err = watch_ctx.cli:watchcancel(http_cli) + if res == 1 then + log.info("cancel watch connection success") + else + log.error("cancel watch failed: ", err) + end +end + + +-- append res to the queue and notify pending watchers +local function produce_res(res, err) + if log_level >= NGX_INFO then + log.info("append res: ", inspect(res), ", err: ", inspect(err)) + end + insert_tab(watch_ctx.res, {res=res, err=err}) + for _, sema in pairs(watch_ctx.sema) do + sema:post() + end + table.clear(watch_ctx.sema) +end + + +local function run_watch(premature) + if premature then + return + end + + local local_conf, err = config_local.local_conf() + if not local_conf then + error("no local conf: " .. err) + end + watch_ctx.prefix = local_conf.etcd.prefix .. "/" + + watch_ctx.cli, err = get_etcd() + if not watch_ctx.cli then + error("failed to create etcd instance: " .. string(err)) + end + + local rev = 0 + if loaded_configuration then + local _, res = next(loaded_configuration) + if res then + rev = tonumber(res.headers["X-Etcd-Index"]) + assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]') + end + end + + if rev == 0 then + while true do + local res, err = watch_ctx.cli:get(watch_ctx.prefix) + if not res then + log.error("etcd get: ", err) + ngx_sleep(3) + else + watch_ctx.rev = tonumber(res.body.header.revision) + break + end + end + end + + watch_ctx.rev = rev + 1 + watch_ctx.started = true + + log.warn("main etcd watcher started, revision=", watch_ctx.rev) + for _, sema in pairs(watch_ctx.wait_init) do + sema:post() + end + watch_ctx.wait_init = nil + + local opts = {} + opts.timeout = 50 -- second + opts.need_cancel = true + + ::restart_watch:: + while true do + opts.start_revision = watch_ctx.rev + log.info("restart watchdir: start_revision=", opts.start_revision) + local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts) + if not res_func then + log.error("watchdir: ", err) + ngx_sleep(3) + goto restart_watch + end + + ::watch_event:: + while true do + local res, err = res_func() + if log_level >= NGX_INFO then + log.info("res_func: ", inspect(res)) + end + + if not res then + if err ~= "closed" and + err ~= "timeout" and + err ~= "broken pipe" + then + log.error("wait watch event: ", err) + end + cancel_watch(http_cli) + break + end + + if res.error then + log.error("wait watch event: ", inspect(res.error)) + cancel_watch(http_cli) + break + end + + if res.result.created then + goto watch_event + end + + if res.result.canceled then + log.warn("watch canceled by etcd, res: ", inspect(res)) + if res.result.compact_revision then + watch_ctx.rev = tonumber(res.result.compact_revision) + log.warn("etcd compacted, compact_revision=", watch_ctx.rev) + produce_res(nil, "compacted") + end + cancel_watch(http_cli) + break + end + + -- cleanup + local min_idx = 0 + for _, idx in pairs(watch_ctx.idx) do + if (min_idx == 0) or (idx < min_idx) then + min_idx = idx + end + end + + for i = 1, min_idx - 1 do + watch_ctx.res[i] = false + end + + if min_idx > 100 then + for k, idx in pairs(watch_ctx.idx) do + watch_ctx.idx[k] = idx - min_idx + 1 + end + -- trim the res table + for i = 1, min_idx - 1 do + table.remove(watch_ctx.res, 1) + end + end + + local rev = tonumber(res.result.header.revision) + if rev > watch_ctx.rev then + watch_ctx.rev = rev + 1 + end + produce_res(res) + end + end +end + + +local function init_watch_ctx(key) + if not watch_ctx then + watch_ctx = { + idx = {}, + res = {}, + sema = {}, + wait_init = {}, + started = false, + } + ngx_timer_at(0, run_watch) + end + + if watch_ctx.started == false then + -- wait until the main watcher is started + local sema, err = semaphore.new() + if not sema then + error(err) + end + watch_ctx.wait_init[key] = sema + while true do + local ok, err = sema:wait(60) + if ok then + break + end + log.error("wait main watcher to start, key: ", key, ", err: ", err) + end + end +end + + local function getkey(etcd_cli, key) if not etcd_cli then return nil, "not inited" @@ -157,45 +369,67 @@ local function flush_watching_streams(self) end -local function http_waitdir(etcd_cli, key, modified_index, timeout) - local opts = {} - opts.start_revision = modified_index - opts.timeout = timeout - opts.need_cancel = true - local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts) - if not res_func then - return nil, func_err +local function http_waitdir(self, etcd_cli, key, modified_index, timeout) + if not watch_ctx.idx[key] then + watch_ctx.idx[key] = 1 end - -- in etcd v3, the 1st res of watch is watch info, useless to us. - -- try twice to skip create info - local res, err = res_func() - if not res or not res.result or not res.result.events then - res, err = res_func() - end + ::iterate_events:: + for i = watch_ctx.idx[key], #watch_ctx.res do + watch_ctx.idx[key] = i + 1 - if http_cli then - local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli) - if res_cancel == 1 then - log.info("cancel watch connection success") - else - log.error("cancel watch failed: ", err_cancel) + local item = watch_ctx.res[i] + if item == false then + goto iterate_events + end + + local res, err = item.res, item.err + if err then + return res, err + end + + -- ignore res with revision smaller then self.prev_index + if tonumber(res.result.header.revision) > self.prev_index then + local res2 + for _, evt in ipairs(res.result.events) do + if evt.kv.key:find(key) == 1 then + if not res2 then + res2 = tablex.deepcopy(res) + table.clear(res2.result.events) + end + insert_tab(res2.result.events, evt) + end + end + + if res2 then + if log_level >= NGX_INFO then + log.info("http_waitdir: ", inspect(res2)) + end + return res2 + end end end - if not res then - return nil, err + -- if no events, wait via semaphore + if not self.watch_sema then + local sema, err = semaphore.new() + if not sema then + error(err) + end + self.watch_sema = sema end - if type(res.result) ~= "table" then - err = "failed to wait etcd dir" - if res.error and res.error.message then - err = err .. ": " .. res.error.message + watch_ctx.sema[key] = self.watch_sema + local ok, err = self.watch_sema:wait(timeout or 60) + watch_ctx.sema[key] = nil + if ok then + goto iterate_events + else + if err ~= "timeout" then + log.error("wait watch event, key=", key, ", err: ", err) end return nil, err end - - return res, err end @@ -213,7 +447,7 @@ local function waitdir(self) if etcd_cli.use_grpc then res, err = grpc_waitdir(self, etcd_cli, key, modified_index, timeout) else - res, err = http_waitdir(etcd_cli, key, modified_index, timeout) + res, err = http_waitdir(self, etcd_cli, key, modified_index, timeout) end if not res then @@ -359,6 +593,10 @@ local function sync_data(self) return nil, "missing 'key' arguments" end + if not self.etcd_cli.use_grpc then + init_watch_ctx(self.key) + end + if self.need_reload then flush_watching_streams(self) @@ -555,22 +793,6 @@ function _M.getkey(self, key) end -local get_etcd -do - local etcd_cli - - function get_etcd() - if etcd_cli ~= nil then - return etcd_cli - end - - local _, err - etcd_cli, _, err = etcd_apisix.get_etcd_syncer() - return etcd_cli, err - end -end - - local function _automatic_fetch(premature, self) if premature then return diff --git a/ci/linux_openresty_runner.sh b/ci/linux_openresty_runner.sh index 2cdc87b218f3..877248913368 100755 --- a/ci/linux_openresty_runner.sh +++ b/ci/linux_openresty_runner.sh @@ -18,5 +18,5 @@ export OPENRESTY_VERSION=source -export TEST_CI_USE_GRPC=true +#export TEST_CI_USE_GRPC=true . ./ci/linux_openresty_common_runner.sh diff --git a/t/core/etcd-sync.t b/t/core/etcd-sync.t index e74ae19ec710..aef5e23619a9 100644 --- a/t/core/etcd-sync.t +++ b/t/core/etcd-sync.t @@ -22,65 +22,7 @@ run_tests; __DATA__ -=== TEST 1: minus timeout to watch repeatedly ---- yaml_config -deployment: - role: traditional - role_traditional: - config_provider: etcd - etcd: - # this test requires the HTTP long pull as the gRPC stream is shared and can't change - # default timeout in the fly - use_grpc: false - admin: - admin_key: null ---- config - location /t { - content_by_lua_block { - local core = require("apisix.core") - local t = require("lib.test_admin").test - - local consumers, _ = core.config.new("/consumers", { - automatic = true, - item_schema = core.schema.consumer, - timeout = 0.2 - }) - - ngx.sleep(0.6) - local idx = consumers.prev_index - - local code, body = t('/apisix/admin/consumers', - ngx.HTTP_PUT, - [[{ - "username": "jobs", - "plugins": { - "basic-auth": { - "username": "jobs", - "password": "123456" - } - } - }]]) - - ngx.sleep(2) - local new_idx = consumers.prev_index - core.log.info("idx:", idx, " new_idx: ", new_idx) - if new_idx > idx then - ngx.say("prev_index updated") - else - ngx.say("prev_index not update") - end - } - } ---- request -GET /t ---- response_body -prev_index updated ---- error_log eval -qr/(create watch stream for key|cancel watch connection success)/ - - - -=== TEST 2: using default timeout +=== TEST 1: using default timeout --- config location /t { content_by_lua_block { @@ -126,7 +68,7 @@ waitdir key -=== TEST 3: no update +=== TEST 2: no update --- config location /t { content_by_lua_block { @@ -162,7 +104,7 @@ prev_index not update -=== TEST 4: bad plugin configuration (validated via incremental sync) +=== TEST 3: bad plugin configuration (validated via incremental sync) --- config location /t { content_by_lua_block { @@ -182,7 +124,7 @@ property "uri" validation failed -=== TEST 5: bad plugin configuration (validated via full sync) +=== TEST 4: bad plugin configuration (validated via full sync) --- config location /t { content_by_lua_block { @@ -196,7 +138,7 @@ property "uri" validation failed -=== TEST 6: bad plugin configuration (validated without sync during start) +=== TEST 5: bad plugin configuration (validated without sync during start) --- extra_yaml_config disable_sync_configuration_during_start: true --- config diff --git a/t/plugin/error-log-logger-skywalking.t b/t/plugin/error-log-logger-skywalking.t index ffebf1cf4fa5..edb5003c0988 100644 --- a/t/plugin/error-log-logger-skywalking.t +++ b/t/plugin/error-log-logger-skywalking.t @@ -118,8 +118,8 @@ qr/Batch Processor\[error-log-logger\] failed to process entries: error while se --- request GET /tg --- response_body ---- error_log eval -qr/.*\[\{\"body\":\{\"text\":\{\"text\":\".*this is an error message for test.*\"\}\},\"endpoint\":\"\",\"service\":\"APISIX\",\"serviceInstance\":\"instance\".*/ +--- error_log +this is an error message for test --- wait: 5