Skip to content

Commit

Permalink
Surface new labels for uninstrumented services and systems (#3543)
Browse files Browse the repository at this point in the history
* Surface new labels for uninstrumented services and systems

* Update CHANGELOG.md

* remove unnecessary Println in test

* Reuse dimensions and prefixes for this use-case

* Add docs

* keep only virtual_node behind the new feature

* add overrides

* Update docs/sources/tempo/metrics-generator/service_graphs/_index.md

Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com>

* forgot this test (fixed)

* add benchmarks

* add edge pooling/reuse

* update flag

* move label out of dimensions to avoid prefix logic

* lint

* Update modules/generator/processor/servicegraphs/config.go

Co-authored-by: Mario <mariorvinas@gmail.com>

* minor amends to names and docs

* leave the new virtual_node label as an extra dimension

* keep edge sync.Pool ops inside store

* Update modules/generator/processor/servicegraphs/store/store.go

The edge is not expired here, so it shouldn't be returned to the pool.

Co-authored-by: Mario <mariorvinas@gmail.com>

* leave the new label un-prefixed

---------

Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com>
Co-authored-by: Mario <mariorvinas@gmail.com>
  • Loading branch information
3 people authored Apr 30, 2024
1 parent 11e315c commit 251bf5a
Show file tree
Hide file tree
Showing 19 changed files with 250 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [ENHANCEMENT] Surface new labels for uninstrumented services and systems [#3543](https://github.com/grafana/tempo/pull/3543) (@t00mas)
* [FEATURE] Add TLS support for Memcached Client [#3585](https://github.com/grafana/tempo/pull/3585) (@sonisr)
* [ENHANCEMENT] Add querier metrics for requests executed [#3524](https://github.com/grafana/tempo/pull/3524) (@electron0zero)
* [FEATURE] Added gRPC streaming endpoints for Tempo APIs.
Expand Down
3 changes: 3 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ metrics_generator:
# Attribute Key to multiply span metrics
[span_multiplier_key: <string> | default = ""]
# Enables additional labels for services and virtual nodes.
[enable_virtual_node_label: <bool> | default = false]
span_metrics:
# Buckets for the latency histogram in seconds.
Expand Down
12 changes: 10 additions & 2 deletions docs/sources/tempo/metrics-generator/service_graphs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,18 @@ The following metrics are exported:

Duration is measured both from the client and the server sides.

Possible values for `connection_type`: unset, `messaging_system`, or `database`.
Possible values for `connection_type`: unset, `virtual_node`, `messaging_system`, or `database`.

Additional labels can be included using the `dimensions` configuration option.
Additional labels can be included using the `dimensions` configuration option, or the `enable_virtual_node_label` option.

Since the service graph processor has to process both sides of an edge,
it needs to process all spans of a trace to function properly.
If spans of a trace are spread out over multiple instances, spans are not paired up reliably.

#### Activate `enable_virtual_node_label`

Activating this feature adds the following label and corresponding values:

| Label | Possible Values | Description |
|-------------------------|-----------------------------|--------------------------------------------------------------------------|
| virtual_node | `unset`, `client`, `server` | Explicitly indicates the side that is uninstrumented |
2 changes: 2 additions & 0 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,7 @@ func (cfg *ProcessorConfig) copyWithOverrides(o metricsGeneratorOverrides, userI

copyCfg.ServiceGraphs.EnableClientServerPrefix = o.MetricsGeneratorProcessorServiceGraphsEnableClientServerPrefix(userID)

copyCfg.ServiceGraphs.EnableVirtualNodeLabel = o.MetricsGeneratorProcessorServiceGraphsEnableVirtualNodeLabel(userID)

return copyCfg, nil
}
1 change: 1 addition & 0 deletions modules/generator/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type metricsGeneratorOverrides interface {
MetricsGeneratorProcessorSpanMetricsDimensionMappings(userID string) []sharedconfig.DimensionMappings
MetricsGeneratorProcessorSpanMetricsEnableTargetInfo(userID string) bool
MetricsGeneratorProcessorServiceGraphsEnableClientServerPrefix(userID string) bool
MetricsGeneratorProcessorServiceGraphsEnableVirtualNodeLabel(userID string) bool
MetricsGeneratorProcessorSpanMetricsTargetInfoExcludedDimensions(userID string) []string
DedicatedColumns(userID string) backend.DedicatedColumns
MaxBytesPerTrace(userID string) int
Expand Down
5 changes: 5 additions & 0 deletions modules/generator/overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type mockOverrides struct {
serviceGraphsDimensions []string
serviceGraphsPeerAttributes []string
serviceGraphsEnableClientServerPrefix bool
serviceGraphsEnableVirtualNodeLabel bool
spanMetricsHistogramBuckets []float64
spanMetricsDimensions []string
spanMetricsIntrinsicDimensions map[string]bool
Expand Down Expand Up @@ -128,6 +129,10 @@ func (m *mockOverrides) MetricsGeneratorProcessorServiceGraphsEnableClientServer
return m.serviceGraphsEnableClientServerPrefix
}

func (m *mockOverrides) MetricsGeneratorProcessorServiceGraphsEnableVirtualNodeLabel(string) bool {
return m.serviceGraphsEnableVirtualNodeLabel
}

func (m *mockOverrides) MetricsGeneratorProcessorSpanMetricsTargetInfoExcludedDimensions(string) []string {
return m.spanMetricsTargetInfoExcludedDimensions
}
Expand Down
3 changes: 3 additions & 0 deletions modules/generator/processor/servicegraphs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type Config struct {

// If enabled attribute value will be used for metric calculation
SpanMultiplierKey string `yaml:"span_multiplier_key"`

// EnableVirtualNodeLabel enables additional labels for uninstrumented services
EnableVirtualNodeLabel bool `yaml:"enable_virtual_node_label"`
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
Expand Down
35 changes: 34 additions & 1 deletion modules/generator/processor/servicegraphs/servicegraphs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
metricRequestClientSeconds = "traces_service_graph_request_client_seconds"
)

const virtualNodeLabel = "virtual_node"

var defaultPeerAttributes = []attribute.Key{
semconv.PeerServiceKey, semconv.DBNameKey, semconv.DBSystemKey,
}
Expand Down Expand Up @@ -86,8 +88,20 @@ type Processor struct {

func New(cfg Config, tenant string, registry registry.Registry, logger log.Logger) gen.Processor {
labels := []string{"client", "server", "connection_type"}

if cfg.EnableVirtualNodeLabel {
cfg.Dimensions = append(cfg.Dimensions, virtualNodeLabel)
}

for _, d := range cfg.Dimensions {
if cfg.EnableClientServerPrefix {
if cfg.EnableVirtualNodeLabel {
// leave the extra label for this feature as-is
if d == virtualNodeLabel {
labels = append(labels, strutil.SanitizeLabelName(d))
continue
}
}
labels = append(labels, strutil.SanitizeLabelName("client_"+d), strutil.SanitizeLabelName("server_"+d))
} else {
labels = append(labels, strutil.SanitizeLabelName(d))
Expand Down Expand Up @@ -271,13 +285,22 @@ func (p *Processor) onComplete(e *store.Edge) {

for _, dimension := range p.Cfg.Dimensions {
if p.Cfg.EnableClientServerPrefix {
if p.Cfg.EnableVirtualNodeLabel {
// leave the extra label for this feature as-is
if dimension == virtualNodeLabel {
labelValues = append(labelValues, e.Dimensions[dimension])
continue
}
}
labelValues = append(labelValues, e.Dimensions["client_"+dimension], e.Dimensions["server_"+dimension])
} else {
labelValues = append(labelValues, e.Dimensions[dimension])
}
}

registryLabelValues := p.registry.NewLabelValueCombo(p.labels, labelValues)
labels := append([]string{}, p.labels...)

registryLabelValues := p.registry.NewLabelValueCombo(labels, labelValues)

p.serviceGraphRequestTotal.Inc(registryLabelValues, 1*e.SpanMultiplier)
if e.Failed {
Expand All @@ -301,12 +324,22 @@ func (p *Processor) onExpire(e *store.Edge) {
// We check if the span we have is the root span, and if so, we set the client service to "user".
if _, parentSpan := parseKey(e.Key()); len(parentSpan) == 0 {
e.ClientService = "user"

if p.Cfg.EnableVirtualNodeLabel {
e.Dimensions[virtualNodeLabel] = "client"
}

p.onComplete(e)
}
} else if len(e.ServerService) == 0 && len(e.PeerNode) > 0 {
// If client span does not have its matching server span, but has a peer attribute present,
// we make the assumption that a call was made to an external service, for which Tempo won't receive spans.
e.ServerService = e.PeerNode

if p.Cfg.EnableVirtualNodeLabel {
e.Dimensions[virtualNodeLabel] = "server"
}

p.onComplete(e)
}
}
Expand Down
117 changes: 110 additions & 7 deletions modules/generator/processor/servicegraphs/servicegraphs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package servicegraphs
import (
"context"
"errors"
"fmt"
"math"
"os"
"strconv"
Expand Down Expand Up @@ -59,8 +58,6 @@ func TestServiceGraphs(t *testing.T) {
"god": "",
})

fmt.Println(testRegistry)

// counters
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, requesterToServerLabels))
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, requesterToServerLabels))
Expand Down Expand Up @@ -131,8 +128,6 @@ func TestServiceGraphs_prefixDimensions(t *testing.T) {
"server_god": "zeus",
})

fmt.Println(testRegistry)

// counters
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, requesterToServerLabels))
}
Expand Down Expand Up @@ -162,8 +157,6 @@ func TestServiceGraphs_failedRequests(t *testing.T) {
"connection_type": "database",
})

fmt.Println(testRegistry)

// counters
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, requesterToServerLabels))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_failed_total`, requesterToServerLabels))
Expand Down Expand Up @@ -228,6 +221,116 @@ func TestServiceGraphs_virtualNodes(t *testing.T) {
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, clientToVirtualPeerLabels))
}

func TestServiceGraphs_virtualNodesExtraLabelsForUninstrumentedServices(t *testing.T) {
testRegistry := registry.NewTestRegistry()

cfg := Config{}
cfg.RegisterFlagsAndApplyDefaults("", nil)

cfg.EnableVirtualNodeLabel = true
cfg.Wait = time.Nanosecond

p := New(cfg, "test", testRegistry, log.NewNopLogger())
defer p.Shutdown(context.Background())

request, err := loadTestData("testdata/trace-with-virtual-nodes.json")
require.NoError(t, err)

p.PushSpans(context.Background(), request)

p.(*Processor).store.Expire()

userToServerLabels := labels.FromMap(map[string]string{
"client": "user",
"server": "mythical-server",
"connection_type": "virtual_node",
virtualNodeLabel: "client",
})

clientToVirtualPeerLabels := labels.FromMap(map[string]string{
"client": "mythical-requester",
"server": "external-payments-platform",
"connection_type": "virtual_node",
virtualNodeLabel: "server",
})

// counters
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, userToServerLabels))
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, userToServerLabels))

assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, clientToVirtualPeerLabels))
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, clientToVirtualPeerLabels))
}

func TestServiceGraphs_prefixDimensionsAndEnableExtraLabels(t *testing.T) {
testRegistry := registry.NewTestRegistry()

cfg := Config{}
cfg.RegisterFlagsAndApplyDefaults("", nil)

cfg.HistogramBuckets = []float64{0.04}
cfg.Dimensions = []string{"db.system", "messaging.system"}
cfg.EnableClientServerPrefix = true
cfg.EnableVirtualNodeLabel = true

p := New(cfg, "test", testRegistry, log.NewNopLogger())
defer p.Shutdown(context.Background())

request, err := loadTestData("testdata/trace-with-queue-database.json")
require.NoError(t, err)

p.PushSpans(context.Background(), request)

messagingSystemLabels := labels.FromMap(map[string]string{
"client": "mythical-requester",
"client_db_system": "",
"client_messaging_system": "rabbitmq",
"connection_type": "messaging_system",
"server_db_system": "",
"server_messaging_system": "rabbitmq",
"server": "mythical-recorder",
virtualNodeLabel: "",
})

dbSystemSystemLabels := labels.FromMap(map[string]string{
"client": "mythical-server",
"client_db_system": "postgresql",
"client_messaging_system": "",
"connection_type": "database",
"server_db_system": "",
"server_messaging_system": "",
"server": "postgres",
virtualNodeLabel: "",
})

// counters
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, messagingSystemLabels))
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, messagingSystemLabels))

assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, dbSystemSystemLabels))
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, dbSystemSystemLabels))
}

func BenchmarkServiceGraphs(b *testing.B) {
testRegistry := registry.NewTestRegistry()

cfg := Config{}
cfg.RegisterFlagsAndApplyDefaults("", nil)

cfg.HistogramBuckets = []float64{0.04}
cfg.Dimensions = []string{"beast", "god"}

p := New(cfg, "test", testRegistry, log.NewNopLogger())
defer p.Shutdown(context.Background())

request, err := loadTestData("testdata/trace-with-queue-database.json")
require.NoError(b, err)

for i := 0; i < b.N; i++ {
p.PushSpans(context.Background(), request)
}
}

func loadTestData(path string) (*tempopb.PushSpansRequest, error) {
f, err := os.Open(path)
if err != nil {
Expand Down
19 changes: 13 additions & 6 deletions modules/generator/processor/servicegraphs/store/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,19 @@ type Edge struct {
SpanMultiplier float64
}

func newEdge(key string, ttl time.Duration) *Edge {
return &Edge{
key: key,
Dimensions: make(map[string]string),
expiration: time.Now().Add(ttl).Unix(),
}
// resetEdge resets the Edge to its zero state.
// Useful for reusing an Edge without allocating a new one.
func resetEdge(e *Edge) {
e.TraceID = ""
e.ConnectionType = Unknown
e.ServerService = ""
e.ClientService = ""
e.ServerLatencySec = 0
e.ClientLatencySec = 0
e.Failed = false
clear(e.Dimensions)
e.PeerNode = ""
e.SpanMultiplier = 1
}

// isComplete returns true if the corresponding client and server
Expand Down
Loading

0 comments on commit 251bf5a

Please sign in to comment.