Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sd: allow to tune degraded point #264

Merged
merged 5 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ jobs:

- name: Install packaging dependencies
run: |
gem install dotenv -v 2.8.1 # workaroaund for ruby version 2.7.8.225
gem install fpm package_cloud
GO111MODULE=off go get github.com/mitchellh/gox
go install github.com/mitchellh/gox@latest

- name: Check packaging
run: |
Expand Down
20 changes: 14 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ type Common struct {
MemoryReturnInterval time.Duration `toml:"memory-return-interval" json:"memory-return-interval" comment:"daemon will return the freed memory to the OS when it>0"`
HeadersToLog []string `toml:"headers-to-log" json:"headers-to-log" comment:"additional request headers to log"`

BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service discovery base weight (on idle)"`
SDType SDType `toml:"service-discovery-type" json:"service-discovery-type" comment:"service discovery type"`
SD string `toml:"service-discovery" json:"service-discovery" comment:"service discovery address (consul)"`
SDNamespace string `toml:"service-discovery-ns" json:"service-discovery-ns" comment:"service discovery namespace (graphite by default)"`
SDDc []string `toml:"service-discovery-ds" json:"service-discovery-ds" comment:"service discovery datacenters (first - is primary, in other register as backup)"`
SDExpire time.Duration `toml:"service-discovery-expire" json:"service-discovery-expire" comment:"service discovery expire duration for cleanup (minimum is 24h, if enabled)"`
BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service discovery base weight (on idle)"`
DegragedMultiply float64 `toml:"degraged_multiply" json:"degraged_multiply" comment:"service discovery degraded load avg multiplier (if normalized load avg > degraged_load_avg) (default 4.0)"`
DegragedLoad float64 `toml:"degraged_load_avg" json:"degraged_load_avg" comment:"service discovery normilized load avg degraded point (default 1.0)"`
SDType SDType `toml:"service-discovery-type" json:"service-discovery-type" comment:"service discovery type"`
SD string `toml:"service-discovery" json:"service-discovery" comment:"service discovery address (consul)"`
SDNamespace string `toml:"service-discovery-ns" json:"service-discovery-ns" comment:"service discovery namespace (graphite by default)"`
SDDc []string `toml:"service-discovery-ds" json:"service-discovery-ds" comment:"service discovery datacenters (first - is primary, in other register as backup)"`
SDExpire time.Duration `toml:"service-discovery-expire" json:"service-discovery-expire" comment:"service discovery expire duration for cleanup (minimum is 24h, if enabled)"`

FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`

Expand Down Expand Up @@ -732,6 +734,12 @@ func Unmarshal(body []byte, exactConfig bool) (cfg *Config, warns []zap.Field, e
// NeedLoadAvgColect check if load avg collect is neeeded
func (c *Config) NeedLoadAvgColect() bool {
if c.Common.SD != "" {
if c.Common.DegragedMultiply <= 0 {
c.Common.DegragedMultiply = 4.0
}
if c.Common.DegragedLoad <= 0 {
c.Common.DegragedLoad = 1.0
}
if c.Common.BaseWeight <= 0 {
c.Common.BaseWeight = 100
}
Expand Down
4 changes: 4 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
headers-to-log = []
# service discovery base weight (on idle)
base_weight = 0
# service discovery degraded load avg multiplier (if normalized load avg > degraged_load_avg) (default 4.0)
degraged_multiply = 0.0
# service discovery normilized load avg degraded point (default 1.0)
degraged_load_avg = 0.0
# service discovery type
service-discovery-type = 0
# service discovery address (consul)
Expand Down
2 changes: 1 addition & 1 deletion issues/daytime/test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
precision = "10s"

[[test.clickhouse]]
version = "latest"
version = "23.12"
dir = "tests/clickhouse/rollup"

[test.carbon_clickhouse]
Expand Down
63 changes: 32 additions & 31 deletions limiter/alimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var (
checkDelay = time.Second * 60
)

// calc reserved slots count based on load average (for protect overload)
func getWeighted(n, max int) int {
if n <= 0 {
return 0
Expand All @@ -35,31 +36,31 @@ func getWeighted(n, max int) int {

// ALimiter provide limiter amount of requests/concurrently executing requests (adaptive with load avg)
type ALimiter struct {
l limiter
cL limiter
c int
n int
limiter limiter
concurrentLimiter limiter
concurrent int
n int

m metrics.WaitMetric
}

// NewServerLimiter creates a limiter for specific servers list.
func NewALimiter(l, c, n int, enableMetrics bool, scope, sub string) ServerLimiter {
if l <= 0 && c <= 0 {
func NewALimiter(capacity, concurrent, n int, enableMetrics bool, scope, sub string) ServerLimiter {
if capacity <= 0 && concurrent <= 0 {
return NoopLimiter{}
}
if n >= c {
n = c - 1
if n >= concurrent {
n = concurrent - 1
}
if n <= 0 {
return NewWLimiter(l, c, enableMetrics, scope, sub)
return NewWLimiter(capacity, concurrent, enableMetrics, scope, sub)
}

a := &ALimiter{
m: metrics.NewWaitMetric(enableMetrics, scope, sub), c: c, n: n,
m: metrics.NewWaitMetric(enableMetrics, scope, sub), concurrent: concurrent, n: n,
}
a.cL.ch = make(chan struct{}, c)
a.cL.cap = c
a.concurrentLimiter.ch = make(chan struct{}, concurrent)
a.concurrentLimiter.cap = concurrent

go a.balance()

Expand All @@ -70,17 +71,17 @@ func (sl *ALimiter) balance() int {
var last int
for {
start := time.Now()
n := getWeighted(sl.n, sl.c)
n := getWeighted(sl.n, sl.concurrent)
if n > last {
for i := 0; i < n-last; i++ {
if sl.cL.enter(ctxMain, "balance") != nil {
if sl.concurrentLimiter.enter(ctxMain, "balance") != nil {
break
}
}
last = n
} else if n < last {
for i := 0; i < last-n; i++ {
sl.cL.leave(ctxMain, "balance")
sl.concurrentLimiter.leave(ctxMain, "balance")
}
last = n
}
Expand All @@ -92,20 +93,20 @@ func (sl *ALimiter) balance() int {
}

func (sl *ALimiter) Capacity() int {
return sl.l.capacity()
return sl.limiter.capacity()
}

func (sl *ALimiter) Enter(ctx context.Context, s string) (err error) {
if sl.l.cap > 0 {
if err = sl.l.tryEnter(ctx, s); err != nil {
if sl.limiter.cap > 0 {
if err = sl.limiter.tryEnter(ctx, s); err != nil {
sl.m.WaitErrors.Add(1)
return
}
}
if sl.cL.cap > 0 {
if sl.cL.enter(ctx, s) != nil {
if sl.l.cap > 0 {
sl.l.leave(ctx, s)
if sl.concurrentLimiter.cap > 0 {
if sl.concurrentLimiter.enter(ctx, s) != nil {
if sl.limiter.cap > 0 {
sl.limiter.leave(ctx, s)
}
sl.m.WaitErrors.Add(1)
err = ErrTimeout
Expand All @@ -117,16 +118,16 @@ func (sl *ALimiter) Enter(ctx context.Context, s string) (err error) {

// TryEnter claims one of free slots without blocking.
func (sl *ALimiter) TryEnter(ctx context.Context, s string) (err error) {
if sl.l.cap > 0 {
if err = sl.l.tryEnter(ctx, s); err != nil {
if sl.limiter.cap > 0 {
if err = sl.limiter.tryEnter(ctx, s); err != nil {
sl.m.WaitErrors.Add(1)
return
}
}
if sl.cL.cap > 0 {
if sl.cL.tryEnter(ctx, s) != nil {
if sl.l.cap > 0 {
sl.l.leave(ctx, s)
if sl.concurrentLimiter.cap > 0 {
if sl.concurrentLimiter.tryEnter(ctx, s) != nil {
if sl.limiter.cap > 0 {
sl.limiter.leave(ctx, s)
}
sl.m.WaitErrors.Add(1)
err = ErrTimeout
Expand All @@ -138,10 +139,10 @@ func (sl *ALimiter) TryEnter(ctx context.Context, s string) (err error) {

// Frees a slot in limiter
func (sl *ALimiter) Leave(ctx context.Context, s string) {
if sl.l.cap > 0 {
sl.l.leave(ctx, s)
if sl.limiter.cap > 0 {
sl.limiter.leave(ctx, s)
}
sl.cL.leave(ctx, s)
sl.concurrentLimiter.leave(ctx, s)
}

// SendDuration send StatsD duration iming
Expand Down
59 changes: 31 additions & 28 deletions limiter/alimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,99 +15,102 @@ import (
func Test_getWeighted(t *testing.T) {
tests := []struct {
loadAvg float64
c int
n int
max int
want int
}{
{loadAvg: 0, c: 100, n: 100, want: 0},
{loadAvg: 0.2, c: 100, n: 100, want: 0},
{loadAvg: 0.999, c: 100, n: 1, want: 0},
{loadAvg: 1, c: 1, n: 100, want: 1},
{loadAvg: 1, c: 100, n: 100, want: 99},
{loadAvg: 1, c: 101, n: 100, want: 100},
{loadAvg: 1, c: 200, n: 100, want: 100},
{loadAvg: 2, c: 100, n: 200, want: 99},
{loadAvg: 2, c: 200, n: 200, want: 199},
{loadAvg: 2, c: 300, n: 200, want: 299},
{loadAvg: 2, c: 400, n: 200, want: 399},
{loadAvg: 2, c: 401, n: 200, want: 400},
{loadAvg: 2, c: 402, n: 200, want: 400},
{loadAvg: 0, max: 100, n: 100, want: 0},
{loadAvg: 0.2, max: 100, n: 100, want: 0},
{loadAvg: 0.7, max: 100, n: 100, want: 70},
{loadAvg: 0.8, max: 100, n: 100, want: 80},
{loadAvg: 0.999, max: 100, n: 100, want: 99},
{loadAvg: 0.999, max: 100, n: 1, want: 0},
{loadAvg: 1, max: 1, n: 100, want: 1},
{loadAvg: 1, max: 100, n: 100, want: 99},
{loadAvg: 1, max: 101, n: 100, want: 100},
{loadAvg: 1, max: 200, n: 100, want: 100},
{loadAvg: 2, max: 100, n: 200, want: 99},
{loadAvg: 2, max: 200, n: 200, want: 199},
{loadAvg: 2, max: 300, n: 200, want: 299},
{loadAvg: 2, max: 400, n: 200, want: 399},
{loadAvg: 2, max: 401, n: 200, want: 400},
{loadAvg: 2, max: 402, n: 200, want: 400},
}
for n, tt := range tests {
t.Run(strconv.Itoa(n), func(t *testing.T) {
load_avg.Store(tt.loadAvg)
if got := getWeighted(tt.n, tt.c); got != tt.want {
t.Errorf("load avg = %f getWeighted(%d) = %v, want %v", tt.loadAvg, tt.n, got, tt.want)
if got := getWeighted(tt.n, tt.max); got != tt.want {
t.Errorf("load avg = %f getWeighted(%d, %d) = %v, want %v", tt.loadAvg, tt.n, tt.max, got, tt.want)
}
})
}
}

func TestNewALimiter(t *testing.T) {
l := 14
c := 12
capacity := 14
concurrent := 12
n := 10
checkDelay = time.Millisecond * 10
limiter := NewALimiter(l, c, n, false, "", "")
limiter := NewALimiter(capacity, concurrent, n, false, "", "")

// inital - load not collected
load_avg.Store(0)

var i int
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)

for i = 0; i < c; i++ {
for i = 0; i < concurrent; i++ {
require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 0 [%d]", i)
}

require.Error(t, limiter.Enter(ctx, "render"))

for i = 0; i < c; i++ {
for i = 0; i < concurrent; i++ {
limiter.Leave(ctx, "render")
}

cancel()

// load_avg 0.5
load_avg.Store(0.5)
k := getWeighted(n, c)
k := getWeighted(n, concurrent)
require.Equal(t, 0, k)

// load_avg 0.6
load_avg.Store(0.6)
k = getWeighted(n, c)
k = getWeighted(n, concurrent)
require.Equal(t, n*6/10, k)

time.Sleep(checkDelay * 2)

ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100)
for i = 0; i < c-k; i++ {
for i = 0; i < concurrent-k; i++ {
require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 0.5 [%d]", i)
}

require.Error(t, limiter.Enter(ctx, "render"))

for i = 0; i < c-k; i++ {
for i = 0; i < concurrent-k; i++ {
limiter.Leave(ctx, "render")
}

cancel()

// // load_avg 1
load_avg.Store(1)
k = getWeighted(n, c)
k = getWeighted(n, concurrent)
require.Equal(t, n, k)

time.Sleep(checkDelay * 2)

ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
for i = 0; i < c-n; i++ {
for i = 0; i < concurrent-n; i++ {
require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 1 [%d]", i)
}

require.Error(t, limiter.Enter(ctx, "render"))

for i = 0; i < c-n; i++ {
for i = 0; i < concurrent-n; i++ {
limiter.Leave(ctx, "render")
}

Expand Down
Loading
Loading