Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics-generator: use Prometheus Agent WAL and remote storage #1323

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions example/docker-compose/distributed/tempo-distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ querier:
frontend_address: query-frontend:9095

metrics_generator:
remote_write:
enabled: true
client:
url: http://prometheus:9090/api/v1/write
storage:
path: /tmp/tempo/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true

storage:
trace:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ require (
github.com/mostynb/go-grpc-compression v1.1.15 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/oklog/run v1.1.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.41.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.41.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.41.0 // indirect
Expand Down
45 changes: 45 additions & 0 deletions integration/e2e/config-metrics-generator.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
metrics_generator_enabled: true

server:
http_listen_port: 3200

distributor:
receivers:
jaeger:
protocols:
grpc:
log_received_traces: true

ingester:
lifecycler:
ring:
replication_factor: 1

metrics_generator:
collection_interval: 1s
storage:
path: /var/tempo-metrics-generator
remote_write:
- url: http://tempo_e2e-prometheus:9090/api/v1/write
send_exemplars: true
processor:
service_graphs:
histogram_buckets: [1, 2] # seconds
span_metrics:
histogram_buckets: [1, 2] # seconds

storage:
trace:
backend: local
local:
path: /var/tempo

memberlist:
bind_port: 7946
join_members:
- tempo_e2e-distributor:7946
- tempo_e2e-ingester-1:7946
- tempo_e2e-metrics-generator:7946

overrides:
metrics_generator_processors: [service-graphs, span-metrics]
225 changes: 225 additions & 0 deletions integration/e2e/metrics_generator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package e2e

import (
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"strings"
"testing"
"time"

"github.com/grafana/e2e"
thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

util "github.com/grafana/tempo/integration"
)

const (
configMetricsGenerator = "config-metrics-generator.yaml"
prometheusImage = "prom/prometheus:latest"
)

func TestMetricsGenerator(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

require.NoError(t, util.CopyFileToSharedDir(s, configMetricsGenerator, "config.yaml"))
tempoDistributor := util.NewTempoDistributor()
tempoIngester := util.NewTempoIngester(1)
tempoMetricsGenerator := util.NewTempoMetricsGenerator()
prometheus := newPrometheus()
require.NoError(t, s.StartAndWaitReady(tempoDistributor, tempoIngester, tempoMetricsGenerator, prometheus))

// Wait until ingester and metrics-generator are active
isServiceActiveMatcher := func(service string) []*labels.Matcher {
return []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "name", service),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"),
}
}
require.NoError(t, tempoDistributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{`cortex_ring_members`}, e2e.WithLabelMatchers(isServiceActiveMatcher("ingester")...), e2e.WaitMissingMetrics))
require.NoError(t, tempoDistributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{`cortex_ring_members`}, e2e.WithLabelMatchers(isServiceActiveMatcher("metrics-generator")...), e2e.WaitMissingMetrics))

// Get port for the Jaeger gRPC receiver endpoint
c, err := util.NewJaegerGRPCClient(tempoDistributor.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, c)

// Send two spans that have a client-server relationship
r := rand.New(rand.NewSource(time.Now().UnixMilli()))
traceIDLow := r.Int63()
traceIDHigh := r.Int63()
parentSpanID := r.Int63()

err = c.EmitBatch(context.Background(), &thrift.Batch{
Process: &thrift.Process{ServiceName: "lb"},
Spans: []*thrift.Span{
{
TraceIdLow: traceIDLow,
TraceIdHigh: traceIDHigh,
SpanId: parentSpanID,
ParentSpanId: 0,
OperationName: "lb-get",
StartTime: time.Now().UnixMicro(),
Duration: int64(2 * time.Second / time.Microsecond),
Tags: []*thrift.Tag{{Key: "span.kind", VStr: stringPtr("client")}},
},
},
})
require.NoError(t, err)

err = c.EmitBatch(context.Background(), &thrift.Batch{
Process: &thrift.Process{ServiceName: "app"},
Spans: []*thrift.Span{
{
TraceIdLow: traceIDLow,
TraceIdHigh: traceIDHigh,
SpanId: r.Int63(),
ParentSpanId: parentSpanID,
OperationName: "app-handle",
StartTime: time.Now().UnixMicro(),
Duration: int64(1 * time.Second / time.Microsecond),
Tags: []*thrift.Tag{{Key: "span.kind", VStr: stringPtr("server")}},
},
},
})
require.NoError(t, err)

// Fetch metrics from Prometheus once they are received
var metricFamilies map[string]*io_prometheus_client.MetricFamily
for {
metricFamilies, err = extractMetricsFromPrometheus(prometheus, `{__name__=~"traces_.+"}`)
require.NoError(t, err)
if len(metricFamilies) > 0 {
break
}
time.Sleep(time.Second)
}

// Print collected metrics for easier debugging
fmt.Println()
for key, family := range metricFamilies {
fmt.Println(key)
for _, metric := range family.Metric {
fmt.Println(metric)
}
}
fmt.Println()

// Service graphs
lbls := []string{"client", "lb", "server", "app"}
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_service_graph_request_total", lbls))

assert.Equal(t, 0.0, sumValues(metricFamilies, "traces_service_graph_request_client_seconds_bucket", append(lbls, "le", "1")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_service_graph_request_client_seconds_bucket", append(lbls, "le", "2")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_service_graph_request_client_seconds_bucket", append(lbls, "le", "+Inf")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_service_graph_request_client_seconds_count", lbls))
assert.Equal(t, 2.0, sumValues(metricFamilies, "traces_service_graph_request_client_seconds_sum", lbls))

assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_service_graph_request_server_seconds_bucket", append(lbls, "le", "1")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_service_graph_request_server_seconds_bucket", append(lbls, "le", "2")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_service_graph_request_server_seconds_bucket", append(lbls, "le", "+Inf")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_service_graph_request_server_seconds_count", lbls))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_service_graph_request_server_seconds_sum", lbls))

