Skip to content

Commit

Permalink
metrics-generator: emit exemplars
Browse files Browse the repository at this point in the history
  • Loading branch information
Koenraad Verheyden committed Apr 5, 2022
1 parent 464dcbf commit d18c5ff
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 86 deletions.
7 changes: 5 additions & 2 deletions modules/generator/processor/servicegraphs/servicegraphs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/tempo/modules/generator/registry"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
tempo_util "github.com/grafana/tempo/pkg/util"
)

var (
Expand Down Expand Up @@ -146,13 +147,15 @@ func (p *processor) consume(resourceSpans []*v1.ResourceSpans) error {
case v1.Span_SPAN_KIND_CLIENT:
k = key(hex.EncodeToString(span.TraceId), hex.EncodeToString(span.SpanId))
edge, err = p.store.UpsertEdge(k, func(e *store.Edge) {
e.TraceID = tempo_util.TraceIDToHexString(span.TraceId)
e.ClientService = svcName
e.ClientLatencySec = spanDurationSec(span)
e.Failed = e.Failed || p.spanFailed(span)
})
case v1.Span_SPAN_KIND_SERVER:
k = key(hex.EncodeToString(span.TraceId), hex.EncodeToString(span.ParentSpanId))
edge, err = p.store.UpsertEdge(k, func(e *store.Edge) {
e.TraceID = tempo_util.TraceIDToHexString(span.TraceId)
e.ServerService = svcName
e.ServerLatencySec = spanDurationSec(span)
e.Failed = e.Failed || p.spanFailed(span)
Expand Down Expand Up @@ -203,8 +206,8 @@ func (p *processor) collectEdge(e *store.Edge) {
p.serviceGraphRequestFailedTotal.Inc(clientServerLabelValues, 1)
}

p.serviceGraphRequestServerSecondsHistogram.Observe(clientServerLabelValues, e.ServerLatencySec)
p.serviceGraphRequestClientSecondsHistogram.Observe(clientServerLabelValues, e.ClientLatencySec)
p.serviceGraphRequestServerSecondsHistogram.Observe(clientServerLabelValues, e.ServerLatencySec, e.TraceID)
p.serviceGraphRequestClientSecondsHistogram.Observe(clientServerLabelValues, e.ClientLatencySec, e.TraceID)
} else if e.IsExpired() {
p.metricUnpairedEdges.Inc()
}
Expand Down
1 change: 1 addition & 0 deletions modules/generator/processor/servicegraphs/store/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "time"
type Edge struct {
key string

TraceID string
ServerService, ClientService string
ServerLatencySec, ClientLatencySec float64

Expand Down
4 changes: 2 additions & 2 deletions modules/generator/processor/spanmetrics/spanmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/tempo/modules/generator/registry"
"github.com/grafana/tempo/pkg/tempopb"
v1_trace "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util"
)

const (
Expand Down Expand Up @@ -86,6 +87,5 @@ func (p *processor) aggregateMetricsForSpan(svcName string, span *v1_trace.Span)
registrylabelValues := registry.NewLabelValues(labelValues)

p.spanMetricsCallsTotal.Inc(registrylabelValues, 1)
// TODO observe exemplar prometheus.Labels{"traceID": tempo_util.TraceIDToHexString(span.TraceId)}
p.spanMetricsDurationSeconds.Observe(registrylabelValues, latencySeconds)
p.spanMetricsDurationSeconds.Observe(registrylabelValues, latencySeconds, util.TraceIDToHexString(span.TraceId))
}
22 changes: 18 additions & 4 deletions modules/generator/registry/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (n noopAppender) Rollback() error {

type capturingAppender struct {
samples []sample
exemplars []exemplarSample
isCommitted bool
isRolledback bool
}
Expand All @@ -46,6 +47,11 @@ type sample struct {
v float64
}

type exemplarSample struct {
l labels.Labels
e exemplar.Exemplar
}

func newSample(lbls map[string]string, t int64, v float64) sample {
return sample{
l: labels.FromMap(lbls),
Expand All @@ -54,6 +60,13 @@ func newSample(lbls map[string]string, t int64, v float64) sample {
}
}

func newExemplar(lbls map[string]string, e exemplar.Exemplar) exemplarSample {
return exemplarSample{
l: labels.FromMap(lbls),
e: e,
}
}

func (s sample) String() string {
return fmt.Sprintf("%s %d %g", s.l, s.t, s.v)
}
Expand All @@ -70,6 +83,11 @@ func (c *capturingAppender) Append(ref storage.SeriesRef, l labels.Labels, t int
return ref, nil
}

func (c *capturingAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
c.exemplars = append(c.exemplars, exemplarSample{l, e})
return ref, nil
}

func (c *capturingAppender) Commit() error {
c.isCommitted = true
return nil
Expand All @@ -79,7 +97,3 @@ func (c *capturingAppender) Rollback() error {
c.isRolledback = true
return nil
}

func (c *capturingAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
panic("AppendExemplar is not supported")
}
19 changes: 10 additions & 9 deletions modules/generator/registry/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func Test_counter(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples)
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)

c.Inc(NewLabelValues([]string{"value-2"}), 2.0)
c.Inc(NewLabelValues([]string{"value-3"}), 3.0)
Expand All @@ -42,7 +42,7 @@ func Test_counter(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 4),
newSample(map[string]string{"__name__": "my_counter", "label": "value-3"}, collectionTimeMs, 3),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 3, expectedSamples)
collectMetricAndAssert(t, c, collectionTimeMs, nil, 3, expectedSamples, nil)
}

func Test_counter_invalidLabelValues(t *testing.T) {
Expand Down Expand Up @@ -76,7 +76,7 @@ func Test_counter_cantAdd(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples)
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)

// block new series - existing series can still be updated
canAdd = false
Expand All @@ -89,7 +89,7 @@ func Test_counter_cantAdd(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 4),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples)
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)
}

