Skip to content

Commit

Permalink
feat: rework tracing a bit
Browse files Browse the repository at this point in the history
Removed noisy traces and added tracing to RtRefreshManager
  • Loading branch information
Jorropo committed Mar 29, 2023
1 parent b07488e commit c6dea86
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 68 deletions.
15 changes: 5 additions & 10 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.ctx, p, false)
dht.peerFound(p, false)
}
dht.plk.Unlock()

Expand Down Expand Up @@ -482,7 +482,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
// we try to add all peers we are connected to to the Routing Table
// in case they aren't already there.
for _, p := range dht.host.Network().Peers() {
dht.peerFound(ctx, p, false)
dht.peerFound(p, false)
}

// TODO Active Bootstrapping
Expand Down Expand Up @@ -648,9 +648,7 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//
// Do Nothing.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.PeerFound", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()
func (dht *IpfsDHT) peerFound(p peer.ID, queryPeer bool) {

if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
Expand All @@ -661,17 +659,14 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
} else if b {
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-ctx.Done():
case <-dht.ctx.Done():
return
}
}
}

// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
_, span := internal.StartSpan(ctx, "IpfsDHT.PeerStoppedDHT", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()

func (dht *IpfsDHT) peerStoppedDHT(p peer.ID) {
logger.Debugw("peer stopped dht", "peer", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Expand Down
4 changes: 2 additions & 2 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func TestBootstrappersReplacable(t *testing.T) {
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(ctx, d5.self, true)
d.peerFound(ctx, d1.self, true)
d.peerFound(d5.self, true)
d.peerFound(d1.self, true)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
Expand Down
2 changes: 1 addition & 1 deletion dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
}

// a peer has queried us, let's add it to RT
dht.peerFound(dht.ctx, mPeer, true)
dht.peerFound(mPeer, true)

if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
Expand Down
6 changes: 3 additions & 3 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func TestNotFound(t *testing.T) {
}

for _, p := range hosts {
d.peerFound(ctx, p.ID(), true)
d.peerFound(p.ID(), true)
}

// long timeout to ensure timing is not at play.
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestLessThanKResponses(t *testing.T) {
}

for i := 1; i < 5; i++ {
d.peerFound(ctx, hosts[i].ID(), true)
d.peerFound(hosts[i].ID(), true)
}

// Reply with random peers to every message
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}

d.peerFound(ctx, hosts[1].ID(), true)
d.peerFound(hosts[1].ID(), true)

for _, proto := range d.serverProtocols {
// It would be nice to be able to just get a value and succeed but then
Expand Down
2 changes: 1 addition & 1 deletion handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func BenchmarkHandleFindPeer(b *testing.B) {
panic(err)
}

d.peerFound(ctx, id, true)
d.peerFound(id, true)

peers = append(peers, id)
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2000+i))
Expand Down
6 changes: 3 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
if err := q.dht.dialPeer(dialCtx, p); err != nil {
// remove the peer if there was a dial failure..but not because of a context cancellation
if dialCtx.Err() == nil {
q.dht.peerStoppedDHT(q.dht.ctx, p)
q.dht.peerStoppedDHT(p)
}
ch <- &queryUpdate{cause: p, unreachable: []peer.ID{p}}
return
Expand All @@ -433,7 +433,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
newPeers, err := q.queryFn(queryCtx, p)
if err != nil {
if queryCtx.Err() == nil {
q.dht.peerStoppedDHT(q.dht.ctx, p)
q.dht.peerStoppedDHT(p)
}
ch <- &queryUpdate{cause: p, unreachable: []peer.ID{p}}
return
Expand All @@ -442,7 +442,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
queryDuration := time.Since(startQuery)

// query successful, try to add to RT
q.dht.peerFound(q.dht.ctx, p, true)
q.dht.peerFound(p, true)

// process new peers
saw := []peer.ID{}
Expand Down
104 changes: 58 additions & 46 deletions rtrefresh/rt_refresh_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/hashicorp/go-multierror"
Expand All @@ -14,6 +15,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-base32"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

var logger = logging.Logger("dht/RtRefreshManager")
Expand Down Expand Up @@ -135,28 +138,49 @@ func (r *RtRefreshManager) pingAndEvictPeers(ctx context.Context) {
ctx, span := internal.StartSpan(ctx, "RefreshManager.PingAndEvictPeers")
defer span.End()

peersChecked := 0
peersSkipped := 0
// ping Routing Table peers that haven't been heard of/from in the interval they should have been.
// and evict them if they don't reply.
var peersChecked int
var alive int64
var wg sync.WaitGroup
for _, ps := range r.rt.GetPeerInfos() {
if time.Since(ps.LastSuccessfulOutboundQueryAt) <= r.successfulOutboundQueryGracePeriod {
peersSkipped++
continue
}
peersChecked++
wg.Add(1)
go func(ps kbucket.PeerInfo) {
defer wg.Done()
livelinessCtx, cancel := context.WithTimeout(ctx, peerPingTimeout)
if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil {
logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err)
r.rt.RemovePeer(ps.Id)
peers := r.rt.GetPeerInfos()
for _, ps := range peers {
if time.Since(ps.LastSuccessfulOutboundQueryAt) > r.successfulOutboundQueryGracePeriod {
if time.Since(ps.LastSuccessfulOutboundQueryAt) <= r.successfulOutboundQueryGracePeriod {
continue
}
cancel()
}(ps)
peersChecked++
wg.Add(1)
go func(ps kbucket.PeerInfo) {
defer wg.Done()

livelinessCtx, cancel := context.WithTimeout(ctx, peerPingTimeout)
defer cancel()
peerIdStr := ps.Id.String()
livelinessCtx, span := internal.StartSpan(livelinessCtx, "RefreshManager.PingAndEvictPeers.worker", trace.WithAttributes(attribute.String("peer", peerIdStr)))
defer span.End()

if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil {
logger.Debugw("evicting peer after failed connection", "peer", peerIdStr, "error", err)
span.RecordError(err)
r.rt.RemovePeer(ps.Id)
return
}

if err := r.refreshPingFnc(livelinessCtx, ps.Id); err != nil {
logger.Debugw("evicting peer after failed ping", "peer", peerIdStr, "error", err)
span.RecordError(err)
r.rt.RemovePeer(ps.Id)
return
}

atomic.AddInt64(&alive, 1)
}(ps)
}
}
span.SetAttributes(attribute.Int("NumPeersChecked", peersChecked), attribute.Int("NumPeersSkipped", peersSkipped))
wg.Wait()

span.SetAttributes(attribute.Int("NumPeersChecked", peersChecked), attribute.Int("NumPeersSkipped", len(peers)-peersChecked), attribute.Int64("NumPeersAlive", alive))
}

func (r *RtRefreshManager) loop() {
Expand Down Expand Up @@ -205,33 +229,7 @@ func (r *RtRefreshManager) loop() {

ctx, span := internal.StartSpan(r.ctx, "RefreshManager.Refresh")

// ping Routing Table peers that haven't been heard of/from in the interval they should have been.
// and evict them if they don't reply.
var wg sync.WaitGroup
for _, ps := range r.rt.GetPeerInfos() {
if time.Since(ps.LastSuccessfulOutboundQueryAt) > r.successfulOutboundQueryGracePeriod {
wg.Add(1)
go func(ps kbucket.PeerInfo) {
defer wg.Done()

livelinessCtx, cancel := context.WithTimeout(r.ctx, peerPingTimeout)
defer cancel()

if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil {
logger.Debugw("evicting peer after failed connection", "peer", ps.Id, "error", err)
r.rt.RemovePeer(ps.Id)
return
}

if err := r.refreshPingFnc(livelinessCtx, ps.Id); err != nil {
logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err)
r.rt.RemovePeer(ps.Id)
return
}
}(ps)
}
}
wg.Wait()
r.pingAndEvictPeers(ctx)

// Query for self and refresh the required buckets
err := r.doRefresh(ctx, forced)
Expand All @@ -248,6 +246,9 @@ func (r *RtRefreshManager) loop() {
}

func (r *RtRefreshManager) doRefresh(ctx context.Context, forceRefresh bool) error {
ctx, span := internal.StartSpan(ctx, "RefreshManager.doRefresh")
defer span.End()

var merr error

if err := r.queryForSelf(ctx); err != nil {
Expand Down Expand Up @@ -316,25 +317,36 @@ func (r *RtRefreshManager) refreshCplIfEligible(ctx context.Context, cpl uint, l
}

func (r *RtRefreshManager) refreshCpl(ctx context.Context, cpl uint) error {
ctx, span := internal.StartSpan(ctx, "RefreshManager.refreshCpl", trace.WithAttributes(attribute.Int("cpl", int(cpl))))
defer span.End()

// gen a key for the query to refresh the cpl
key, err := r.refreshKeyGenFnc(cpl)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to generated query key for cpl=%d, err=%s", cpl, err)
}

logger.Infof("starting refreshing cpl %d with key %s (routing table size was %d)",
cpl, loggableRawKeyString(key), r.rt.Size())

if err := r.runRefreshDHTQuery(ctx, key); err != nil {
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to refresh cpl=%d, err=%s", cpl, err)
}

logger.Infof("finished refreshing cpl %d, routing table size is now %d", cpl, r.rt.Size())
sz := r.rt.Size()
logger.Infof("finished refreshing cpl %d, routing table size is now %d", cpl, sz)
span.SetAttributes(attribute.Int("NewSize", sz))
return nil
}

func (r *RtRefreshManager) queryForSelf(ctx context.Context) error {
ctx, span := internal.StartSpan(ctx, "RefreshManager.queryForSelf")
defer span.End()

if err := r.runRefreshDHTQuery(ctx, string(r.dhtPeerId)); err != nil {
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to query for self, err=%s", err)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions subscriber_notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ func handlePeerChangeEvent(dht *IpfsDHT, p peer.ID) {
logger.Errorf("could not check peerstore for protocol support: err: %s", err)
return
} else if valid {
dht.peerFound(dht.ctx, p, false)
dht.peerFound(p, false)
dht.fixRTIfNeeded()
} else {
dht.peerStoppedDHT(dht.ctx, p)
dht.peerStoppedDHT(p)
}
}

Expand Down

0 comments on commit c6dea86

Please sign in to comment.