Skip to content

Commit

Permalink
chore: race support (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Feb 21, 2024
1 parent 793799f commit f8cb291
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 39 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ test: install-tools test-run test-teardown
test-run: ## Run all unit tests
ifeq ($(filter 1,$(debug) $(RUNNER_DEBUG)),)
$(eval TEST_CMD = SLOW=0 gotestsum --format pkgname-and-test-fails --)
$(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=count -coverpkg=./... -vet=all --timeout=15m)
$(eval TEST_OPTIONS = -race -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=atomic -coverpkg=./... -vet=all --timeout=15m)
else
$(eval TEST_CMD = SLOW=0 go test)
$(eval TEST_OPTIONS = -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=count -coverpkg=./... -vet=all --timeout=15m)
$(eval TEST_OPTIONS = -race -p=1 -v -failfast -shuffle=on -coverprofile=profile.out -covermode=atomic -coverpkg=./... -vet=all --timeout=15m)
endif
ifdef package
ifdef exclude
Expand Down
29 changes: 20 additions & 9 deletions stats/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,18 @@ func (s *statsdStats) Start(ctx context.Context, goFactory GoRoutineFactory) err
goFactory.Go(func() {
if err != nil {
s.logger.Info("retrying StatsD client creation in the background...")
s.state.client.statsdMu.Lock()
s.state.client.statsd, err = s.getNewStatsdClientWithExpoBackoff(ctx, s.state.conn, s.statsdConfig.statsdTagsFormat(), s.statsdConfig.statsdDefaultTags())
s.state.client.statsdMu.Unlock()
if err != nil {
s.config.enabled.Store(false)
s.logger.Errorf("error while creating new StatsD client, giving up: %v", err)
} else {
s.state.clientsLock.Lock()
for _, client := range s.state.pendingClients {
client.statsdMu.Lock()
client.statsd = s.state.client.statsd.Clone(s.state.conn, s.statsdConfig.statsdTagsFormat(), s.statsdConfig.statsdDefaultTags(), statsd.Tags(client.tags...), statsd.SampleRate(client.samplingRate))
client.statsdMu.Unlock()
}

s.logger.Info("StatsD client setup succeeded.")
Expand Down Expand Up @@ -195,7 +199,9 @@ func (s *statsdStats) internalNewTaggedStat(name, statType string, tags Tags, sa
tagVals := newTags.Strings()
taggedClient = &statsdClient{samplingRate: samplingRate, tags: tagVals}
if s.state.connEstablished {
taggedClient.statsdMu.Lock()
taggedClient.statsd = s.state.client.statsd.Clone(s.state.conn, s.statsdConfig.statsdTagsFormat(), s.statsdConfig.statsdDefaultTags(), statsd.Tags(tagVals...), statsd.SampleRate(samplingRate))
taggedClient.statsdMu.Unlock()
} else {
// new statsd clients will be created when connection is established for all pending clients
s.state.pendingClients[taggedClientKey] = taggedClient
Expand Down Expand Up @@ -268,15 +274,15 @@ func (c *statsdConfig) statsdTagsFormat() statsd.Option {
}

type statsdState struct {
conn statsd.Option
client *statsdClient
connEstablished bool
rc runtimeStatsCollector
mc metricStatsCollector
conn statsd.Option
client *statsdClient
rc runtimeStatsCollector
mc metricStatsCollector

clientsLock sync.RWMutex
clients map[string]*statsdClient
pendingClients map[string]*statsdClient
clientsLock sync.RWMutex // protects the following
connEstablished bool
clients map[string]*statsdClient
pendingClients map[string]*statsdClient
}

// statsdClient is a wrapper around statsd.Client.
Expand All @@ -285,10 +291,15 @@ type statsdState struct {
type statsdClient struct {
samplingRate float32
tags []string
statsd *statsd.Client

statsdMu sync.RWMutex // protects the following
statsd *statsd.Client
}

// ready returns true if the statsd client is ready to be used (not nil).
func (sc *statsdClient) ready() bool {
sc.statsdMu.RLock()
defer sc.statsdMu.RUnlock()

return sc.statsd != nil
}
46 changes: 27 additions & 19 deletions stats/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -103,8 +105,8 @@ func TestStatsdMeasurementInvalidOperations(t *testing.T) {
}

func TestStatsdMeasurementOperations(t *testing.T) {
var lastReceived string
server := newStatsdServer(t, func(s string) { lastReceived = s })
var lastReceived atomic.Value
server := newStatsdServer(t, func(s string) { lastReceived.Store(s) })
defer server.Close()

c := config.New()
Expand All @@ -128,39 +130,39 @@ func TestStatsdMeasurementOperations(t *testing.T) {
s.NewStat("test-counter", stats.CountType).Increment()

require.Eventually(t, func() bool {
return lastReceived == "test-counter,instanceName=test:1|c"
return lastReceived.Load() == "test-counter,instanceName=test:1|c"
}, 2*time.Second, time.Millisecond)
})

t.Run("counter count", func(t *testing.T) {
s.NewStat("test-counter", stats.CountType).Count(10)

require.Eventually(t, func() bool {
return lastReceived == "test-counter,instanceName=test:10|c"
return lastReceived.Load() == "test-counter,instanceName=test:10|c"
}, 2*time.Second, time.Millisecond)
})

t.Run("gauge", func(t *testing.T) {
s.NewStat("test-gauge", stats.GaugeType).Gauge(1234)

require.Eventually(t, func() bool {
return lastReceived == "test-gauge,instanceName=test:1234|g"
return lastReceived.Load() == "test-gauge,instanceName=test:1234|g"
}, 2*time.Second, time.Millisecond)
})

t.Run("timer send timing", func(t *testing.T) {
s.NewStat("test-timer-1", stats.TimerType).SendTiming(10 * time.Second)

require.Eventually(t, func() bool {
return lastReceived == "test-timer-1,instanceName=test:10000|ms"
return lastReceived.Load() == "test-timer-1,instanceName=test:10000|ms"
}, 2*time.Second, time.Millisecond)
})

t.Run("timer since", func(t *testing.T) {
s.NewStat("test-timer-2", stats.TimerType).Since(time.Now())

require.Eventually(t, func() bool {
return lastReceived == "test-timer-2,instanceName=test:0|ms"
return lastReceived.Load() == "test-timer-2,instanceName=test:0|ms"
}, 2*time.Second, time.Millisecond)
})

Expand All @@ -170,44 +172,44 @@ func TestStatsdMeasurementOperations(t *testing.T) {
}()

require.Eventually(t, func() bool {
return lastReceived == "test-timer-4,instanceName=test:0|ms"
return lastReceived.Load() == "test-timer-4,instanceName=test:0|ms"
}, 2*time.Second, time.Millisecond)
})

t.Run("histogram", func(t *testing.T) {
s.NewStat("test-hist-1", stats.HistogramType).Observe(1.2)
require.Eventually(t, func() bool {
return lastReceived == "test-hist-1,instanceName=test:1.2|h"
return lastReceived.Load() == "test-hist-1,instanceName=test:1.2|h"
}, 2*time.Second, time.Millisecond)
})

t.Run("tagged stats", func(t *testing.T) {
s.NewTaggedStat("test-tagged", stats.CountType, stats.Tags{"key": "value"}).Increment()
require.Eventually(t, func() bool {
return lastReceived == "test-tagged,instanceName=test,key=value:1|c"
return lastReceived.Load() == "test-tagged,instanceName=test,key=value:1|c"
}, 2*time.Second, time.Millisecond)

// same measurement name, different measurement type
s.NewTaggedStat("test-tagged", stats.GaugeType, stats.Tags{"key": "value"}).Gauge(22)
require.Eventually(t, func() bool {
return lastReceived == "test-tagged,instanceName=test,key=value:22|g"
return lastReceived.Load() == "test-tagged,instanceName=test,key=value:22|g"
}, 2*time.Second, time.Millisecond)
})

t.Run("sampled stats", func(t *testing.T) {
lastReceived = ""
lastReceived.Store("")
// use the same, non-sampled counter first to make sure we don't get it from cache when we request the sampled one
counter := s.NewTaggedStat("test-tagged-sampled", stats.CountType, stats.Tags{"key": "value"})
counter.Increment()

require.Eventually(t, func() bool {
return lastReceived == "test-tagged-sampled,instanceName=test,key=value:1|c"
return lastReceived.Load() == "test-tagged-sampled,instanceName=test,key=value:1|c"
}, 2*time.Second, time.Millisecond)

counterSampled := s.NewSampledTaggedStat("test-tagged-sampled", stats.CountType, stats.Tags{"key": "value"})
counterSampled.Increment()
require.Eventually(t, func() bool {
if lastReceived == "test-tagged-sampled,instanceName=test,key=value:1|c|@0.5" {
if lastReceived.Load() == "test-tagged-sampled,instanceName=test,key=value:1|c|@0.5" {
return true
}
// playing with probabilities, we might or might not get the sample (0.5 -> 50% chance)
Expand All @@ -220,27 +222,30 @@ func TestStatsdMeasurementOperations(t *testing.T) {
s.NewStat("", stats.CountType).Increment()

require.Eventually(t, func() bool {
return lastReceived == "novalue,instanceName=test:1|c"
return lastReceived.Load() == "novalue,instanceName=test:1|c"
}, 2*time.Second, time.Millisecond)
})

t.Run("measurement with empty name and empty tag key", func(t *testing.T) {
s.NewTaggedStat(" ", stats.GaugeType, stats.Tags{"key": "value", "": "value2"}).Gauge(22)

require.Eventually(t, func() bool {
return lastReceived == "novalue,instanceName=test,key=value:22|g"
return lastReceived.Load() == "novalue,instanceName=test,key=value:22|g"
}, 2*time.Second, time.Millisecond)
})
}

func TestStatsdPeriodicStats(t *testing.T) {
runTest := func(t *testing.T, prepareFunc func(c *config.Config, m metric.Manager), expected []string) {
var received []string
var receivedMu sync.RWMutex
server := newStatsdServer(t, func(s string) {
if i := strings.Index(s, ":"); i > 0 {
s = s[:i]
}
receivedMu.Lock()
received = append(received, s)
receivedMu.Unlock()
})
defer server.Close()

Expand All @@ -264,6 +269,9 @@ func TestStatsdPeriodicStats(t *testing.T) {
defer s.Stop()

require.Eventually(t, func() bool {
receivedMu.RLock()
defer receivedMu.RUnlock()

if len(received) != len(expected) {
return false
}
Expand Down Expand Up @@ -358,8 +366,8 @@ func TestStatsdPeriodicStats(t *testing.T) {
}

func TestStatsdExcludedTags(t *testing.T) {
var lastReceived string
server := newStatsdServer(t, func(s string) { lastReceived = s })
var lastReceived atomic.Value
server := newStatsdServer(t, func(s string) { lastReceived.Store(s) })
defer server.Close()

c := config.New()
Expand All @@ -382,7 +390,7 @@ func TestStatsdExcludedTags(t *testing.T) {
c.Set("statsExcludedTags", []string{"workspaceId"})
s.NewTaggedStat("test-workspaceId", stats.CountType, stats.Tags{"workspaceId": "value"}).Increment()
require.Eventually(t, func() bool {
return lastReceived == "test-workspaceId,instanceName=test:1|c"
return lastReceived.Load() == "test-workspaceId,instanceName=test:1|c"
}, 2*time.Second, time.Millisecond)
}

Expand Down
9 changes: 5 additions & 4 deletions sync/plocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sync_test

import (
gsync "sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -45,16 +46,16 @@ func TestPartitionLocker(t *testing.T) {
}
var s l
s.locker = *sync.NewPartitionLocker()
var locks int
var locks atomic.Int64
const id = "id"
s.locker.Lock(id)
go func() {
s.locker.Lock(id)
locks = locks + 1
locks.Store(locks.Add(1))
s.locker.Unlock(id)
}()
require.Never(t, func() bool { return locks == 1 }, 100*time.Millisecond, 1*time.Millisecond)
require.Never(t, func() bool { return locks.Load() == 1 }, 100*time.Millisecond, 1*time.Millisecond)
s.locker.Unlock(id)
require.Eventually(t, func() bool { return locks == 1 }, 100*time.Millisecond, 1*time.Millisecond)
require.Eventually(t, func() bool { return locks.Load() == 1 }, 100*time.Millisecond, 1*time.Millisecond)
})
}
12 changes: 7 additions & 5 deletions throttling/memory_gcra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ func TestMemoryGCRA(t *testing.T) {
rate := int64(1)
period := int64(1)

limit, err := l.limit(context.Background(), "key", burst+rate, burst, rate, period)
allowed, err := l.limit(context.Background(), "key", burst+rate, burst, rate, period)
require.NoError(t, err)
require.True(t, limit, "it should be able to fill the bucket (burst)")
require.True(t, allowed, "it should be able to fill the bucket (burst)")

// next request should be allowed after 5 seconds
start := time.Now()
var allowed bool

require.Eventually(t, func() bool {
allowed, err = l.limit(context.Background(), "key", burst, burst, rate, period)
require.NoError(t, err)
allowed, err := l.limit(context.Background(), "key", burst, burst, rate, period)
if err != nil {
t.Logf("Memory GCRA error: %v", err)
return false
}
return allowed
}, 10*time.Second, 1*time.Second, "next request should be eventually allowed")

Expand Down

0 comments on commit f8cb291

Please sign in to comment.