Skip to content

Commit

Permalink
bitswap/network,bitswap/client: move content routing responsabilities…
Browse files Browse the repository at this point in the history
… to an option of the client

Given that the previous commit remove the content advertising from the server, it did not made sense to share these paths on the network.

The code has been reworked:
- addresses aren't magically added to the peerstore as a side-effect of calling `Network.FindProvidersAsync`. Instead they are passed as hints to ConnectTo which copies libp2p `host.ConnectTo` API.
- the providerquerymanager is completely shutdown when not using `WithContentSearch` option, this helps usecase where `routinghelpers.Null` is used for content routing and the consumer exclusively rely on broadcast, like networks where most peoples have all the content (Filecoin, Celestia, ...).
  • Loading branch information
Jorropo committed Jan 16, 2024
1 parent 5e3072d commit 301d5b5
Show file tree
Hide file tree
Showing 23 changed files with 220 additions and 237 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ The following emojis are used to highlight certain changes:
- `blockservice.NewWritethrough` deprecated function has been removed, instead you can do `blockservice.New(..., ..., WriteThrough())` like previously.
- `blockservice` now has `WithContentBlocker` option which allows to filter Add and Get requests by CID.
- `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do.
- `bitswap` & `bitswap/client` now have a `WithContentSearch` option, this pickup the content routing job from `bitswap/network`.
It used to be a commun pattern for consumers which do not need external content routing to pass a [`routinghelpers.Null`](https://pkg.go.dev/github.com/libp2p/go-libp2p-routing-helpers#Null), now this can be ommited completely which is more efficient.

### Changed

- 🛠 `bitswap/network` no longer manages content routing, related Methods and function Arguments have been removed.
- `Network.ConnectTo` method has been changed from [`peer.ID`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/peer#ID) to [`peer.AddrInfo`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/peer#AddrInfo), given adding addresses hints used to be a side effect of the network. Theses now need to be passed in as values.

### Removed

- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany.
Expand Down
9 changes: 4 additions & 5 deletions bitswap/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
bsnet "github.com/ipfs/boxo/bitswap/network"
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
mockrouting "github.com/ipfs/boxo/routing/mock"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
)
Expand Down Expand Up @@ -142,7 +141,7 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) {
oldSeedCount := bch.oldSeedCount
newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount)

net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay)
net := tn.VirtualNetwork(fixedDelay)

// Simulate an older Bitswap node (old protocol ID) that doesn't
// send DONT_HAVE responses
Expand Down Expand Up @@ -294,7 +293,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
numblks := 1000

for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
Expand All @@ -312,7 +311,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.VirtualNetwork(mockrouting.NewServer(), d)
net := tn.VirtualNetwork(d)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)

Expand All @@ -327,7 +326,7 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b

func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
Expand Down
30 changes: 15 additions & 15 deletions bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
const kNetworkDelay = 0 * time.Millisecond

func TestClose(t *testing.T) {
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
Expand All @@ -66,7 +66,7 @@ func TestClose(t *testing.T) {

func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
rs := mockrouting.NewServer()
net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()

Expand All @@ -90,7 +90,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
}

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
}

func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)}
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
// Tests that a received block is not stored in the blockstore if the block was
// not requested by the client
func TestUnwantedBlockNotAdded(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
bsMessage := bsmsg.New(true)
bsMessage.AddBlock(block)
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
// (because the live request queue is full)
func TestPendingBlockAdded(t *testing.T) {
ctx := context.Background()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
bg := blocksutil.NewBlockGenerator()
sessionBroadcastWantCapacity := 4

Expand Down Expand Up @@ -278,7 +278,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
if testing.Short() {
t.SkipNow()
}
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, []bitswap.Option{
bitswap.TaskWorkerCount(5),
bitswap.EngineTaskWorkerCount(5),
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestSendToWantingPeer(t *testing.T) {
t.SkipNow()
}

net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestSendToWantingPeer(t *testing.T) {
}

func TestEmptyKey(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bs := ig.Instances(1)[0].Exchange
Expand Down Expand Up @@ -406,7 +406,7 @@ func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint6
}

func TestBasicBitswap(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -478,7 +478,7 @@ func TestBasicBitswap(t *testing.T) {
}

func TestDoubleGet(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestDoubleGet(t *testing.T) {
}

func TestWantlistCleanup(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -665,7 +665,7 @@ func newReceipt(sent, recv, exchanged uint64) *server.Receipt {
}

func TestBitswapLedgerOneWay(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -714,7 +714,7 @@ func TestBitswapLedgerOneWay(t *testing.T) {
}

func TestBitswapLedgerTwoWay(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -803,7 +803,7 @@ func (tsl *testingScoreLedger) Stop() {
// Tests start and stop of a custom decision logic
func TestWithScoreLedger(t *testing.T) {
tsl := newTestingScoreLedger()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
bsOpts := []bitswap.Option{bitswap.WithScoreLedger(tsl)}
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
defer ig.Close()
Expand Down
40 changes: 29 additions & 11 deletions bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ipfs/boxo/bitswap"
"github.com/ipfs/boxo/bitswap/client/internal/session"
"github.com/ipfs/boxo/bitswap/client/traceability"
bsnet "github.com/ipfs/boxo/bitswap/network"
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
mockrouting "github.com/ipfs/boxo/routing/mock"
Expand All @@ -18,13 +19,15 @@ import (
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
tu "github.com/libp2p/go-libp2p-testing/etc"
tnet "github.com/libp2p/go-libp2p-testing/net"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)

func getVirtualNetwork() tn.Network {
// FIXME: the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
return tn.VirtualNetwork(delay.Fixed(0))
}

func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
Expand All @@ -37,10 +40,6 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
if err != nil {
t.Fatal(err)
}
err = inst.Adapter.Provide(ctx, blk.Cid())
if err != nil {
t.Fatal(err)
}
}

func TestBasicSessions(t *testing.T) {
Expand Down Expand Up @@ -114,7 +113,7 @@ func TestSessionBetweenPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(time.Millisecond))
vnet := tn.VirtualNetwork(delay.Fixed(time.Millisecond))
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.SetSimulateDontHavesOnTimeout(false)})
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -219,16 +218,23 @@ func TestFetchNotConnected(t *testing.T) {
defer cancel()

vnet := getVirtualNetwork()
rs := mockrouting.NewServer()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)})
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()

other := ig.Next()
var otherClient mockrouting.Client
other := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
otherClient = rs.Client(id)
return nil, nil // don't add content search, only the client needs it
})

// Provide 10 blocks on Peer A
blks := bgen.Blocks(10)
for _, block := range blks {
addBlock(t, ctx, other, block)
err := otherClient.Provide(ctx, block.Cid(), true)
require.NoError(t, err)
}

var cids []cid.Cid
Expand All @@ -239,7 +245,9 @@ func TestFetchNotConnected(t *testing.T) {
// Request blocks with Peer B
// Note: Peer A and Peer B are not initially connected, so this tests
// that Peer B will search for and find Peer A
thisNode := ig.Next()
thisNode := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
return nil, []bitswap.Option{bitswap.WithContentSearch(rs.Client(id))}
})
ses := thisNode.Exchange.NewSession(ctx).(*session.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

Expand All @@ -262,16 +270,19 @@ func TestFetchAfterDisconnect(t *testing.T) {
defer cancel()

vnet := getVirtualNetwork()
rs := mockrouting.NewServer()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{
bitswap.ProviderSearchDelay(10 * time.Millisecond),
bitswap.RebroadcastDelay(delay.Fixed(15 * time.Millisecond)),
})
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()

inst := ig.Instances(2)
peerA := inst[0]
peerB := inst[1]
var aClient mockrouting.Client
peerA := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
aClient = rs.Client(id)
return nil, nil // don't add content search, only the client needs it
})

// Provide 5 blocks on Peer A
blks := bgen.Blocks(10)
Expand All @@ -283,9 +294,14 @@ func TestFetchAfterDisconnect(t *testing.T) {
firstBlks := blks[:5]
for _, block := range firstBlks {
addBlock(t, ctx, peerA, block)
err := aClient.Provide(ctx, block.Cid(), true)
require.NoError(t, err)
}

// Request all blocks with Peer B
peerB := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
return nil, []bitswap.Option{bitswap.WithContentSearch(rs.Client(id))}
})
ses := peerB.Exchange.NewSession(ctx).(*session.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

Expand Down Expand Up @@ -317,6 +333,8 @@ func TestFetchAfterDisconnect(t *testing.T) {
lastBlks := blks[5:]
for _, block := range lastBlks {
addBlock(t, ctx, peerA, block)
err := aClient.Provide(ctx, block.Cid(), true)
require.NoError(t, err)
}

// Peer B should call FindProviders() and find Peer A
Expand Down
Loading

0 comments on commit 301d5b5

Please sign in to comment.