Skip to content

Commit

Permalink
Reuse dimensions and prefixes for this use-case
Browse files Browse the repository at this point in the history
  • Loading branch information
t00mas committed Apr 12, 2024
1 parent 6600e48 commit cc7f069
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 71 deletions.
96 changes: 49 additions & 47 deletions modules/generator/processor/servicegraphs/servicegraphs.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,29 @@ type Processor struct {

func New(cfg Config, tenant string, registry registry.Registry, logger log.Logger) gen.Processor {
labels := []string{"client", "server", "connection_type"}

if cfg.EnableExtraUninstrumentedServicesLabels {
cfg.Dimensions = append(cfg.Dimensions, "virtual_node", "db.system", "messaging.system")
}

for _, d := range cfg.Dimensions {
if cfg.EnableClientServerPrefix {
if cfg.EnableExtraUninstrumentedServicesLabels {
// leave the extra label for this feature as-is
if d == "virtual_node" {
labels = append(labels, strutil.SanitizeLabelName(d))
continue
}
}
labels = append(labels, strutil.SanitizeLabelName("client_"+d), strutil.SanitizeLabelName("server_"+d))
} else {
if cfg.EnableExtraUninstrumentedServicesLabels {
// prefix the extra labels for this feature anyway
if d == "db.system" || d == "messaging.system" {
labels = append(labels, strutil.SanitizeLabelName("client_"+d), strutil.SanitizeLabelName("server_"+d))
continue
}
}
labels = append(labels, strutil.SanitizeLabelName(d))
}
}
Expand Down Expand Up @@ -189,16 +208,14 @@ func (p *Processor) consume(resourceSpans []*v1_trace.ResourceSpans) (err error)
e.ConnectionType = store.Database
e.ServerService = dbName
e.ServerLatencySec = spanDurationSec(span)
if p.Cfg.EnableExtraUninstrumentedServicesLabels {
e.Dimensions["server_db_system"] = e.ServerService
}
}

if p.Cfg.EnableExtraUninstrumentedServicesLabels {
switch e.ConnectionType {
case store.Database:
e.ServerDbSystem = e.ServerService
case store.MessagingSystem:
if messagingSystem, ok := processor_util.FindAttributeValue("messaging.system", rs.Resource.Attributes, span.Attributes); ok {
e.ServerMessagingSystem = messagingSystem
}
if messagingSystem, ok := processor_util.FindAttributeValue("messaging.system", rs.Resource.Attributes, span.Attributes); ok {
if p.Cfg.EnableExtraUninstrumentedServicesLabels {
e.Dimensions["server_messaging_system"] = messagingSystem
}
}
})
Expand All @@ -219,12 +236,9 @@ func (p *Processor) consume(resourceSpans []*v1_trace.ResourceSpans) (err error)
e.SpanMultiplier = spanMultiplier
p.upsertPeerNode(e, span.Attributes)

if p.Cfg.EnableExtraUninstrumentedServicesLabels {
switch e.ConnectionType {
case store.MessagingSystem:
if messagingSystem, ok := processor_util.FindAttributeValue("messaging.system", rs.Resource.Attributes, span.Attributes); ok {
e.ClientMessagingSystem = messagingSystem
}
if messagingSystem, ok := processor_util.FindAttributeValue("messaging.system", rs.Resource.Attributes, span.Attributes); ok {
if p.Cfg.EnableExtraUninstrumentedServicesLabels {
e.Dimensions["client_messaging_system"] = messagingSystem
}
}
})
Expand Down Expand Up @@ -286,45 +300,33 @@ func (p *Processor) Shutdown(_ context.Context) {
}

func (p *Processor) onComplete(e *store.Edge) {
var labelValues []string
if p.Cfg.EnableExtraUninstrumentedServicesLabels {
labelValues = make([]string, 0, 7+len(p.Cfg.Dimensions))
} else {
labelValues = make([]string, 0, 2+len(p.Cfg.Dimensions))
}
labelValues := make([]string, 0, 2+len(p.Cfg.Dimensions))
labelValues = append(labelValues, e.ClientService, e.ServerService, string(e.ConnectionType))

for _, dimension := range p.Cfg.Dimensions {
for _, d := range p.Cfg.Dimensions {
if p.Cfg.EnableClientServerPrefix {
labelValues = append(labelValues, e.Dimensions["client_"+dimension], e.Dimensions["server_"+dimension])
if p.Cfg.EnableExtraUninstrumentedServicesLabels {
// leave the extra label for this feature as-is
if d == "virtual_node" {
labelValues = append(labelValues, e.Dimensions[d])
continue
}
}
labelValues = append(labelValues, e.Dimensions["client_"+d], e.Dimensions["server_"+d])
} else {
labelValues = append(labelValues, e.Dimensions[dimension])
if p.Cfg.EnableExtraUninstrumentedServicesLabels {
// prefix the extra labels for this feature anyway
if d == "db.system" || d == "messaging.system" {
clientDimLabel, serverDimLabel := strutil.SanitizeLabelName("client_"+d), strutil.SanitizeLabelName("server_"+d)
labelValues = append(labelValues, e.Dimensions[clientDimLabel], e.Dimensions[serverDimLabel])
continue
}
}
labelValues = append(labelValues, e.Dimensions[d])
}
}

labels := append([]string{}, p.labels...)
if p.Cfg.EnableExtraUninstrumentedServicesLabels {
if e.VirtualNode != "" {
labelValues = append(labelValues, e.VirtualNode)
labels = append(p.labels, "virtual_node")
}
if e.ClientDbSystem != "" {
labelValues = append(labelValues, e.ClientDbSystem)
labels = append(p.labels, "client_db_system")
}
if e.ClientMessagingSystem != "" {
labelValues = append(labelValues, e.ClientMessagingSystem)
labels = append(p.labels, "client_messaging_system")
}
if e.ServerDbSystem != "" {
labelValues = append(labelValues, e.ServerDbSystem)
labels = append(p.labels, "server_db_system")
}
if e.ServerMessagingSystem != "" {
labelValues = append(labelValues, e.ServerMessagingSystem)
labels = append(p.labels, "server_messaging_system")
}
}

registryLabelValues := p.registry.NewLabelValueCombo(labels, labelValues)

Expand Down Expand Up @@ -352,7 +354,7 @@ func (p *Processor) onExpire(e *store.Edge) {
e.ClientService = "user"

if p.Cfg.EnableExtraUninstrumentedServicesLabels {
e.VirtualNode = "client"
e.Dimensions["virtual_node"] = "client"
}

p.onComplete(e)
Expand All @@ -363,7 +365,7 @@ func (p *Processor) onExpire(e *store.Edge) {
e.ServerService = e.PeerNode

if p.Cfg.EnableExtraUninstrumentedServicesLabels {
e.VirtualNode = "server"
e.Dimensions["virtual_node"] = "server"
}

p.onComplete(e)
Expand Down
100 changes: 81 additions & 19 deletions modules/generator/processor/servicegraphs/servicegraphs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package servicegraphs
import (
"context"
"errors"
"fmt"
"math"
"os"
"strconv"
Expand Down Expand Up @@ -59,8 +58,6 @@ func TestServiceGraphs(t *testing.T) {
"god": "",
})

fmt.Println(testRegistry)

// counters
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, requesterToServerLabels))
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, requesterToServerLabels))
Expand Down Expand Up @@ -131,8 +128,6 @@ func TestServiceGraphs_prefixDimensions(t *testing.T) {
"server_god": "zeus",
})

