Skip to content

Commit

Permalink
feat: rework tracing a bit
Browse files Browse the repository at this point in the history
Removed noisy traces and added tracing to RtRefreshManager
  • Loading branch information
Jorropo committed Mar 29, 2023
1 parent b07488e commit 9605ac9
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 88 deletions.
17 changes: 6 additions & 11 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.ctx, p, false)
dht.peerFound(p, false)
}
dht.plk.Unlock()

Expand Down Expand Up @@ -482,7 +482,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
// we try to add all peers we are connected to to the Routing Table
// in case they aren't already there.
for _, p := range dht.host.Network().Peers() {
dht.peerFound(ctx, p, false)
dht.peerFound(p, false)
}

// TODO Active Bootstrapping
Expand Down Expand Up @@ -648,9 +648,7 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//
// Do Nothing.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.PeerFound", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()
func (dht *IpfsDHT) peerFound(p peer.ID, queryPeer bool) {

if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
Expand All @@ -661,17 +659,14 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
} else if b {
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-ctx.Done():
case <-dht.ctx.Done():
return
}
}
}

// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
_, span := internal.StartSpan(ctx, "IpfsDHT.PeerStoppedDHT", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()

func (dht *IpfsDHT) peerStoppedDHT(p peer.ID) {
logger.Debugw("peer stopped dht", "peer", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Expand Down Expand Up @@ -838,7 +833,7 @@ func (dht *IpfsDHT) Host() host.Host {

// Ping sends a ping message to the passed peer and waits for a response.
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.Ping")
ctx, span := internal.StartSpan(ctx, "IpfsDHT.Ping", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()
return dht.protoMessenger.Ping(ctx, p)
}
Expand Down
4 changes: 2 additions & 2 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func TestBootstrappersReplacable(t *testing.T) {
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(ctx, d5.self, true)
d.peerFound(ctx, d1.self, true)
d.peerFound(d5.self, true)
d.peerFound(d1.self, true)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
Expand Down
2 changes: 1 addition & 1 deletion dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
}

// a peer has queried us, let's add it to RT
dht.peerFound(dht.ctx, mPeer, true)
dht.peerFound(mPeer, true)

if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
Expand Down
6 changes: 3 additions & 3 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func TestNotFound(t *testing.T) {
}

for _, p := range hosts {
d.peerFound(ctx, p.ID(), true)
d.peerFound(p.ID(), true)
}

// long timeout to ensure timing is not at play.
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestLessThanKResponses(t *testing.T) {
}

for i := 1; i < 5; i++ {
d.peerFound(ctx, hosts[i].ID(), true)
d.peerFound(hosts[i].ID(), true)
}

// Reply with random peers to every message
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}

d.peerFound(ctx, hosts[1].ID(), true)
d.peerFound(hosts[1].ID(), true)

for _, proto := range d.serverProtocols {
// It would be nice to be able to just get a value and succeed but then
Expand Down
24 changes: 13 additions & 11 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,12 @@ func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Opt
// SearchValue searches for the value corresponding to given Key and streams the results.
func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(attribute.String("Key", key)))
defer span.End()
var good bool
defer func() {
if !good {
span.End()
}
}()

if !dht.enableValues {
return nil, routing.ErrNotSupported
Expand All @@ -570,10 +575,10 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.
valCh, lookupRes := dht.getValues(ctx, key, stopCh)

out := make(chan []byte)
good = true
go func() {
defer close(out)
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue.Worker")
defer span.End()
defer close(out)

best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
if best == nil || aborted {
Expand Down Expand Up @@ -1190,9 +1195,6 @@ func divideByChunkSize(keys []peer.ID, chunkSize int) [][]peer.ID {

// FindProviders searches until the context expires.
func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
ctx, span := internal.StartSpan(ctx, "FullRT.FindProviders", trace.WithAttributes(attribute.Stringer("Key", c)))
defer span.End()

if !dht.enableProviders {
return nil, routing.ErrNotSupported
} else if !c.Defined() {
Expand All @@ -1212,9 +1214,6 @@ func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInf
// completes. Note: not reading from the returned channel may block the query
// from progressing.
func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("Key", key), attribute.Int("Count", count)))
defer span.End()

if !dht.enableProviders || !key.Defined() {
peerOut := make(chan peer.AddrInfo)
close(peerOut)
Expand All @@ -1235,10 +1234,11 @@ func (dht *FullRT) FindProvidersAsync(ctx context.Context, key cid.Cid, count in
}

func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
defer close(peerOut)
ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRouting")
ctx, span := internal.StartSpan(ctx, "FullRT.FindProvidersAsyncRoutine", trace.WithAttributes(attribute.Stringer("Key", key)))
defer span.End()

defer close(peerOut)

findAll := count == 0
ps := make(map[peer.ID]struct{})
psLock := &sync.Mutex{}
Expand Down Expand Up @@ -1267,6 +1267,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
if psTryAdd(p.ID) {
select {
case peerOut <- p:
span.AddEvent("found provider", attribute.Stringer("peer", p))
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -1309,6 +1310,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
logger.Debugf("using provider: %s", prov)
select {
case peerOut <- *prov:
span.AddEvent("found provider", attribute.Stringer("peer", prov.ID))
case <-ctx.Done():
logger.Debug("context timed out sending more providers")
return ctx.Err()
Expand Down
2 changes: 1 addition & 1 deletion handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func BenchmarkHandleFindPeer(b *testing.B) {
panic(err)
}

d.peerFound(ctx, id, true)
d.peerFound(id, true)

peers = append(peers, id)
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2000+i))
Expand Down
6 changes: 3 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
if err := q.dht.dialPeer(dialCtx, p); err != nil {
// remove the peer if there was a dial failure..but not because of a context cancellation
if dialCtx.Err() == nil {
q.dht.peerStoppedDHT(q.dht.ctx, p)
q.dht.peerStoppedDHT(p)
}
ch <- &queryUpdate{cause: p, unreachable: []peer.ID{p}}
return
Expand All @@ -433,7 +433,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
newPeers, err := q.queryFn(queryCtx, p)
if err != nil {
if queryCtx.Err() == nil {
q.dht.peerStoppedDHT(q.dht.ctx, p)
q.dht.peerStoppedDHT(p)
}
ch <- &queryUpdate{cause: p, unreachable: []peer.ID{p}}
return
Expand All @@ -442,7 +442,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
queryDuration := time.Since(startQuery)

// query successful, try to add to RT
q.dht.peerFound(q.dht.ctx, p, true)
q.dht.peerFound(p, true)

// process new peers
saw := []peer.ID{}
Expand Down
28 changes: 14 additions & 14 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,13 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op

// SearchValue searches for the value corresponding to given Key and streams the results.
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue")
defer span.End()
ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue", trace.WithAttributes(attribute.String("Key", key)))
var good bool
defer func() {
if !good {
span.End()
}
}()

if !dht.enableValues {
return nil, routing.ErrNotSupported
Expand All @@ -164,10 +169,9 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing
valCh, lookupRes := dht.getValues(ctx, key, stopCh)

out := make(chan []byte)
good = true
go func() {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue.Worker")
defer span.End()

defer close(out)
best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
if best == nil || aborted {
Expand Down Expand Up @@ -460,9 +464,6 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err

// FindProviders searches until the context expires.
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProviders")
defer span.End()

if !dht.enableProviders {
return nil, routing.ErrNotSupported
} else if !c.Defined() {
Expand All @@ -482,9 +483,6 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn
// completes. Note: not reading from the returned channel may block the query
// from progressing.
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProvidersAsync")
defer span.End()

if !dht.enableProviders || !key.Defined() {
peerOut := make(chan peer.AddrInfo)
close(peerOut)
Expand All @@ -505,11 +503,11 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
}

func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
defer close(peerOut)

ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProvidersAsyncRoutine")
ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindProvidersAsyncRoutine", trace.WithAttributes(attribute.Stringer("Key", key)))
defer span.End()

defer close(peerOut)

findAll := count == 0

ps := make(map[peer.ID]peer.AddrInfo)
Expand Down Expand Up @@ -539,6 +537,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
if psTryAdd(p) {
select {
case peerOut <- p:
span.AddEvent("found provider", attribute.Stringer("peer", p.ID))
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -574,6 +573,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
logger.Debugf("using provider: %s", prov)
select {
case peerOut <- *prov:
span.AddEvent("found provider", attribute.Stringer("peer", prov.ID))
case <-ctx.Done():
logger.Debug("context timed out sending more providers")
return nil, ctx.Err()
Expand Down Expand Up @@ -608,7 +608,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash

// FindPeer searches for a peer with given ID.
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindPeer")
ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindPeer", trace.WithAttributes(attribute.Stringer("PeerID", id)))
defer span.End()

if err := id.Validate(); err != nil {
Expand Down
Loading

0 comments on commit 9605ac9

Please sign in to comment.