Skip to content

Commit

Permalink
Merge pull request #859 from MichaelMure/fix-tracing
Browse files Browse the repository at this point in the history
tracing: fix DHT keys as string attribute not being valid utf-8
  • Loading branch information
Jorropo committed Jul 28, 2023
2 parents ee95d1a + 43b0581 commit fa4953c
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 8 deletions.
4 changes: 2 additions & 2 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func workers(numWorkers int, fn func(interface{}), inputs <-chan interface{}) {
}

func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
_, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key)))
_, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
defer span.End()

kbID := kb.ConvertKey(key)
Expand Down Expand Up @@ -554,7 +554,7 @@ func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Opt

// SearchValue searches for the value corresponding to given Key and streams the results.
func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(attribute.String("Key", key)))
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
var good bool
defer func() {
if !good {
Expand Down
19 changes: 19 additions & 0 deletions internal/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,30 @@ package internal
import (
"context"
"fmt"
"unicode/utf8"

"github.com/multiformats/go-multibase"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return otel.Tracer("go-libp2p-kad-dht").Start(ctx, fmt.Sprintf("KademliaDHT.%s", name), opts...)
}

// KeyAsAttribute format a DHT key into a suitable tracing attribute.
// DHT keys can be either valid utf-8 or binary, when they are derived from, for example, a multihash.
// Tracing (and notably OpenTelemetry+grpc exporter) requires valid utf-8 for string attributes.
func KeyAsAttribute(name string, key string) attribute.KeyValue {
b := []byte(key)
if utf8.Valid(b) {
return attribute.String(name, key)
}
encoded, err := multibase.Encode(multibase.Base58BTC, b)
if err != nil {
// should be unreachable
panic(err)
}
return attribute.String(name, encoded)
}
2 changes: 1 addition & 1 deletion lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key)))
ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
defer span.End()

if key == "" {
Expand Down
2 changes: 1 addition & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type lookupWithFollowupResult struct {
// After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the
// lookup that have not already been successfully queried.
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.RunLookupWithFollowup", trace.WithAttributes(attribute.String("Target", target)))
ctx, span := internal.StartSpan(ctx, "IpfsDHT.RunLookupWithFollowup", trace.WithAttributes(internal.KeyAsAttribute("Target", target)))
defer span.End()

// run the query
Expand Down
6 changes: 3 additions & 3 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.PutValue", trace.WithAttributes(attribute.String("Key", key)))
ctx, span := internal.StartSpan(ctx, "IpfsDHT.PutValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
defer span.End()

if !dht.enableValues {
Expand Down Expand Up @@ -109,7 +109,7 @@ type recvdVal struct {

// GetValue searches for the value corresponding to given Key.
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetValue", trace.WithAttributes(attribute.String("Key", key)))
ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
defer span.End()

if !dht.enableValues {
Expand Down Expand Up @@ -146,7 +146,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op

// SearchValue searches for the value corresponding to given Key and streams the results.
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue", trace.WithAttributes(attribute.String("Key", key)))
ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
var good bool
defer func() {
if !good {
Expand Down
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "v0.24.2"
"version": "v0.24.3"
}

0 comments on commit fa4953c

Please sign in to comment.