Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: indefinite function runs inside mutex lock #372

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions stats/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,28 @@
goFactory.Go(func() {
if err != nil {
s.logger.Info("retrying StatsD client creation in the background...")
s.state.client.statsdMu.Lock()
s.state.client.statsd, err = s.getNewStatsdClientWithExpoBackoff(ctx, s.state.conn, s.statsdConfig.statsdTagsFormat(), s.statsdConfig.statsdDefaultTags())
s.state.client.statsdMu.Unlock()
var c *statsd.Client
c, err = s.getNewStatsdClientWithExpoBackoff(
ctx,
s.state.conn,
s.statsdConfig.statsdTagsFormat(),
s.statsdConfig.statsdDefaultTags(),
)

Check warning on line 59 in stats/statsd.go

View check run for this annotation

Codecov / codecov/patch

stats/statsd.go#L53-L59

Added lines #L53 - L59 were not covered by tests
if err != nil {
s.config.enabled.Store(false)
s.logger.Errorf("error while creating new StatsD client, giving up: %v", err)
} else {
s.state.clientsLock.Lock()
s.state.client.statsd = c

Check warning on line 65 in stats/statsd.go

View check run for this annotation

Codecov / codecov/patch

stats/statsd.go#L65

Added line #L65 was not covered by tests
for _, client := range s.state.pendingClients {
client.statsdMu.Lock()
client.statsd = s.state.client.statsd.Clone(s.state.conn, s.statsdConfig.statsdTagsFormat(), s.statsdConfig.statsdDefaultTags(), statsd.Tags(client.tags...), statsd.SampleRate(client.samplingRate))
client.statsd = s.state.client.statsd.Clone(
s.state.conn,
s.statsdConfig.statsdTagsFormat(),
s.statsdConfig.statsdDefaultTags(),
statsd.Tags(client.tags...),
statsd.SampleRate(client.samplingRate),
)

Check warning on line 74 in stats/statsd.go

View check run for this annotation

Codecov / codecov/patch

stats/statsd.go#L68-L74

Added lines #L68 - L74 were not covered by tests
client.statsdMu.Unlock()
}

Expand Down Expand Up @@ -104,7 +115,7 @@

func (s *statsdStats) collectPeriodicStats(goFactory GoRoutineFactory) {
gaugeFunc := func(key string, val uint64) {
s.state.client.statsd.Gauge("runtime_"+key, val)
s.NewStat("runtime_"+key, GaugeType).Gauge(val)
}
s.state.rc = newRuntimeStatsCollector(gaugeFunc)
s.state.rc.PauseDur = time.Duration(s.config.periodicStatsConfig.statsCollectionInterval) * time.Second
Expand Down Expand Up @@ -152,7 +163,7 @@

// NewStat creates a new Measurement with provided Name and Type
func (s *statsdStats) NewStat(name, statType string) (m Measurement) {
return s.newStatsdMeasurement(name, statType, s.state.client)
return s.internalNewTaggedStat(name, statType, nil, 1)
}

func (s *statsdStats) NewTaggedStat(Name, StatType string, tags Tags) (m Measurement) {
Expand Down Expand Up @@ -199,9 +210,13 @@
tagVals := newTags.Strings()
taggedClient = &statsdClient{samplingRate: samplingRate, tags: tagVals}
if s.state.connEstablished {
taggedClient.statsdMu.Lock()
taggedClient.statsd = s.state.client.statsd.Clone(s.state.conn, s.statsdConfig.statsdTagsFormat(), s.statsdConfig.statsdDefaultTags(), statsd.Tags(tagVals...), statsd.SampleRate(samplingRate))
taggedClient.statsdMu.Unlock()
taggedClient.statsd = s.state.client.statsd.Clone(
s.state.conn,
s.statsdConfig.statsdTagsFormat(),
s.statsdConfig.statsdDefaultTags(),
statsd.Tags(tagVals...),
statsd.SampleRate(samplingRate),
)
} else {
// new statsd clients will be created when connection is established for all pending clients
s.state.pendingClients[taggedClientKey] = taggedClient
Expand Down Expand Up @@ -297,9 +312,8 @@
}

// ready returns true if the statsd client is ready to be used (not nil).
//
// sc.statsdMu.RLock should be held when calling this method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't understood, why are we removing lock from here and asking caller to hold lock? Isn't that same?

Copy link
Contributor

@mihir20 mihir20 Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, along with calling this function in skip() fn call, we are recording stats as well here. we would need to hold lock again before recording new stats. Holding multiple locks multiple time can slow application down so we removed lock from here.

func (sc *statsdClient) ready() bool {
sc.statsdMu.RLock()
defer sc.statsdMu.RUnlock()

return sc.statsd != nil
}
16 changes: 16 additions & 0 deletions stats/statsd_measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
}

// skip returns true if the stat should be skipped (stats disabled or client not ready)
//
// m.client.statsdMu.RLock should be held when calling this method.
func (m *statsdMeasurement) skip() bool {
return !m.enabled || !m.client.ready()
}
Expand All @@ -25,6 +27,8 @@
}

func (c *statsdCounter) Count(n int) {
c.client.statsdMu.RLock()
defer c.client.statsdMu.RUnlock()
if c.skip() {
return
}
Expand All @@ -33,6 +37,8 @@

// Increment increases the stat by 1. Is the Equivalent of Count(1). Only applies to CountType stats
func (c *statsdCounter) Increment() {
c.client.statsdMu.RLock()
defer c.client.statsdMu.RUnlock()
if c.skip() {
return
}
Expand All @@ -46,6 +52,8 @@

// Gauge records an absolute value for this stat. Only applies to GaugeType stats
func (g *statsdGauge) Gauge(value interface{}) {
g.client.statsdMu.RLock()
defer g.client.statsdMu.RUnlock()
if g.skip() {
return
}
Expand All @@ -61,6 +69,8 @@
// Start starts a new timing for this stat. Only applies to TimerType stats
// Deprecated: Use concurrent safe SendTiming() instead
func (t *statsdTimer) Start() {
t.client.statsdMu.RLock()
defer t.client.statsdMu.RUnlock()

Check warning on line 73 in stats/statsd_measurement.go

View check run for this annotation

Codecov / codecov/patch

stats/statsd_measurement.go#L72-L73

Added lines #L72 - L73 were not covered by tests
if t.skip() {
return
}
Expand All @@ -71,6 +81,8 @@
// End send the time elapsed since the Start() call of this stat. Only applies to TimerType stats
// Deprecated: Use concurrent safe SendTiming() instead
func (t *statsdTimer) End() {
t.client.statsdMu.RLock()
defer t.client.statsdMu.RUnlock()

Check warning on line 85 in stats/statsd_measurement.go

View check run for this annotation

Codecov / codecov/patch

stats/statsd_measurement.go#L84-L85

Added lines #L84 - L85 were not covered by tests
if t.skip() || t.timing == nil {
return
}
Expand All @@ -84,6 +96,8 @@

// SendTiming sends a timing for this stat. Only applies to TimerType stats
func (t *statsdTimer) SendTiming(duration time.Duration) {
t.client.statsdMu.RLock()
defer t.client.statsdMu.RUnlock()
if t.skip() {
return
}
Expand All @@ -107,6 +121,8 @@

// Observe sends an observation
func (h *statsdHistogram) Observe(value float64) {
h.client.statsdMu.RLock()
defer h.client.statsdMu.RUnlock()
if h.skip() {
return
}
Expand Down
Loading