Skip to content

Commit

Permalink
Use simpler locking in the Go 1.17 collector
Browse files Browse the repository at this point in the history
A previous PR made it so that the Go 1.17 collector locked only around
uses of rmSampleBuf, but really that means that Metric values may be
sent over the channel containing some values from future metrics.Read
calls. While generally-speaking this isn't a problem, we lose any
consistency guarantees provided by the runtime/metrics package.

Also, that optimization to not just lock around all of Collect was
premature. Truthfully, Collect is called relatively infrequently, and
its critical path is fairly fast (10s of µs). To prove it, this test
also adds a benchmark.

Signed-off-by: Michael Anthony Knyszek <mknyszek@google.com>
  • Loading branch information
mknyszek committed Jan 24, 2022
1 parent f63e219 commit 7a60951
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 17 deletions.
33 changes: 17 additions & 16 deletions prometheus/go_collector_go117.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ import (
type goCollector struct {
base baseGoCollector

// mu protects updates to all fields ensuring a consistent
// snapshot is always produced by Collect.
mu sync.Mutex

// rm... fields all pertain to the runtime/metrics package.
rmSampleMu sync.Mutex
rmSampleBuf []metrics.Sample
rmSampleMap map[string]*metrics.Sample
rmMetrics []collectorMetric
Expand Down Expand Up @@ -135,10 +138,16 @@ func (c *goCollector) Collect(ch chan<- Metric) {
// rmSampleBuf. Just read into rmSampleBuf but write all the data
// we get into our Metrics or MemStats.
//
// Note that we cannot simply read and then clone rmSampleBuf
// because we'd need to perform a deep clone of it, which is likely
// not worth it.
c.rmSampleMu.Lock()
// This lock also ensures that the Metrics we send out are all from
// the same updates, ensuring their mutual consistency insofar as
// is guaranteed by the runtime/metrics package.
//
// N.B. This locking is heavy-handed, but Collect is expected to be called
// relatively infrequently. Also the core operation here, metrics.Read,
// is fast (O(tens of microseconds)) so contention should certainly be
// low, though channel operations and any allocations may add to that.
c.mu.Lock()
defer c.mu.Unlock()

// Populate runtime/metrics sample buffer.
metrics.Read(c.rmSampleBuf)
Expand All @@ -157,10 +166,13 @@ func (c *goCollector) Collect(ch chan<- Metric) {
if v1 > v0 {
m.Add(unwrapScalarRMValue(sample.Value) - m.get())
}
m.Collect(ch)
case *gauge:
m.Set(unwrapScalarRMValue(sample.Value))
m.Collect(ch)
case *batchHistogram:
m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name))
m.Collect(ch)
default:
panic("unexpected metric type")
}
Expand All @@ -169,17 +181,6 @@ func (c *goCollector) Collect(ch chan<- Metric) {
// populate the old metrics from it.
var ms runtime.MemStats
memStatsFromRM(&ms, c.rmSampleMap)

c.rmSampleMu.Unlock()

// Export all the metrics to ch.
// At this point we must not access rmSampleBuf or rmSampleMap, because
// a concurrent caller could use it. It's safe to Collect all our Metrics,
// however, because they're updated in a thread-safe way while MemStats
// is local to this call of Collect.
for _, m := range c.rmMetrics {
m.Collect(ch)
}
for _, i := range c.msMetrics {
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms))
}
Expand Down
2 changes: 1 addition & 1 deletion prometheus/go_collector_go117_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func TestGoCollectorConcurrency(t *testing.T) {
go func() {
ch := make(chan Metric)
go func() {
// Drain all metrics recieved until the
// Drain all metrics received until the
// channel is closed.
for range ch {
}
Expand Down
17 changes: 17 additions & 0 deletions prometheus/go_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,20 @@ func TestGoCollectorGC(t *testing.T) {
break
}
}

func BenchmarkGoCollector(b *testing.B) {
c := NewGoCollector().(*goCollector)

b.ResetTimer()
for i := 0; i < b.N; i++ {
ch := make(chan Metric, 8)
go func() {
// Drain all metrics received until the
// channel is closed.
for range ch {
}
}()
c.Collect(ch)
close(ch)
}
}

0 comments on commit 7a60951

Please sign in to comment.