Skip to content

Commit

Permalink
feat(bitswap/client): add additional tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Jun 20, 2024
1 parent 625ba76 commit 13f0d1b
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package providerquerymanager

import (
"context"
"fmt"
"sync"
"time"

"github.com/ipfs/boxo/bitswap/client/internal"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var log = logging.Logger("bitswap")
Expand Down Expand Up @@ -39,7 +41,7 @@ type ProviderQueryNetwork interface {
}

type providerQueryMessage interface {
debugMessage() string
debugMessage()
handle(pqm *ProviderQueryManager)
}

Expand All @@ -61,6 +63,7 @@ type newProvideQueryMessage struct {
}

type cancelRequestMessage struct {
ctx context.Context
incomingProviders chan peer.ID
k cid.Cid
}
Expand Down Expand Up @@ -121,6 +124,10 @@ func (pqm *ProviderQueryManager) SetFindProviderTimeout(findProviderTimeout time
func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, k cid.Cid) <-chan peer.ID {
inProgressRequestChan := make(chan inProgressRequest)

var span trace.Span
sessionCtx, span = internal.StartSpan(sessionCtx, "ProviderQueryManager.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("cid", k)))
defer span.End()

select {
case pqm.providerQueryMessages <- &newProvideQueryMessage{
ctx: sessionCtx,
Expand Down Expand Up @@ -182,7 +189,7 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k
return
case <-sessionCtx.Done():
if incomingProviders != nil {
pqm.cancelProviderRequest(k, incomingProviders)
pqm.cancelProviderRequest(sessionCtx, k, incomingProviders)
}
return
case provider, ok := <-incomingProviders:
Expand All @@ -199,11 +206,12 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k
return returnedProviders
}

func (pqm *ProviderQueryManager) cancelProviderRequest(k cid.Cid, incomingProviders chan peer.ID) {
func (pqm *ProviderQueryManager) cancelProviderRequest(ctx context.Context, k cid.Cid, incomingProviders chan peer.ID) {
cancelMessageChannel := pqm.providerQueryMessages
for {
select {
case cancelMessageChannel <- &cancelRequestMessage{
ctx: ctx,
incomingProviders: incomingProviders,
k: k,
}:
Expand Down Expand Up @@ -235,17 +243,22 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
pqm.timeoutMutex.RLock()
findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout)
pqm.timeoutMutex.RUnlock()
span := trace.SpanFromContext(findProviderCtx)
span.AddEvent("StartFindProvidersAsync")
providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders)
wg := &sync.WaitGroup{}
for p := range providers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
span.AddEvent("FoundProvider", trace.WithAttributes(attribute.Stringer("peer", p)))
err := pqm.network.ConnectTo(findProviderCtx, p)
if err != nil {
span.RecordError(err, trace.WithAttributes(attribute.Stringer("peer", p)))
log.Debugf("failed to connect to provider %s: %s", p, err)
return
}
span.AddEvent("ConnectedToProvider", trace.WithAttributes(attribute.Stringer("peer", p)))
select {
case pqm.providerQueryMessages <- &receivedProviderMessage{
ctx: findProviderCtx,
Expand Down Expand Up @@ -326,16 +339,17 @@ func (pqm *ProviderQueryManager) run() {
for {
select {
case nextMessage := <-pqm.providerQueryMessages:
log.Debug(nextMessage.debugMessage())
nextMessage.debugMessage()
nextMessage.handle(pqm)
case <-pqm.ctx.Done():
return
}
}
}

func (rpm *receivedProviderMessage) debugMessage() string {
return fmt.Sprintf("Received provider (%s) for cid (%s)", rpm.p.String(), rpm.k.String())
func (rpm *receivedProviderMessage) debugMessage() {
log.Debugf("Received provider (%s) (%s)", rpm.p, rpm.k)
trace.SpanFromContext(rpm.ctx).AddEvent("ReceivedProvider", trace.WithAttributes(attribute.Stringer("provider", rpm.p), attribute.Stringer("cid", rpm.k)))
}

func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) {
Expand All @@ -354,8 +368,9 @@ func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) {
}
}

func (fpqm *finishedProviderQueryMessage) debugMessage() string {
return "Finished Provider Query on cid: " + fpqm.k.String()
func (fpqm *finishedProviderQueryMessage) debugMessage() {
log.Debugf("Finished Provider Query on cid: %s", fpqm.k)
trace.SpanFromContext(fpqm.ctx).AddEvent("FinishedProviderQuery", trace.WithAttributes(attribute.Stringer("cid", fpqm.k)))
}

func (fpqm *finishedProviderQueryMessage) handle(pqm *ProviderQueryManager) {
Expand All @@ -371,21 +386,28 @@ func (fpqm *finishedProviderQueryMessage) handle(pqm *ProviderQueryManager) {
requestStatus.cancelFn()
}

func (npqm *newProvideQueryMessage) debugMessage() string {
return "New Provider Query on cid: " + npqm.k.String()
func (npqm *newProvideQueryMessage) debugMessage() {
log.Debugf("New Provider Query on cid: %s", npqm.k)
trace.SpanFromContext(npqm.ctx).AddEvent("NewProvideQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k)))
}

func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
requestStatus, ok := pqm.inProgressRequestStatuses[npqm.k]
if !ok {

ctx, cancelFn := context.WithCancel(pqm.ctx)
span := trace.SpanFromContext(npqm.ctx)
span.AddEvent("NewQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k)))
ctx = trace.ContextWithSpan(ctx, span)

requestStatus = &inProgressRequestStatus{
listeners: make(map[chan peer.ID]struct{}),
ctx: ctx,
cancelFn: cancelFn,
}

pqm.inProgressRequestStatuses[npqm.k] = requestStatus

select {
case pqm.incomingFindProviderRequests <- &findProviderRequest{
k: npqm.k,
Expand All @@ -394,6 +416,8 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
case <-pqm.ctx.Done():
return
}
} else {
trace.SpanFromContext(npqm.ctx).AddEvent("JoinQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k)))
}
inProgressChan := make(chan peer.ID)
requestStatus.listeners[inProgressChan] = struct{}{}
Expand All @@ -406,8 +430,9 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
}
}

func (crm *cancelRequestMessage) debugMessage() string {
return "Cancel provider query on cid: " + crm.k.String()
func (crm *cancelRequestMessage) debugMessage() {
log.Debugf("Cancel provider query on cid: %s", crm.k)
trace.SpanFromContext(crm.ctx).AddEvent("CancelRequest", trace.WithAttributes(attribute.Stringer("cid", crm.k)))
}

func (crm *cancelRequestMessage) handle(pqm *ProviderQueryManager) {
Expand Down
21 changes: 18 additions & 3 deletions bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -301,35 +302,46 @@ func (s *Session) run(ctx context.Context) {

s.idleTick = time.NewTimer(s.initialSearchDelay)
s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
sessionSpan := trace.SpanFromContext(ctx)
for {
select {
case oper := <-s.incoming:
switch oper.op {
case opReceive:
// Received blocks
sessionSpan.AddEvent("Session.ReceiveOp")
s.handleReceive(oper.keys)
case opWant:
// Client wants blocks
sessionSpan.AddEvent("Session.WantOp")
s.wantBlocks(ctx, oper.keys)
case opCancel:
// Wants were cancelled
sessionSpan.AddEvent("Session.WantCancelOp")
s.sw.CancelPending(oper.keys)
s.sws.Cancel(oper.keys)
case opWantsSent:
// Wants were sent to a peer
sessionSpan.AddEvent("Session.WantsSentOp")
s.sw.WantsSent(oper.keys)
case opBroadcast:
// Broadcast want-haves to all peers
s.broadcast(ctx, oper.keys)
opCtx, span := internal.StartSpan(ctx, "Session.BroadcastOp")
s.broadcast(opCtx, oper.keys)
span.End()
default:
panic("unhandled operation")
}
case <-s.idleTick.C:
// The session hasn't received blocks for a while, broadcast
s.broadcast(ctx, nil)
opCtx, span := internal.StartSpan(ctx, "Session.IdleBroadcast")
s.broadcast(opCtx, nil)
span.End()
case <-s.periodicSearchTimer.C:
// Periodically search for a random live want
s.handlePeriodicSearch(ctx)
opCtx, span := internal.StartSpan(ctx, "Session.PeriodicSearch")
s.handlePeriodicSearch(opCtx)
span.End()
case baseTickDelay := <-s.tickDelayReqs:
// Set the base tick delay
s.baseTickDelay = baseTickDelay
Expand Down Expand Up @@ -392,9 +404,12 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) {
// providers for the given Cid
func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) {
go func(k cid.Cid) {
ctx, span := internal.StartSpan(ctx, "Session.FindMorePeers")
defer span.End()
for p := range s.providerFinder.FindProvidersAsync(ctx, k) {
// When a provider indicates that it has a cid, it's equivalent to
// the providing peer sending a HAVE
span.AddEvent("FoundPeer")
s.sws.Update(p, nil, []cid.Cid{c}, nil)
}
}(c)
Expand Down

0 comments on commit 13f0d1b

Please sign in to comment.