Skip to content

Commit

Permalink
feat: add basic tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert authored and Jorropo committed Mar 28, 2023
1 parent 471e058 commit b07488e
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 48 deletions.
17 changes: 15 additions & 2 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/libp2p/go-libp2p-kad-dht/internal"
dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
Expand Down Expand Up @@ -647,6 +649,9 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
//
// Do Nothing.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.PeerFound", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()

if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
}
Expand All @@ -656,14 +661,17 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
} else if b {
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-dht.ctx.Done():
case <-ctx.Done():
return
}
}
}

// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
_, span := internal.StartSpan(ctx, "IpfsDHT.PeerStoppedDHT", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()

logger.Debugw("peer stopped dht", "peer", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Expand All @@ -678,7 +686,10 @@ func (dht *IpfsDHT) fixRTIfNeeded() {
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
func (dht *IpfsDHT) FindLocal(ctx context.Context, id peer.ID) peer.AddrInfo {
_, span := internal.StartSpan(ctx, "IpfsDHT.FindLocal", trace.WithAttributes(attribute.Stringer("PeerID", id)))
defer span.End()

switch dht.host.Network().Connectedness(id) {
case network.Connected, network.CanConnect:
return dht.peerstore.PeerInfo(id)
Expand Down Expand Up @@ -827,6 +838,8 @@ func (dht *IpfsDHT) Host() host.Host {

// Ping sends a ping message to the passed peer and waits for a response.
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.Ping")
defer span.End()
return dht.protoMessenger.Ping(ctx, p)
}

Expand Down
40 changes: 28 additions & 12 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,10 @@ import (
"github.com/libp2p/go-libp2p-xor/kademlia"
kadkey "github.com/libp2p/go-libp2p-xor/key"
"github.com/libp2p/go-libp2p-xor/trie"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var Tracer = otel.Tracer("")

var logger = logging.Logger("fullrtdht")

const rtRefreshLimitsMsg = `Accelerated DHT client was unable to fully refresh its routing table due to Resource Manager limits, which may degrade content routing. Consider increasing resource limits. See debug logs for the "dht-crawler" subsystem for details.`
Expand Down Expand Up @@ -363,6 +360,9 @@ func (dht *FullRT) Bootstrap(ctx context.Context) error {

// CheckPeers return (success, total)
func (dht *FullRT) CheckPeers(ctx context.Context, peers ...peer.ID) (int, int) {
ctx, span := internal.StartSpan(ctx, "FullRT.CheckPeers", trace.WithAttributes(attribute.Int("NumPeers", len(peers))))
defer span.End()

var peerAddrs chan interface{}
var total int
if len(peers) == 0 {
Expand Down Expand Up @@ -420,8 +420,7 @@ func workers(numWorkers int, fn func(interface{}), inputs <-chan interface{}) {
}

func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
ctx, span := Tracer.Start(ctx, "GetClosestPeers")
_ = ctx // not used, but we want to assign it _just_ in case we use it.
_, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key)))
defer span.End()

kbID := kb.ConvertKey(key)
Expand Down Expand Up @@ -550,6 +549,9 @@ func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Opt

// SearchValue searches for the value corresponding to given Key and streams the results.
func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(attribute.String("Key", key)))
defer span.End()

if !dht.enableValues {
return nil, routing.ErrNotSupported
}
Expand All @@ -570,6 +572,9 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.
out := make(chan []byte)
go func() {
defer close(out)
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue.Worker")
defer span.End()

best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
if best == nil || aborted {
return
Expand All @@ -591,7 +596,7 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.
return
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
dht.updatePeerValues(ctx, key, best, updatePeers)
cancel()
}()
Expand Down Expand Up @@ -775,7 +780,7 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str

// Provide makes this node announce that it can provide a value for the given key
func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
ctx, span := Tracer.Start(ctx, "Provide")
ctx, span := internal.StartSpan(ctx, "FullRT.Provide", trace.WithAttributes(attribute.Stringer("Key", key), attribute.Bool("Broadcast", brdcst)))
defer span.End()

if !dht.enableProviders {
Expand Down Expand Up @@ -853,9 +858,6 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
// execOnMany has returned (e.g. do not write to resources that might get closed or set to nil and therefore result in
// a panic instead of just returning an error).
func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID, sloppyExit bool) int {
ctx, span := Tracer.Start(ctx, "execOnMany")
defer span.End()

if len(peers) == 0 {
return 0
}
Expand Down Expand Up @@ -919,7 +921,7 @@ func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer
}

func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash) error {
ctx, span := Tracer.Start(ctx, "ProvideMany")
ctx, span := internal.StartSpan(ctx, "FullRT.ProvideMany", trace.WithAttributes(attribute.Int("NumKeys", len(keys))))
defer span.End()

if !dht.enableProviders {
Expand Down Expand Up @@ -955,6 +957,9 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
}

func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) error {
ctx, span := internal.StartSpan(ctx, "FullRT.PutMany", trace.WithAttributes(attribute.Int("NumKeys", len(keys))))
defer span.End()

if !dht.enableValues {
return routing.ErrNotSupported
}
Expand Down Expand Up @@ -983,7 +988,7 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte)
}

func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, target, k peer.ID) error, isProvRec bool) error {
ctx, span := Tracer.Start(ctx, "bulkMessageSend", trace.WithAttributes(attribute.Int("numKeys", len(keys))))
ctx, span := internal.StartSpan(ctx, "FullRT.BulkMessageSend")
defer span.End()

if len(keys) == 0 {
Expand Down Expand Up @@ -1185,6 +1190,9 @@ func divideByChunkSize(keys []peer.ID, chunkSize int) [][]peer.ID {

// FindProviders searches until the context expires.
func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
ctx, span := internal.StartSpan(ctx, "FullRT.FindProviders", trace.WithAttributes(attribute.Stringer("Key", c)))
defer span.End()

if !dht.enableProviders {
return nil, routing.ErrNotSupported
} else if !c.Defined() {
Expand All @@ -1204,6 +1212,9 @@ func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInf
// completes. Note: not reading from the returned channel may block the query
// from progressing.
func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("Key", key), attribute.Int("Count", count)))
defer span.End()

if !dht.enableProviders || !key.Defined() {
peerOut := make(chan peer.AddrInfo)
close(peerOut)
Expand All @@ -1225,6 +1236,8 @@ func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count in

func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
defer close(peerOut)
ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRouting")
defer span.End()

findAll := count == 0
ps := make(map[peer.ID]struct{})
Expand Down Expand Up @@ -1324,6 +1337,9 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.

// FindPeer searches for a peer with given ID.
func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
ctx, span := internal.StartSpan(ctx, "FullRT.FindPeer", trace.WithAttributes(attribute.Stringer("PeerID", id)))
defer span.End()

if err := id.Validate(); err != nil {
return peer.AddrInfo{}, err
}
Expand Down
13 changes: 13 additions & 0 deletions internal/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package internal

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return otel.Tracer("go-libp2p-kad-dht").Start(ctx, fmt.Sprintf("KademliaDHT.%s", name), opts...)
}
9 changes: 7 additions & 2 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"fmt"
"time"

"github.com/libp2p/go-libp2p-kad-dht/internal"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"

kb "github.com/libp2p/go-libp2p-kbucket"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// GetClosestPeers is a Kademlia 'node lookup' operation. Returns a channel of
Expand All @@ -17,6 +19,9 @@ import (
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key)))
defer span.End()

if key == "" {
return nil, fmt.Errorf("can't lookup empty key")
}
Expand Down
14 changes: 10 additions & 4 deletions providers/providers_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ import (
"strings"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
peerstoreImpl "github.com/libp2p/go-libp2p/p2p/host/peerstore"

lru "github.com/hashicorp/golang-lru/simplelru"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/autobatch"
dsq "github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-kad-dht/internal"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
peerstoreImpl "github.com/libp2p/go-libp2p/p2p/host/peerstore"
"github.com/multiformats/go-base32"
)

Expand Down Expand Up @@ -240,6 +240,9 @@ func (pm *ProviderManager) run(ctx context.Context, proc goprocess.Process) {

// AddProvider adds a provider
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, provInfo peer.AddrInfo) error {
ctx, span := internal.StartSpan(ctx, "ProviderManager.AddProvider")
defer span.End()

if provInfo.ID != pm.self { // don't add own addrs.
pm.pstore.AddAddrs(provInfo.ID, provInfo.Addrs, ProviderAddrTTL)
}
Expand Down Expand Up @@ -287,6 +290,9 @@ func mkProvKey(k []byte) string {
// GetProviders returns the set of providers for the given key.
// This method _does not_ copy the set. Do not modify it.
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) ([]peer.AddrInfo, error) {
ctx, span := internal.StartSpan(ctx, "ProviderManager.GetProviders")
defer span.End()

gp := &getProv{
ctx: ctx,
key: k,
Expand Down
30 changes: 29 additions & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p-kad-dht/internal"
"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
kb "github.com/libp2p/go-libp2p-kbucket"
)
Expand Down Expand Up @@ -77,6 +80,9 @@ type lookupWithFollowupResult struct {
// After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the
// lookup that have not already been successfully queried.
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.RunLookupWithFollowup", trace.WithAttributes(attribute.String("Target", target)))
defer span.End()

// run the query
lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn)
if err != nil {
Expand Down Expand Up @@ -146,6 +152,9 @@ processFollowUp:
}

func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.RunQuery")
defer span.End()

// pick the K closest peers to the key in our Routing table.
targetKadID := kb.ConvertKey(target)
seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize)
Expand Down Expand Up @@ -259,7 +268,10 @@ type queryUpdate struct {
}

func (q *query) run() {
pathCtx, cancelPath := context.WithCancel(q.ctx)
ctx, span := internal.StartSpan(q.ctx, "IpfsDHT.Query.Run")
defer span.End()

pathCtx, cancelPath := context.WithCancel(ctx)
defer cancelPath()

alpha := q.dht.alpha
Expand Down Expand Up @@ -303,6 +315,12 @@ func (q *query) run() {

// spawnQuery starts one query, if an available heard peer is found
func (q *query) spawnQuery(ctx context.Context, cause peer.ID, queryPeer peer.ID, ch chan<- *queryUpdate) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.SpawnQuery", trace.WithAttributes(
attribute.String("Cause", cause.String()),
attribute.String("QueryPeer", queryPeer.String()),
))
defer span.End()

PublishLookupEvent(ctx,
NewLookupEvent(
q.dht.self,
Expand Down Expand Up @@ -369,6 +387,9 @@ func (q *query) isStarvationTermination() bool {
}

func (q *query) terminate(ctx context.Context, cancel context.CancelFunc, reason LookupTerminationReason) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.Query.Terminate", trace.WithAttributes(attribute.Stringer("Reason", reason)))
defer span.End()

if q.terminated {
return
}
Expand All @@ -391,6 +412,10 @@ func (q *query) terminate(ctx context.Context, cancel context.CancelFunc, reason
// queryPeer does not access the query state in queryPeers!
func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID) {
defer q.waitGroup.Done()

ctx, span := internal.StartSpan(ctx, "IpfsDHT.QueryPeer")
defer span.End()

dialCtx, queryCtx := ctx, ctx

// dial the peer
Expand Down Expand Up @@ -497,6 +522,9 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) {
}

func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.DialPeer", trace.WithAttributes(attribute.String("PeerID", p.String())))
defer span.End()

// short-circuit if we're already connected.
if dht.host.Network().Connectedness(p) == network.Connected {
return nil
Expand Down
Loading

0 comments on commit b07488e

Please sign in to comment.