diff --git a/fullrt/dht.go b/fullrt/dht.go index 3b0cd3e9..6cdde4cb 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -425,7 +425,7 @@ func workers(numWorkers int, fn func(interface{}), inputs <-chan interface{}) { } func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { - _, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key))) + _, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) defer span.End() kbID := kb.ConvertKey(key) @@ -554,7 +554,7 @@ func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Opt // SearchValue searches for the value corresponding to given Key and streams the results. func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { - ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(attribute.String("Key", key))) + ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) var good bool defer func() { if !good { diff --git a/internal/tracing.go b/internal/tracing.go index 2a2f1864..6b707f9c 100644 --- a/internal/tracing.go +++ b/internal/tracing.go @@ -3,11 +3,30 @@ package internal import ( "context" "fmt" + "unicode/utf8" + "github.com/multiformats/go-multibase" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { return otel.Tracer("go-libp2p-kad-dht").Start(ctx, fmt.Sprintf("KademliaDHT.%s", name), opts...) } + +// KeyAsAttribute format a DHT key into a suitable tracing attribute. +// DHT keys can be either valid utf-8 or binary, when they are derived from, for example, a multihash. +// Tracing (and notably OpenTelemetry+grpc exporter) requires valid utf-8 for string attributes. +func KeyAsAttribute(name string, key string) attribute.KeyValue { + b := []byte(key) + if utf8.Valid(b) { + return attribute.String(name, key) + } + encoded, err := multibase.Encode(multibase.Base58BTC, b) + if err != nil { + // should be unreachable + panic(err) + } + return attribute.String(name, encoded) +} diff --git a/lookup.go b/lookup.go index 563ed6be..86877781 100644 --- a/lookup.go +++ b/lookup.go @@ -22,7 +22,7 @@ import ( // If the context is canceled, this function will return the context error along // with the closest K peers it has found so far. func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key))) + ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) defer span.End() if key == "" { diff --git a/query.go b/query.go index 524269ae..7c01a2af 100644 --- a/query.go +++ b/query.go @@ -81,7 +81,7 @@ type lookupWithFollowupResult struct { // After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the // lookup that have not already been successfully queried. func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.RunLookupWithFollowup", trace.WithAttributes(attribute.String("Target", target))) + ctx, span := internal.StartSpan(ctx, "IpfsDHT.RunLookupWithFollowup", trace.WithAttributes(internal.KeyAsAttribute("Target", target))) defer span.End() // run the query diff --git a/routing.go b/routing.go index 20490041..26c07976 100644 --- a/routing.go +++ b/routing.go @@ -34,7 +34,7 @@ import ( // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.PutValue", trace.WithAttributes(attribute.String("Key", key))) + ctx, span := internal.StartSpan(ctx, "IpfsDHT.PutValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) defer span.End() if !dht.enableValues { @@ -109,7 +109,7 @@ type recvdVal struct { // GetValue searches for the value corresponding to given Key. func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetValue", trace.WithAttributes(attribute.String("Key", key))) + ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) defer span.End() if !dht.enableValues { @@ -146,7 +146,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op // SearchValue searches for the value corresponding to given Key and streams the results. func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue", trace.WithAttributes(attribute.String("Key", key))) + ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) var good bool defer func() { if !good { diff --git a/version.json b/version.json index 48cf18fa..336b08fb 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.24.2" + "version": "v0.24.3" }