diff --git a/bitswap/client/client.go b/bitswap/client/client.go index f01851bee..301ca7558 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -13,7 +13,7 @@ import ( bsmq "github.com/ipfs/boxo/bitswap/client/internal/messagequeue" "github.com/ipfs/boxo/bitswap/client/internal/notifications" bspm "github.com/ipfs/boxo/bitswap/client/internal/peermanager" - bspqm "github.com/ipfs/boxo/bitswap/client/internal/providerquerymanager" + "github.com/ipfs/boxo/bitswap/client/internal/session" bssession "github.com/ipfs/boxo/bitswap/client/internal/session" bssim "github.com/ipfs/boxo/bitswap/client/internal/sessioninterestmanager" bssm "github.com/ipfs/boxo/bitswap/client/internal/sessionmanager" @@ -99,7 +99,7 @@ func WithoutDuplicatedBlockStats() Option { } } -type ContentSearcher = bspqm.ContentRouter +type ContentSearcher = session.ProviderFinder // WithContentSearch allows the client to search for providers when it is not // able to find the content itself. @@ -155,11 +155,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore option(bs) } - if bs.router != nil { - bs.pqm = bspqm.New(ctx, network, bs.router) - bs.pqm.Startup() - } - // bind the context and process. // do it over here to avoid closing before all setup is done. go func() { @@ -179,10 +174,6 @@ type Client struct { pm *bspm.PeerManager - // the provider query manager manages requests to find providers - // is nil if content routing is disabled - pqm *bspqm.ProviderQueryManager - // network delivers messages on behalf of the session network bsnet.BitSwapNetwork @@ -244,13 +235,7 @@ func (bs *Client) sessionFactory( rebroadcastDelay delay.D, self peer.ID, ) bssm.Session { - // avoid typed nils - var pqm bssession.ProviderFinder - if bs.pqm != nil { - pqm = bs.pqm - } - - return bssession.New(sessctx, sessmgr, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) + return bssession.New(sessctx, sessmgr, id, spm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, bs.router, bs.network) } // onDontHaveTimeout is called when a want-block is sent to a peer that diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go deleted file mode 100644 index 7c9b6bb22..000000000 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ /dev/null @@ -1,440 +0,0 @@ -package providerquerymanager - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - peer "github.com/libp2p/go-libp2p/core/peer" -) - -var log = logging.Logger("bitswap") - -const ( - maxProviders = 10 - maxInProcessRequests = 6 - defaultTimeout = 10 * time.Second -) - -type inProgressRequestStatus struct { - ctx context.Context - cancelFn func() - providersSoFar []peer.ID - listeners map[chan peer.ID]struct{} -} - -type findProviderRequest struct { - k cid.Cid - ctx context.Context -} - -// ProviderQueryNetwork is an interface for finding providers and connecting to -// peers. -type ProviderQueryNetwork interface { - Self() peer.ID - ConnectTo(context.Context, peer.AddrInfo) error -} - -type ContentRouter interface { - FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.AddrInfo -} - -type providerQueryMessage interface { - debugMessage() string - handle(pqm *ProviderQueryManager) -} - -type receivedProviderMessage struct { - ctx context.Context - k cid.Cid - p peer.ID -} - -type finishedProviderQueryMessage struct { - ctx context.Context - k cid.Cid -} - -type newProvideQueryMessage struct { - ctx context.Context - k cid.Cid - inProgressRequestChan chan<- inProgressRequest -} - -type cancelRequestMessage struct { - incomingProviders chan peer.ID - k cid.Cid -} - -// ProviderQueryManager manages requests to find more providers for blocks -// for bitswap sessions. It's main goals are to: -// - rate limit requests -- don't have too many find provider calls running -// simultaneously -// - connect to found peers and filter them if it can't connect -// - ensure two findprovider calls for the same block don't run concurrently -// - manage timeouts -type ProviderQueryManager struct { - ctx context.Context - network ProviderQueryNetwork - router ContentRouter - providerQueryMessages chan providerQueryMessage - providerRequestsProcessing chan *findProviderRequest - incomingFindProviderRequests chan *findProviderRequest - - findProviderTimeout time.Duration - timeoutMutex sync.RWMutex - - // do not touch outside the run loop - inProgressRequestStatuses map[cid.Cid]*inProgressRequestStatus -} - -// New initializes a new ProviderQueryManager for a given context and a given -// network provider. -func New(ctx context.Context, network ProviderQueryNetwork, router ContentRouter) *ProviderQueryManager { - return &ProviderQueryManager{ - ctx: ctx, - network: network, - router: router, - providerQueryMessages: make(chan providerQueryMessage, 16), - providerRequestsProcessing: make(chan *findProviderRequest), - incomingFindProviderRequests: make(chan *findProviderRequest), - inProgressRequestStatuses: make(map[cid.Cid]*inProgressRequestStatus), - findProviderTimeout: defaultTimeout, - } -} - -// Startup starts processing for the ProviderQueryManager. -func (pqm *ProviderQueryManager) Startup() { - go pqm.run() -} - -type inProgressRequest struct { - providersSoFar []peer.ID - incoming chan peer.ID -} - -// SetFindProviderTimeout changes the timeout for finding providers -func (pqm *ProviderQueryManager) SetFindProviderTimeout(findProviderTimeout time.Duration) { - pqm.timeoutMutex.Lock() - pqm.findProviderTimeout = findProviderTimeout - pqm.timeoutMutex.Unlock() -} - -// FindProvidersAsync finds providers for the given block. -func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, k cid.Cid) <-chan peer.ID { - inProgressRequestChan := make(chan inProgressRequest) - - select { - case pqm.providerQueryMessages <- &newProvideQueryMessage{ - ctx: sessionCtx, - k: k, - inProgressRequestChan: inProgressRequestChan, - }: - case <-pqm.ctx.Done(): - ch := make(chan peer.ID) - close(ch) - return ch - case <-sessionCtx.Done(): - ch := make(chan peer.ID) - close(ch) - return ch - } - - // DO NOT select on sessionCtx. We only want to abort here if we're - // shutting down because we can't actually _cancel_ the request till we - // get to receiveProviders. - var receivedInProgressRequest inProgressRequest - select { - case <-pqm.ctx.Done(): - ch := make(chan peer.ID) - close(ch) - return ch - case receivedInProgressRequest = <-inProgressRequestChan: - } - - return pqm.receiveProviders(sessionCtx, k, receivedInProgressRequest) -} - -func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, receivedInProgressRequest inProgressRequest) <-chan peer.ID { - // maintains an unbuffered queue for incoming providers for given request for a given session - // essentially, as a provider comes in, for a given CID, we want to immediately broadcast to all - // sessions that queried that CID, without worrying about whether the client code is actually - // reading from the returned channel -- so that the broadcast never blocks - // based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd - returnedProviders := make(chan peer.ID) - receivedProviders := append([]peer.ID(nil), receivedInProgressRequest.providersSoFar[0:]...) - incomingProviders := receivedInProgressRequest.incoming - - go func() { - defer close(returnedProviders) - outgoingProviders := func() chan<- peer.ID { - if len(receivedProviders) == 0 { - return nil - } - return returnedProviders - } - nextProvider := func() peer.ID { - if len(receivedProviders) == 0 { - return "" - } - return receivedProviders[0] - } - for len(receivedProviders) > 0 || incomingProviders != nil { - select { - case <-pqm.ctx.Done(): - return - case <-sessionCtx.Done(): - if incomingProviders != nil { - pqm.cancelProviderRequest(k, incomingProviders) - } - return - case provider, ok := <-incomingProviders: - if !ok { - incomingProviders = nil - } else { - receivedProviders = append(receivedProviders, provider) - } - case outgoingProviders() <- nextProvider(): - receivedProviders = receivedProviders[1:] - } - } - }() - return returnedProviders -} - -func (pqm *ProviderQueryManager) cancelProviderRequest(k cid.Cid, incomingProviders chan peer.ID) { - cancelMessageChannel := pqm.providerQueryMessages - for { - select { - case cancelMessageChannel <- &cancelRequestMessage{ - incomingProviders: incomingProviders, - k: k, - }: - cancelMessageChannel = nil - // clear out any remaining providers, in case and "incoming provider" - // messages get processed before our cancel message - case _, ok := <-incomingProviders: - if !ok { - return - } - case <-pqm.ctx.Done(): - return - } - } -} - -func (pqm *ProviderQueryManager) findProviderWorker() { - // findProviderWorker just cycles through incoming provider queries one - // at a time. We have six of these workers running at once - // to let requests go in parallel but keep them rate limited - for { - select { - case fpr, ok := <-pqm.providerRequestsProcessing: - if !ok { - return - } - k := fpr.k - log.Debugf("Beginning Find Provider Request for cid: %s", k.String()) - pqm.timeoutMutex.RLock() - findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout) - pqm.timeoutMutex.RUnlock() - providers := pqm.router.FindProvidersAsync(findProviderCtx, k, maxProviders) - wg := &sync.WaitGroup{} - for p := range providers { - if p.ID == pqm.network.Self() { - continue // ignore self as provider - } - - wg.Add(1) - go func(p peer.AddrInfo) { - defer wg.Done() - err := pqm.network.ConnectTo(findProviderCtx, p) - if err != nil { - log.Debugf("failed to connect to provider %s: %s", p, err) - return - } - select { - case pqm.providerQueryMessages <- &receivedProviderMessage{ - ctx: findProviderCtx, - k: k, - p: p.ID, - }: - case <-pqm.ctx.Done(): - return - } - }(p) - } - wg.Wait() - cancel() - select { - case pqm.providerQueryMessages <- &finishedProviderQueryMessage{ - ctx: findProviderCtx, - k: k, - }: - case <-pqm.ctx.Done(): - } - case <-pqm.ctx.Done(): - return - } - } -} - -func (pqm *ProviderQueryManager) providerRequestBufferWorker() { - // the provider request buffer worker just maintains an unbounded - // buffer for incoming provider queries and dispatches to the find - // provider workers as they become available - // based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd - var providerQueryRequestBuffer []*findProviderRequest - nextProviderQuery := func() *findProviderRequest { - if len(providerQueryRequestBuffer) == 0 { - return nil - } - return providerQueryRequestBuffer[0] - } - outgoingRequests := func() chan<- *findProviderRequest { - if len(providerQueryRequestBuffer) == 0 { - return nil - } - return pqm.providerRequestsProcessing - } - - for { - select { - case incomingRequest, ok := <-pqm.incomingFindProviderRequests: - if !ok { - return - } - providerQueryRequestBuffer = append(providerQueryRequestBuffer, incomingRequest) - case outgoingRequests() <- nextProviderQuery(): - providerQueryRequestBuffer = providerQueryRequestBuffer[1:] - case <-pqm.ctx.Done(): - return - } - } -} - -func (pqm *ProviderQueryManager) cleanupInProcessRequests() { - for _, requestStatus := range pqm.inProgressRequestStatuses { - for listener := range requestStatus.listeners { - close(listener) - } - requestStatus.cancelFn() - } -} - -func (pqm *ProviderQueryManager) run() { - defer pqm.cleanupInProcessRequests() - - go pqm.providerRequestBufferWorker() - for i := 0; i < maxInProcessRequests; i++ { - go pqm.findProviderWorker() - } - - for { - select { - case nextMessage := <-pqm.providerQueryMessages: - log.Debug(nextMessage.debugMessage()) - nextMessage.handle(pqm) - case <-pqm.ctx.Done(): - return - } - } -} - -func (rpm *receivedProviderMessage) debugMessage() string { - return fmt.Sprintf("Received provider (%s) for cid (%s)", rpm.p.String(), rpm.k.String()) -} - -func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) { - requestStatus, ok := pqm.inProgressRequestStatuses[rpm.k] - if !ok { - log.Errorf("Received provider (%s) for cid (%s) not requested", rpm.p.String(), rpm.k.String()) - return - } - requestStatus.providersSoFar = append(requestStatus.providersSoFar, rpm.p) - for listener := range requestStatus.listeners { - select { - case listener <- rpm.p: - case <-pqm.ctx.Done(): - return - } - } -} - -func (fpqm *finishedProviderQueryMessage) debugMessage() string { - return "Finished Provider Query on cid: " + fpqm.k.String() -} - -func (fpqm *finishedProviderQueryMessage) handle(pqm *ProviderQueryManager) { - requestStatus, ok := pqm.inProgressRequestStatuses[fpqm.k] - if !ok { - // we canceled the request as it finished. - return - } - for listener := range requestStatus.listeners { - close(listener) - } - delete(pqm.inProgressRequestStatuses, fpqm.k) - requestStatus.cancelFn() -} - -func (npqm *newProvideQueryMessage) debugMessage() string { - return "New Provider Query on cid: " + npqm.k.String() -} - -func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { - requestStatus, ok := pqm.inProgressRequestStatuses[npqm.k] - if !ok { - - ctx, cancelFn := context.WithCancel(pqm.ctx) - requestStatus = &inProgressRequestStatus{ - listeners: make(map[chan peer.ID]struct{}), - ctx: ctx, - cancelFn: cancelFn, - } - pqm.inProgressRequestStatuses[npqm.k] = requestStatus - select { - case pqm.incomingFindProviderRequests <- &findProviderRequest{ - k: npqm.k, - ctx: ctx, - }: - case <-pqm.ctx.Done(): - return - } - } - inProgressChan := make(chan peer.ID) - requestStatus.listeners[inProgressChan] = struct{}{} - select { - case npqm.inProgressRequestChan <- inProgressRequest{ - providersSoFar: requestStatus.providersSoFar, - incoming: inProgressChan, - }: - case <-pqm.ctx.Done(): - } -} - -func (crm *cancelRequestMessage) debugMessage() string { - return "Cancel provider query on cid: " + crm.k.String() -} - -func (crm *cancelRequestMessage) handle(pqm *ProviderQueryManager) { - requestStatus, ok := pqm.inProgressRequestStatuses[crm.k] - if !ok { - // Request finished while queued. - return - } - _, ok = requestStatus.listeners[crm.incomingProviders] - if !ok { - // Request finished and _restarted_ while queued. - return - } - delete(requestStatus.listeners, crm.incomingProviders) - close(crm.incomingProviders) - if len(requestStatus.listeners) == 0 { - delete(pqm.inProgressRequestStatuses, crm.k) - requestStatus.cancelFn() - } -} diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go b/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go deleted file mode 100644 index 518a904ed..000000000 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go +++ /dev/null @@ -1,372 +0,0 @@ -package providerquerymanager - -import ( - "context" - "errors" - "reflect" - "sync" - "testing" - "time" - - "github.com/ipfs/boxo/bitswap/internal/testutil" - cid "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/peer" -) - -type fakeProviderNetwork struct { - self peer.ID - peersFound []peer.ID - connectError error - delay time.Duration - connectDelay time.Duration - queriesMadeMutex sync.RWMutex - queriesMade int - liveQueries int -} - -func (fpn *fakeProviderNetwork) Self() peer.ID { - return fpn.self -} - -func (fpn *fakeProviderNetwork) ConnectTo(context.Context, peer.AddrInfo) error { - time.Sleep(fpn.connectDelay) - return fpn.connectError -} - -func (fpn *fakeProviderNetwork) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo { - fpn.queriesMadeMutex.Lock() - fpn.queriesMade++ - fpn.liveQueries++ - fpn.queriesMadeMutex.Unlock() - incomingPeers := make(chan peer.AddrInfo) - go func() { - defer close(incomingPeers) - for _, p := range fpn.peersFound { - time.Sleep(fpn.delay) - select { - case <-ctx.Done(): - return - default: - } - select { - case incomingPeers <- peer.AddrInfo{ID: p}: - case <-ctx.Done(): - return - } - } - fpn.queriesMadeMutex.Lock() - fpn.liveQueries-- - fpn.queriesMadeMutex.Unlock() - }() - - return incomingPeers -} - -func TestNormalSimultaneousFetch(t *testing.T) { - peers := testutil.GeneratePeers(10) - fpn := &fakeProviderNetwork{ - peersFound: peers, - delay: 1 * time.Millisecond, - } - ctx := context.Background() - providerQueryManager := New(ctx, fpn, fpn) - providerQueryManager.Startup() - keys := testutil.GenerateCids(2) - - sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0]) - secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[1]) - - var firstPeersReceived []peer.ID - for p := range firstRequestChan { - firstPeersReceived = append(firstPeersReceived, p) - } - - var secondPeersReceived []peer.ID - for p := range secondRequestChan { - secondPeersReceived = append(secondPeersReceived, p) - } - - if len(firstPeersReceived) != len(peers) || len(secondPeersReceived) != len(peers) { - t.Fatal("Did not collect all peers for request that was completed") - } - - fpn.queriesMadeMutex.Lock() - defer fpn.queriesMadeMutex.Unlock() - if fpn.queriesMade != 2 { - t.Fatal("Did not dedup provider requests running simultaneously") - } -} - -func TestDedupingProviderRequests(t *testing.T) { - peers := testutil.GeneratePeers(10) - fpn := &fakeProviderNetwork{ - peersFound: peers, - delay: 1 * time.Millisecond, - } - ctx := context.Background() - providerQueryManager := New(ctx, fpn, fpn) - providerQueryManager.Startup() - key := testutil.GenerateCids(1)[0] - - sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key) - secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key) - - var firstPeersReceived []peer.ID - for p := range firstRequestChan { - firstPeersReceived = append(firstPeersReceived, p) - } - - var secondPeersReceived []peer.ID - for p := range secondRequestChan { - secondPeersReceived = append(secondPeersReceived, p) - } - - if len(firstPeersReceived) != len(peers) || len(secondPeersReceived) != len(peers) { - t.Fatal("Did not collect all peers for request that was completed") - } - - if !reflect.DeepEqual(firstPeersReceived, secondPeersReceived) { - t.Fatal("Did not receive the same response to both find provider requests") - } - fpn.queriesMadeMutex.Lock() - defer fpn.queriesMadeMutex.Unlock() - if fpn.queriesMade != 1 { - t.Fatal("Did not dedup provider requests running simultaneously") - } -} - -func TestCancelOneRequestDoesNotTerminateAnother(t *testing.T) { - peers := testutil.GeneratePeers(10) - fpn := &fakeProviderNetwork{ - peersFound: peers, - delay: 1 * time.Millisecond, - } - ctx := context.Background() - providerQueryManager := New(ctx, fpn, fpn) - providerQueryManager.Startup() - - key := testutil.GenerateCids(1)[0] - - // first session will cancel before done - firstSessionCtx, firstCancel := context.WithTimeout(ctx, 3*time.Millisecond) - defer firstCancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(firstSessionCtx, key) - secondSessionCtx, secondCancel := context.WithTimeout(ctx, 5*time.Second) - defer secondCancel() - secondRequestChan := providerQueryManager.FindProvidersAsync(secondSessionCtx, key) - - var firstPeersReceived []peer.ID - for p := range firstRequestChan { - firstPeersReceived = append(firstPeersReceived, p) - } - - var secondPeersReceived []peer.ID - for p := range secondRequestChan { - secondPeersReceived = append(secondPeersReceived, p) - } - - if len(secondPeersReceived) != len(peers) { - t.Fatal("Did not collect all peers for request that was completed") - } - - if len(firstPeersReceived) >= len(peers) { - t.Fatal("Collected all peers on cancelled peer, should have been cancelled immediately") - } - fpn.queriesMadeMutex.Lock() - defer fpn.queriesMadeMutex.Unlock() - if fpn.queriesMade != 1 { - t.Fatal("Did not dedup provider requests running simultaneously") - } -} - -func TestCancelManagerExitsGracefully(t *testing.T) { - peers := testutil.GeneratePeers(10) - fpn := &fakeProviderNetwork{ - peersFound: peers, - delay: 1 * time.Millisecond, - } - ctx := context.Background() - managerCtx, managerCancel := context.WithTimeout(ctx, 5*time.Millisecond) - defer managerCancel() - providerQueryManager := New(managerCtx, fpn, fpn) - providerQueryManager.Startup() - - key := testutil.GenerateCids(1)[0] - - sessionCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) - defer cancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key) - secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key) - - var firstPeersReceived []peer.ID - for p := range firstRequestChan { - firstPeersReceived = append(firstPeersReceived, p) - } - - var secondPeersReceived []peer.ID - for p := range secondRequestChan { - secondPeersReceived = append(secondPeersReceived, p) - } - - if len(firstPeersReceived) >= len(peers) || - len(secondPeersReceived) >= len(peers) { - t.Fatal("Did not cancel requests in progress correctly") - } -} - -func TestPeersWithConnectionErrorsNotAddedToPeerList(t *testing.T) { - peers := testutil.GeneratePeers(10) - fpn := &fakeProviderNetwork{ - peersFound: peers, - connectError: errors.New("not able to connect"), - delay: 1 * time.Millisecond, - } - ctx := context.Background() - providerQueryManager := New(ctx, fpn, fpn) - providerQueryManager.Startup() - - key := testutil.GenerateCids(1)[0] - - sessionCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) - defer cancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key) - secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key) - - var firstPeersReceived []peer.ID - for p := range firstRequestChan { - firstPeersReceived = append(firstPeersReceived, p) - } - - var secondPeersReceived []peer.ID - for p := range secondRequestChan { - secondPeersReceived = append(secondPeersReceived, p) - } - - if len(firstPeersReceived) != 0 || len(secondPeersReceived) != 0 { - t.Fatal("Did not filter out peers with connection issues") - } -} - -func TestRateLimitingRequests(t *testing.T) { - peers := testutil.GeneratePeers(10) - fpn := &fakeProviderNetwork{ - peersFound: peers, - delay: 5 * time.Millisecond, - } - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - providerQueryManager := New(ctx, fpn, fpn) - providerQueryManager.Startup() - - keys := testutil.GenerateCids(maxInProcessRequests + 1) - sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - var requestChannels []<-chan peer.ID - for i := 0; i < maxInProcessRequests+1; i++ { - requestChannels = append(requestChannels, providerQueryManager.FindProvidersAsync(sessionCtx, keys[i])) - } - time.Sleep(20 * time.Millisecond) - fpn.queriesMadeMutex.Lock() - if fpn.liveQueries != maxInProcessRequests { - t.Logf("Queries made: %d\n", fpn.liveQueries) - t.Fatal("Did not limit parallel requests to rate limit") - } - fpn.queriesMadeMutex.Unlock() - for i := 0; i < maxInProcessRequests+1; i++ { - for range requestChannels[i] { - } - } - - fpn.queriesMadeMutex.Lock() - defer fpn.queriesMadeMutex.Unlock() - if fpn.queriesMade != maxInProcessRequests+1 { - t.Logf("Queries made: %d\n", fpn.queriesMade) - t.Fatal("Did not make all separate requests") - } -} - -func TestFindProviderTimeout(t *testing.T) { - peers := testutil.GeneratePeers(10) - fpn := &fakeProviderNetwork{ - peersFound: peers, - delay: 10 * time.Millisecond, - } - ctx := context.Background() - providerQueryManager := New(ctx, fpn, fpn) - providerQueryManager.Startup() - providerQueryManager.SetFindProviderTimeout(2 * time.Millisecond) - keys := testutil.GenerateCids(1) - - sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0]) - var firstPeersReceived []peer.ID - for p := range firstRequestChan { - firstPeersReceived = append(firstPeersReceived, p) - } - if len(firstPeersReceived) >= len(peers) { - t.Fatal("Find provider request should have timed out, did not") - } -} - -func TestFindProviderPreCanceled(t *testing.T) { - peers := testutil.GeneratePeers(10) - fpn := &fakeProviderNetwork{ - peersFound: peers, - delay: 1 * time.Millisecond, - } - ctx := context.Background() - providerQueryManager := New(ctx, fpn, fpn) - providerQueryManager.Startup() - providerQueryManager.SetFindProviderTimeout(100 * time.Millisecond) - keys := testutil.GenerateCids(1) - - sessionCtx, cancel := context.WithCancel(ctx) - cancel() - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0]) - if firstRequestChan == nil { - t.Fatal("expected non-nil channel") - } - select { - case <-firstRequestChan: - case <-time.After(10 * time.Millisecond): - t.Fatal("shouldn't have blocked waiting on a closed context") - } -} - -func TestCancelFindProvidersAfterCompletion(t *testing.T) { - peers := testutil.GeneratePeers(2) - fpn := &fakeProviderNetwork{ - peersFound: peers, - delay: 1 * time.Millisecond, - } - ctx := context.Background() - providerQueryManager := New(ctx, fpn, fpn) - providerQueryManager.Startup() - providerQueryManager.SetFindProviderTimeout(100 * time.Millisecond) - keys := testutil.GenerateCids(1) - - sessionCtx, cancel := context.WithCancel(ctx) - firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0]) - <-firstRequestChan // wait for everything to start. - time.Sleep(10 * time.Millisecond) // wait for the incoming providres to stop. - cancel() // cancel the context. - - timer := time.NewTimer(10 * time.Millisecond) - defer timer.Stop() - for { - select { - case _, ok := <-firstRequestChan: - if !ok { - return - } - case <-timer.C: - t.Fatal("should have finished receiving responses within timeout") - } - } -} diff --git a/bitswap/client/internal/session/session.go b/bitswap/client/internal/session/session.go index 462803bfa..b9f91e940 100644 --- a/bitswap/client/internal/session/session.go +++ b/bitswap/client/internal/session/session.go @@ -2,6 +2,7 @@ package session import ( "context" + "sync" "time" "github.com/ipfs/boxo/bitswap/client/internal" @@ -15,6 +16,8 @@ import ( delay "github.com/ipfs/go-ipfs-delay" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p/core/peer" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.uber.org/zap" ) @@ -25,6 +28,9 @@ var ( const ( broadcastLiveWantsLimit = 64 + // MAGIC: why ten ? we should keep searching until we find a couple of online peers + maxProviders = 10 + findProvidersTimeout = 10 * time.Second ) // PeerManager keeps track of which sessions are interested in which peers @@ -74,7 +80,7 @@ type SessionPeerManager interface { // ProviderFinder is used to find providers for a given key type ProviderFinder interface { // FindProvidersAsync searches for peers that provide the given CID - FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID + FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo } // opType is the kind of operation that is being processed by the event loop @@ -103,13 +109,12 @@ type op struct { // info to, and who to request blocks from. type Session struct { // dependencies - ctx context.Context - shutdown func() - sm SessionManager - pm PeerManager - sprm SessionPeerManager - providerFinder ProviderFinder // optional, nil when missing - sim *bssim.SessionInterestManager + ctx context.Context + shutdown func() + sm SessionManager + pm PeerManager + sprm SessionPeerManager + sim *bssim.SessionInterestManager sw sessionWants sws sessionWantSender @@ -131,7 +136,15 @@ type Session struct { notif notifications.PubSub id uint64 - self peer.ID + providerFinder ProviderFinder // optional, nil when missing + network Connector +} + +type Connector interface { + Self() peer.ID + + // ConnectTo attempts to connect to the peer, using the passed addresses as a hint, they can be empty. + ConnectTo(context.Context, peer.AddrInfo) error } // New creates a new bitswap session whose lifetime is bounded by the @@ -141,15 +154,15 @@ func New( sm SessionManager, id uint64, sprm SessionPeerManager, - // providerFinder might be nil - providerFinder ProviderFinder, sim *bssim.SessionInterestManager, pm PeerManager, bpm *bsbpm.BlockPresenceManager, notif notifications.PubSub, initialSearchDelay time.Duration, periodicSearchDelay delay.D, - self peer.ID, + // providerFinder might be nil + providerFinder ProviderFinder, + network Connector, ) *Session { ctx, cancel := context.WithCancel(ctx) s := &Session{ @@ -160,7 +173,6 @@ func New( sm: sm, pm: pm, sprm: sprm, - providerFinder: providerFinder, sim: sim, incoming: make(chan op, 128), latencyTrkr: latencyTracker{}, @@ -169,7 +181,8 @@ func New( id: id, initialSearchDelay: initialSearchDelay, periodicSearchDelay: periodicSearchDelay, - self: self, + providerFinder: providerFinder, + network: network, } s.sws = newSessionWantSender(id, pm, sprm, sm, bpm, s.onWantsSent, s.onPeersExhausted) @@ -218,13 +231,13 @@ func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []c } for _, c := range interestedKs { - log.Debugw("Bitswap <- block", "local", s.self, "from", from, "cid", c, "session", s.id) + log.Debugw("Bitswap <- block", "local", s.network.Self(), "from", from, "cid", c, "session", s.id) } for _, c := range haves { - log.Debugw("Bitswap <- HAVE", "local", s.self, "from", from, "cid", c, "session", s.id) + log.Debugw("Bitswap <- HAVE", "local", s.network.Self(), "from", from, "cid", c, "session", s.id) } for _, c := range dontHaves { - log.Debugw("Bitswap <- DONT_HAVE", "local", s.self, "from", from, "cid", c, "session", s.id) + log.Debugw("Bitswap <- DONT_HAVE", "local", s.network.Self(), "from", from, "cid", c, "session", s.id) } } @@ -396,12 +409,68 @@ func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) { // ¯\_(ツ)_/¯ return } + go func(k cid.Cid) { - for p := range s.providerFinder.FindProvidersAsync(ctx, k) { - // When a provider indicates that it has a cid, it's equivalent to - // the providing peer sending a HAVE - s.sws.Update(p, nil, []cid.Cid{c}, nil) + ctx, span := internal.StartSpan(ctx, "Session.findMorePeers") + defer span.End() + if span.IsRecording() { + span.SetAttributes(attribute.Stringer("cid", c)) + } + + ctx, cancel := context.WithTimeout(ctx, findProvidersTimeout) + defer cancel() + + providers := s.providerFinder.FindProvidersAsync(ctx, k, maxProviders) + var wg sync.WaitGroup + providerLoop: + for { + select { + case p, ok := <-providers: + if !ok { + break providerLoop + } + + if p.ID == s.network.Self() { + continue // ignore self as provider + } + + wg.Add(1) + go func(p peer.AddrInfo) { + defer wg.Done() + + ctx, span := internal.StartSpan(ctx, "Session.findMorePeers.ConnectTo") + defer span.End() + recording := span.IsRecording() + if recording { + maddrs := make([]string, len(p.Addrs)) + for i, a := range p.Addrs { + maddrs[i] = a.String() + } + + span.SetAttributes( + attribute.Stringer("peer", p.ID), + attribute.StringSlice("addrs", maddrs), + ) + } + + err := s.network.ConnectTo(ctx, p) + if err != nil { + if recording { + span.SetStatus(codes.Error, err.Error()) + } + log.Debugf("failed to connect to provider %s: %s", p, err) + return + } + + // When a provider indicates that it has a cid, it's equivalent to + // the providing peer sending a HAVE + s.sws.Update(p.ID, nil, []cid.Cid{c}, nil) + }(p) + case <-ctx.Done(): + return + } } + wg.Wait() }(c) } diff --git a/bitswap/client/internal/session/session_test.go b/bitswap/client/internal/session/session_test.go index 2eb166f90..920d76f93 100644 --- a/bitswap/client/internal/session/session_test.go +++ b/bitswap/client/internal/session/session_test.go @@ -112,7 +112,7 @@ func newFakeProviderFinder() *fakeProviderFinder { } } -func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID { +func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo { go func() { select { case fpf.findMorePeersRequested <- k: @@ -120,9 +120,14 @@ func (fpf *fakeProviderFinder) FindProvidersAsync(ctx context.Context, k cid.Cid } }() - return make(chan peer.ID) + return make(chan peer.AddrInfo) } +type fakeNetwork struct{} + +func (fakeNetwork) Self() peer.ID { return "" } +func (fakeNetwork) ConnectTo(context.Context, peer.AddrInfo) error { return nil } + type wantReq struct { cids []cid.Cid } @@ -154,14 +159,13 @@ func TestSessionGetBlocks(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) fpm := newFakePeerManager() fspm := newFakeSessionPeerManager() - fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() notif := notifications.New() defer notif.Shutdown() id := testutil.GenerateSessionID() sm := newMockSessionMgr() - session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, sm, id, fspm, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), nil, fakeNetwork{}) blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2) var cids []cid.Cid @@ -256,7 +260,7 @@ func TestSessionFindMorePeers(t *testing.T) { defer notif.Shutdown() id := testutil.GenerateSessionID() sm := newMockSessionMgr() - session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, sm, id, fspm, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), fpf, fakeNetwork{}) session.SetBaseTickDelay(200 * time.Microsecond) blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2) @@ -323,7 +327,6 @@ func TestSessionOnPeersExhausted(t *testing.T) { defer cancel() fpm := newFakePeerManager() fspm := newFakeSessionPeerManager() - fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() @@ -331,7 +334,7 @@ func TestSessionOnPeersExhausted(t *testing.T) { defer notif.Shutdown() id := testutil.GenerateSessionID() sm := newMockSessionMgr() - session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, sm, id, fspm, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), nil, fakeNetwork{}) blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit + 5) var cids []cid.Cid @@ -377,7 +380,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { defer notif.Shutdown() id := testutil.GenerateSessionID() sm := newMockSessionMgr() - session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "") + session := New(ctx, sm, id, fspm, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), fpf, fakeNetwork{}) blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(4) var cids []cid.Cid @@ -483,7 +486,6 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { fpm := newFakePeerManager() fspm := newFakeSessionPeerManager() - fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() notif := notifications.New() @@ -493,7 +495,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { // Create a new session with its own context sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(sessctx, sm, id, fspm, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), nil, fakeNetwork{}) timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer timerCancel() @@ -533,7 +535,6 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { func TestSessionOnShutdownCalled(t *testing.T) { fpm := newFakePeerManager() fspm := newFakeSessionPeerManager() - fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() notif := notifications.New() @@ -544,7 +545,7 @@ func TestSessionOnShutdownCalled(t *testing.T) { // Create a new session with its own context sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer sesscancel() - session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(sessctx, sm, id, fspm, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), nil, fakeNetwork{}) // Shutdown the session session.Shutdown() @@ -561,7 +562,6 @@ func TestSessionReceiveMessageAfterCtxCancel(t *testing.T) { ctx, cancelCtx := context.WithTimeout(context.Background(), 20*time.Millisecond) fpm := newFakePeerManager() fspm := newFakeSessionPeerManager() - fpf := newFakeProviderFinder() sim := bssim.New() bpm := bsbpm.New() @@ -569,7 +569,7 @@ func TestSessionReceiveMessageAfterCtxCancel(t *testing.T) { defer notif.Shutdown() id := testutil.GenerateSessionID() sm := newMockSessionMgr() - session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, sm, id, fspm, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), nil, fakeNetwork{}) blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(2) cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}