Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return previous memstats if Go collector needs longer than 1s #568

Merged
merged 3 commits into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 77 additions & 12 deletions prometheus/go_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
package prometheus

import (
"fmt"
"runtime"
"runtime/debug"
"sync"
"time"
)

Expand All @@ -26,16 +26,41 @@ type goCollector struct {
gcDesc *Desc
goInfoDesc *Desc

// metrics to describe and collect
metrics memStatsMetrics
// ms... are memstats related.
msLast *runtime.MemStats // Previously collected memstats.
msLastTimestamp time.Time
msMtx sync.Mutex // Protects msLast and msLastTimestamp.
msMetrics memStatsMetrics
msRead func(*runtime.MemStats) // For mocking in tests.
msMaxWait time.Duration // Wait time for fresh memstats.
msMaxAge time.Duration // Maximum allowed age of old memstats.
}

// NewGoCollector returns a collector which exports metrics about the current Go
// process. This includes memory stats. To collect those, runtime.ReadMemStats
// is called. This causes a stop-the-world, which is very short with Go1.9+
// (~25µs). However, with older Go versions, the stop-the-world duration depends
// on the heap size and can be quite significant (~1.7 ms/GiB as per
// is called. This requires to “stop the world”, which usually only happens for
// garbage collection (GC). Take the following implications into account when
// deciding whether to use the Go collector:
//
// 1. The performance impact of stopping the world is the more relevant the more
// frequently metrics are collected. However, with Go1.9 or later the
// stop-the-world time per metrics collection is very short (~25µs) so that the
// performance impact will only matter in rare cases. However, with older Go
// versions, the stop-the-world duration depends on the heap size and can be
// quite significant (~1.7 ms/GiB as per
// https://go-review.googlesource.com/c/go/+/34937).
//
// 2. During an ongoing GC, nothing else can stop the world. Therefore, if the
// metrics collection happens to coincide with GC, it will only complete after
// GC has finished. Usually, GC is fast enough to not cause problems. However,
// with a very large heap, GC might take multiple seconds, which is enough to
// cause scrape timeouts in common setups. To avoid this problem, the Go
// collector will use the memstats from a previous collection if
// runtime.ReadMemStats takes more than 1s. However, if there are no previously
// collected memstats, or their collection is more than 5m ago, the collection
// will block until runtime.ReadMemStats succeeds. (The problem might be solved
// in Go1.13, see https://github.com/golang/go/issues/19812 for the related Go
// issue.)
func NewGoCollector() Collector {
return &goCollector{
goroutinesDesc: NewDesc(
Expand All @@ -54,7 +79,11 @@ func NewGoCollector() Collector {
"go_info",
"Information about the Go environment.",
nil, Labels{"version": runtime.Version()}),
metrics: memStatsMetrics{
msLast: &runtime.MemStats{},
msRead: runtime.ReadMemStats,
msMaxWait: time.Second,
msMaxAge: 5 * time.Minute,
msMetrics: memStatsMetrics{
{
desc: NewDesc(
memstatNamespace("alloc_bytes"),
Expand Down Expand Up @@ -253,7 +282,7 @@ func NewGoCollector() Collector {
}

func memstatNamespace(s string) string {
return fmt.Sprintf("go_memstats_%s", s)
return "go_memstats_" + s
}

// Describe returns all descriptions of the collector.
Expand All @@ -262,13 +291,27 @@ func (c *goCollector) Describe(ch chan<- *Desc) {
ch <- c.threadsDesc
ch <- c.gcDesc
ch <- c.goInfoDesc
for _, i := range c.metrics {
for _, i := range c.msMetrics {
ch <- i.desc
}
}

// Collect returns the current state of all metrics of the collector.
func (c *goCollector) Collect(ch chan<- Metric) {
var (
ms = &runtime.MemStats{}
done = make(chan struct{})
)
// Start reading memstats first as it might take a while.
go func() {
c.msRead(ms)
c.msMtx.Lock()
c.msLast = ms
c.msLastTimestamp = time.Now()
c.msMtx.Unlock()
close(done)
}()

ch <- MustNewConstMetric(c.goroutinesDesc, GaugeValue, float64(runtime.NumGoroutine()))
n, _ := runtime.ThreadCreateProfile(nil)
ch <- MustNewConstMetric(c.threadsDesc, GaugeValue, float64(n))
Expand All @@ -286,9 +329,31 @@ func (c *goCollector) Collect(ch chan<- Metric) {

ch <- MustNewConstMetric(c.goInfoDesc, GaugeValue, 1)

ms := &runtime.MemStats{}
runtime.ReadMemStats(ms)
for _, i := range c.metrics {
timer := time.NewTimer(c.msMaxWait)
select {
case <-done: // Our own ReadMemStats succeeded in time. Use it.
timer.Stop() // Important for high collection frequencies to not pile up timers.
c.msCollect(ch, ms)
return
case <-timer.C: // Time out, use last memstats if possible. Continue below.
}
c.msMtx.Lock()
if time.Since(c.msLastTimestamp) < c.msMaxAge {
// Last memstats are recent enough. Collect from them under the lock.
c.msCollect(ch, c.msLast)
c.msMtx.Unlock()
return
}
// If we are here, the last memstats are too old or don't exist. We have
// to wait until our own ReadMemStats finally completes. For that to
// happen, we have to release the lock.
c.msMtx.Unlock()
<-done
c.msCollect(ch, ms)
}

func (c *goCollector) msCollect(ch chan<- Metric, ms *runtime.MemStats) {
for _, i := range c.msMetrics {
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(ms))
}
}
Expand Down
171 changes: 136 additions & 35 deletions prometheus/go_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,40 @@ import (
dto "github.com/prometheus/client_model/go"
)

func TestGoCollector(t *testing.T) {
func TestGoCollectorGoroutines(t *testing.T) {
var (
c = NewGoCollector()
ch = make(chan Metric)
waitc = make(chan struct{})
closec = make(chan struct{})
old = -1
c = NewGoCollector()
metricCh = make(chan Metric)
waitCh = make(chan struct{})
endGoroutineCh = make(chan struct{})
endCollectionCh = make(chan struct{})
old = -1
)
defer close(closec)
defer func() {
close(endGoroutineCh)
// Drain the collect channel to prevent goroutine leak.
for {
select {
case <-metricCh:
case <-endCollectionCh:
return
}
}
}()

go func() {
c.Collect(ch)
c.Collect(metricCh)
go func(c <-chan struct{}) {
<-c
}(closec)
<-waitc
c.Collect(ch)
}(endGoroutineCh)
<-waitCh
c.Collect(metricCh)
close(endCollectionCh)
}()

for {
select {
case m := <-ch:
case m := <-metricCh:
// m can be Gauge or Counter,
// currently just test the go_goroutines Gauge
// and ignore others.
Expand All @@ -57,51 +69,55 @@ func TestGoCollector(t *testing.T) {

if old == -1 {
old = int(pb.GetGauge().GetValue())
close(waitc)
close(waitCh)
continue
}

if diff := int(pb.GetGauge().GetValue()) - old; diff != 1 {
// TODO: This is flaky in highly concurrent situations.
t.Errorf("want 1 new goroutine, got %d", diff)
}

// GoCollector performs three sends per call.
// On line 27 we need to receive three more sends
// to shut down cleanly.
<-ch
<-ch
<-ch
return
case <-time.After(1 * time.Second):
t.Fatalf("expected collect timed out")
}
break
}
}

func TestGCCollector(t *testing.T) {
func TestGoCollectorGC(t *testing.T) {
var (
c = NewGoCollector()
ch = make(chan Metric)
waitc = make(chan struct{})
closec = make(chan struct{})
oldGC uint64
oldPause float64
c = NewGoCollector()
metricCh = make(chan Metric)
waitCh = make(chan struct{})
endCollectionCh = make(chan struct{})
oldGC uint64
oldPause float64
)
defer close(closec)

go func() {
c.Collect(ch)
c.Collect(metricCh)
// force GC
runtime.GC()
<-waitc
c.Collect(ch)
<-waitCh
c.Collect(metricCh)
close(endCollectionCh)
}()

defer func() {
// Drain the collect channel to prevent goroutine leak.
for {
select {
case <-metricCh:
case <-endCollectionCh:
return
}
}
}()

first := true
for {
select {
case metric := <-ch:
case metric := <-metricCh:
pb := &dto.Metric{}
metric.Write(pb)
if pb.GetSummary() == nil {
Expand All @@ -119,7 +135,7 @@ func TestGCCollector(t *testing.T) {
first = false
oldGC = *pb.GetSummary().SampleCount
oldPause = *pb.GetSummary().SampleSum
close(waitc)
close(waitCh)
continue
}
if diff := *pb.GetSummary().SampleCount - oldGC; diff != 1 {
Expand All @@ -128,9 +144,94 @@ func TestGCCollector(t *testing.T) {
if diff := *pb.GetSummary().SampleSum - oldPause; diff <= 0 {
t.Errorf("want moar pause, got %f", diff)
}
return
case <-time.After(1 * time.Second):
t.Fatalf("expected collect timed out")
}
break
}
}

func TestGoCollectorMemStats(t *testing.T) {
var (
c = NewGoCollector().(*goCollector)
got uint64
)

checkCollect := func(want uint64) {
metricCh := make(chan Metric)
endCh := make(chan struct{})

go func() {
c.Collect(metricCh)
close(endCh)
}()
Collect:
for {
select {
case metric := <-metricCh:
if metric.Desc().fqName != "go_memstats_alloc_bytes" {
continue Collect
}
pb := &dto.Metric{}
metric.Write(pb)
got = uint64(pb.GetGauge().GetValue())
case <-endCh:
break Collect
}
}
if want != got {
t.Errorf("unexpected value of go_memstats_alloc_bytes, want %d, got %d", want, got)
}
}

// Speed up the timing to make the tast faster.
c.msMaxWait = time.Millisecond
c.msMaxAge = 10 * time.Millisecond

// Scenario 1: msRead responds slowly, no previous memstats available,
// msRead is executed anyway.
c.msRead = func(ms *runtime.MemStats) {
time.Sleep(3 * time.Millisecond)
ms.Alloc = 1
}
checkCollect(1)
// Now msLast is set.
if want, got := uint64(1), c.msLast.Alloc; want != got {
t.Errorf("unexpected of msLast.Alloc, want %d, got %d", want, got)
}

// Scenario 2: msRead responds fast, previous memstats available, new
// value collected.
c.msRead = func(ms *runtime.MemStats) {
ms.Alloc = 2
}
checkCollect(2)
// msLast is set, too.
if want, got := uint64(2), c.msLast.Alloc; want != got {
t.Errorf("unexpected of msLast.Alloc, want %d, got %d", want, got)
}

// Scenario 3: msRead responds slowly, previous memstats available, old
// value collected.
c.msRead = func(ms *runtime.MemStats) {
time.Sleep(3 * time.Millisecond)
ms.Alloc = 3
}
checkCollect(2)
// After waiting, new value is still set in msLast.
time.Sleep(12 * time.Millisecond)
if want, got := uint64(3), c.msLast.Alloc; want != got {
t.Errorf("unexpected of msLast.Alloc, want %d, got %d", want, got)
}

// Scenario 4: msRead responds slowly, previous memstats is too old, new
// value collected.
c.msRead = func(ms *runtime.MemStats) {
time.Sleep(3 * time.Millisecond)
ms.Alloc = 4
}
checkCollect(4)
if want, got := uint64(4), c.msLast.Alloc; want != got {
t.Errorf("unexpected of msLast.Alloc, want %d, got %d", want, got)
}
}