func Test_counter_removeStaleSeries(t *testing.T) {
Expand All @@ -114,7 +114,7 @@ func Test_counter_removeStaleSeries(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples)
collectMetricAndAssert(t, c, collectionTimeMs, nil, 2, expectedSamples, nil)

time.Sleep(10 * time.Millisecond)
timeMs = time.Now().UnixMilli()
Expand All @@ -130,7 +130,7 @@ func Test_counter_removeStaleSeries(t *testing.T) {
expectedSamples = []sample{
newSample(map[string]string{"__name__": "my_counter", "label": "value-2"}, collectionTimeMs, 4),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 1, expectedSamples)
collectMetricAndAssert(t, c, collectionTimeMs, nil, 1, expectedSamples, nil)
}

func Test_counter_externalLabels(t *testing.T) {
Expand All @@ -144,7 +144,7 @@ func Test_counter_externalLabels(t *testing.T) {
newSample(map[string]string{"__name__": "my_counter", "label": "value-1", "external_label": "external_value"}, collectionTimeMs, 1),
newSample(map[string]string{"__name__": "my_counter", "label": "value-2", "external_label": "external_value"}, collectionTimeMs, 2),
}
collectMetricAndAssert(t, c, collectionTimeMs, map[string]string{"external_label": "external_value"}, 2, expectedSamples)
collectMetricAndAssert(t, c, collectionTimeMs, map[string]string{"external_label": "external_value"}, 2, expectedSamples, nil)
}

func Test_counter_concurrencyDataRace(t *testing.T) {
Expand Down Expand Up @@ -226,10 +226,10 @@ func Test_counter_concurrencyCorrectness(t *testing.T) {
expectedSamples := []sample{
newSample(map[string]string{"__name__": "my_counter", "label": "value-1"}, collectionTimeMs, float64(totalCount.Load())),
}
collectMetricAndAssert(t, c, collectionTimeMs, nil, 1, expectedSamples)
collectMetricAndAssert(t, c, collectionTimeMs, nil, 1, expectedSamples, nil)
}

func collectMetricAndAssert(t *testing.T, m metric, collectionTimeMs int64, externalLabels map[string]string, expectedActiveSeries int, expectedSamples []sample) {
func collectMetricAndAssert(t *testing.T, m metric, collectionTimeMs int64, externalLabels map[string]string, expectedActiveSeries int, expectedSamples []sample, expectedExemplars []exemplarSample) {
appender := &capturingAppender{}

activeSeries, err := m.collectMetrics(appender, collectionTimeMs, externalLabels)
Expand All @@ -239,4 +239,5 @@ func collectMetricAndAssert(t *testing.T, m metric, collectionTimeMs int64, exte
assert.False(t, appender.isCommitted)
assert.False(t, appender.isRolledback)
assert.ElementsMatch(t, expectedSamples, appender.samples)
assert.ElementsMatch(t, expectedExemplars, appender.exemplars)
}
92 changes: 58 additions & 34 deletions modules/generator/registry/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package registry

import (
"fmt"
"math"
"sort"
"strconv"
"sync"
"time"

"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"
Expand All @@ -32,9 +35,12 @@ type histogramSeries struct {
labelValues []string
count *atomic.Float64
sum *atomic.Float64
buckets []*atomic.Float64
bucketInf *atomic.Float64
lastUpdated *atomic.Int64
// buckets includes the +Inf bucket
buckets []*atomic.Float64
// exemplar is stored as a single traceID
exemplars []*atomic.String
exemplarValues []*atomic.Float64
lastUpdated *atomic.Int64
}

var _ Histogram = (*histogram)(nil)
Expand All @@ -50,6 +56,9 @@ func newHistogram(name string, labels []string, buckets []float64, onAddSeries f
onRemoveSeries = func(uint32) {}
}

// add +Inf bucket
buckets = append(buckets, math.Inf(1))

bucketLabels := make([]string, len(buckets))
for i, bucket := range buckets {
bucketLabels[i] = formatFloat(bucket)
Expand All @@ -68,7 +77,7 @@ func newHistogram(name string, labels []string, buckets []float64, onAddSeries f
}
}

func (h *histogram) Observe(labelValues *LabelValues, value float64) {
func (h *histogram) Observe(labelValues *LabelValues, value float64, traceID string) {
if len(h.labels) != len(labelValues.getValues()) {
panic(fmt.Sprintf("length of given label values does not match with labels, labels: %v, label values: %v", h.labels, labelValues))
}
Expand All @@ -80,62 +89,65 @@ func (h *histogram) Observe(labelValues *LabelValues, value float64) {
h.seriesMtx.RUnlock()

if ok {
h.updateSeries(s, value)
h.updateSeries(s, value, traceID)
return
}

if !h.onAddSerie(h.activeSeriesPerHistogramSerie()) {
return
}

newSeries := h.newSeries(labelValues, value)
newSeries := h.newSeries(labelValues, value, traceID)

h.seriesMtx.Lock()
defer h.seriesMtx.Unlock()

s, ok = h.series[hash]
if ok {
h.updateSeries(s, value)
h.updateSeries(s, value, traceID)
return
}
h.series[hash] = newSeries
}

func (h *histogram) newSeries(labelValues *LabelValues, value float64) *histogramSeries {
func (h *histogram) newSeries(labelValues *LabelValues, value float64, traceID string) *histogramSeries {
newSeries := &histogramSeries{
labelValues: labelValues.getValuesCopy(),
count: atomic.NewFloat64(1),
sum: atomic.NewFloat64(value),
count: atomic.NewFloat64(0),
sum: atomic.NewFloat64(0),
buckets: nil,
bucketInf: atomic.NewFloat64(1),
lastUpdated: atomic.NewInt64(time.Now().UnixMilli()),
exemplars: nil,
lastUpdated: atomic.NewInt64(0),
}
for _, bucket := range h.buckets {
if value <= bucket {
newSeries.buckets = append(newSeries.buckets, atomic.NewFloat64(1))
} else {
newSeries.buckets = append(newSeries.buckets, atomic.NewFloat64(0))
}
for i := 0; i < len(h.buckets); i++ {
newSeries.buckets = append(newSeries.buckets, atomic.NewFloat64(0))
newSeries.exemplars = append(newSeries.exemplars, atomic.NewString(""))
newSeries.exemplarValues = append(newSeries.exemplarValues, atomic.NewFloat64(0))
}

h.updateSeries(newSeries, value, traceID)

return newSeries
}

func (h *histogram) updateSeries(s *histogramSeries, value float64) {
func (h *histogram) updateSeries(s *histogramSeries, value float64, traceID string) {
s.count.Add(1)
s.sum.Add(value)

for i, bucket := range h.buckets {
if value <= bucket {
s.buckets[i].Add(1)
}
}
s.bucketInf.Add(1)

bucket := sort.SearchFloat64s(h.buckets, value)
s.exemplars[bucket].Store(traceID)
s.exemplarValues[bucket].Store(value)

s.lastUpdated.Store(time.Now().UnixMilli())
}

func (h *histogram) collectMetrics(appender storage.Appender, timeMs int64, externalLabels map[string]string) (activeSeries int, err error) {
h.seriesMtx.RLock()
defer h.seriesMtx.RUnlock()

activeSeries = len(h.series) * int(h.activeSeriesPerHistogramSerie())

lbls := make(labels.Labels, 1+len(externalLabels)+len(h.labels)+1)
Expand All @@ -146,6 +158,9 @@ func (h *histogram) collectMetrics(appender storage.Appender, timeMs int64, exte
lb.Set(name, value)
}

h.seriesMtx.RLock()
defer h.seriesMtx.RUnlock()

for _, s := range h.series {
// set series-specific labels
for i, name := range h.labels {
Expand All @@ -171,20 +186,29 @@ func (h *histogram) collectMetrics(appender storage.Appender, timeMs int64, exte

for i, bucketLabel := range h.bucketLabels {
lb.Set(labels.BucketLabel, bucketLabel)
_, err = appender.Append(0, lb.Labels(), timeMs, s.buckets[i].Load())
ref, err := appender.Append(0, lb.Labels(), timeMs, s.buckets[i].Load())
if err != nil {
return
return activeSeries, err
}
}

lb.Set(labels.BucketLabel, "+Inf")
_, err = appender.Append(0, lb.Labels(), timeMs, s.bucketInf.Load())
if err != nil {
return
ex := s.exemplars[i].Load()
if ex != "" {
_, err = appender.AppendExemplar(ref, lb.Labels(), exemplar.Exemplar{
Labels: []labels.Label{{
Name: "traceID",
Value: ex,
}},
Value: s.exemplarValues[i].Load(),
Ts: timeMs,
})
if err != nil {
return activeSeries, err
}
}
// clear the exemplar so we don't emit it again
s.exemplars[i].Store("")
}

// TODO support exemplars

lb.Del(labels.BucketLabel)
}

Expand All @@ -204,8 +228,8 @@ func (h *histogram) removeStaleSeries(staleTimeMs int64) {
}

func (h *histogram) activeSeriesPerHistogramSerie() uint32 {
// sum + count + +Inf + #buckets
return uint32(3 + len(h.buckets))
// sum + count + #buckets
return uint32(2 + len(h.buckets))
}

func formatFloat(value float64) string {
Expand Down
Loading

0 comments on commit d18c5ff

Please sign in to comment.