assert.Equal(t, 0.0, sumValues(metricFamilies, "traces_service_graph_request_failed_total", nil))
assert.Equal(t, 0.0, sumValues(metricFamilies, "traces_service_graph_unpaired_spans_total", nil))
assert.Equal(t, 0.0, sumValues(metricFamilies, "traces_service_graph_dropped_spans_total", nil))

// Span metrics
lbls = []string{"service", "lb", "span_name", "lb-get", "span_kind", "SPAN_KIND_CLIENT", "span_status", "STATUS_CODE_UNSET"}
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_calls_total", lbls))
assert.Equal(t, 0.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_bucket", append(lbls, "le", "1")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_bucket", append(lbls, "le", "2")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_bucket", append(lbls, "le", "+Inf")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_count", lbls))
assert.Equal(t, 2.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_sum", lbls))

lbls = []string{"service", "app", "span_name", "app-handle", "span_kind", "SPAN_KIND_SERVER", "span_status", "STATUS_CODE_UNSET"}
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_calls_total", lbls))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_bucket", append(lbls, "le", "1")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_bucket", append(lbls, "le", "2")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_bucket", append(lbls, "le", "+Inf")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_count", lbls))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_sum", lbls))
}

func newPrometheus() *e2e.HTTPService {
return e2e.NewHTTPService(
"prometheus",
prometheusImage,
e2e.NewCommandWithoutEntrypoint("/bin/prometheus", "--config.file=/etc/prometheus/prometheus.yml", "--web.enable-remote-write-receiver"),
e2e.NewHTTPReadinessProbe(9090, "/-/ready", 200, 299),
9090,
)
}

// extractMetricsFromPrometheus extracts metrics stored in Prometheus using the /federate endpoint.
func extractMetricsFromPrometheus(prometheus *e2e.HTTPService, matcher string) (map[string]*io_prometheus_client.MetricFamily, error) {
url := fmt.Sprintf("http://%s/federate?match[]=%s", prometheus.HTTPEndpoint(), url.QueryEscape(matcher))

res, err := http.Get(url)
if err != nil {
return nil, err
}
defer res.Body.Close()

if res.StatusCode < 200 || res.StatusCode >= 300 {
return nil, fmt.Errorf("unexpected status code %d while fetching federate metrics", res.StatusCode)
}

body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}

var tp expfmt.TextParser
return tp.TextToMetricFamilies(strings.NewReader(string(body)))
}

// sumValues calculates the sum of all metrics in the metricFamily that contain the given labels.
// filterLabels must be key-value pairs.
func sumValues(metricFamily map[string]*io_prometheus_client.MetricFamily, metric string, filterLabels []string) float64 {
if len(filterLabels)%2 != 0 {
panic(fmt.Sprintf("filterLabels must be pairs: %v", filterLabels))
}
filterLabelsMap := map[string]string{}
for i := 0; i < len(filterLabels); i += 2 {
filterLabelsMap[filterLabels[i]] = filterLabels[i+1]
}

sum := 0.0

outer:
for _, metric := range metricFamily[metric].GetMetric() {
labelMap := map[string]string{}
for _, label := range metric.GetLabel() {
labelMap[label.GetName()] = label.GetValue()
}

for key, expectedValue := range filterLabelsMap {
value, ok := labelMap[key]
if !ok || value != expectedValue {
continue outer
}
}

// since we fetch metrics using /federate they are all untyped
sum += metric.GetUntyped().GetValue()
}

return sum
}

func stringPtr(s string) *string {
return &s
}
17 changes: 17 additions & 0 deletions integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,23 @@ func NewTempoIngester(replica int) *e2e.HTTPService {
return s
}

func NewTempoMetricsGenerator() *e2e.HTTPService {
args := []string{"-config.file=" + filepath.Join(e2e.ContainerSharedDir, "config.yaml"), "-target=metrics-generator"}
args = buildArgsWithExtra(args)

s := e2e.NewHTTPService(
"metrics-generator",
image,
e2e.NewCommandWithoutEntrypoint("/tempo", args...),
e2e.NewHTTPReadinessProbe(3200, "/ready", 200, 299),
3200,
)

s.SetBackoff(TempoBackoff())

return s
}

func NewTempoQueryFrontend() *e2e.HTTPService {
args := []string{"-config.file=" + filepath.Join(e2e.ContainerSharedDir, "config.yaml"), "-target=query-frontend"}
args = buildArgsWithExtra(args)
Expand Down
6 changes: 3 additions & 3 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/grafana/tempo/modules/generator/processor/servicegraphs"
"github.com/grafana/tempo/modules/generator/processor/spanmetrics"
"github.com/grafana/tempo/modules/generator/remotewrite"
"github.com/grafana/tempo/modules/generator/storage"
)

const (
Expand Down Expand Up @@ -34,7 +34,7 @@ type Config struct {
// instances of the metrics-generator as each instance will push the same time series.
AddInstanceIDLabel bool `yaml:"add_instance_id_label"`

RemoteWrite remotewrite.Config `yaml:"remote_write,omitempty"`
Storage storage.Config `yaml:"storage"`
}

// RegisterFlagsAndApplyDefaults registers the flags.
Expand All @@ -45,7 +45,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.CollectionInterval = 15 * time.Second
cfg.AddInstanceIDLabel = true

cfg.RemoteWrite.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.Storage.RegisterFlagsAndApplyDefaults(prefix, f)
}

type ProcessorConfig struct {
Expand Down
Loading