Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store ewma stats per backend #3467

Merged
merged 2 commits into from
Nov 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions rootfs/etc/nginx/lua/balancer/ewma.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ local split = require("util.split")
local DECAY_TIME = 10 -- this value is in seconds
local PICK_SET_SIZE = 2

local balancer_ewma = {}
local balancer_ewma_last_touched_at = {}

local _M = { name = "ewma" }

local function decay_ewma(ewma, last_touched_at, rtt, now)
Expand All @@ -25,28 +22,28 @@ local function decay_ewma(ewma, last_touched_at, rtt, now)
return ewma
end

local function get_or_update_ewma(upstream, rtt, update)
local ewma = balancer_ewma[upstream] or 0
local function get_or_update_ewma(self, upstream, rtt, update)
local ewma = self.ewma[upstream] or 0

local now = ngx.now()
local last_touched_at = balancer_ewma_last_touched_at[upstream] or 0
local last_touched_at = self.ewma_last_touched_at[upstream] or 0
ewma = decay_ewma(ewma, last_touched_at, rtt, now)

if not update then
return ewma, nil
end

balancer_ewma_last_touched_at[upstream] = now
balancer_ewma[upstream] = ewma
self.ewma[upstream] = ewma
self.ewma_last_touched_at[upstream] = now
return ewma, nil
end


local function score(upstream)
local function score(self, upstream)
-- Original implementation used names
-- Endpoints don't have names, so passing in IP:Port as key instead
local upstream_name = upstream.address .. ":" .. upstream.port
return get_or_update_ewma(upstream_name, 0, false)
return get_or_update_ewma(self, upstream_name, 0, false)
end

-- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
Expand All @@ -62,12 +59,12 @@ local function shuffle_peers(peers, k)
-- peers[1 .. k] will now contain a randomly selected k from #peers
end

local function pick_and_score(peers, k)
local function pick_and_score(self, peers, k)
shuffle_peers(peers, k)
local lowest_score_index = 1
local lowest_score = score(peers[lowest_score_index])
local lowest_score = score(self, peers[lowest_score_index])
for i = 2, k do
local new_score = score(peers[i])
local new_score = score(self, peers[i])
if new_score < lowest_score then
lowest_score_index, lowest_score = i, new_score
end
Expand All @@ -82,14 +79,14 @@ function _M.balance(self)
if #peers > 1 then
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
local peer_copy = util.deepcopy(peers)
endpoint = pick_and_score(peer_copy, k)
endpoint = pick_and_score(self, peer_copy, k)
end

-- TODO(elvinefendi) move this processing to _M.sync
return endpoint.address .. ":" .. endpoint.port
end

function _M.after_balance(_)
function _M.after_balance(self)
local response_time = tonumber(split.get_first_value(ngx.var.upstream_response_time)) or 0
local connect_time = tonumber(split.get_first_value(ngx.var.upstream_connect_time)) or 0
local rtt = connect_time + response_time
Expand All @@ -98,7 +95,7 @@ function _M.after_balance(_)
if util.is_blank(upstream) then
return
end
get_or_update_ewma(upstream, rtt, true)
get_or_update_ewma(self, upstream, rtt, true)
end

function _M.sync(self, backend)
Expand All @@ -111,15 +108,15 @@ function _M.sync(self, backend)
end

self.peers = backend.endpoints

-- TODO: Reset state of EWMA per backend
balancer_ewma = {}
balancer_ewma_last_touched_at = {}
self.ewma = {}
self.ewma_last_touched_at = {}
end

function _M.new(self, backend)
local o = {
peers = backend.endpoints,
ewma = {},
ewma_last_touched_at = {},
traffic_shaping_policy = backend.trafficShapingPolicy,
alternative_backends = backend.alternativeBackends,
}
Expand Down
18 changes: 18 additions & 0 deletions rootfs/etc/nginx/lua/test/balancer/ewma_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,24 @@ local util = require("util")
describe("Balancer ewma", function()
local balancer_ewma = require("balancer.ewma")

describe("after_balance()", function()
local ngx_now = 1543238266
_G.ngx.now = function() return ngx_now end
_G.ngx.var = { upstream_response_time = "0.25", upstream_connect_time = "0.02", upstream_addr = "10.184.7.40:8080" }

it("updates EWMA stats", function()
local backend = {
name = "my-dummy-backend", ["load-balance"] = "ewma",
endpoints = { { address = "10.184.7.40", port = "8080", maxFails = 0, failTimeout = 0 } }
}
local instance = balancer_ewma:new(backend)

instance:after_balance()
assert.equal(0.27, instance.ewma[ngx.var.upstream_addr])
assert.equal(ngx_now, instance.ewma_last_touched_at[ngx.var.upstream_addr])
end)
end)

describe("balance()", function()
it("returns single endpoint when the given backend has only one endpoint", function()
local backend = {
Expand Down