Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable load balancing with EWMA #2229

Merged
merged 1 commit into from
Mar 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/user-guide/configmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ The value can either be:
- round_robin: to use the default round robin loadbalancer
- least_conn: to use the least connected method
- ip_hash: to use a hash of the server for routing.
- ewma: to use the peak ewma method for routing (only available with `enable-dynamic-configuration` flag)

The default is least_conn.

Expand Down
31 changes: 28 additions & 3 deletions internal/file/bindata.go

Large diffs are not rendered by default.

69 changes: 59 additions & 10 deletions rootfs/etc/nginx/lua/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ local configuration = require("configuration")
local util = require("util")
local lrucache = require("resty.lrucache")
local resty_lock = require("resty.lock")
local ewma = require("balancer.ewma")

-- measured in seconds
-- for an Nginx worker to pick up the new list of upstream peers
-- it will take <the delay until controller POSTed the backend object to the Nginx endpoint> + BACKENDS_SYNC_INTERVAL
local BACKENDS_SYNC_INTERVAL = 1

local ROUND_ROBIN_LOCK_KEY = "round_robin"
local DEFAULT_LB_ALG = "round_robin"

local round_robin_state = ngx.shared.round_robin_state

Expand All @@ -23,28 +25,49 @@ if not backends then
return error("failed to create the cache for backends: " .. (err or "unknown"))
end

local function balance()
local function get_current_backend()
local backend_name = ngx.var.proxy_upstream_name
local backend = backends:get(backend_name)
-- lb_alg field does not exist for ingress.Backend struct for now, so lb_alg
-- will always be round_robin
local lb_alg = backend.lb_alg or "round_robin"
return backends:get(backend_name)
end

local function get_current_lb_alg()
local backend = get_current_backend()
return backend["load-balance"] or DEFAULT_LB_ALG
end

local function balance()
local backend = get_current_backend()
local lb_alg = get_current_lb_alg()

if lb_alg == "ip_hash" then
-- TODO(elvinefendi) implement me
return backend.endpoints[0].address, backend.endpoints[0].port
elseif lb_alg == "ewma" then
local endpoint = ewma.balance(backend.endpoints)
return endpoint.address, endpoint.port
end

if lb_alg ~= DEFAULT_LB_ALG then
ngx.log(ngx.WARN, tostring(lb_alg) .. " is not supported, falling back to " .. DEFAULT_LB_ALG)
end

-- Round-Robin
round_robin_lock:lock(backend_name .. ROUND_ROBIN_LOCK_KEY)
local last_index = round_robin_state:get(backend_name)
round_robin_lock:lock(backend.name .. ROUND_ROBIN_LOCK_KEY)
local last_index = round_robin_state:get(backend.name)
local index, endpoint = next(backend.endpoints, last_index)
if not index then
index = 1
endpoint = backend.endpoints[index]
end
round_robin_state:set(backend_name, index)
round_robin_lock:unlock(backend_name .. ROUND_ROBIN_LOCK_KEY)
local success, forcible
success, err, forcible = round_robin_state:set(backend.name, index)
if not success then
ngx.log(ngx.WARN, "round_robin_state:set failed " .. err)
end
if forcible then
ngx.log(ngx.WARN, "round_robin_state:set valid items forcibly overwritten")
end
round_robin_lock:unlock(backend.name .. ROUND_ROBIN_LOCK_KEY)

return endpoint.address, endpoint.port
end
Expand All @@ -55,6 +78,13 @@ local function sync_backend(backend)
-- also reset the respective balancer state since backend has changed
round_robin_state:delete(backend.name)

-- TODO: Reset state of EWMA per backend
local lb_alg = backend["load-balance"] or DEFAULT_LB_ALG
if lb_alg == "ewma" then
ngx.shared.balancer_ewma:flush_all()
ngx.shared.balancer_ewma_last_touched_at:flush_all()
end

ngx.log(ngx.INFO, "syncronization completed for: " .. backend.name)
end

Expand Down Expand Up @@ -84,6 +114,13 @@ local function sync_backends()
end
end

local function after_balance()
local lb_alg = get_current_lb_alg()
if lb_alg == "ewma" then
ewma.after_balance()
end
end

function _M.init_worker()
_, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
if err then
Expand All @@ -92,14 +129,26 @@ function _M.init_worker()
end

