From a75a3331748a9cb2e65d4603f0a96cde0f66f2e3 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Wed, 29 Mar 2023 05:31:39 +0200 Subject: [PATCH] feat: rework tracing a bit Removed noisy traces and added tracing to RtRefreshManager --- dht.go | 17 +++---- dht_bootstrap_test.go | 4 +- dht_net.go | 2 +- ext_test.go | 6 +-- fullrt/dht.go | 46 ++++++++++++----- handlers_test.go | 2 +- lookup.go | 9 +++- query.go | 6 +-- routing.go | 61 ++++++++++++++++------- rtrefresh/rt_refresh_manager.go | 87 ++++++++++++++++++--------------- subscriber_notifee.go | 4 +- 11 files changed, 150 insertions(+), 94 deletions(-) diff --git a/dht.go b/dht.go index bf8230722..f3d4227ae 100644 --- a/dht.go +++ b/dht.go @@ -235,7 +235,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) // Fill routing table with currently connected peers that are DHT servers dht.plk.Lock() for _, p := range dht.host.Network().Peers() { - dht.peerFound(dht.ctx, p, false) + dht.peerFound(p, false) } dht.plk.Unlock() @@ -482,7 +482,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) { // we try to add all peers we are connected to to the Routing Table // in case they aren't already there. for _, p := range dht.host.Network().Peers() { - dht.peerFound(ctx, p, false) + dht.peerFound(p, false) } // TODO Active Bootstrapping @@ -648,9 +648,7 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) { // If we connect to a peer we already have in the RT but do not exchange a query (rare) // // Do Nothing. -func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.PeerFound", trace.WithAttributes(attribute.Stringer("PeerID", p))) - defer span.End() +func (dht *IpfsDHT) peerFound(p peer.ID, queryPeer bool) { if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil { c.Write(zap.String("peer", p.String())) @@ -661,17 +659,14 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) { } else if b { select { case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}: - case <-ctx.Done(): + case <-dht.ctx.Done(): return } } } // peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore. -func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) { - _, span := internal.StartSpan(ctx, "IpfsDHT.PeerStoppedDHT", trace.WithAttributes(attribute.Stringer("PeerID", p))) - defer span.End() - +func (dht *IpfsDHT) peerStoppedDHT(p peer.ID) { logger.Debugw("peer stopped dht", "peer", p) // A peer that does not support the DHT protocol is dead for us. // There's no point in talking to anymore till it starts supporting the DHT protocol again. @@ -838,7 +833,7 @@ func (dht *IpfsDHT) Host() host.Host { // Ping sends a ping message to the passed peer and waits for a response. func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.Ping") + ctx, span := internal.StartSpan(ctx, "IpfsDHT.Ping", trace.WithAttributes(attribute.Stringer("PeerID", p))) defer span.End() return dht.protoMessenger.Ping(ctx, p) } diff --git a/dht_bootstrap_test.go b/dht_bootstrap_test.go index d97e3b7e7..9b1deb8b9 100644 --- a/dht_bootstrap_test.go +++ b/dht_bootstrap_test.go @@ -191,8 +191,8 @@ func TestBootstrappersReplacable(t *testing.T) { require.NoError(t, d.host.Network().ClosePeer(d5.self)) connectNoSync(t, ctx, d, d1) connectNoSync(t, ctx, d, d5) - d.peerFound(ctx, d5.self, true) - d.peerFound(ctx, d1.self, true) + d.peerFound(d5.self, true) + d.peerFound(d1.self, true) time.Sleep(1 * time.Second) require.Len(t, d.routingTable.ListPeers(), 2) diff --git a/dht_net.go b/dht_net.go index 2a31a0b7c..7ebdd3773 100644 --- a/dht_net.go +++ b/dht_net.go @@ -111,7 +111,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { } // a peer has queried us, let's add it to RT - dht.peerFound(dht.ctx, mPeer, true) + dht.peerFound(mPeer, true) if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil { c.Write(zap.String("from", mPeer.String()), diff --git a/ext_test.go b/ext_test.go index d550d8d13..a42cbe0b7 100644 --- a/ext_test.go +++ b/ext_test.go @@ -293,7 +293,7 @@ func TestNotFound(t *testing.T) { } for _, p := range hosts { - d.peerFound(ctx, p.ID(), true) + d.peerFound(p.ID(), true) } // long timeout to ensure timing is not at play. @@ -343,7 +343,7 @@ func TestLessThanKResponses(t *testing.T) { } for i := 1; i < 5; i++ { - d.peerFound(ctx, hosts[i].ID(), true) + d.peerFound(hosts[i].ID(), true) } // Reply with random peers to every message @@ -415,7 +415,7 @@ func TestMultipleQueries(t *testing.T) { t.Fatal(err) } - d.peerFound(ctx, hosts[1].ID(), true) + d.peerFound(hosts[1].ID(), true) for _, proto := range d.serverProtocols { // It would be nice to be able to just get a value and succeed but then diff --git a/fullrt/dht.go b/fullrt/dht.go index 1a7569a9f..f1a26e70a 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -45,6 +45,7 @@ import ( kadkey "github.com/libp2p/go-libp2p-xor/key" "github.com/libp2p/go-libp2p-xor/trie" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) @@ -92,6 +93,8 @@ type FullRT struct { timeoutPerOp time.Duration bulkSendParallelism int + + self peer.ID } // NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network, @@ -147,7 +150,8 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful ctx, cancel := context.WithCancel(context.Background()) - pm, err := providers.NewProviderManager(ctx, h.ID(), h.Peerstore(), dhtcfg.Datastore, fullrtcfg.pmOpts...) + self := h.ID() + pm, err := providers.NewProviderManager(ctx, self, h.Peerstore(), dhtcfg.Datastore, fullrtcfg.pmOpts...) if err != nil { cancel() return nil, err @@ -189,6 +193,8 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful crawlerInterval: fullrtcfg.crawlInterval, bulkSendParallelism: fullrtcfg.bulkSendParallelism, + + self: self, } rt.wg.Add(1) @@ -550,7 +556,12 @@ func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Opt // SearchValue searches for the value corresponding to given Key and streams the results. func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(attribute.String("Key", key))) - defer span.End() + var good bool + defer func() { + if !good { + span.End() + } + }() if !dht.enableValues { return nil, routing.ErrNotSupported @@ -570,10 +581,10 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing. valCh, lookupRes := dht.getValues(ctx, key, stopCh) out := make(chan []byte) + good = true go func() { - defer close(out) - ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue.Worker") defer span.End() + defer close(out) best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded) if best == nil || aborted { @@ -1190,9 +1201,6 @@ func divideByChunkSize(keys []peer.ID, chunkSize int) [][]peer.ID { // FindProviders searches until the context expires. func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) { - ctx, span := internal.StartSpan(ctx, "FullRT.FindProviders", trace.WithAttributes(attribute.Stringer("Key", c))) - defer span.End() - if !dht.enableProviders { return nil, routing.ErrNotSupported } else if !c.Defined() { @@ -1212,9 +1220,6 @@ func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInf // completes. Note: not reading from the returned channel may block the query // from progressing. func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { - ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("Key", key), attribute.Int("Count", count))) - defer span.End() - if !dht.enableProviders || !key.Defined() { peerOut := make(chan peer.AddrInfo) close(peerOut) @@ -1235,10 +1240,11 @@ func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count in } func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { - defer close(peerOut) - ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRouting") + ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRoutine", trace.WithAttributes(attribute.Stringer("Key", key))) defer span.End() + defer close(peerOut) + findAll := count == 0 ps := make(map[peer.ID]struct{}) psLock := &sync.Mutex{} @@ -1267,6 +1273,10 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. if psTryAdd(p.ID) { select { case peerOut <- p: + span.AddEvent("found provider", trace.WithAttributes( + attribute.Stringer("peer", p.ID), + attribute.Stringer("from", dht.self), + )) case <-ctx.Done(): return } @@ -1294,10 +1304,16 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. ID: p, }) - provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key) + mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetProviders", trace.WithAttributes(attribute.Stringer("peer", p))) + provs, closest, err := dht.protoMessenger.GetProviders(mctx, p, key) if err != nil { + if mspan.IsRecording() { + mspan.SetStatus(codes.Error, err.Error()) + } + mspan.End() return err } + mspan.End() logger.Debugf("%d provider entries", len(provs)) @@ -1309,6 +1325,10 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash. logger.Debugf("using provider: %s", prov) select { case peerOut <- *prov: + span.AddEvent("found provider", trace.WithAttributes( + attribute.Stringer("peer", prov.ID), + attribute.Stringer("from", p), + )) case <-ctx.Done(): logger.Debug("context timed out sending more providers") return ctx.Err() diff --git a/handlers_test.go b/handlers_test.go index c4c1e4ca1..327cc93d5 100644 --- a/handlers_test.go +++ b/handlers_test.go @@ -111,7 +111,7 @@ func BenchmarkHandleFindPeer(b *testing.B) { panic(err) } - d.peerFound(ctx, id, true) + d.peerFound(id, true) peers = append(peers, id) a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2000+i)) diff --git a/lookup.go b/lookup.go index f18be5eb6..a59474911 100644 --- a/lookup.go +++ b/lookup.go @@ -10,6 +10,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) @@ -34,11 +35,17 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, ID: p, }) - peers, err := dht.protoMessenger.GetClosestPeers(ctx, p, peer.ID(key)) + mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetClosestPeers", trace.WithAttributes(attribute.Stringer("peer", p))) + peers, err := dht.protoMessenger.GetClosestPeers(mctx, p, peer.ID(key)) if err != nil { + if mspan.IsRecording() { + mspan.SetStatus(codes.Error, err.Error()) + } + mspan.End() logger.Debugf("error getting closer peers: %s", err) return nil, err } + mspan.End() // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ diff --git a/query.go b/query.go index 5fa534b84..05632856e 100644 --- a/query.go +++ b/query.go @@ -422,7 +422,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID if err := q.dht.dialPeer(dialCtx, p); err != nil { // remove the peer if there was a dial failure..but not because of a context cancellation if dialCtx.Err() == nil { - q.dht.peerStoppedDHT(q.dht.ctx, p) + q.dht.peerStoppedDHT(p) } ch <- &queryUpdate{cause: p, unreachable: []peer.ID{p}} return @@ -433,7 +433,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID newPeers, err := q.queryFn(queryCtx, p) if err != nil { if queryCtx.Err() == nil { - q.dht.peerStoppedDHT(q.dht.ctx, p) + q.dht.peerStoppedDHT(p) } ch <- &queryUpdate{cause: p, unreachable: []peer.ID{p}} return @@ -442,7 +442,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID queryDuration := time.Since(startQuery) // query successful, try to add to RT - q.dht.peerFound(q.dht.ctx, p, true) + q.dht.peerFound(p, true) // process new peers saw := []peer.ID{} diff --git a/routing.go b/routing.go index c9743d846..2df0a61b4 100644 --- a/routing.go +++ b/routing.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/routing" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" u "github.com/ipfs/boxo/util" @@ -143,8 +144,13 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op // SearchValue searches for the value corresponding to given Key and streams the results. func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue") - defer span.End() + ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue", trace.WithAttributes(attribute.String("Key", key))) + var good bool + defer func() { + if !good { + span.End() + } + }() if !dht.enableValues { return nil, routing.ErrNotSupported @@ -164,10 +170,9 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing valCh, lookupRes := dht.getValues(ctx, key, stopCh) out := make(chan []byte) + good = true go func() { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue.Worker") defer span.End() - defer close(out) best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded) if best == nil || aborted { @@ -310,10 +315,17 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st ID: p, }) - rec, peers, err := dht.protoMessenger.GetValue(ctx, p, key) + mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetValue", trace.WithAttributes(attribute.Stringer("peer", p))) + rec, peers, err := dht.protoMessenger.GetValue(mctx, p, key) if err != nil { + if mspan.IsRecording() { + mspan.SetStatus(codes.Error, err.Error()) + } + mspan.End() + logger.Debugf("error getting closer peers: %s", err) return nil, err } + mspan.End() // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ @@ -460,9 +472,6 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err // FindProviders searches until the context expires. func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProviders") - defer span.End() - if !dht.enableProviders { return nil, routing.ErrNotSupported } else if !c.Defined() { @@ -482,9 +491,6 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn // completes. Note: not reading from the returned channel may block the query // from progressing. func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProvidersAsync") - defer span.End() - if !dht.enableProviders || !key.Defined() { peerOut := make(chan peer.AddrInfo) close(peerOut) @@ -505,11 +511,11 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i } func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { - defer close(peerOut) - - ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProvidersAsyncRoutine") + ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProvidersAsyncRoutine", trace.WithAttributes(attribute.Stringer("Key", key))) defer span.End() + defer close(peerOut) + findAll := count == 0 ps := make(map[peer.ID]peer.AddrInfo) @@ -539,6 +545,10 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash if psTryAdd(p) { select { case peerOut <- p: + span.AddEvent("found provider", trace.WithAttributes( + attribute.Stringer("peer", p.ID), + attribute.Stringer("from", dht.self), + )) case <-ctx.Done(): return } @@ -553,16 +563,23 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash lookupRes, err := dht.runLookupWithFollowup(ctx, string(key), func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { + // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.SendingQuery, ID: p, }) - provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key) + mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetProviders", trace.WithAttributes(attribute.Stringer("peer", p))) + provs, closest, err := dht.protoMessenger.GetProviders(mctx, p, key) if err != nil { + if mspan.IsRecording() { + mspan.SetStatus(codes.Error, err.Error()) + } + mspan.End() return nil, err } + mspan.End() logger.Debugf("%d provider entries", len(provs)) @@ -574,6 +591,10 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash logger.Debugf("using provider: %s", prov) select { case peerOut <- *prov: + span.AddEvent("found provider", trace.WithAttributes( + attribute.Stringer("peer", prov.ID), + attribute.Stringer("from", p), + )) case <-ctx.Done(): logger.Debug("context timed out sending more providers") return nil, ctx.Err() @@ -608,7 +629,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash // FindPeer searches for a peer with given ID. func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindPeer") + ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindPeer", trace.WithAttributes(attribute.Stringer("PeerID", id))) defer span.End() if err := id.Validate(); err != nil { @@ -630,11 +651,17 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, ID: p, }) - peers, err := dht.protoMessenger.GetClosestPeers(ctx, p, id) + mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetClosestPeers", trace.WithAttributes(attribute.Stringer("peer", p))) + peers, err := dht.protoMessenger.GetClosestPeers(mctx, p, id) if err != nil { + if mspan.IsRecording() { + mspan.SetStatus(codes.Error, err.Error()) + } + mspan.End() logger.Debugf("error getting closer peers: %s", err) return nil, err } + mspan.End() // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ diff --git a/rtrefresh/rt_refresh_manager.go b/rtrefresh/rt_refresh_manager.go index b4659f4b5..d08983702 100644 --- a/rtrefresh/rt_refresh_manager.go +++ b/rtrefresh/rt_refresh_manager.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/hashicorp/go-multierror" @@ -14,6 +15,8 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-base32" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) var logger = logging.Logger("dht/RtRefreshManager") @@ -129,34 +132,52 @@ func (r *RtRefreshManager) RefreshNoWait() { } } -// pingAndEvictPeers pings Routing Table peers that haven't been heard of/from in the interval they should have been. -// and evict them if they don't reply. +// pingAndEvictPeers pings Routing Table peers that haven't been heard of/from +// in the interval they should have been and evict them if they don't reply. func (r *RtRefreshManager) pingAndEvictPeers(ctx context.Context) { ctx, span := internal.StartSpan(ctx, "RefreshManager.PingAndEvictPeers") defer span.End() - peersChecked := 0 - peersSkipped := 0 + var peersChecked int + var alive int64 var wg sync.WaitGroup - for _, ps := range r.rt.GetPeerInfos() { + peers := r.rt.GetPeerInfos() + for _, ps := range peers { if time.Since(ps.LastSuccessfulOutboundQueryAt) <= r.successfulOutboundQueryGracePeriod { - peersSkipped++ continue } + peersChecked++ wg.Add(1) go func(ps kbucket.PeerInfo) { defer wg.Done() + livelinessCtx, cancel := context.WithTimeout(ctx, peerPingTimeout) + defer cancel() + peerIdStr := ps.Id.String() + livelinessCtx, span := internal.StartSpan(livelinessCtx, "RefreshManager.PingAndEvictPeers.worker", trace.WithAttributes(attribute.String("peer", peerIdStr))) + defer span.End() + if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil { - logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err) + logger.Debugw("evicting peer after failed connection", "peer", peerIdStr, "error", err) + span.RecordError(err) + r.rt.RemovePeer(ps.Id) + return + } + + if err := r.refreshPingFnc(livelinessCtx, ps.Id); err != nil { + logger.Debugw("evicting peer after failed ping", "peer", peerIdStr, "error", err) + span.RecordError(err) r.rt.RemovePeer(ps.Id) + return } - cancel() + + atomic.AddInt64(&alive, 1) }(ps) } - span.SetAttributes(attribute.Int("NumPeersChecked", peersChecked), attribute.Int("NumPeersSkipped", peersSkipped)) wg.Wait() + + span.SetAttributes(attribute.Int("NumPeersChecked", peersChecked), attribute.Int("NumPeersSkipped", len(peers)-peersChecked), attribute.Int64("NumPeersAlive", alive)) } func (r *RtRefreshManager) loop() { @@ -164,9 +185,7 @@ func (r *RtRefreshManager) loop() { var refreshTickrCh <-chan time.Time if r.enableAutoRefresh { - ctx, span := internal.StartSpan(r.ctx, "RefreshManager.Refresh") - err := r.doRefresh(ctx, true) - span.End() + err := r.doRefresh(r.ctx, true) if err != nil { logger.Warn("failed when refreshing routing table", err) } @@ -205,33 +224,7 @@ func (r *RtRefreshManager) loop() { ctx, span := internal.StartSpan(r.ctx, "RefreshManager.Refresh") - // ping Routing Table peers that haven't been heard of/from in the interval they should have been. - // and evict them if they don't reply. - var wg sync.WaitGroup - for _, ps := range r.rt.GetPeerInfos() { - if time.Since(ps.LastSuccessfulOutboundQueryAt) > r.successfulOutboundQueryGracePeriod { - wg.Add(1) - go func(ps kbucket.PeerInfo) { - defer wg.Done() - - livelinessCtx, cancel := context.WithTimeout(r.ctx, peerPingTimeout) - defer cancel() - - if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil { - logger.Debugw("evicting peer after failed connection", "peer", ps.Id, "error", err) - r.rt.RemovePeer(ps.Id) - return - } - - if err := r.refreshPingFnc(livelinessCtx, ps.Id); err != nil { - logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err) - r.rt.RemovePeer(ps.Id) - return - } - }(ps) - } - } - wg.Wait() + r.pingAndEvictPeers(ctx) // Query for self and refresh the required buckets err := r.doRefresh(ctx, forced) @@ -248,6 +241,9 @@ func (r *RtRefreshManager) loop() { } func (r *RtRefreshManager) doRefresh(ctx context.Context, forceRefresh bool) error { + ctx, span := internal.StartSpan(ctx, "RefreshManager.doRefresh") + defer span.End() + var merr error if err := r.queryForSelf(ctx); err != nil { @@ -316,9 +312,13 @@ func (r *RtRefreshManager) refreshCplIfEligible(ctx context.Context, cpl uint, l } func (r *RtRefreshManager) refreshCpl(ctx context.Context, cpl uint) error { + ctx, span := internal.StartSpan(ctx, "RefreshManager.refreshCpl", trace.WithAttributes(attribute.Int("cpl", int(cpl)))) + defer span.End() + // gen a key for the query to refresh the cpl key, err := r.refreshKeyGenFnc(cpl) if err != nil { + span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("failed to generated query key for cpl=%d, err=%s", cpl, err) } @@ -326,15 +326,22 @@ func (r *RtRefreshManager) refreshCpl(ctx context.Context, cpl uint) error { cpl, loggableRawKeyString(key), r.rt.Size()) if err := r.runRefreshDHTQuery(ctx, key); err != nil { + span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("failed to refresh cpl=%d, err=%s", cpl, err) } - logger.Infof("finished refreshing cpl %d, routing table size is now %d", cpl, r.rt.Size()) + sz := r.rt.Size() + logger.Infof("finished refreshing cpl %d, routing table size is now %d", cpl, sz) + span.SetAttributes(attribute.Int("NewSize", sz)) return nil } func (r *RtRefreshManager) queryForSelf(ctx context.Context) error { + ctx, span := internal.StartSpan(ctx, "RefreshManager.queryForSelf") + defer span.End() + if err := r.runRefreshDHTQuery(ctx, string(r.dhtPeerId)); err != nil { + span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("failed to query for self, err=%s", err) } return nil diff --git a/subscriber_notifee.go b/subscriber_notifee.go index ec9eca146..eb20e7fcb 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -108,10 +108,10 @@ func handlePeerChangeEvent(dht *IpfsDHT, p peer.ID) { logger.Errorf("could not check peerstore for protocol support: err: %s", err) return } else if valid { - dht.peerFound(dht.ctx, p, false) + dht.peerFound(p, false) dht.fixRTIfNeeded() } else { - dht.peerStoppedDHT(dht.ctx, p) + dht.peerStoppedDHT(p) } }