Skip to content

Commit

Permalink
fix(bitswap/client/providerquerymanager): don't end trace span until … (
Browse files Browse the repository at this point in the history
#725)

fix(bitswap/client/providerquerymanager): don't end trace span until all providers are returned
  • Loading branch information
gammazero authored Nov 21, 2024
1 parent 138b596 commit c91cc1d
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context,

var span trace.Span
sessionCtx, span = internal.StartSpan(sessionCtx, "ProviderQueryManager.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("cid", k)))
defer span.End()

select {
case pqm.providerQueryMessages <- &newProvideQueryMessage{
Expand All @@ -137,6 +136,7 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context,
case <-pqm.ctx.Done():
ch := make(chan peer.ID)
close(ch)
span.End()
return ch
case <-sessionCtx.Done():
ch := make(chan peer.ID)
Expand All @@ -152,14 +152,15 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context,
case <-pqm.ctx.Done():
ch := make(chan peer.ID)
close(ch)
span.End()
return ch
case receivedInProgressRequest = <-inProgressRequestChan:
}

return pqm.receiveProviders(sessionCtx, k, receivedInProgressRequest)
return pqm.receiveProviders(sessionCtx, k, receivedInProgressRequest, func() { span.End() })
}

func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, receivedInProgressRequest inProgressRequest) <-chan peer.ID {
func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, receivedInProgressRequest inProgressRequest, onCloseFn func()) <-chan peer.ID {
// maintains an unbuffered queue for incoming providers for given request for a given session
// essentially, as a provider comes in, for a given CID, we want to immediately broadcast to all
// sessions that queried that CID, without worrying about whether the client code is actually
Expand All @@ -171,6 +172,7 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k

go func() {
defer close(returnedProviders)
defer onCloseFn()
outgoingProviders := func() chan<- peer.ID {
if len(receivedProviders) == 0 {
return nil
Expand Down

0 comments on commit c91cc1d

Please sign in to comment.