Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race when reporting internal cardinality metrics #257

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions scope_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"runtime"
"sync"
"unsafe"

"go.uber.org/atomic"
)

var (
Expand Down Expand Up @@ -304,41 +302,50 @@ func (r *scopeRegistry) reportInternalMetrics() {
return
}

counters, gauges, histograms, scopes := atomic.Int64{}, atomic.Int64{}, atomic.Int64{}, atomic.Int64{}
rootCounters, rootGauges, rootHistograms := atomic.Int64{}, atomic.Int64{}, atomic.Int64{}
scopes.Inc() // Account for root scope.
var counters, gauges, histograms int64
var rootCounters, rootGauges, rootHistograms int64
scopes := 1 // Account for root scope.
r.ForEachScope(
func(ss *scope) {
ss.cm.RLock()
defer ss.cm.RUnlock()
counterSliceLen, gaugeSliceLen, histogramSliceLen := int64(len(ss.countersSlice)), int64(len(ss.gaugesSlice)), int64(len(ss.histogramsSlice))
counterSliceLen := int64(len(ss.countersSlice))
ss.cm.RUnlock()

ss.gm.RLock()
gaugeSliceLen := int64(len(ss.gaugesSlice))
ss.gm.RUnlock()

ss.hm.RLock()
histogramSliceLen := int64(len(ss.histogramsSlice))
ss.hm.RUnlock()

if ss.root { // Root scope is referenced across all buckets.
rootCounters.Store(counterSliceLen)
rootGauges.Store(gaugeSliceLen)
rootHistograms.Store(histogramSliceLen)
rootCounters = counterSliceLen
rootGauges = gaugeSliceLen
rootHistograms = histogramSliceLen
return
}
counters.Add(counterSliceLen)
gauges.Add(gaugeSliceLen)
histograms.Add(histogramSliceLen)
scopes.Inc()
counters += counterSliceLen
gauges += gaugeSliceLen
histograms += histogramSliceLen
scopes++
},
)

counters.Add(rootCounters.Load())
gauges.Add(rootGauges.Load())
histograms.Add(rootHistograms.Load())
counters += rootCounters
gauges += rootGauges
histograms += rootHistograms
if r.root.reporter != nil {
r.root.reporter.ReportGauge(r.sanitizedCounterCardinalityName, r.cardinalityMetricsTags, float64(counters.Load()))
r.root.reporter.ReportGauge(r.sanitizedGaugeCardinalityName, r.cardinalityMetricsTags, float64(gauges.Load()))
r.root.reporter.ReportGauge(r.sanitizedHistogramCardinalityName, r.cardinalityMetricsTags, float64(histograms.Load()))
r.root.reporter.ReportGauge(r.sanitizedScopeCardinalityName, r.cardinalityMetricsTags, float64(scopes.Load()))
r.root.reporter.ReportGauge(r.sanitizedCounterCardinalityName, r.cardinalityMetricsTags, float64(counters))
r.root.reporter.ReportGauge(r.sanitizedGaugeCardinalityName, r.cardinalityMetricsTags, float64(gauges))
r.root.reporter.ReportGauge(r.sanitizedHistogramCardinalityName, r.cardinalityMetricsTags, float64(histograms))
r.root.reporter.ReportGauge(r.sanitizedScopeCardinalityName, r.cardinalityMetricsTags, float64(scopes))
}

if r.root.cachedReporter != nil {
r.cachedCounterCardinalityGauge.ReportGauge(float64(counters.Load()))
r.cachedGaugeCardinalityGauge.ReportGauge(float64(gauges.Load()))
r.cachedHistogramCardinalityGauge.ReportGauge(float64(histograms.Load()))
r.cachedScopeCardinalityGauge.ReportGauge(float64(scopes.Load()))
r.cachedCounterCardinalityGauge.ReportGauge(float64(counters))
r.cachedGaugeCardinalityGauge.ReportGauge(float64(gauges))
r.cachedHistogramCardinalityGauge.ReportGauge(float64(histograms))
r.cachedScopeCardinalityGauge.ReportGauge(float64(scopes))
}
}
66 changes: 66 additions & 0 deletions scope_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ package tally

import (
"fmt"
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -257,3 +260,66 @@ func TestCachedReporterInternalMetricsAlloc(t *testing.T) {
)
}
}

func TestCachedReporterInternalMetricsConcurrent(t *testing.T) {
tr := newTestStatsReporter()
root, closer := NewRootScope(ScopeOptions{
CachedReporter: tr,
OmitCardinalityMetrics: false,
}, 0)
s := root.(*scope)

var wg sync.WaitGroup

done := make(chan struct{})
time.AfterFunc(time.Second, func() {
close(done)
})

wg.Add(1)
go func() {
defer wg.Done()
var i int
for {
select {
case <-done:
return
default:
}
suffix := strconv.Itoa(i)
tr.gg.Add(1)
tr.tg.Add(1)
tr.cg.Add(1)
s.Gauge("gauge-foo" + suffix).Update(42)
s.Timer("timer-foo" + suffix).Record(42)
s.Counter("counter-foo" + suffix).Inc(42)
i++
time.Sleep(time.Microsecond)
}
}()

wg.Add(1)
go func() {
defer wg.Done()

ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()

for {
select {
case <-done:
return
case <-ticker.C:
// kick off report loop manually, so we can keep track of how many internal metrics
// we emitted.
tr.gg.Add(numInternalMetrics)
s.reportLoopRun()
}
}
}()
wg.Wait()

// Close should also trigger internal metric report.
tr.gg.Add(numInternalMetrics)
closer.Close()
}
38 changes: 38 additions & 0 deletions scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func newTestHistogramValue() *testHistogramValue {
}

