Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: indefinite function runs inside mutex lock #372

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions stats/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,28 @@
goFactory.Go(func() {
if err != nil {
s.logger.Info("retrying StatsD client creation in the background...")
s.state.client.statsdMu.Lock()
s.state.client.statsd, err = s.getNewStatsdClientWithExpoBackoff(ctx, s.state.conn, s.statsdConfig.statsdTagsFormat(), s.statsdConfig.statsdDefaultTags())
s.state.client.statsdMu.Unlock()
var c *statsd.Client
c, err = s.getNewStatsdClientWithExpoBackoff(
ctx,
s.state.conn,
s.statsdConfig.statsdTagsFormat(),
s.statsdConfig.statsdDefaultTags(),
)

Check warning on line 59 in stats/statsd.go

View check run for this annotation

Codecov / codecov/patch

stats/statsd.go#L53-L59

Added lines #L53 - L59 were not covered by tests
if err != nil {
s.config.enabled.Store(false)
s.logger.Errorf("error while creating new StatsD client, giving up: %v", err)
} else {
s.state.clientsLock.Lock()
s.state.client.statsd = c

Check warning on line 65 in stats/statsd.go

View check run for this annotation

Codecov / codecov/patch

stats/statsd.go#L65

Added line #L65 was not covered by tests
for _, client := range s.state.pendingClients {
client.statsdMu.Lock()
client.statsd = s.state.client.statsd.Clone(s.state.conn, s.statsdConfig.statsdTagsFormat(), s.statsdConfig.statsdDefaultTags(), statsd.Tags(client.tags...), statsd.SampleRate(client.samplingRate))
client.statsd = s.state.client.statsd.Clone(
s.state.conn,
s.statsdConfig.statsdTagsFormat(),
s.statsdConfig.statsdDefaultTags(),
statsd.Tags(client.tags...),
statsd.SampleRate(client.samplingRate),
)

Check warning on line 74 in stats/statsd.go

View check run for this annotation

Codecov / codecov/patch

stats/statsd.go#L68-L74

Added lines #L68 - L74 were not covered by tests
client.statsdMu.Unlock()
}

Expand Down Expand Up @@ -104,7 +115,7 @@

func (s *statsdStats) collectPeriodicStats(goFactory GoRoutineFactory) {
gaugeFunc := func(key string, val uint64) {
s.state.client.statsd.Gauge("runtime_"+key, val)
s.NewStat("runtime_"+key, GaugeType).Gauge(val)
}
s.state.rc = newRuntimeStatsCollector(gaugeFunc)
s.state.rc.PauseDur = time.Duration(s.config.periodicStatsConfig.statsCollectionInterval) * time.Second
Expand Down Expand Up @@ -152,7 +163,7 @@

// NewStat creates a new Measurement with provided Name and Type
func (s *statsdStats) NewStat(name, statType string) (m Measurement) {
return s.newStatsdMeasurement(name, statType, s.state.client)
return s.internalNewTaggedStat(name, statType, nil, 1)
}

func (s *statsdStats) NewTaggedStat(Name, StatType string, tags Tags) (m Measurement) {
Expand Down Expand Up @@ -200,7 +211,13 @@
taggedClient = &statsdClient{samplingRate: samplingRate, tags: tagVals}
if s.state.connEstablished {
taggedClient.statsdMu.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to lock here given that we just created the client and it's not a key on any of the maps, right?

taggedClient.statsd = s.state.client.statsd.Clone(s.state.conn, s.statsdConfig.statsdTagsFormat(), s.statsdConfig.statsdDefaultTags(), statsd.Tags(tagVals...), statsd.SampleRate(samplingRate))
taggedClient.statsd = s.state.client.statsd.Clone(
s.state.conn,
s.statsdConfig.statsdTagsFormat(),
s.statsdConfig.statsdDefaultTags(),
statsd.Tags(tagVals...),
statsd.SampleRate(samplingRate),
)
taggedClient.statsdMu.Unlock()
} else {
// new statsd clients will be created when connection is established for all pending clients
Expand Down Expand Up @@ -297,9 +314,8 @@
}

// ready returns true if the statsd client is ready to be used (not nil).
//
// statsdMu.RLock should be held when calling this method.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comment might be useful on skip().

func (sc *statsdClient) ready() bool {
sc.statsdMu.RLock()
defer sc.statsdMu.RUnlock()

return sc.statsd != nil
}
14 changes: 14 additions & 0 deletions stats/statsd_measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
}

func (c *statsdCounter) Count(n int) {
c.client.statsdMu.RLock()
defer c.client.statsdMu.RUnlock()
if c.skip() {
return
}
Expand All @@ -33,6 +35,8 @@

// Increment increases the stat by 1. Is the Equivalent of Count(1). Only applies to CountType stats
func (c *statsdCounter) Increment() {
c.client.statsdMu.RLock()
defer c.client.statsdMu.RUnlock()
if c.skip() {
return
}
Expand All @@ -46,6 +50,8 @@

// Gauge records an absolute value for this stat. Only applies to GaugeType stats
func (g *statsdGauge) Gauge(value interface{}) {
g.client.statsdMu.RLock()
defer g.client.statsdMu.RUnlock()
if g.skip() {
return
}
Expand All @@ -61,6 +67,8 @@
// Start starts a new timing for this stat. Only applies to TimerType stats
// Deprecated: Use concurrent safe SendTiming() instead
func (t *statsdTimer) Start() {
t.client.statsdMu.RLock()
defer t.client.statsdMu.RUnlock()

Check warning on line 71 in stats/statsd_measurement.go

View check run for this annotation

Codecov / codecov/patch

stats/statsd_measurement.go#L70-L71

Added lines #L70 - L71 were not covered by tests
if t.skip() {
return
}
Expand All @@ -71,6 +79,8 @@
// End send the time elapsed since the Start() call of this stat. Only applies to TimerType stats
// Deprecated: Use concurrent safe SendTiming() instead
func (t *statsdTimer) End() {
t.client.statsdMu.RLock()
defer t.client.statsdMu.RUnlock()

Check warning on line 83 in stats/statsd_measurement.go

View check run for this annotation

Codecov / codecov/patch

stats/statsd_measurement.go#L82-L83

Added lines #L82 - L83 were not covered by tests
if t.skip() || t.timing == nil {
return
}
Expand All @@ -84,6 +94,8 @@

// SendTiming sends a timing for this stat. Only applies to TimerType stats
func (t *statsdTimer) SendTiming(duration time.Duration) {
t.client.statsdMu.RLock()
defer t.client.statsdMu.RUnlock()
if t.skip() {
return
}
Expand All @@ -107,6 +119,8 @@

// Observe sends an observation
func (h *statsdHistogram) Observe(value float64) {
h.client.statsdMu.RLock()
defer h.client.statsdMu.RUnlock()
if h.skip() {
return
}
Expand Down
Loading