diff --git a/go.mod b/go.mod index 8d91351a..482fb322 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/prometheus/client_golang v1.15.0 github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.42.0 + github.com/samber/lo v1.38.1 github.com/shirou/gopsutil/v3 v3.23.4 github.com/spf13/cast v1.5.0 github.com/spf13/viper v1.15.0 diff --git a/go.sum b/go.sum index 5e689bbf..0217744c 100644 --- a/go.sum +++ b/go.sum @@ -288,6 +288,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg= github.com/shirou/gopsutil/v3 v3.23.4 h1:hZwmDxZs7Ewt75DV81r4pFMqbq+di2cbt9FsQBqLD2o= github.com/shirou/gopsutil/v3 v3.23.4/go.mod h1:ZcGxyfzAMRevhUR2+cfhXDH6gQdFYE/t8j1nsU4mPI8= diff --git a/sync/limiter.go b/sync/limiter.go index a8b53785..7ff1272a 100644 --- a/sync/limiter.go +++ b/sync/limiter.go @@ -3,11 +3,13 @@ package sync import ( "container/heap" "context" + "fmt" "sync" "time" "github.com/rudderlabs/rudder-go-kit/queue" "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/samber/lo" ) // LimiterPriorityValue defines the priority values supported by Limiter. @@ -67,6 +69,9 @@ var WithLimiterTags = func(tags stats.Tags) func(*limiter) { // NewLimiter creates a new limiter func NewLimiter(ctx context.Context, wg *sync.WaitGroup, name string, limit int, statsf stats.Stats, opts ...func(*limiter)) Limiter { + if limit <= 0 { + panic(fmt.Errorf("limit for %q must be greater than 0", name)) + } l := &limiter{ name: name, limit: limit, @@ -141,10 +146,11 @@ func (l *limiter) Begin(key string) (end func()) { func (l *limiter) BeginWithPriority(key string, priority LimiterPriorityValue) (end func()) { start := time.Now() l.wait(priority) - l.stats.stat.NewTaggedStat(l.name+"_limiter_waiting", stats.TimerType, stats.Tags{"key": key}).Since(start) + tags := lo.Assign(l.tags, stats.Tags{"key": key}) + l.stats.stat.NewTaggedStat(l.name+"_limiter_waiting", stats.TimerType, tags).Since(start) start = time.Now() end = func() { - defer l.stats.stat.NewTaggedStat(l.name+"_limiter_working", stats.TimerType, stats.Tags{"key": key}).Since(start) + defer l.stats.stat.NewTaggedStat(l.name+"_limiter_working", stats.TimerType, tags).Since(start) l.mu.Lock() l.count-- if len(l.waitList) == 0 { diff --git a/sync/limiter_test.go b/sync/limiter_test.go index 38de5d81..93875b88 100644 --- a/sync/limiter_test.go +++ b/sync/limiter_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-go-kit/stats/memstats" miscsync "github.com/rudderlabs/rudder-go-kit/sync" "github.com/stretchr/testify/require" @@ -149,4 +150,16 @@ func TestLimiter(t *testing.T) { require.EqualValues(t, 1000, counterHigh, "high priority counter should be 1000") require.EqualValues(t, 10, counterLow, "low priority counter should be 10") }) + + t.Run("invalid limit", func(t *testing.T) { + require.Panics(t, func() { + var wg sync.WaitGroup + _ = miscsync.NewLimiter(context.Background(), &wg, "zerolimit", 0, stats.Default) + }) + + require.Panics(t, func() { + var wg sync.WaitGroup + _ = miscsync.NewLimiter(context.Background(), &wg, "negativelimit", -1, stats.Default) + }) + }) }