function _M.call()
local phase = ngx.get_phase()
if phase == "log" then
after_balance()
return
end
if phase ~= "balancer" then
return error("must be called in balancer or log, but was called in: " .. phase)
end

ngx_balancer.set_more_tries(1)

local host, port = balance()

local ok
ok, err = ngx_balancer.set_current_peer(host, port)
if ok then
ngx.log(ngx.INFO, "current peer is set to " .. host .. ":" .. port)
ngx.log(
ngx.INFO,
"current peer is set to " .. host .. ":" .. port .. " using lb_alg " .. tostring(get_current_lb_alg())
)
else
ngx.log(ngx.ERR, "error while setting current upstream peer to: " .. tostring(err))
end
Expand Down
141 changes: 141 additions & 0 deletions rootfs/etc/nginx/lua/balancer/ewma.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
-- Original Authors: Shiv Nagarajan & Scott Francis
-- Accessed: March 12, 2018
-- Inspiration drawn from:
-- https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
-- /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala


local resty_lock = require("resty.lock")
local util = require("util")

local DECAY_TIME = 10 -- this value is in seconds
local LOCK_KEY = ":ewma_key"
local PICK_SET_SIZE = 2

local ewma_lock = resty_lock:new("locks", {timeout = 0, exptime = 0.1})

local _M = {}

local function lock(upstream)
local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
if err then
if err ~= "timeout" then
ngx.log(ngx.ERR, string.format("EWMA Balancer failed to lock: %s", tostring(err)))
end
end

return err
end

local function unlock()
local ok, err = ewma_lock:unlock()
if not ok then
ngx.log(ngx.ERR, string.format("EWMA Balancer failed to unlock: %s", tostring(err)))
end

return err
end

local function decay_ewma(ewma, last_touched_at, rtt, now)
local td = now - last_touched_at
td = (td > 0) and td or 0
local weight = math.exp(-td/DECAY_TIME)

ewma = ewma * weight + rtt * (1.0 - weight)
return ewma
end

local function get_or_update_ewma(upstream, rtt, update)
local lock_err = nil
if update then
lock_err = lock(upstream)
end
local ewma = ngx.shared.balancer_ewma:get(upstream) or 0
if lock_err ~= nil then
return ewma, lock_err
end

local now = ngx.now()
local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0
ewma = decay_ewma(ewma, last_touched_at, rtt, now)

if not update then
return ewma, nil
end

local success, err, forcible = ngx.shared.balancer_ewma_last_touched_at:set(upstream, now)
if not success then
ngx.log(ngx.WARN, "balancer_ewma_last_touched_at:set failed " .. err)
end
if forcible then
ngx.log(ngx.WARN, "balancer_ewma_last_touched_at:set valid items forcibly overwritten")
end

success, err, forcible = ngx.shared.balancer_ewma:set(upstream, ewma)
if not success then
ngx.log(ngx.WARN, "balancer_ewma:set failed " .. err)
end
if forcible then
ngx.log(ngx.WARN, "balancer_ewma:set valid items forcibly overwritten")
end

unlock()
return ewma, nil
end


local function score(upstream)
-- Original implementation used names
-- Endpoints don't have names, so passing in IP:Port as key instead
local upstream_name = upstream.address .. ":" .. upstream.port
return get_or_update_ewma(upstream_name, 0, false)
end

-- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
-- or https://en.wikipedia.org/wiki/Random_permutation
-- loop from 1 .. k
-- pick a random value r from the remaining set of unpicked values (i .. n)
-- swap the value at position i with the value at position r
local function shuffle_peers(peers, k)
for i=1, k do
local rand_index = math.random(i,#peers)
peers[i], peers[rand_index] = peers[rand_index], peers[i]
end
-- peers[1 .. k] will now contain a randomly selected k from #peers
end

local function pick_and_score(peers, k)
shuffle_peers(peers, k)
local lowest_score_index = 1
local lowest_score = score(peers[lowest_score_index])
for i = 2, k do
local new_score = score(peers[i])
if new_score < lowest_score then
lowest_score_index, lowest_score = i, new_score
end
end
return peers[lowest_score_index]
end

function _M.balance(peers)
if #peers == 1 then
return peers[1]
end
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
local peer_copy = util.deepcopy(peers)
return pick_and_score(peer_copy, k)
end

function _M.after_balance()
local response_time = tonumber(util.get_first_value(ngx.var.upstream_response_time)) or 0
local connect_time = tonumber(util.get_first_value(ngx.var.upstream_connect_time)) or 0
local rtt = connect_time + response_time
local upstream = util.get_first_value(ngx.var.upstream_addr)

if util.is_blank(upstream) then
return
end
get_or_update_ewma(upstream, rtt, true)
end

return _M
47 changes: 47 additions & 0 deletions rootfs/etc/nginx/lua/util.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
local _M = {}
local string_len = string.len

-- this implementation is taken from
-- https://web.archive.org/web/20131225070434/http://snippets.luacode.org/snippets/Deep_Comparison_of_Two_Values_3
Expand All @@ -24,4 +25,50 @@ local function deep_compare(t1, t2, ignore_mt)
end
_M.deep_compare = deep_compare

function _M.is_blank(str)
return str == nil or string_len(str) == 0
end

-- http://nginx.org/en/docs/http/ngx_http_upstream_module.html#example
-- CAVEAT: nginx is giving out : instead of , so the docs are wrong
-- 127.0.0.1:26157 : 127.0.0.1:26157 , ngx.var.upstream_addr
-- 200 : 200 , ngx.var.upstream_status
-- 0.00 : 0.00, ngx.var.upstream_response_time
function _M.split_upstream_var(var)
if not var then
return nil, nil
end
local t = {}
for v in var:gmatch("[^%s|,]+") do
if v ~= ":" then
t[#t+1] = v
end
end
return t
end

function _M.get_first_value(var)
local t = _M.split_upstream_var(var) or {}
if #t == 0 then return nil end
return t[1]
end

-- this implementation is taken from:
-- https://github.com/luafun/luafun/blob/master/fun.lua#L33
-- SHA: 04c99f9c393e54a604adde4b25b794f48104e0d0
local function deepcopy(orig)
local orig_type = type(orig)
local copy
if orig_type == 'table' then
copy = {}
for orig_key, orig_value in next, orig, nil do
copy[deepcopy(orig_key)] = deepcopy(orig_value)
end
else
copy = orig
end
return copy
end
_M.deepcopy = deepcopy

return _M
7 changes: 7 additions & 0 deletions rootfs/etc/nginx/template/nginx.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ http {
lua_shared_dict configuration_data 5M;
lua_shared_dict round_robin_state 1M;
lua_shared_dict locks 512k;
lua_shared_dict balancer_ewma 1M;
lua_shared_dict balancer_ewma_last_touched_at 1M;
Copy link
Member

@aledbf aledbf Mar 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this size enough?
Do you have an estimation of how many endpoints fit in 1M?
What happens if the dict is full?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shivnagarajan Do you know the answer to any of these questions?

Copy link
Member

@ElvinEfendi ElvinEfendi Mar 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zrdaley you can do some back of the napkin math here. balancer_ewma holds key value data where keys are upstream names(strings) and values are EWMAs(float value - number in Lua). Similarly for balancer_ewma_last_touched_at where keys are the same and values are time(I think this will be stored as string, refer to https://github.com/openresty/lua-nginx-module#ngxshareddictset).

What happens if the dict is full?

When the dict is full we fail to store the EWMA values which means the load balancing will start misbehaving.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

balancer_ewma and balancer_ewma_last_touched_at are both dictionaries that have a string key and floating-point value.

Do you have an estimation of how many endpoints fit in 1M?

Each dict entry
~10 char key + float = 4B*10 + 4B
= 44B = 0.000044MB

Total entries
1MB/0.000044MB = 22,727

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the dict is full?

When the dict is full, values will be overwritten or fail to write entirely. I created nginx logging around round_robin_state, balancer_ewma and balancer_ewma_last_touched_at for both of these cases (see round_robin_state example).

The full ngx.shared.DICT.set docs can be found here.


init_by_lua_block {
require("resty.core")
Expand Down Expand Up @@ -976,6 +978,11 @@ stream {
{{ end }}

{{ if not (empty $location.Backend) }}
{{ if $all.DynamicConfigurationEnabled}}
log_by_lua_block {
balancer.call()
}
{{ end }}
{{ buildProxyPass $server.Hostname $all.Backends $location $all.DynamicConfigurationEnabled }}
{{ if (or (eq $location.Proxy.ProxyRedirectFrom "default") (eq $location.Proxy.ProxyRedirectFrom "off")) }}
proxy_redirect {{ $location.Proxy.ProxyRedirectFrom }};
Expand Down