Skip to content

Commit

Permalink
Allow disabling throttling when limit value is negative (#511)
Browse files Browse the repository at this point in the history
* Allow disabling throttling when limit value is negative

* Add atomic ops for limiter value
  • Loading branch information
HeadHunter483 authored Oct 4, 2023
1 parent 1b30680 commit 769f106
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 7 deletions.
6 changes: 6 additions & 0 deletions plugin/action/throttle/in_memory_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package throttle

import (
"sync"
"sync/atomic"
"time"

"github.com/ozontech/file.d/logger"
Expand Down Expand Up @@ -39,6 +40,11 @@ func (l *inMemoryLimiter) sync() {
}

func (l *inMemoryLimiter) isAllowed(event *pipeline.Event, ts time.Time) bool {
// limit value fast check without races
if atomic.LoadInt64(&l.limit.value) < 0 {
return true
}

l.mu.Lock()
defer l.mu.Unlock()

Expand Down
6 changes: 4 additions & 2 deletions plugin/action/throttle/redis_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/ozontech/file.d/logger"
Expand Down Expand Up @@ -244,8 +245,9 @@ func (l *redisLimiter) updateKeyLimit() error {
return fmt.Errorf("failed to convert redis value to int64: %w", err)
}
}
l.totalLimiter.limit.value = limitVal
l.incrementLimiter.limit.value = limitVal
// atomic store to prevent races with limit value fast check
atomic.StoreInt64(&l.totalLimiter.limit.value, limitVal)
atomic.StoreInt64(&l.incrementLimiter.limit.value, limitVal)
return nil
}

Expand Down
55 changes: 50 additions & 5 deletions plugin/action/throttle/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ func (c *testConfig) runPipeline() {

limMap := limiters[p.Name]

outEvents := make([]*pipeline.Event, 0)
inEventsCnt := 0
input.SetInFn(func() {
inEventsCnt++
})

outEventsCnt := 0
output.SetOutFn(func(e *pipeline.Event) {
outEvents = append(outEvents, e)
outEventsCnt++
})

sourceNames := []string{
Expand All @@ -55,12 +60,15 @@ func (c *testConfig) runPipeline() {
}

// generating much more events per iteration than we need so that all buckets are filled
genEventsCnt := 10 * c.eventsTotal
genEventsCnt := 10
if c.eventsTotal >= 0 {
genEventsCnt = 10 * c.eventsTotal
}

bucketIntervalNS := c.config.BucketInterval_.Nanoseconds()
startTime := time.Now()
if startTime.UnixNano()%bucketIntervalNS > bucketIntervalNS/2 {
startTime.Add(c.config.BucketInterval_ / 2)
startTime = startTime.Add(c.config.BucketInterval_ / 2)
}
for i := 0; i < c.iterations; i++ {
curTime := startTime.Add(time.Duration(i) * c.config.BucketInterval_)
Expand All @@ -81,7 +89,11 @@ func (c *testConfig) runPipeline() {
p.Stop()

// check that we passed expected amount of events
assert.Equal(c.t, c.eventsTotal, len(outEvents), "wrong out events count")
if c.eventsTotal >= 0 {
assert.Equal(c.t, c.eventsTotal, outEventsCnt, "wrong out events count")
} else {
assert.Equal(c.t, inEventsCnt, outEventsCnt, "wrong out events count")
}
}

func TestThrottle(t *testing.T) {
Expand Down Expand Up @@ -118,6 +130,39 @@ func TestThrottle(t *testing.T) {
})
}

func TestThrottleNoLimit(t *testing.T) {
buckets := 2
limitA := -2
limitB := -3
defaultLimit := -20

iterations := 5

eventsTotal := -1

config := &Config{
Rules: []RuleConfig{
{Limit: int64(limitA), Conditions: map[string]string{"k8s_ns": "ns_1"}},
{Limit: int64(limitB), Conditions: map[string]string{"k8s_ns": "ns_2"}},
},
BucketsCount: buckets,
BucketInterval: "100ms",
ThrottleField: "k8s_pod",
TimeField: "",
DefaultLimit: int64(defaultLimit),
}
err := cfg.Parse(config, nil)
if err != nil {
logger.Panic(err.Error())
}

tconf := testConfig{t, config, eventsTotal, iterations}
tconf.runPipeline()
t.Cleanup(func() {
throttleMapsCleanup()
})
}

func TestSizeThrottle(t *testing.T) {
buckets := 4
sizeFraction := 100
Expand Down

0 comments on commit 769f106

Please sign in to comment.