fmt.Println(testRegistry)

// counters
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, requesterToServerLabels))
}
Expand Down Expand Up @@ -162,8 +157,6 @@ func TestServiceGraphs_failedRequests(t *testing.T) {
"connection_type": "database",
})

fmt.Println(testRegistry)

// counters
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, requesterToServerLabels))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_failed_total`, requesterToServerLabels))
Expand Down Expand Up @@ -248,17 +241,25 @@ func TestServiceGraphs_virtualNodesExtraLabelsForUninstrumentedServices(t *testi
p.(*Processor).store.Expire()

userToServerLabels := labels.FromMap(map[string]string{
"client": "user",
"server": "mythical-server",
"connection_type": "virtual_node",
"virtual_node": "client",
"client": "user",
"server": "mythical-server",
"connection_type": "virtual_node",
"virtual_node": "client",
"client_db_system": "",
"client_messaging_system": "",
"server_db_system": "",
"server_messaging_system": "",
})

clientToVirtualPeerLabels := labels.FromMap(map[string]string{
"client": "mythical-requester",
"server": "external-payments-platform",
"connection_type": "virtual_node",
"virtual_node": "server",
"client": "mythical-requester",
"server": "external-payments-platform",
"connection_type": "virtual_node",
"virtual_node": "server",
"client_db_system": "",
"client_messaging_system": "",
"server_db_system": "",
"server_messaging_system": "",
})

// counters
Expand Down Expand Up @@ -286,16 +287,24 @@ func TestServiceGraphs_enableExtraLabelsForUninstrumentedServicesWithQueueAndDat
p.PushSpans(context.Background(), request)

serverDbSystemLabels := labels.FromMap(map[string]string{
"client": "mythical-server",
"connection_type": "database",
"server": "postgres",
"server_db_system": "postgres",
"client": "mythical-server",
"connection_type": "database",
"server": "postgres",
"virtual_node": "",
"client_db_system": "",
"client_messaging_system": "",
"server_db_system": "postgres",
"server_messaging_system": "",
})

serverMsgSystemLabels := labels.FromMap(map[string]string{
"client": "mythical-requester",
"connection_type": "messaging_system",
"server": "mythical-recorder",
"virtual_node": "",
"client_db_system": "",
"client_messaging_system": "rabbitmq",
"server_db_system": "",
"server_messaging_system": "rabbitmq",
})

Expand All @@ -307,6 +316,59 @@ func TestServiceGraphs_enableExtraLabelsForUninstrumentedServicesWithQueueAndDat
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, serverMsgSystemLabels))
}

func TestServiceGraphs_prefixDimensionsAndEnableExtraLabels(t *testing.T) {
testRegistry := registry.NewTestRegistry()

cfg := Config{}
cfg.RegisterFlagsAndApplyDefaults("", nil)

cfg.HistogramBuckets = []float64{0.04}
cfg.Dimensions = []string{"net.peer.port"}
cfg.EnableClientServerPrefix = true
cfg.EnableExtraUninstrumentedServicesLabels = true

p := New(cfg, "test", testRegistry, log.NewNopLogger())
defer p.Shutdown(context.Background())

request, err := loadTestData("testdata/trace-with-queue-database.json")
require.NoError(t, err)

p.PushSpans(context.Background(), request)

messagingSystemLabels := labels.FromMap(map[string]string{
"client": "mythical-requester",
"client_db_system": "",
"client_messaging_system": "rabbitmq",
"client_net_peer_port": "5672",
"connection_type": "messaging_system",
"server_db_system": "",
"server_messaging_system": "rabbitmq",
"server_net_peer_port": "5672",
"server": "mythical-recorder",
"virtual_node": "",
})

dbSystemSystemLabels := labels.FromMap(map[string]string{
"client": "mythical-server",
"client_db_system": "postgresql",
"client_messaging_system": "",
"client_net_peer_port": "5432",
"connection_type": "database",
"server_db_system": "",
"server_messaging_system": "",
"server_net_peer_port": "",
"server": "postgres",
"virtual_node": "",
})

// counters
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, messagingSystemLabels))
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, messagingSystemLabels))

assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, dbSystemSystemLabels))
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, dbSystemSystemLabels))
}

func loadTestData(path string) (*tempopb.PushSpansRequest, error) {
f, err := os.Open(path)
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions modules/generator/processor/servicegraphs/store/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ type Edge struct {
// Additional dimension to add to the metrics
Dimensions map[string]string

// Additional labels for uninstrumented services
VirtualNode string // Indicates which service is the virtual node (client or server)
ClientDbSystem, ClientMessagingSystem string // If the client is a virtual node, indicates the client's db or messaging system
ServerDbSystem, ServerMessagingSystem string // If the server is a virtual node, indicates the server's db or messaging system

// PeerNode is the attribute that will be used to create a peer edge
PeerNode string

Expand Down

0 comments on commit cc7f069

Please sign in to comment.