diff --git a/dht.go b/dht.go index 830be3dd6..a7d4f6c69 100644 --- a/dht.go +++ b/dht.go @@ -14,6 +14,8 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/libp2p/go-libp2p-kad-dht/internal" dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config" @@ -636,6 +638,9 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) { // If we connect to a peer we already have in the RT but do not exchange a query (rare) // Do Nothing. func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.PeerFound", trace.WithAttributes(attribute.Stringer("PeerID", p))) + defer span.End() + if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil { c.Write(zap.String("peer", p.String())) } @@ -645,7 +650,7 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) { } else if b { select { case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}: - case <-dht.ctx.Done(): + case <-ctx.Done(): return } } @@ -653,6 +658,9 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) { // peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore. func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) { + _, span := internal.StartSpan(ctx, "IpfsDHT.PeerStoppedDHT", trace.WithAttributes(attribute.Stringer("PeerID", p))) + defer span.End() + logger.Debugw("peer stopped dht", "peer", p) // A peer that does not support the DHT protocol is dead for us. // There's no point in talking to anymore till it starts supporting the DHT protocol again. @@ -667,7 +675,10 @@ func (dht *IpfsDHT) fixRTIfNeeded() { } // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. -func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo { +func (dht *IpfsDHT) FindLocal(ctx context.Context, id peer.ID) peer.AddrInfo { + _, span := internal.StartSpan(ctx, "IpfsDHT.FindLocal", trace.WithAttributes(attribute.Stringer("PeerID", id))) + defer span.End() + switch dht.host.Network().Connectedness(id) { case network.Connected, network.CanConnect: return dht.peerstore.PeerInfo(id) @@ -816,6 +827,8 @@ func (dht *IpfsDHT) Host() host.Host { // Ping sends a ping message to the passed peer and waits for a response. func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.Ping") + defer span.End() return dht.protoMessenger.Ping(ctx, p) } diff --git a/fullrt/dht.go b/fullrt/dht.go index c4c0b43a7..8f1cc3169 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -42,13 +42,10 @@ import ( "github.com/libp2p/go-libp2p-xor/kademlia" kadkey "github.com/libp2p/go-libp2p-xor/key" "github.com/libp2p/go-libp2p-xor/trie" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) -var Tracer = otel.Tracer("") - var logger = logging.Logger("fullrtdht") // FullRT is an experimental DHT client that is under development. Expect breaking changes to occur in this client @@ -338,6 +335,9 @@ func (dht *FullRT) Bootstrap(ctx context.Context) error { // CheckPeers return (success, total) func (dht *FullRT) CheckPeers(ctx context.Context, peers ...peer.ID) (int, int) { + ctx, span := internal.StartSpan(ctx, "FullRT.CheckPeers", trace.WithAttributes(attribute.Int("NumPeers", len(peers)))) + defer span.End() + var peerAddrs chan interface{} var total int if len(peers) == 0 { @@ -395,8 +395,7 @@ func workers(numWorkers int, fn func(interface{}), inputs <-chan interface{}) { } func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { - ctx, span := Tracer.Start(ctx, "GetClosestPeers") - _ = ctx // not used, but we want to assign it _just_ in case we use it. + _, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key))) defer span.End() kbID := kb.ConvertKey(key) @@ -525,6 +524,9 @@ func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Opt // SearchValue searches for the value corresponding to given Key and streams the results. func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { + ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(attribute.String("Key", key))) + defer span.End() + if !dht.enableValues { return nil, routing.ErrNotSupported } @@ -545,6 +547,9 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing. out := make(chan []byte) go func() { defer close(out) + ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue.Worker") + defer span.End() + best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded) if best == nil || aborted { return @@ -566,7 +571,7 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing. return } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(ctx, time.Second*5) dht.updatePeerValues(ctx, key, best, updatePeers) cancel() }() @@ -750,7 +755,7 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str // Provide makes this node announce that it can provide a value for the given key func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { - ctx, span := Tracer.Start(ctx, "Provide") + ctx, span := internal.StartSpan(ctx, "FullRT.Provide", trace.WithAttributes(attribute.Stringer("Key", key), attribute.Bool("Broadcast", brdcst))) defer span.End() if !dht.enableProviders { @@ -828,9 +833,6 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e // execOnMany has returned (e.g. do not write to resources that might get closed or set to nil and therefore result in // a panic instead of just returning an error). func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID, sloppyExit bool) int { - ctx, span := Tracer.Start(ctx, "execOnMany") - defer span.End() - if len(peers) == 0 { return 0 } @@ -894,7 +896,7 @@ func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer } func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash) error { - ctx, span := Tracer.Start(ctx, "ProvideMany") + ctx, span := internal.StartSpan(ctx, "FullRT.ProvideMany", trace.WithAttributes(attribute.Int("NumKeys", len(keys)))) defer span.End() if !dht.enableProviders { @@ -930,6 +932,9 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash) } func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) error { + ctx, span := internal.StartSpan(ctx, "FullRT.PutMany", trace.WithAttributes(attribute.Int("NumKeys", len(keys)))) + defer span.End() + if !dht.enableValues { return routing.ErrNotSupported } @@ -958,7 +963,7 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) } func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, target, k peer.ID) error, isProvRec bool) error { - ctx, span := Tracer.Start(ctx, "bulkMessageSend", trace.WithAttributes(attribute.Int("numKeys", len(keys)))) + ctx, span := internal.StartSpan(ctx, "FullRT.BulkMessageSend") defer span.End() if len(keys) == 0 { @@ -1160,6 +1165,9 @@ func divideByChunkSize(keys []peer.ID, chunkSize int) [][]peer.ID { // FindProviders searches until the context expires. func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) { + ctx, span := internal.StartSpan(ctx, "FullRT.FindProviders", trace.WithAttributes(attribute.Stringer("Key", c))) + defer span.End() + if !dht.enableProviders { return nil, routing.ErrNotSupported } else if !c.Defined() { @@ -1179,6 +1187,9 @@ func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInf // completes. Note: not reading from the returned channel may block the query // from progressing. func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { + ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("Key", key), attribute.Int("Count", count))) + defer span.End() + if !dht.enableProviders || !key.Defined() { peerOut := make(chan peer.AddrInfo) close(peerOut) @@ -1200,6 +1211,8 @@ func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count in func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { defer close(peerOut) + ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRouting") + defer span.End() findAll := count == 0 var ps *peer.Set @@ -1288,6 +1301,9 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. // FindPeer searches for a peer with given ID. func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) { + ctx, span := internal.StartSpan(ctx, "FullRT.FindPeer", trace.WithAttributes(attribute.Stringer("PeerID", id))) + defer span.End() + if err := id.Validate(); err != nil { return peer.AddrInfo{}, err } diff --git a/go.mod b/go.mod index 6f5f43b24..84a665670 100644 --- a/go.mod +++ b/go.mod @@ -32,10 +32,10 @@ require ( github.com/multiformats/go-multibase v0.0.3 github.com/multiformats/go-multihash v0.0.15 github.com/multiformats/go-multistream v0.2.2 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.7.1 github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 go.opencensus.io v0.23.0 - go.opentelemetry.io/otel v0.20.0 - go.opentelemetry.io/otel/trace v0.20.0 + go.opentelemetry.io/otel v1.6.3 + go.opentelemetry.io/otel/trace v1.6.3 go.uber.org/zap v1.19.0 ) diff --git a/go.sum b/go.sum index 8a6be7432..f43b282da 100644 --- a/go.sum +++ b/go.sum @@ -177,6 +177,11 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I= @@ -234,8 +239,9 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -848,8 +854,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -888,14 +895,10 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g= -go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= -go.opentelemetry.io/otel/metric v0.20.0 h1:4kzhXFP+btKm4jwxpjIqjs41A7MakRFUS86bqLHTIw8= -go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= -go.opentelemetry.io/otel/oteltest v0.20.0 h1:HiITxCawalo5vQzdHfKeZurV8x7ljcqAgiWzF6Vaeaw= -go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw= -go.opentelemetry.io/otel/trace v0.20.0 h1:1DL6EXUdcg95gukhuRRvLDO/4X5THh/5dIV52lqtnbw= -go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= +go.opentelemetry.io/otel v1.6.3 h1:FLOfo8f9JzFVFVyU+MSRJc2HdEAXQgm7pIv2uFKRSZE= +go.opentelemetry.io/otel v1.6.3/go.mod h1:7BgNga5fNlF/iZjG06hM3yofffp0ofKCDwSXx1GC4dI= +go.opentelemetry.io/otel/trace v1.6.3 h1:IqN4L+5b0mPNjdXIiZ90Ni4Bl5BRkDQywePLWemd9bc= +go.opentelemetry.io/otel/trace v1.6.3/go.mod h1:GNJQusJlUgZl9/TQBPKU/Y/ty+0iVB5fjhKeJGZPGFs= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= diff --git a/internal/tracing.go b/internal/tracing.go new file mode 100644 index 000000000..2a2f18647 --- /dev/null +++ b/internal/tracing.go @@ -0,0 +1,13 @@ +package internal + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" +) + +func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + return otel.Tracer("go-libp2p-kad-dht").Start(ctx, fmt.Sprintf("KademliaDHT.%s", name), opts...) +} diff --git a/lookup.go b/lookup.go index 88695dc4a..e6b01ff31 100644 --- a/lookup.go +++ b/lookup.go @@ -7,6 +7,9 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" + "github.com/libp2p/go-libp2p-kad-dht/internal" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" kb "github.com/libp2p/go-libp2p-kbucket" ) @@ -17,6 +20,9 @@ import ( // If the context is canceled, this function will return the context error along // with the closest K peers it has found so far. func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key))) + defer span.End() + if key == "" { return nil, fmt.Errorf("can't lookup empty key") } diff --git a/providers/providers_manager.go b/providers/providers_manager.go index ad5929203..239b5a881 100644 --- a/providers/providers_manager.go +++ b/providers/providers_manager.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" peerstore "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p-kad-dht/internal" peerstoreImpl "github.com/libp2p/go-libp2p-peerstore" lru "github.com/hashicorp/golang-lru/simplelru" @@ -231,6 +232,9 @@ func (pm *ProviderManager) run(ctx context.Context, proc goprocess.Process) { // AddProvider adds a provider func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, provInfo peer.AddrInfo) error { + ctx, span := internal.StartSpan(ctx, "ProviderManager.AddProvider") + defer span.End() + if provInfo.ID != pm.self { // don't add own addrs. pm.pstore.AddAddrs(provInfo.ID, provInfo.Addrs, peerstore.ProviderAddrTTL) } @@ -278,6 +282,9 @@ func mkProvKey(k []byte) string { // GetProviders returns the set of providers for the given key. // This method _does not_ copy the set. Do not modify it. func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) ([]peer.AddrInfo, error) { + ctx, span := internal.StartSpan(ctx, "ProviderManager.GetProviders") + defer span.End() + gp := &getProv{ ctx: ctx, key: k, diff --git a/query.go b/query.go index a6a73935e..0e275030f 100644 --- a/query.go +++ b/query.go @@ -12,8 +12,11 @@ import ( "github.com/libp2p/go-libp2p-core/peer" pstore "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/google/uuid" + "github.com/libp2p/go-libp2p-kad-dht/internal" "github.com/libp2p/go-libp2p-kad-dht/qpeerset" kb "github.com/libp2p/go-libp2p-kbucket" ) @@ -77,6 +80,9 @@ type lookupWithFollowupResult struct { // After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the // lookup that have not already been successfully queried. func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.RunLookupWithFollowup", trace.WithAttributes(attribute.String("Target", target))) + defer span.End() + // run the query lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn) if err != nil { @@ -146,6 +152,9 @@ processFollowUp: } func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.RunQuery") + defer span.End() + // pick the K closest peers to the key in our Routing table. targetKadID := kb.ConvertKey(target) seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize) @@ -259,7 +268,10 @@ type queryUpdate struct { } func (q *query) run() { - pathCtx, cancelPath := context.WithCancel(q.ctx) + ctx, span := internal.StartSpan(q.ctx, "IpfsDHT.Query.Run") + defer span.End() + + pathCtx, cancelPath := context.WithCancel(ctx) defer cancelPath() alpha := q.dht.alpha @@ -303,6 +315,12 @@ func (q *query) run() { // spawnQuery starts one query, if an available heard peer is found func (q *query) spawnQuery(ctx context.Context, cause peer.ID, queryPeer peer.ID, ch chan<- *queryUpdate) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.SpawnQuery", trace.WithAttributes( + attribute.String("Cause", cause.String()), + attribute.String("QueryPeer", queryPeer.String()), + )) + defer span.End() + PublishLookupEvent(ctx, NewLookupEvent( q.dht.self, @@ -369,6 +387,9 @@ func (q *query) isStarvationTermination() bool { } func (q *query) terminate(ctx context.Context, cancel context.CancelFunc, reason LookupTerminationReason) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.Query.Terminate", trace.WithAttributes(attribute.Stringer("Reason", reason))) + defer span.End() + if q.terminated { return } @@ -391,6 +412,10 @@ func (q *query) terminate(ctx context.Context, cancel context.CancelFunc, reason // queryPeer does not access the query state in queryPeers! func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID) { defer q.waitGroup.Done() + + ctx, span := internal.StartSpan(ctx, "IpfsDHT.QueryPeer") + defer span.End() + dialCtx, queryCtx := ctx, ctx // dial the peer @@ -497,6 +522,9 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) { } func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.DialPeer", trace.WithAttributes(attribute.String("PeerID", p.String()))) + defer span.End() + // short-circuit if we're already connected. if dht.host.Network().Connectedness(p) == network.Connected { return nil diff --git a/records.go b/records.go index 23d6ee7da..3e6ab3240 100644 --- a/records.go +++ b/records.go @@ -6,6 +6,9 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" + "github.com/libp2p/go-libp2p-kad-dht/internal" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ci "github.com/libp2p/go-libp2p-core/crypto" ) @@ -18,6 +21,9 @@ type pubkrs struct { // GetPublicKey gets the public key when given a Peer ID. It will extract from // the Peer ID if inlined or ask the node it belongs to or ask the DHT. func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetPublicKey", trace.WithAttributes(attribute.Stringer("PeerID", p))) + defer span.End() + if !dht.enableValues { return nil, routing.ErrNotSupported } diff --git a/routing.go b/routing.go index c6df5e224..36e32e138 100644 --- a/routing.go +++ b/routing.go @@ -11,6 +11,8 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" @@ -29,6 +31,9 @@ import ( // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.PutValue", trace.WithAttributes(attribute.String("Key", key))) + defer span.End() + if !dht.enableValues { return routing.ErrNotSupported } @@ -101,6 +106,9 @@ type recvdVal struct { // GetValue searches for the value corresponding to given Key. func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetValue", trace.WithAttributes(attribute.String("Key", key))) + defer span.End() + if !dht.enableValues { return nil, routing.ErrNotSupported } @@ -135,6 +143,9 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op // SearchValue searches for the value corresponding to given Key and streams the results. func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue") + defer span.End() + if !dht.enableValues { return nil, routing.ErrNotSupported } @@ -154,6 +165,9 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing out := make(chan []byte) go func() { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue.Worker") + defer span.End() + defer close(out) best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded) if best == nil || aborted { @@ -371,6 +385,9 @@ func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollow // Provide makes this node announce that it can provide a value for the given key func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.Provide", trace.WithAttributes(attribute.String("Key", key.String()), attribute.Bool("Broadcast", brdcst))) + defer span.End() + if !dht.enableProviders { return routing.ErrNotSupported } else if !key.Defined() { @@ -443,6 +460,9 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err // FindProviders searches until the context expires. func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProviders") + defer span.End() + if !dht.enableProviders { return nil, routing.ErrNotSupported } else if !c.Defined() { @@ -462,6 +482,9 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn // completes. Note: not reading from the returned channel may block the query // from progressing. func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProvidersAsync") + defer span.End() + if !dht.enableProviders || !key.Defined() { peerOut := make(chan peer.AddrInfo) close(peerOut) @@ -484,6 +507,9 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { defer close(peerOut) + ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProvidersAsyncRoutine") + defer span.End() + findAll := count == 0 var ps *peer.Set if findAll { @@ -570,6 +596,9 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash // FindPeer searches for a peer with given ID. func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) { + ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindPeer") + defer span.End() + if err := id.Validate(); err != nil { return peer.AddrInfo{}, err } @@ -577,7 +606,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, logger.Debugw("finding peer", "peer", id) // Check if were already connected to them - if pi := dht.FindLocal(id); pi.ID != "" { + if pi := dht.FindLocal(ctx, id); pi.ID != "" { return pi, nil } diff --git a/rtrefresh/rt_refresh_manager.go b/rtrefresh/rt_refresh_manager.go index 1dc9b7e7e..cd3501e92 100644 --- a/rtrefresh/rt_refresh_manager.go +++ b/rtrefresh/rt_refresh_manager.go @@ -8,6 +8,8 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-kad-dht/internal" + "go.opentelemetry.io/otel/attribute" kbucket "github.com/libp2p/go-libp2p-kbucket" @@ -126,12 +128,44 @@ func (r *RtRefreshManager) RefreshNoWait() { } } +// pingAndEvictPeers pings Routing Table peers that haven't been heard of/from in the interval they should have been. +// and evict them if they don't reply. +func (r *RtRefreshManager) pingAndEvictPeers(ctx context.Context) { + ctx, span := internal.StartSpan(ctx, "RefreshManager.PingAndEvictPeers") + defer span.End() + + peersChecked := 0 + peersSkipped := 0 + var wg sync.WaitGroup + for _, ps := range r.rt.GetPeerInfos() { + if time.Since(ps.LastSuccessfulOutboundQueryAt) <= r.successfulOutboundQueryGracePeriod { + peersSkipped++ + continue + } + peersChecked++ + wg.Add(1) + go func(ps kbucket.PeerInfo) { + defer wg.Done() + livelinessCtx, cancel := context.WithTimeout(ctx, peerPingTimeout) + if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil { + logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err) + r.rt.RemovePeer(ps.Id) + } + cancel() + }(ps) + } + span.SetAttributes(attribute.Int("NumPeersChecked", peersChecked), attribute.Int("NumPeersSkipped", peersSkipped)) + wg.Wait() +} + func (r *RtRefreshManager) loop() { defer r.refcount.Done() var refreshTickrCh <-chan time.Time if r.enableAutoRefresh { - err := r.doRefresh(true) + ctx, span := internal.StartSpan(r.ctx, "RefreshManager.Refresh") + err := r.doRefresh(ctx, true) + span.End() if err != nil { logger.Warn("failed when refreshing routing table", err) } @@ -168,29 +202,12 @@ func (r *RtRefreshManager) loop() { } } - // EXECUTE the refresh - - // ping Routing Table peers that haven't been heard of/from in the interval they should have been. - // and evict them if they don't reply. - var wg sync.WaitGroup - for _, ps := range r.rt.GetPeerInfos() { - if time.Since(ps.LastSuccessfulOutboundQueryAt) > r.successfulOutboundQueryGracePeriod { - wg.Add(1) - go func(ps kbucket.PeerInfo) { - defer wg.Done() - livelinessCtx, cancel := context.WithTimeout(r.ctx, peerPingTimeout) - if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil { - logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err) - r.rt.RemovePeer(ps.Id) - } - cancel() - }(ps) - } - } - wg.Wait() + ctx, span := internal.StartSpan(r.ctx, "RefreshManager.Refresh") + + r.pingAndEvictPeers(ctx) // Query for self and refresh the required buckets - err := r.doRefresh(forced) + err := r.doRefresh(ctx, forced) for _, w := range waiting { w <- err close(w) @@ -198,13 +215,15 @@ func (r *RtRefreshManager) loop() { if err != nil { logger.Warnw("failed when refreshing routing table", "error", err) } + + span.End() } } -func (r *RtRefreshManager) doRefresh(forceRefresh bool) error { +func (r *RtRefreshManager) doRefresh(ctx context.Context, forceRefresh bool) error { var merr error - if err := r.queryForSelf(); err != nil { + if err := r.queryForSelf(ctx); err != nil { merr = multierror.Append(merr, err) } @@ -212,9 +231,9 @@ func (r *RtRefreshManager) doRefresh(forceRefresh bool) error { rfnc := func(cpl uint) (err error) { if forceRefresh { - err = r.refreshCpl(cpl) + err = r.refreshCpl(ctx, cpl) } else { - err = r.refreshCplIfEligible(cpl, refreshCpls[cpl]) + err = r.refreshCplIfEligible(ctx, cpl, refreshCpls[cpl]) } return } @@ -245,8 +264,8 @@ func (r *RtRefreshManager) doRefresh(forceRefresh bool) error { select { case r.refreshDoneCh <- struct{}{}: - case <-r.ctx.Done(): - return r.ctx.Err() + case <-ctx.Done(): + return ctx.Err() } return merr @@ -260,16 +279,16 @@ func min(a int, b int) int { return b } -func (r *RtRefreshManager) refreshCplIfEligible(cpl uint, lastRefreshedAt time.Time) error { +func (r *RtRefreshManager) refreshCplIfEligible(ctx context.Context, cpl uint, lastRefreshedAt time.Time) error { if time.Since(lastRefreshedAt) <= r.refreshInterval { logger.Debugf("not running refresh for cpl %d as time since last refresh not above interval", cpl) return nil } - return r.refreshCpl(cpl) + return r.refreshCpl(ctx, cpl) } -func (r *RtRefreshManager) refreshCpl(cpl uint) error { +func (r *RtRefreshManager) refreshCpl(ctx context.Context, cpl uint) error { // gen a key for the query to refresh the cpl key, err := r.refreshKeyGenFnc(cpl) if err != nil { @@ -279,7 +298,7 @@ func (r *RtRefreshManager) refreshCpl(cpl uint) error { logger.Infof("starting refreshing cpl %d with key %s (routing table size was %d)", cpl, loggableRawKeyString(key), r.rt.Size()) - if err := r.runRefreshDHTQuery(key); err != nil { + if err := r.runRefreshDHTQuery(ctx, key); err != nil { return fmt.Errorf("failed to refresh cpl=%d, err=%s", cpl, err) } @@ -287,15 +306,15 @@ func (r *RtRefreshManager) refreshCpl(cpl uint) error { return nil } -func (r *RtRefreshManager) queryForSelf() error { - if err := r.runRefreshDHTQuery(string(r.dhtPeerId)); err != nil { +func (r *RtRefreshManager) queryForSelf(ctx context.Context) error { + if err := r.runRefreshDHTQuery(ctx, string(r.dhtPeerId)); err != nil { return fmt.Errorf("failed to query for self, err=%s", err) } return nil } -func (r *RtRefreshManager) runRefreshDHTQuery(key string) error { - queryCtx, cancel := context.WithTimeout(r.ctx, r.refreshQueryTimeout) +func (r *RtRefreshManager) runRefreshDHTQuery(ctx context.Context, key string) error { + queryCtx, cancel := context.WithTimeout(ctx, r.refreshQueryTimeout) defer cancel() err := r.refreshQueryFnc(queryCtx, key) diff --git a/rtrefresh/rt_refresh_manager_test.go b/rtrefresh/rt_refresh_manager_test.go index 7bfc14351..00f63ee55 100644 --- a/rtrefresh/rt_refresh_manager_test.go +++ b/rtrefresh/rt_refresh_manager_test.go @@ -65,7 +65,7 @@ func TestSkipRefreshOnGapCpls(t *testing.T) { b, _ := rt.TryAddPeer(p, true, false) require.True(t, b) r.refreshQueryFnc = qFuncWithIgnore(rt, icpl) - require.NoError(t, r.doRefresh(true)) + require.NoError(t, r.doRefresh(ctx, true)) for i := uint(0); i < lastCpl+1; i++ { if i == icpl { @@ -88,7 +88,7 @@ func TestSkipRefreshOnGapCpls(t *testing.T) { b, _ = rt.TryAddPeer(p, true, false) require.True(t, b) r.refreshQueryFnc = qFuncWithIgnore(rt, icpl) - require.NoError(t, r.doRefresh(true)) + require.NoError(t, r.doRefresh(ctx, true)) for i := uint(0); i < 10; i++ { if i == icpl {