diff --git a/CHANGELOG.md b/CHANGELOG.md index ad9097666..081395116 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,9 +20,14 @@ The following emojis are used to highlight certain changes: - `gateway`: a new header configuration middleware has been added to replace the existing header configuration, which can be used more generically. - `namesys` now has a `WithMaxCacheTTL` option, which allows you to define a maximum TTL that will be used for caching IPNS entries. - `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do. +- `bitswap` & `bitswap/client` now have a `WithContentSearch` option, this pickup the content routing job from `bitswap/network`. + It used to be a commun pattern for consumers which do not need external content routing to pass a [`routinghelpers.Null`](https://pkg.go.dev/github.com/libp2p/go-libp2p-routing-helpers#Null), now this can be ommited completely which is more efficient. ### Changed +- 🛠 `bitswap/network` no longer manages content routing, related Methods and function Arguments have been removed. + - `Network.ConnectTo` method has been changed from [`peer.ID`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/peer#ID) to [`peer.AddrInfo`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/peer#AddrInfo), given adding addresses hints used to be a side effect of the network. Theses now need to be passed in as values. + ### Removed ### Fixed diff --git a/bitswap/benchmarks_test.go b/bitswap/benchmarks_test.go index 80eb373ab..2a174d570 100644 --- a/bitswap/benchmarks_test.go +++ b/bitswap/benchmarks_test.go @@ -20,7 +20,6 @@ import ( bsnet "github.com/ipfs/boxo/bitswap/network" testinstance "github.com/ipfs/boxo/bitswap/testinstance" tn "github.com/ipfs/boxo/bitswap/testnet" - mockrouting "github.com/ipfs/boxo/routing/mock" cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" ) @@ -142,7 +141,7 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) { oldSeedCount := bch.oldSeedCount newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount) - net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay) + net := tn.VirtualNetwork(fixedDelay) // Simulate an older Bitswap node (old protocol ID) that doesn't // send DONT_HAVE responses @@ -294,7 +293,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) { numblks := 1000 for i := 0; i < b.N; i++ { - net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator) + net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -312,7 +311,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) { func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { for i := 0; i < b.N; i++ { - net := tn.VirtualNetwork(mockrouting.NewServer(), d) + net := tn.VirtualNetwork(d) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) @@ -327,7 +326,7 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { for i := 0; i < b.N; i++ { - net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator) + net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index 7d2b1f924..7ffe09335 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -49,7 +49,7 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk const kNetworkDelay = 0 * time.Millisecond func TestClose(t *testing.T) { - vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -66,7 +66,7 @@ func TestClose(t *testing.T) { func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this rs := mockrouting.NewServer() - net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -90,7 +90,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this } func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -118,7 +118,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { } func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)} ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) @@ -150,7 +150,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { // Tests that a received block is not stored in the blockstore if the block was // not requested by the client func TestUnwantedBlockNotAdded(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) bsMessage := bsmsg.New(true) bsMessage.AddBlock(block) @@ -186,7 +186,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) { // (because the live request queue is full) func TestPendingBlockAdded(t *testing.T) { ctx := context.Background() - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) bg := blocksutil.NewBlockGenerator() sessionBroadcastWantCapacity := 4 @@ -278,7 +278,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { if testing.Short() { t.SkipNow() } - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, []bitswap.Option{ bitswap.TaskWorkerCount(5), bitswap.EngineTaskWorkerCount(5), @@ -335,7 +335,7 @@ func TestSendToWantingPeer(t *testing.T) { t.SkipNow() } - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -373,7 +373,7 @@ func TestSendToWantingPeer(t *testing.T) { } func TestEmptyKey(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bs := ig.Instances(1)[0].Exchange @@ -406,7 +406,7 @@ func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint6 } func TestBasicBitswap(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -478,7 +478,7 @@ func TestBasicBitswap(t *testing.T) { } func TestDoubleGet(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -543,7 +543,7 @@ func TestDoubleGet(t *testing.T) { } func TestWantlistCleanup(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -665,7 +665,7 @@ func newReceipt(sent, recv, exchanged uint64) *server.Receipt { } func TestBitswapLedgerOneWay(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -714,7 +714,7 @@ func TestBitswapLedgerOneWay(t *testing.T) { } func TestBitswapLedgerTwoWay(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -803,7 +803,7 @@ func (tsl *testingScoreLedger) Stop() { // Tests start and stop of a custom decision logic func TestWithScoreLedger(t *testing.T) { tsl := newTestingScoreLedger() - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) bsOpts := []bitswap.Option{bitswap.WithScoreLedger(tsl)} ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) defer ig.Close() diff --git a/bitswap/client/bitswap_with_sessions_test.go b/bitswap/client/bitswap_with_sessions_test.go index a3174d0a4..2d9d06418 100644 --- a/bitswap/client/bitswap_with_sessions_test.go +++ b/bitswap/client/bitswap_with_sessions_test.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/boxo/bitswap" "github.com/ipfs/boxo/bitswap/client/internal/session" "github.com/ipfs/boxo/bitswap/client/traceability" + bsnet "github.com/ipfs/boxo/bitswap/network" testinstance "github.com/ipfs/boxo/bitswap/testinstance" tn "github.com/ipfs/boxo/bitswap/testnet" mockrouting "github.com/ipfs/boxo/routing/mock" @@ -18,13 +19,15 @@ import ( blocksutil "github.com/ipfs/go-ipfs-blocksutil" delay "github.com/ipfs/go-ipfs-delay" tu "github.com/libp2p/go-libp2p-testing/etc" + tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" ) func getVirtualNetwork() tn.Network { // FIXME: the tests are really sensitive to the network delay. fix them to work // well under varying conditions - return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) + return tn.VirtualNetwork(delay.Fixed(0)) } func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) { @@ -114,7 +117,7 @@ func TestSessionBetweenPeers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(time.Millisecond)) + vnet := tn.VirtualNetwork(delay.Fixed(time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.SetSimulateDontHavesOnTimeout(false)}) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -219,16 +222,23 @@ func TestFetchNotConnected(t *testing.T) { defer cancel() vnet := getVirtualNetwork() + rs := mockrouting.NewServer() ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)}) defer ig.Close() bgen := blocksutil.NewBlockGenerator() - other := ig.Next() + var otherClient mockrouting.Client + other := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) { + otherClient = rs.Client(id) + return nil, nil // don't add content search, only the client needs it + }) // Provide 10 blocks on Peer A blks := bgen.Blocks(10) for _, block := range blks { addBlock(t, ctx, other, block) + err := otherClient.Provide(ctx, block.Cid(), true) + require.NoError(t, err) } var cids []cid.Cid @@ -239,7 +249,9 @@ func TestFetchNotConnected(t *testing.T) { // Request blocks with Peer B // Note: Peer A and Peer B are not initially connected, so this tests // that Peer B will search for and find Peer A - thisNode := ig.Next() + thisNode := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) { + return nil, []bitswap.Option{bitswap.WithContentSearch(rs.Client(id))} + }) ses := thisNode.Exchange.NewSession(ctx).(*session.Session) ses.SetBaseTickDelay(time.Millisecond * 10) @@ -262,6 +274,7 @@ func TestFetchAfterDisconnect(t *testing.T) { defer cancel() vnet := getVirtualNetwork() + rs := mockrouting.NewServer() ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{ bitswap.ProviderSearchDelay(10 * time.Millisecond), bitswap.RebroadcastDelay(delay.Fixed(15 * time.Millisecond)), @@ -269,9 +282,11 @@ func TestFetchAfterDisconnect(t *testing.T) { defer ig.Close() bgen := blocksutil.NewBlockGenerator() - inst := ig.Instances(2) - peerA := inst[0] - peerB := inst[1] + var aClient mockrouting.Client + peerA := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) { + aClient = rs.Client(id) + return nil, nil // don't add content search, only the client needs it + }) // Provide 5 blocks on Peer A blks := bgen.Blocks(10) @@ -283,9 +298,14 @@ func TestFetchAfterDisconnect(t *testing.T) { firstBlks := blks[:5] for _, block := range firstBlks { addBlock(t, ctx, peerA, block) + err := aClient.Provide(ctx, block.Cid(), true) + require.NoError(t, err) } // Request all blocks with Peer B + peerB := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) { + return nil, []bitswap.Option{bitswap.WithContentSearch(rs.Client(id))} + }) ses := peerB.Exchange.NewSession(ctx).(*session.Session) ses.SetBaseTickDelay(time.Millisecond * 10) @@ -317,6 +337,8 @@ func TestFetchAfterDisconnect(t *testing.T) { lastBlks := blks[5:] for _, block := range lastBlks { addBlock(t, ctx, peerA, block) + err := aClient.Provide(ctx, block.Cid(), true) + require.NoError(t, err) } // Peer B should call FindProviders() and find Peer A diff --git a/bitswap/client/client.go b/bitswap/client/client.go index aa9ab78fa..f01851bee 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -99,6 +99,18 @@ func WithoutDuplicatedBlockStats() Option { } } +type ContentSearcher = bspqm.ContentRouter + +// WithContentSearch allows the client to search for providers when it is not +// able to find the content itself. +// Helps seeding sessions in networks where a significant amount of the peers +// connected do not have the content you want to download. +func WithContentSearch(router ContentSearcher) Option { + return func(bs *Client) { + bs.router = router + } +} + type BlockReceivedNotifier interface { // ReceivedBlocks notifies the decision engine that a peer is well-behaving // and gave us useful data, potentially increasing its score and making us @@ -121,56 +133,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore return nil }) - // onDontHaveTimeout is called when a want-block is sent to a peer that - // has an old version of Bitswap that doesn't support DONT_HAVE messages, - // or when no response is received within a timeout. - var sm *bssm.SessionManager - var bs *Client - onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) { - // Simulate a message arriving with DONT_HAVEs - if bs.simulateDontHavesOnTimeout { - sm.ReceiveFrom(ctx, p, nil, nil, dontHaves) - } - } - peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue { - return bsmq.New(ctx, p, network, onDontHaveTimeout) - } - - sim := bssim.New() - bpm := bsbpm.New() - pm := bspm.New(ctx, peerQueueFactory, network.Self()) - pqm := bspqm.New(ctx, network) - - sessionFactory := func( - sessctx context.Context, - sessmgr bssession.SessionManager, - id uint64, - spm bssession.SessionPeerManager, - sim *bssim.SessionInterestManager, - pm bssession.PeerManager, - bpm *bsbpm.BlockPresenceManager, - notif notifications.PubSub, - provSearchDelay time.Duration, - rebroadcastDelay delay.D, - self peer.ID, - ) bssm.Session { - return bssession.New(sessctx, sessmgr, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) - } - sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager { - return bsspm.New(id, network.ConnectionManager()) - } - notif := notifications.New() - sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self()) - - bs = &Client{ + bs := &Client{ + ctx: ctx, blockstore: bstore, network: network, process: px, - pm: pm, - pqm: pqm, - sm: sm, - sim: sim, - notif: notif, + sim: bssim.New(), + notif: notifications.New(), counters: new(counters), dupMetric: bmetrics.DupHist(ctx), allMetric: bmetrics.AllHist(ctx), @@ -178,21 +147,26 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay), simulateDontHavesOnTimeout: true, } + bs.pm = bspm.New(ctx, bs.peerQueueFactory, network.Self()) + bs.sm = bssm.New(ctx, bs.sessionFactory, bs.sim, bs.sessionPeerManagerFactory, bsbpm.New(), bs.pm, bs.notif, network.Self()) // apply functional options before starting and running bitswap for _, option := range options { option(bs) } - bs.pqm.Startup() + if bs.router != nil { + bs.pqm = bspqm.New(ctx, network, bs.router) + bs.pqm.Startup() + } // bind the context and process. // do it over here to avoid closing before all setup is done. go func() { <-px.Closing() // process closes first - sm.Shutdown() + bs.sm.Shutdown() cancelFunc() - notif.Shutdown() + bs.notif.Shutdown() }() procctx.CloseAfterContext(px, ctx) // parent cancelled first @@ -201,9 +175,12 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore // Client instances implement the bitswap protocol. type Client struct { + ctx context.Context + pm *bspm.PeerManager // the provider query manager manages requests to find providers + // is nil if content routing is disabled pqm *bspqm.ProviderQueryManager // network delivers messages on behalf of the session @@ -244,6 +221,9 @@ type Client struct { blockReceivedNotifier BlockReceivedNotifier + // optional content router + router ContentSearcher + // whether we should actually simulate dont haves on request timeout simulateDontHavesOnTimeout bool @@ -251,6 +231,46 @@ type Client struct { skipDuplicatedBlocksStats bool } +func (bs *Client) sessionFactory( + sessctx context.Context, + sessmgr bssession.SessionManager, + id uint64, + spm bssession.SessionPeerManager, + sim *bssim.SessionInterestManager, + pm bssession.PeerManager, + bpm *bsbpm.BlockPresenceManager, + notif notifications.PubSub, + provSearchDelay time.Duration, + rebroadcastDelay delay.D, + self peer.ID, +) bssm.Session { + // avoid typed nils + var pqm bssession.ProviderFinder + if bs.pqm != nil { + pqm = bs.pqm + } + + return bssession.New(sessctx, sessmgr, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) +} + +// onDontHaveTimeout is called when a want-block is sent to a peer that +// has an old version of Bitswap that doesn't support DONT_HAVE messages, +// or when no response is received within a timeout. +func (bs *Client) onDontHaveTimeout(p peer.ID, dontHaves []cid.Cid) { + // Simulate a message arriving with DONT_HAVEs + if bs.simulateDontHavesOnTimeout { + bs.sm.ReceiveFrom(bs.ctx, p, nil, nil, dontHaves) + } +} + +func (bs *Client) peerQueueFactory(ctx context.Context, p peer.ID) bspm.PeerQueue { + return bsmq.New(ctx, p, bs.network, bs.onDontHaveTimeout) +} + +func (bs *Client) sessionPeerManagerFactory(ctx context.Context, id uint64) bssession.SessionPeerManager { + return bsspm.New(id, bs.network.ConnectionManager()) +} + type counters struct { blocksRecvd uint64 dupBlocksRecvd uint64 diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index 4f90f239b..97b4f40d2 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -51,7 +51,6 @@ const ( // MessageNetwork is any network that can connect peers and generate a message // sender. type MessageNetwork interface { - ConnectTo(context.Context, peer.ID) error NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) Latency(peer.ID) time.Duration Ping(context.Context, peer.ID) ping.Result diff --git a/bitswap/client/internal/messagequeue/messagequeue_test.go b/bitswap/client/internal/messagequeue/messagequeue_test.go index 4d361c5d5..31d66a04a 100644 --- a/bitswap/client/internal/messagequeue/messagequeue_test.go +++ b/bitswap/client/internal/messagequeue/messagequeue_test.go @@ -26,10 +26,6 @@ type fakeMessageNetwork struct { messageSender bsnet.MessageSender } -func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error { - return fmn.connectError -} - func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) { if fmn.messageSenderError == nil { return fmn.messageSender, nil diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index f918c409a..7c9b6bb22 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -34,8 +34,12 @@ type findProviderRequest struct { // ProviderQueryNetwork is an interface for finding providers and connecting to // peers. type ProviderQueryNetwork interface { - ConnectTo(context.Context, peer.ID) error - FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID + Self() peer.ID + ConnectTo(context.Context, peer.AddrInfo) error +} + +type ContentRouter interface { + FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.AddrInfo } type providerQueryMessage interface { @@ -75,6 +79,7 @@ type cancelRequestMessage struct { type ProviderQueryManager struct { ctx context.Context network ProviderQueryNetwork + router ContentRouter providerQueryMessages chan providerQueryMessage providerRequestsProcessing chan *findProviderRequest incomingFindProviderRequests chan *findProviderRequest @@ -88,10 +93,11 @@ type ProviderQueryManager struct { // New initializes a new ProviderQueryManager for a given context and a given // network provider. -func New(ctx context.Context, network ProviderQueryNetwork) *ProviderQueryManager { +func New(ctx context.Context, network ProviderQueryNetwork, router ContentRouter) *ProviderQueryManager { return &ProviderQueryManager{ ctx: ctx, network: network, + router: router, providerQueryMessages: make(chan providerQueryMessage, 16), providerRequestsProcessing: make(chan *findProviderRequest), incomingFindProviderRequests: make(chan *findProviderRequest), @@ -235,11 +241,15 @@ func (pqm *ProviderQueryManager) findProviderWorker() { pqm.timeoutMutex.RLock() findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout) pqm.timeoutMutex.RUnlock() - providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders) + providers := pqm.router.FindProvidersAsync(findProviderCtx, k, maxProviders) wg := &sync.WaitGroup{} for p := range providers { + if p.ID == pqm.network.Self() { + continue // ignore self as provider + } + wg.Add(1) - go func(p peer.ID) { + go func(p peer.AddrInfo) { defer wg.Done() err := pqm.network.ConnectTo(findProviderCtx, p) if err != nil { @@ -250,7 +260,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() { case pqm.providerQueryMessages <- &receivedProviderMessage{ ctx: findProviderCtx, k: k, - p: p, + p: p.ID, }: case <-pqm.ctx.Done(): return diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go b/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go index 52447e2c1..518a904ed 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go @@ -14,6 +14,7 @@ import ( ) type fakeProviderNetwork struct { + self peer.ID peersFound []peer.ID connectError error delay time.Duration @@ -23,17 +24,21 @@ type fakeProviderNetwork struct { liveQueries int } -func (fpn *fakeProviderNetwork) ConnectTo(context.Context, peer.ID) error { +func (fpn *fakeProviderNetwork) Self() peer.ID { + return fpn.self +} + +func (fpn *fakeProviderNetwork) ConnectTo(context.Context, peer.AddrInfo) error { time.Sleep(fpn.connectDelay) return fpn.connectError } -func (fpn *fakeProviderNetwork) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { +func (fpn *fakeProviderNetwork) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo { fpn.queriesMadeMutex.Lock() fpn.queriesMade++ fpn.liveQueries++ fpn.queriesMadeMutex.Unlock() - incomingPeers := make(chan peer.ID) + incomingPeers := make(chan peer.AddrInfo) go func() { defer close(incomingPeers) for _, p := range fpn.peersFound { @@ -44,7 +49,7 @@ func (fpn *fakeProviderNetwork) FindProvidersAsync(ctx context.Context, k cid.Ci default: } select { - case incomingPeers <- p: + case incomingPeers <- peer.AddrInfo{ID: p}: case <-ctx.Done(): return } @@ -64,7 +69,7 @@ func TestNormalSimultaneousFetch(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() keys := testutil.GenerateCids(2) @@ -101,7 +106,7 @@ func TestDedupingProviderRequests(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() key := testutil.GenerateCids(1)[0] @@ -141,7 +146,7 @@ func TestCancelOneRequestDoesNotTerminateAnother(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() key := testutil.GenerateCids(1)[0] @@ -187,7 +192,7 @@ func TestCancelManagerExitsGracefully(t *testing.T) { ctx := context.Background() managerCtx, managerCancel := context.WithTimeout(ctx, 5*time.Millisecond) defer managerCancel() - providerQueryManager := New(managerCtx, fpn) + providerQueryManager := New(managerCtx, fpn, fpn) providerQueryManager.Startup() key := testutil.GenerateCids(1)[0] @@ -221,7 +226,7 @@ func TestPeersWithConnectionErrorsNotAddedToPeerList(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() key := testutil.GenerateCids(1)[0] @@ -255,7 +260,7 @@ func TestRateLimitingRequests(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() keys := testutil.GenerateCids(maxInProcessRequests + 1) @@ -292,7 +297,7 @@ func TestFindProviderTimeout(t *testing.T) { delay: 10 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() providerQueryManager.SetFindProviderTimeout(2 * time.Millisecond) keys := testutil.GenerateCids(1) @@ -316,7 +321,7 @@ func TestFindProviderPreCanceled(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() providerQueryManager.SetFindProviderTimeout(100 * time.Millisecond) keys := testutil.GenerateCids(1) @@ -341,7 +346,7 @@ func TestCancelFindProvidersAfterCompletion(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() providerQueryManager.SetFindProviderTimeout(100 * time.Millisecond) keys := testutil.GenerateCids(1) diff --git a/bitswap/client/internal/session/session.go b/bitswap/client/internal/session/session.go index 39266a5e6..462803bfa 100644 --- a/bitswap/client/internal/session/session.go +++ b/bitswap/client/internal/session/session.go @@ -108,7 +108,7 @@ type Session struct { sm SessionManager pm PeerManager sprm SessionPeerManager - providerFinder ProviderFinder + providerFinder ProviderFinder // optional, nil when missing sim *bssim.SessionInterestManager sw sessionWants @@ -141,6 +141,7 @@ func New( sm SessionManager, id uint64, sprm SessionPeerManager, + // providerFinder might be nil providerFinder ProviderFinder, sim *bssim.SessionInterestManager, pm PeerManager, @@ -391,6 +392,10 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) { // findMorePeers attempts to find more peers for a session by searching for // providers for the given Cid func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) { + if s.providerFinder == nil { + // ¯\_(ツ)_/¯ + return + } go func(k cid.Cid) { for p := range s.providerFinder.FindProvidersAsync(ctx, k) { // When a provider indicates that it has a cid, it's equivalent to diff --git a/bitswap/network/interface.go b/bitswap/network/interface.go index 962bc2588..dceb5f8c6 100644 --- a/bitswap/network/interface.go +++ b/bitswap/network/interface.go @@ -7,8 +7,6 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" "github.com/ipfs/boxo/bitswap/network/internal" - cid "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" @@ -40,7 +38,8 @@ type BitSwapNetwork interface { // Stop stops the network service. Stop() - ConnectTo(context.Context, peer.ID) error + // ConnectTo attempts to connect to the peer, using the passed addresses as a hint, they can be empty. + ConnectTo(context.Context, peer.AddrInfo) error DisconnectFrom(context.Context, peer.ID) error NewMessageSender(context.Context, peer.ID, *MessageSenderOpts) (MessageSender, error) @@ -49,8 +48,6 @@ type BitSwapNetwork interface { Stats() Stats - Routing - Pinger } @@ -84,16 +81,6 @@ type Receiver interface { PeerDisconnected(peer.ID) } -// Routing is an interface to providing and finding providers on a bitswap -// network. -type Routing interface { - // FindProvidersAsync returns a channel of providers for the given key. - FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID - - // Provide provides the key to the network. - Provide(context.Context, cid.Cid) error -} - // Pinger is an interface to ping a peer and get the average latency of all pings type Pinger interface { // Ping a peer diff --git a/bitswap/network/ipfs_impl.go b/bitswap/network/ipfs_impl.go index a1446775c..4937f5ab8 100644 --- a/bitswap/network/ipfs_impl.go +++ b/bitswap/network/ipfs_impl.go @@ -11,15 +11,12 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" "github.com/ipfs/boxo/bitswap/network/internal" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - peerstore "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/protocol/ping" msgio "github.com/libp2p/go-msgio" ma "github.com/multiformats/go-multiaddr" @@ -38,12 +35,11 @@ var ( ) // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host. -func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) BitSwapNetwork { +func NewFromIpfsHost(host host.Host, opts ...NetOpt) BitSwapNetwork { s := processSettings(opts...) bitswapNetwork := impl{ - host: host, - routing: r, + host: host, protocolBitswapNoVers: s.ProtocolPrefix + ProtocolBitswapNoVers, protocolBitswapOneZero: s.ProtocolPrefix + ProtocolBitswapOneZero, @@ -75,7 +71,6 @@ type impl struct { stats Stats host host.Host - routing routing.ContentRouting connectEvtMgr *connectEventManager protocolBitswapNoVers protocol.ID @@ -106,7 +101,7 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro tctx, cancel := context.WithTimeout(ctx, s.opts.SendTimeout) defer cancel() - if err := s.bsnet.ConnectTo(tctx, s.to); err != nil { + if err := s.bsnet.ConnectTo(tctx, peer.AddrInfo{ID: s.to}); err != nil { return nil, err } @@ -365,40 +360,14 @@ func (bsnet *impl) Stop() { bsnet.host.Network().StopNotify((*netNotifiee)(bsnet)) } -func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { - return bsnet.host.Connect(ctx, peer.AddrInfo{ID: p}) +func (bsnet *impl) ConnectTo(ctx context.Context, p peer.AddrInfo) error { + return bsnet.host.Connect(ctx, p) } func (bsnet *impl) DisconnectFrom(ctx context.Context, p peer.ID) error { return bsnet.host.Network().ClosePeer(p) } -// FindProvidersAsync returns a channel of providers for the given key. -func (bsnet *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { - out := make(chan peer.ID, max) - go func() { - defer close(out) - providers := bsnet.routing.FindProvidersAsync(ctx, k, max) - for info := range providers { - if info.ID == bsnet.host.ID() { - continue // ignore self as provider - } - bsnet.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL) - select { - case <-ctx.Done(): - return - case out <- info.ID: - } - } - }() - return out -} - -// Provide provides the key to the network -func (bsnet *impl) Provide(ctx context.Context, k cid.Cid) error { - return bsnet.routing.Provide(ctx, k, true) -} - // handleNewStream receives a new stream from the network. func (bsnet *impl) handleNewStream(s network.Stream) { defer s.Close() diff --git a/bitswap/network/ipfs_impl_test.go b/bitswap/network/ipfs_impl_test.go index af76e20d6..5529564f8 100644 --- a/bitswap/network/ipfs_impl_test.go +++ b/bitswap/network/ipfs_impl_test.go @@ -13,8 +13,6 @@ import ( bsnet "github.com/ipfs/boxo/bitswap/network" "github.com/ipfs/boxo/bitswap/network/internal" tn "github.com/ipfs/boxo/bitswap/testnet" - mockrouting "github.com/ipfs/boxo/routing/mock" - ds "github.com/ipfs/go-datastore" blocksutil "github.com/ipfs/go-ipfs-blocksutil" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/host" @@ -170,8 +168,7 @@ func TestMessageSendAndReceive(t *testing.T) { defer cancel() mn := mocknet.New() defer mn.Close() - mr := mockrouting.NewServer() - streamNet, err := tn.StreamNet(ctx, mn, mr) + streamNet, err := tn.StreamNet(ctx, mn) if err != nil { t.Fatal("Unable to setup network") } @@ -191,7 +188,7 @@ func TestMessageSendAndReceive(t *testing.T) { if err != nil { t.Fatal(err) } - err = bsnet1.ConnectTo(ctx, p2.ID()) + err = bsnet1.ConnectTo(ctx, peer.AddrInfo{ID: p2.ID()}) if err != nil { t.Fatal(err) } @@ -200,7 +197,7 @@ func TestMessageSendAndReceive(t *testing.T) { t.Fatal("did not connect peer") case <-r1.connectionEvent: } - err = bsnet2.ConnectTo(ctx, p1.ID()) + err = bsnet2.ConnectTo(ctx, peer.AddrInfo{ID: p1.ID()}) if err != nil { t.Fatal(err) } @@ -274,7 +271,6 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec // create network mn := mocknet.New() defer mn.Close() - mr := mockrouting.NewServer() // Host 1 h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address()) @@ -282,8 +278,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec t.Fatal(err) } eh1 := &ErrHost{Host: h1} - routing1 := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore()) - bsnet1 := bsnet.NewFromIpfsHost(eh1, routing1) + bsnet1 := bsnet.NewFromIpfsHost(eh1) bsnet1.Start(r1) t.Cleanup(bsnet1.Stop) if r1.listener != nil { @@ -296,8 +291,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec t.Fatal(err) } eh2 := &ErrHost{Host: h2} - routing2 := mr.ClientWithDatastore(context.TODO(), p2, ds.NewMapDatastore()) - bsnet2 := bsnet.NewFromIpfsHost(eh2, routing2) + bsnet2 := bsnet.NewFromIpfsHost(eh2) bsnet2.Start(r2) t.Cleanup(bsnet2.Stop) if r2.listener != nil { @@ -309,7 +303,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec if err != nil { t.Fatal(err) } - err = bsnet1.ConnectTo(ctx, p2.ID()) + err = bsnet1.ConnectTo(ctx, peer.AddrInfo{ID: p2.ID()}) if err != nil { t.Fatal(err) } @@ -318,7 +312,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec t.Fatal("Expected connect event") } - err = bsnet2.ConnectTo(ctx, p1.ID()) + err = bsnet2.ConnectTo(ctx, peer.AddrInfo{ID: p1.ID()}) if err != nil { t.Fatal(err) } @@ -454,8 +448,7 @@ func TestSupportsHave(t *testing.T) { ctx := context.Background() mn := mocknet.New() defer mn.Close() - mr := mockrouting.NewServer() - streamNet, err := tn.StreamNet(ctx, mn, mr) + streamNet, err := tn.StreamNet(ctx, mn) if err != nil { t.Fatalf("Unable to setup network: %s", err) } diff --git a/bitswap/options.go b/bitswap/options.go index 9bea0b637..32ea3c767 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -75,6 +75,10 @@ func SetSimulateDontHavesOnTimeout(send bool) Option { return Option{client.SetSimulateDontHavesOnTimeout(send)} } +func WithContentSearch(router client.ContentSearcher) Option { + return Option{client.WithContentSearch(router)} +} + func WithTracer(tap tracer.Tracer) Option { // Only trace the server, both receive the same messages anyway return Option{ diff --git a/bitswap/testinstance/testinstance.go b/bitswap/testinstance/testinstance.go index 5a052b831..037acbad4 100644 --- a/bitswap/testinstance/testinstance.go +++ b/bitswap/testinstance/testinstance.go @@ -49,12 +49,28 @@ func (g *InstanceGenerator) Close() error { // Next generates a new instance of bitswap + dependencies func (g *InstanceGenerator) Next() Instance { + return g.NextWithExtraOptions(nil) +} + +// NextWithExtraOptions is like [Next] but it will callback with a fake identity and append extra options. +// If extraOpts is nil, it will ignore it. +func (g *InstanceGenerator) NextWithExtraOptions(extraOpts func(p tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option)) Instance { g.seq++ p, err := p2ptestutil.RandTestBogusIdentity() if err != nil { - panic("FIXME") // TODO change signature + panic(err.Error()) // TODO change signature } - return NewInstance(g.ctx, g.net, p, g.netOptions, g.bsOptions) + + var extraNet []bsnet.NetOpt + var extraBitswap []bitswap.Option + if extraOpts != nil { + extraNet, extraBitswap = extraOpts(p) + } + + return NewInstance(g.ctx, g.net, p, + append(g.netOptions[:len(g.netOptions):len(g.netOptions)], extraNet...), + append(g.bsOptions[:len(g.bsOptions):len(g.bsOptions)], extraBitswap...), + ) } // Instances creates N test instances of bitswap + dependencies and connects @@ -74,7 +90,7 @@ func ConnectInstances(instances []Instance) { for i, inst := range instances { for j := i + 1; j < len(instances); j++ { oinst := instances[j] - err := inst.Adapter.ConnectTo(context.Background(), oinst.Peer) + err := inst.Adapter.ConnectTo(context.Background(), peer.AddrInfo{ID: oinst.Peer}) if err != nil { panic(err.Error()) } diff --git a/bitswap/testnet/network_test.go b/bitswap/testnet/network_test.go index 0947eff3e..2d45e09b1 100644 --- a/bitswap/testnet/network_test.go +++ b/bitswap/testnet/network_test.go @@ -8,7 +8,6 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" bsnet "github.com/ipfs/boxo/bitswap/network" - mockrouting "github.com/ipfs/boxo/routing/mock" blocks "github.com/ipfs/go-block-format" delay "github.com/ipfs/go-ipfs-delay" @@ -17,7 +16,7 @@ import ( ) func TestSendMessageAsyncButWaitForResponse(t *testing.T) { - net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) + net := VirtualNetwork(delay.Fixed(0)) responderPeer := tnet.RandIdentityOrFatal(t) waiter := net.Adapter(tnet.RandIdentityOrFatal(t)) responder := net.Adapter(responderPeer) diff --git a/bitswap/testnet/peernet.go b/bitswap/testnet/peernet.go index e4df19699..9abf189b7 100644 --- a/bitswap/testnet/peernet.go +++ b/bitswap/testnet/peernet.go @@ -5,9 +5,6 @@ import ( bsnet "github.com/ipfs/boxo/bitswap/network" - mockrouting "github.com/ipfs/boxo/routing/mock" - ds "github.com/ipfs/go-datastore" - tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/peer" mockpeernet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -15,12 +12,11 @@ import ( type peernet struct { mockpeernet.Mocknet - routingserver mockrouting.Server } // StreamNet is a testnet that uses libp2p's MockNet -func StreamNet(ctx context.Context, net mockpeernet.Mocknet, rs mockrouting.Server) (Network, error) { - return &peernet{net, rs}, nil +func StreamNet(ctx context.Context, net mockpeernet.Mocknet) (Network, error) { + return &peernet{net}, nil } func (pn *peernet) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNetwork { @@ -28,8 +24,7 @@ func (pn *peernet) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapN if err != nil { panic(err.Error()) } - routing := pn.routingserver.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore()) - return bsnet.NewFromIpfsHost(client, routing, opts...) + return bsnet.NewFromIpfsHost(client, opts...) } func (pn *peernet) HasPeer(p peer.ID) bool { diff --git a/bitswap/testnet/virtual.go b/bitswap/testnet/virtual.go index 914044aed..252362655 100644 --- a/bitswap/testnet/virtual.go +++ b/bitswap/testnet/virtual.go @@ -11,27 +11,23 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" bsnet "github.com/ipfs/boxo/bitswap/network" - mockrouting "github.com/ipfs/boxo/routing/mock" - cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/peer" protocol "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/core/routing" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) // VirtualNetwork generates a new testnet instance - a fake network that // is used to simulate sending messages. -func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { +func VirtualNetwork(d delay.D) Network { return &network{ latencies: make(map[peer.ID]map[peer.ID]time.Duration), clients: make(map[peer.ID]*receiverQueue), delay: d, - routingserver: rs, isRateLimited: false, rateLimitGenerator: nil, conns: make(map[string]struct{}), @@ -45,13 +41,12 @@ type RateLimitGenerator interface { // RateLimitedVirtualNetwork generates a testnet instance where nodes are rate // limited in the upload/download speed. -func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network { +func RateLimitedVirtualNetwork(d delay.D, rateLimitGenerator RateLimitGenerator) Network { return &network{ latencies: make(map[peer.ID]map[peer.ID]time.Duration), rateLimiters: make(map[peer.ID]map[peer.ID]*mocknet.RateLimiter), clients: make(map[peer.ID]*receiverQueue), delay: d, - routingserver: rs, isRateLimited: true, rateLimitGenerator: rateLimitGenerator, conns: make(map[string]struct{}), @@ -63,7 +58,6 @@ type network struct { latencies map[peer.ID]map[peer.ID]time.Duration rateLimiters map[peer.ID]map[peer.ID]*mocknet.RateLimiter clients map[peer.ID]*receiverQueue - routingserver mockrouting.Server delay delay.D isRateLimited bool rateLimitGenerator RateLimitGenerator @@ -105,7 +99,6 @@ func (n *network) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNe client := &networkClient{ local: p.ID(), network: n, - routing: n.routingserver.Client(p), supportedProtocols: s.SupportedProtocols, } n.clients[p.ID()] = &receiverQueue{receiver: client} @@ -192,7 +185,6 @@ type networkClient struct { local peer.ID receivers []bsnet.Receiver network *network - routing routing.Routing supportedProtocols []protocol.ID } @@ -253,27 +245,6 @@ func (nc *networkClient) Stats() bsnet.Stats { } } -// FindProvidersAsync returns a channel of providers for the given key. -func (nc *networkClient) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { - // NB: this function duplicates the AddrInfo -> ID transformation in the - // bitswap network adapter. Not to worry. This network client will be - // deprecated once the ipfsnet.Mock is added. The code below is only - // temporary. - - out := make(chan peer.ID) - go func() { - defer close(out) - providers := nc.routing.FindProvidersAsync(ctx, k, max) - for info := range providers { - select { - case <-ctx.Done(): - case out <- info.ID: - } - } - }() - return out -} - func (nc *networkClient) ConnectionManager() connmgr.ConnManager { return &connmgr.NullConnMgr{} } @@ -322,11 +293,6 @@ func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, opts * }, nil } -// Provide provides the key to the network. -func (nc *networkClient) Provide(ctx context.Context, k cid.Cid) error { - return nc.routing.Provide(ctx, k, true) -} - func (nc *networkClient) Start(r ...bsnet.Receiver) { nc.receivers = r } @@ -334,7 +300,8 @@ func (nc *networkClient) Start(r ...bsnet.Receiver) { func (nc *networkClient) Stop() { } -func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error { +func (nc *networkClient) ConnectTo(_ context.Context, info peer.AddrInfo) error { + p := info.ID nc.network.mu.Lock() otherClient, ok := nc.network.clients[p] if !ok { diff --git a/blockservice/test/mock.go b/blockservice/test/mock.go index e32b10b99..0ca47f1d3 100644 --- a/blockservice/test/mock.go +++ b/blockservice/test/mock.go @@ -4,13 +4,12 @@ import ( testinstance "github.com/ipfs/boxo/bitswap/testinstance" tn "github.com/ipfs/boxo/bitswap/testnet" "github.com/ipfs/boxo/blockservice" - mockrouting "github.com/ipfs/boxo/routing/mock" delay "github.com/ipfs/go-ipfs-delay" ) // Mocks returns |n| connected mock Blockservices func Mocks(n int, opts ...blockservice.Option) []blockservice.BlockService { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) + net := tn.VirtualNetwork(delay.Fixed(0)) sg := testinstance.NewTestInstanceGenerator(net, nil, nil) instances := sg.Instances(n) diff --git a/examples/go.mod b/examples/go.mod index 7b91ade2a..209668ac7 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -10,7 +10,6 @@ require ( github.com/ipld/go-car/v2 v2.13.1 github.com/ipld/go-ipld-prime v0.21.0 github.com/libp2p/go-libp2p v0.32.2 - github.com/libp2p/go-libp2p-routing-helpers v0.7.3 github.com/multiformats/go-multiaddr v0.12.1 github.com/multiformats/go-multicodec v0.9.0 github.com/prometheus/client_golang v1.18.0 @@ -88,6 +87,7 @@ require ( github.com/libp2p/go-libp2p-kad-dht v0.25.2 // indirect github.com/libp2p/go-libp2p-kbucket v0.6.3 // indirect github.com/libp2p/go-libp2p-record v0.2.0 // indirect + github.com/libp2p/go-libp2p-routing-helpers v0.7.3 // indirect github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-nat v0.2.0 // indirect github.com/libp2p/go-netroute v0.2.1 // indirect diff --git a/examples/unixfs-file-cid/main.go b/examples/unixfs-file-cid/main.go index e1adad350..e20b64ec1 100644 --- a/examples/unixfs-file-cid/main.go +++ b/examples/unixfs-file-cid/main.go @@ -32,7 +32,6 @@ import ( unixfile "github.com/ipfs/boxo/ipld/unixfs/file" "github.com/ipfs/boxo/ipld/unixfs/importer/balanced" uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" - routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" bsclient "github.com/ipfs/boxo/bitswap/client" bsnet "github.com/ipfs/boxo/bitswap/network" @@ -178,14 +177,14 @@ func startDataServer(ctx context.Context, h host.Host) (cid.Cid, *bsserver.Serve // Start listening on the Bitswap protocol // For this example we're not leveraging any content routing (DHT, IPNI, delegated routing requests, etc.) as we know the peer we are fetching from - n := bsnet.NewFromIpfsHost(h, routinghelpers.Null{}) + n := bsnet.NewFromIpfsHost(h) bswap := bsserver.New(ctx, n, bs) n.Start(bswap) return nd.Cid(), bswap, nil } func runClient(ctx context.Context, h host.Host, c cid.Cid, targetPeer string) ([]byte, error) { - n := bsnet.NewFromIpfsHost(h, routinghelpers.Null{}) + n := bsnet.NewFromIpfsHost(h) bswap := bsclient.New(ctx, n, blockstore.NewBlockstore(datastore.NewNullDatastore())) n.Start(bswap) defer bswap.Close() diff --git a/fetcher/helpers/block_visitor_test.go b/fetcher/helpers/block_visitor_test.go index 57d3e11ad..6276af1f6 100644 --- a/fetcher/helpers/block_visitor_test.go +++ b/fetcher/helpers/block_visitor_test.go @@ -11,7 +11,6 @@ import ( "github.com/ipfs/boxo/fetcher/helpers" bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" "github.com/ipfs/boxo/fetcher/testutil" - mockrouting "github.com/ipfs/boxo/routing/mock" blocks "github.com/ipfs/go-block-format" delay "github.com/ipfs/go-ipfs-delay" "github.com/ipld/go-ipld-prime" @@ -44,7 +43,7 @@ func TestFetchGraphToBlocks(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -94,7 +93,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() diff --git a/fetcher/impl/blockservice/fetcher_test.go b/fetcher/impl/blockservice/fetcher_test.go index 5a0b071f4..ddbd0863c 100644 --- a/fetcher/impl/blockservice/fetcher_test.go +++ b/fetcher/impl/blockservice/fetcher_test.go @@ -16,7 +16,6 @@ import ( "github.com/ipfs/boxo/fetcher/helpers" bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" "github.com/ipfs/boxo/fetcher/testutil" - mockrouting "github.com/ipfs/boxo/routing/mock" blocks "github.com/ipfs/go-block-format" delay "github.com/ipfs/go-ipfs-delay" "github.com/ipld/go-ipld-prime" @@ -38,7 +37,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -87,7 +86,7 @@ func TestFetchIPLDGraph(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -143,7 +142,7 @@ func TestFetchIPLDPath(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -207,7 +206,7 @@ func TestHelpers(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -321,7 +320,7 @@ func TestNodeReification(t *testing.T) { na.AssembleEntry("link4").AssignLink(link4) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() diff --git a/ipld/merkledag/merkledag_test.go b/ipld/merkledag/merkledag_test.go index ffe4946ca..206afcdfa 100644 --- a/ipld/merkledag/merkledag_test.go +++ b/ipld/merkledag/merkledag_test.go @@ -16,16 +16,17 @@ import ( "testing" "time" + testinstance "github.com/ipfs/boxo/bitswap/testinstance" + tn "github.com/ipfs/boxo/bitswap/testnet" + bserv "github.com/ipfs/boxo/blockservice" + bstest "github.com/ipfs/boxo/blockservice/test" . "github.com/ipfs/boxo/ipld/merkledag" mdpb "github.com/ipfs/boxo/ipld/merkledag/pb" dstest "github.com/ipfs/boxo/ipld/merkledag/test" - - bserv "github.com/ipfs/boxo/blockservice" - bstest "github.com/ipfs/boxo/blockservice/test" - offline "github.com/ipfs/boxo/exchange/offline" u "github.com/ipfs/boxo/util" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" + delay "github.com/ipfs/go-ipfs-delay" ipld "github.com/ipfs/go-ipld-format" prime "github.com/ipld/go-ipld-prime" mh "github.com/multiformats/go-multihash" @@ -507,10 +508,12 @@ func TestCantGet(t *testing.T) { } func TestFetchGraph(t *testing.T) { - var dservs []ipld.DAGService - bsis := bstest.Mocks(2) - for _, bsi := range bsis { - dservs = append(dservs, NewDAGService(bsi)) + net := tn.VirtualNetwork(delay.Fixed(0)) + sg := testinstance.NewTestInstanceGenerator(net, nil, nil) + instances := sg.Instances(2) + dservs := [2]ipld.DAGService{ + NewDAGService(bserv.New(instances[0].Blockstore(), instances[0].Exchange)), + NewDAGService(bserv.New(instances[1].Blockstore(), instances[1].Exchange)), } read := io.LimitReader(u.NewTimeSeededRand(), 1024*32) @@ -522,7 +525,7 @@ func TestFetchGraph(t *testing.T) { } // create an offline dagstore and ensure all blocks were fetched - bs := bserv.New(bsis[1].Blockstore(), offline.Exchange(bsis[1].Blockstore())) + bs := bserv.New(instances[1].Blockstore(), nil) offlineDS := NewDAGService(bs) @@ -547,10 +550,12 @@ func TestFetchGraphWithDepthLimit(t *testing.T) { } testF := func(t *testing.T, tc testcase) { - var dservs []ipld.DAGService - bsis := bstest.Mocks(2) - for _, bsi := range bsis { - dservs = append(dservs, NewDAGService(bsi)) + net := tn.VirtualNetwork(delay.Fixed(0)) + sg := testinstance.NewTestInstanceGenerator(net, nil, nil) + instances := sg.Instances(2) + dservs := [2]ipld.DAGService{ + NewDAGService(bserv.New(instances[0].Blockstore(), instances[0].Exchange)), + NewDAGService(bserv.New(instances[1].Blockstore(), instances[1].Exchange)), } root := makeDepthTestingGraph(t, dservs[0]) @@ -561,7 +566,7 @@ func TestFetchGraphWithDepthLimit(t *testing.T) { } // create an offline dagstore and ensure all blocks were fetched - bs := bserv.New(bsis[1].Blockstore(), offline.Exchange(bsis[1].Blockstore())) + bs := bserv.New(instances[1].Blockstore(), nil) offlineDS := NewDAGService(bs)