Skip to content

Commit

Permalink
feat: rework tracing a bit
Browse files Browse the repository at this point in the history
Removed noisy traces and added tracing to RtRefreshManager
  • Loading branch information
Jorropo committed Mar 29, 2023
1 parent b07488e commit a75a333
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 94 deletions.
17 changes: 6 additions & 11 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.ctx, p, false)
dht.peerFound(p, false)
}
dht.plk.Unlock()

Expand Down Expand Up @@ -482,7 +482,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
// we try to add all peers we are connected to to the Routing Table
// in case they aren't already there.
for _, p := range dht.host.Network().Peers() {
dht.peerFound(ctx, p, false)
dht.peerFound(p, false)
}

// TODO Active Bootstrapping
Expand Down Expand Up @@ -648,9 +648,7 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//
// Do Nothing.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.PeerFound", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()
func (dht *IpfsDHT) peerFound(p peer.ID, queryPeer bool) {

if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
Expand All @@ -661,17 +659,14 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
} else if b {
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-ctx.Done():
case <-dht.ctx.Done():
return
}
}
}

// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
_, span := internal.StartSpan(ctx, "IpfsDHT.PeerStoppedDHT", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()

func (dht *IpfsDHT) peerStoppedDHT(p peer.ID) {
logger.Debugw("peer stopped dht", "peer", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Expand Down Expand Up @@ -838,7 +833,7 @@ func (dht *IpfsDHT) Host() host.Host {

// Ping sends a ping message to the passed peer and waits for a response.
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.Ping")
ctx, span := internal.StartSpan(ctx, "IpfsDHT.Ping", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()
return dht.protoMessenger.Ping(ctx, p)
}
Expand Down
4 changes: 2 additions & 2 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func TestBootstrappersReplacable(t *testing.T) {
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(ctx, d5.self, true)
d.peerFound(ctx, d1.self, true)
d.peerFound(d5.self, true)
d.peerFound(d1.self, true)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
Expand Down
2 changes: 1 addition & 1 deletion dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
}

// a peer has queried us, let's add it to RT
dht.peerFound(dht.ctx, mPeer, true)
dht.peerFound(mPeer, true)

if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
Expand Down
6 changes: 3 additions & 3 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func TestNotFound(t *testing.T) {
}

for _, p := range hosts {
d.peerFound(ctx, p.ID(), true)
d.peerFound(p.ID(), true)
}

// long timeout to ensure timing is not at play.
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestLessThanKResponses(t *testing.T) {
}

for i := 1; i < 5; i++ {
d.peerFound(ctx, hosts[i].ID(), true)
d.peerFound(hosts[i].ID(), true)
}

// Reply with random peers to every message
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}

d.peerFound(ctx, hosts[1].ID(), true)
d.peerFound(hosts[1].ID(), true)

for _, proto := range d.serverProtocols {
// It would be nice to be able to just get a value and succeed but then
Expand Down
46 changes: 33 additions & 13 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
kadkey "github.com/libp2p/go-libp2p-xor/key"
"github.com/libp2p/go-libp2p-xor/trie"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -92,6 +93,8 @@ type FullRT struct {
timeoutPerOp time.Duration

bulkSendParallelism int

self peer.ID
}

// NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network,
Expand Down Expand Up @@ -147,7 +150,8 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful

ctx, cancel := context.WithCancel(context.Background())

pm, err := providers.NewProviderManager(ctx, h.ID(), h.Peerstore(), dhtcfg.Datastore, fullrtcfg.pmOpts...)
self := h.ID()
pm, err := providers.NewProviderManager(ctx, self, h.Peerstore(), dhtcfg.Datastore, fullrtcfg.pmOpts...)
if err != nil {
cancel()
return nil, err
Expand Down Expand Up @@ -189,6 +193,8 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
crawlerInterval: fullrtcfg.crawlInterval,

bulkSendParallelism: fullrtcfg.bulkSendParallelism,

self: self,
}

rt.wg.Add(1)
Expand Down Expand Up @@ -550,7 +556,12 @@ func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Opt
// SearchValue searches for the value corresponding to given Key and streams the results.
func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(attribute.String("Key", key)))
defer span.End()
var good bool
defer func() {
if !good {
span.End()
}
}()

if !dht.enableValues {
return nil, routing.ErrNotSupported
Expand All @@ -570,10 +581,10 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.
valCh, lookupRes := dht.getValues(ctx, key, stopCh)

out := make(chan []byte)
good = true
go func() {
defer close(out)
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue.Worker")
defer span.End()
defer close(out)

best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
if best == nil || aborted {
Expand Down Expand Up @@ -1190,9 +1201,6 @@ func divideByChunkSize(keys []peer.ID, chunkSize int) [][]peer.ID {

// FindProviders searches until the context expires.
func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
ctx, span := internal.StartSpan(ctx, "FullRT.FindProviders", trace.WithAttributes(attribute.Stringer("Key", c)))
defer span.End()

if !dht.enableProviders {
return nil, routing.ErrNotSupported
} else if !c.Defined() {
Expand All @@ -1212,9 +1220,6 @@ func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInf
// completes. Note: not reading from the returned channel may block the query
// from progressing.
func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("Key", key), attribute.Int("Count", count)))
defer span.End()

if !dht.enableProviders || !key.Defined() {
peerOut := make(chan peer.AddrInfo)
close(peerOut)
Expand All @@ -1235,10 +1240,11 @@ func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count in
}

func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
defer close(peerOut)
ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRouting")
ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRoutine", trace.WithAttributes(attribute.Stringer("Key", key)))
defer span.End()

defer close(peerOut)

findAll := count == 0
ps := make(map[peer.ID]struct{})
psLock := &sync.Mutex{}
Expand Down Expand Up @@ -1267,6 +1273,10 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
if psTryAdd(p.ID) {
select {
case peerOut <- p:
span.AddEvent("found provider", trace.WithAttributes(
attribute.Stringer("peer", p.ID),
attribute.Stringer("from", dht.self),
))
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -1294,10 +1304,16 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
ID: p,
})

provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key)
mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetProviders", trace.WithAttributes(attribute.Stringer("peer", p)))
provs, closest, err := dht.protoMessenger.GetProviders(mctx, p, key)
if err != nil {
if mspan.IsRecording() {
mspan.SetStatus(codes.Error, err.Error())
}
mspan.End()
return err
}
mspan.End()

logger.Debugf("%d provider entries", len(provs))

Expand All @@ -1309,6 +1325,10 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
logger.Debugf("using provider: %s", prov)
select {
case peerOut <- *prov:
span.AddEvent("found provider", trace.WithAttributes(
attribute.Stringer("peer", prov.ID),
attribute.Stringer("from", p),
))
case <-ctx.Done():
logger.Debug("context timed out sending more providers")
return ctx.Err()
Expand Down
2 changes: 1 addition & 1 deletion handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func BenchmarkHandleFindPeer(b *testing.B) {
panic(err)
}

d.peerFound(ctx, id, true)
d.peerFound(id, true)

peers = append(peers, id)
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2000+i))
Expand Down
9 changes: 8 additions & 1 deletion lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -34,11 +35,17 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID,
ID: p,
})

peers, err := dht.protoMessenger.GetClosestPeers(ctx, p, peer.ID(key))
mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetClosestPeers", trace.WithAttributes(attribute.Stringer("peer", p)))
peers, err := dht.protoMessenger.GetClosestPeers(mctx, p, peer.ID(key))
if err != nil {
if mspan.IsRecording() {
mspan.SetStatus(codes.Error, err.Error())
}
mspan.End()
logger.Debugf("error getting closer peers: %s", err)
return nil, err
}
mspan.End()

// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Expand Down
6 changes: 3 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
if err := q.dht.dialPeer(dialCtx, p); err != nil {
// remove the peer if there was a dial failure..but not because of a context cancellation
if dialCtx.Err() == nil {
q.dht.peerStoppedDHT(q.dht.ctx, p)
q.dht.peerStoppedDHT(p)
}
ch <- &queryUpdate{cause: p, unreachable: []peer.ID{p}}
return
Expand All @@ -433,7 +433,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
newPeers, err := q.queryFn(queryCtx, p)
if err != nil {
if queryCtx.Err() == nil {
q.dht.peerStoppedDHT(q.dht.ctx, p)
q.dht.peerStoppedDHT(p)
}
ch <- &queryUpdate{cause: p, unreachable: []peer.ID{p}}
return
Expand All @@ -442,7 +442,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
queryDuration := time.Since(startQuery)

// query successful, try to add to RT
q.dht.peerFound(q.dht.ctx, p, true)
q.dht.peerFound(p, true)

// process new peers
saw := []peer.ID{}
Expand Down
Loading

0 comments on commit a75a333

Please sign in to comment.