type testStatsReporter struct {
mtx sync.Mutex

cg sync.WaitGroup
gg sync.WaitGroup
tg sync.WaitGroup
Expand All @@ -125,6 +127,9 @@ func newTestStatsReporter() *testStatsReporter {
}

func (r *testStatsReporter) getCounters() map[string]*testIntValue {
r.mtx.Lock()
defer r.mtx.Unlock()

dst := make(map[string]*testIntValue, len(r.counters))
for k, v := range r.counters {
var (
Expand All @@ -142,6 +147,9 @@ func (r *testStatsReporter) getCounters() map[string]*testIntValue {
}

func (r *testStatsReporter) getGauges() map[string]*testFloatValue {
r.mtx.Lock()
defer r.mtx.Unlock()

dst := make(map[string]*testFloatValue, len(r.gauges))
for k, v := range r.gauges {
var (
Expand All @@ -159,6 +167,9 @@ func (r *testStatsReporter) getGauges() map[string]*testFloatValue {
}

func (r *testStatsReporter) getTimers() map[string]*testIntValue {
r.mtx.Lock()
defer r.mtx.Unlock()

dst := make(map[string]*testIntValue, len(r.timers))
for k, v := range r.timers {
var (
Expand All @@ -176,6 +187,9 @@ func (r *testStatsReporter) getTimers() map[string]*testIntValue {
}

func (r *testStatsReporter) getHistograms() map[string]*testHistogramValue {
r.mtx.Lock()
defer r.mtx.Unlock()

dst := make(map[string]*testHistogramValue, len(r.histograms))
for k, v := range r.histograms {
var (
Expand All @@ -202,6 +216,9 @@ func (r *testStatsReporter) WaitAll() {
func (r *testStatsReporter) AllocateCounter(
name string, tags map[string]string,
) CachedCount {
r.mtx.Lock()
defer r.mtx.Unlock()

counter := &testIntValue{
val: 0,
tags: tags,
Expand All @@ -212,6 +229,9 @@ func (r *testStatsReporter) AllocateCounter(
}

func (r *testStatsReporter) ReportCounter(name string, tags map[string]string, value int64) {
r.mtx.Lock()
defer r.mtx.Unlock()

r.counters[name] = &testIntValue{
val: value,
tags: tags,
Expand All @@ -222,6 +242,9 @@ func (r *testStatsReporter) ReportCounter(name string, tags map[string]string, v
func (r *testStatsReporter) AllocateGauge(
name string, tags map[string]string,
) CachedGauge {
r.mtx.Lock()
defer r.mtx.Unlock()

gauge := &testFloatValue{
val: 0,
tags: tags,
Expand All @@ -232,6 +255,9 @@ func (r *testStatsReporter) AllocateGauge(
}

func (r *testStatsReporter) ReportGauge(name string, tags map[string]string, value float64) {
r.mtx.Lock()
defer r.mtx.Unlock()

r.gauges[name] = &testFloatValue{
val: value,
tags: tags,
Expand All @@ -242,6 +268,9 @@ func (r *testStatsReporter) ReportGauge(name string, tags map[string]string, val
func (r *testStatsReporter) AllocateTimer(
name string, tags map[string]string,
) CachedTimer {
r.mtx.Lock()
defer r.mtx.Unlock()

timer := &testIntValue{
val: 0,
tags: tags,
Expand All @@ -252,6 +281,9 @@ func (r *testStatsReporter) AllocateTimer(
}

func (r *testStatsReporter) ReportTimer(name string, tags map[string]string, interval time.Duration) {
r.mtx.Lock()
defer r.mtx.Unlock()

r.timers[name] = &testIntValue{
val: int64(interval),
tags: tags,
Expand Down Expand Up @@ -320,6 +352,9 @@ func (r *testStatsReporter) ReportHistogramValueSamples(
bucketUpperBound float64,
samples int64,
) {
r.mtx.Lock()
defer r.mtx.Unlock()

key := KeyForPrefixedStringMap(name, tags)
value, ok := r.histograms[key]
if !ok {
Expand All @@ -339,6 +374,9 @@ func (r *testStatsReporter) ReportHistogramDurationSamples(
bucketUpperBound time.Duration,
samples int64,
) {
r.mtx.Lock()
defer r.mtx.Unlock()

key := KeyForPrefixedStringMap(name, tags)
value, ok := r.histograms[key]
if !ok {
Expand Down
Loading