Skip to content

Commit

Permalink
store ewma stats per backend
Browse files Browse the repository at this point in the history
  • Loading branch information
ElvinEfendi committed Nov 26, 2018
1 parent b65b85c commit f81f061
Showing 1 changed file with 17 additions and 20 deletions.
37 changes: 17 additions & 20 deletions rootfs/etc/nginx/lua/balancer/ewma.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ local split = require("util.split")
local DECAY_TIME = 10 -- this value is in seconds
local PICK_SET_SIZE = 2

local balancer_ewma = {}
local balancer_ewma_last_touched_at = {}

local _M = { name = "ewma" }

local function decay_ewma(ewma, last_touched_at, rtt, now)
Expand All @@ -25,28 +22,28 @@ local function decay_ewma(ewma, last_touched_at, rtt, now)
return ewma
end

local function get_or_update_ewma(upstream, rtt, update)
local ewma = balancer_ewma[upstream] or 0
local function get_or_update_ewma(self, upstream, rtt, update)
local ewma = self.ewma[upstream] or 0

local now = ngx.now()
local last_touched_at = balancer_ewma_last_touched_at[upstream] or 0
local last_touched_at = self.ewma_last_touched_at[upstream] or 0
ewma = decay_ewma(ewma, last_touched_at, rtt, now)

if not update then
return ewma, nil
end

balancer_ewma_last_touched_at[upstream] = now
balancer_ewma[upstream] = ewma
self.ewma[upstream] = ewma
self.ewma_last_touched_at[upstream] = now
return ewma, nil
end


local function score(upstream)
local function score(self, upstream)
-- Original implementation used names
-- Endpoints don't have names, so passing in IP:Port as key instead
local upstream_name = upstream.address .. ":" .. upstream.port
return get_or_update_ewma(upstream_name, 0, false)
return get_or_update_ewma(self, upstream_name, 0, false)
end

-- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
Expand All @@ -62,12 +59,12 @@ local function shuffle_peers(peers, k)
-- peers[1 .. k] will now contain a randomly selected k from #peers
end

local function pick_and_score(peers, k)
local function pick_and_score(self, peers, k)
shuffle_peers(peers, k)
local lowest_score_index = 1
local lowest_score = score(peers[lowest_score_index])
local lowest_score = score(self, peers[lowest_score_index])
for i = 2, k do
local new_score = score(peers[i])
local new_score = score(self, peers[i])
if new_score < lowest_score then
lowest_score_index, lowest_score = i, new_score
end
Expand All @@ -82,14 +79,14 @@ function _M.balance(self)
if #peers > 1 then
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
local peer_copy = util.deepcopy(peers)
endpoint = pick_and_score(peer_copy, k)
endpoint = pick_and_score(self, peer_copy, k)
end

-- TODO(elvinefendi) move this processing to _M.sync
return endpoint.address .. ":" .. endpoint.port
end

function _M.after_balance(_)
function _M.after_balance(self)
local response_time = tonumber(split.get_first_value(ngx.var.upstream_response_time)) or 0
local connect_time = tonumber(split.get_first_value(ngx.var.upstream_connect_time)) or 0
local rtt = connect_time + response_time
Expand All @@ -98,7 +95,7 @@ function _M.after_balance(_)
if util.is_blank(upstream) then
return
end
get_or_update_ewma(upstream, rtt, true)
get_or_update_ewma(self, upstream, rtt, true)
end

function _M.sync(self, backend)
Expand All @@ -111,15 +108,15 @@ function _M.sync(self, backend)
end

self.peers = backend.endpoints

-- TODO: Reset state of EWMA per backend
balancer_ewma = {}
balancer_ewma_last_touched_at = {}
self.ewma = {}
self.ewma_last_touched_at = {}
end

function _M.new(self, backend)
local o = {
peers = backend.endpoints,
ewma = {},
ewma_last_touched_at = {},
traffic_shaping_policy = backend.trafficShapingPolicy,
alternative_backends = backend.alternativeBackends,
}
Expand Down

0 comments on commit f81f061

Please sign in to comment.