diff --git a/plugin/action/throttle/in_memory_limiter.go b/plugin/action/throttle/in_memory_limiter.go index c64df86d3..dc7fde458 100644 --- a/plugin/action/throttle/in_memory_limiter.go +++ b/plugin/action/throttle/in_memory_limiter.go @@ -2,6 +2,7 @@ package throttle import ( "sync" + "sync/atomic" "time" "github.com/ozontech/file.d/logger" @@ -39,6 +40,11 @@ func (l *inMemoryLimiter) sync() { } func (l *inMemoryLimiter) isAllowed(event *pipeline.Event, ts time.Time) bool { + // limit value fast check without races + if atomic.LoadInt64(&l.limit.value) < 0 { + return true + } + l.mu.Lock() defer l.mu.Unlock() diff --git a/plugin/action/throttle/redis_limiter.go b/plugin/action/throttle/redis_limiter.go index 2439cc8ab..fbf44451d 100644 --- a/plugin/action/throttle/redis_limiter.go +++ b/plugin/action/throttle/redis_limiter.go @@ -7,6 +7,7 @@ import ( "fmt" "strconv" "strings" + "sync/atomic" "time" "github.com/ozontech/file.d/logger" @@ -244,8 +245,9 @@ func (l *redisLimiter) updateKeyLimit() error { return fmt.Errorf("failed to convert redis value to int64: %w", err) } } - l.totalLimiter.limit.value = limitVal - l.incrementLimiter.limit.value = limitVal + // atomic store to prevent races with limit value fast check + atomic.StoreInt64(&l.totalLimiter.limit.value, limitVal) + atomic.StoreInt64(&l.incrementLimiter.limit.value, limitVal) return nil } diff --git a/plugin/action/throttle/throttle_test.go b/plugin/action/throttle/throttle_test.go index 94e10568a..b13c857fa 100644 --- a/plugin/action/throttle/throttle_test.go +++ b/plugin/action/throttle/throttle_test.go @@ -43,9 +43,14 @@ func (c *testConfig) runPipeline() { limMap := limiters[p.Name] - outEvents := make([]*pipeline.Event, 0) + inEventsCnt := 0 + input.SetInFn(func() { + inEventsCnt++ + }) + + outEventsCnt := 0 output.SetOutFn(func(e *pipeline.Event) { - outEvents = append(outEvents, e) + outEventsCnt++ }) sourceNames := []string{ @@ -55,12 +60,15 @@ func (c *testConfig) runPipeline() { } // generating much more events per iteration than we need so that all buckets are filled - genEventsCnt := 10 * c.eventsTotal + genEventsCnt := 10 + if c.eventsTotal >= 0 { + genEventsCnt = 10 * c.eventsTotal + } bucketIntervalNS := c.config.BucketInterval_.Nanoseconds() startTime := time.Now() if startTime.UnixNano()%bucketIntervalNS > bucketIntervalNS/2 { - startTime.Add(c.config.BucketInterval_ / 2) + startTime = startTime.Add(c.config.BucketInterval_ / 2) } for i := 0; i < c.iterations; i++ { curTime := startTime.Add(time.Duration(i) * c.config.BucketInterval_) @@ -81,7 +89,11 @@ func (c *testConfig) runPipeline() { p.Stop() // check that we passed expected amount of events - assert.Equal(c.t, c.eventsTotal, len(outEvents), "wrong out events count") + if c.eventsTotal >= 0 { + assert.Equal(c.t, c.eventsTotal, outEventsCnt, "wrong out events count") + } else { + assert.Equal(c.t, inEventsCnt, outEventsCnt, "wrong out events count") + } } func TestThrottle(t *testing.T) { @@ -118,6 +130,39 @@ func TestThrottle(t *testing.T) { }) } +func TestThrottleNoLimit(t *testing.T) { + buckets := 2 + limitA := -2 + limitB := -3 + defaultLimit := -20 + + iterations := 5 + + eventsTotal := -1 + + config := &Config{ + Rules: []RuleConfig{ + {Limit: int64(limitA), Conditions: map[string]string{"k8s_ns": "ns_1"}}, + {Limit: int64(limitB), Conditions: map[string]string{"k8s_ns": "ns_2"}}, + }, + BucketsCount: buckets, + BucketInterval: "100ms", + ThrottleField: "k8s_pod", + TimeField: "", + DefaultLimit: int64(defaultLimit), + } + err := cfg.Parse(config, nil) + if err != nil { + logger.Panic(err.Error()) + } + + tconf := testConfig{t, config, eventsTotal, iterations} + tconf.runPipeline() + t.Cleanup(func() { + throttleMapsCleanup() + }) +} + func TestSizeThrottle(t *testing.T) { buckets := 4 sizeFraction := 100