Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bitswap move providing responsabilities from the server to blockservice & reprovider #528

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,20 @@ The following emojis are used to highlight certain changes:

### Added

- `bitswap` & `bitswap/client` now have a `WithContentSearch` option, this pickup the content routing job from `bitswap/network`.
It used to be a commun pattern for consumers which do not need external content routing to pass a [`routinghelpers.Null`](https://pkg.go.dev/github.com/libp2p/go-libp2p-routing-helpers#Null), now this can be ommited completely which is more efficient.
- `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do.

### Changed

- 🛠 `bitswap/network` no longer manages content routing, related Methods and function Arguments have been removed.
- `Network.ConnectTo` method has been changed from [`peer.ID`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/peer#ID) to [`peer.AddrInfo`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/peer#AddrInfo), given adding addresses hints used to be a side effect of the network. Theses now need to be passed in as values.

### Removed

- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany.
- 🛠 `tracing` `jaeger` exporter has been removed due to it's deprecation and removal from upstream, you should use `otlp` exporter instead. See the [docs](./docs/tracing.md) for an example.

### Security

## [v0.16.0]
Expand Down
9 changes: 4 additions & 5 deletions bitswap/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
bsnet "github.com/ipfs/boxo/bitswap/network"
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
mockrouting "github.com/ipfs/boxo/routing/mock"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
)
Expand Down Expand Up @@ -142,7 +141,7 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) {
oldSeedCount := bch.oldSeedCount
newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount)

net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay)
net := tn.VirtualNetwork(fixedDelay)

// Simulate an older Bitswap node (old protocol ID) that doesn't
// send DONT_HAVE responses
Expand Down Expand Up @@ -294,7 +293,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
numblks := 1000

for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
Expand All @@ -312,7 +311,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.VirtualNetwork(mockrouting.NewServer(), d)
net := tn.VirtualNetwork(d)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)

Expand All @@ -327,7 +326,7 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b

func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
for i := 0; i < b.N; i++ {
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)

ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
Expand Down
12 changes: 2 additions & 10 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/internal/defaults"
"github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/server"
Expand Down Expand Up @@ -45,9 +44,8 @@ type bitswap interface {
}

var (
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
HasBlockBufferSize = defaults.HasBlockBufferSize
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
)

type Bitswap struct {
Expand Down Expand Up @@ -85,10 +83,6 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
serverOptions = append(serverOptions, server.WithTracer(tracer))
}

if HasBlockBufferSize != defaults.HasBlockBufferSize {
serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize))
}

ctx = metrics.CtxSubScope(ctx, "bitswap")

bs.Server = server.New(ctx, net, bstore, serverOptions...)
Expand All @@ -115,7 +109,6 @@ type Stat struct {
MessagesReceived uint64
BlocksSent uint64
DataSent uint64
ProvideBufLen int
}

func (bs *Bitswap) Stat() (*Stat, error) {
Expand All @@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) {
Peers: ss.Peers,
BlocksSent: ss.BlocksSent,
DataSent: ss.DataSent,
ProvideBufLen: ss.ProvideBufLen,
}, nil
}

Expand Down
32 changes: 16 additions & 16 deletions bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
const kNetworkDelay = 0 * time.Millisecond

func TestClose(t *testing.T) {
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
Expand All @@ -66,7 +66,7 @@ func TestClose(t *testing.T) {

func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
rs := mockrouting.NewServer()
net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()

Expand All @@ -90,7 +90,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
}

func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
Expand Down Expand Up @@ -118,9 +118,9 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
}

func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)}
bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)}
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
defer ig.Close()

Expand Down Expand Up @@ -150,7 +150,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
// Tests that a received block is not stored in the blockstore if the block was
// not requested by the client
func TestUnwantedBlockNotAdded(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
bsMessage := bsmsg.New(true)
bsMessage.AddBlock(block)
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
// (because the live request queue is full)
func TestPendingBlockAdded(t *testing.T) {
ctx := context.Background()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
bg := blocksutil.NewBlockGenerator()
sessionBroadcastWantCapacity := 4

Expand Down Expand Up @@ -278,7 +278,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
if testing.Short() {
t.SkipNow()
}
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, []bitswap.Option{
bitswap.TaskWorkerCount(5),
bitswap.EngineTaskWorkerCount(5),
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestSendToWantingPeer(t *testing.T) {
t.SkipNow()
}

net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestSendToWantingPeer(t *testing.T) {
}

func TestEmptyKey(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bs := ig.Instances(1)[0].Exchange
Expand Down Expand Up @@ -406,7 +406,7 @@ func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint6
}

func TestBasicBitswap(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -478,7 +478,7 @@ func TestBasicBitswap(t *testing.T) {
}

func TestDoubleGet(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestDoubleGet(t *testing.T) {
}

func TestWantlistCleanup(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -665,7 +665,7 @@ func newReceipt(sent, recv, exchanged uint64) *server.Receipt {
}

func TestBitswapLedgerOneWay(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -714,7 +714,7 @@ func TestBitswapLedgerOneWay(t *testing.T) {
}

func TestBitswapLedgerTwoWay(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -803,7 +803,7 @@ func (tsl *testingScoreLedger) Stop() {
// Tests start and stop of a custom decision logic
func TestWithScoreLedger(t *testing.T) {
tsl := newTestingScoreLedger()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
bsOpts := []bitswap.Option{bitswap.WithScoreLedger(tsl)}
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
defer ig.Close()
Expand Down
36 changes: 29 additions & 7 deletions bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ipfs/boxo/bitswap"
"github.com/ipfs/boxo/bitswap/client/internal/session"
"github.com/ipfs/boxo/bitswap/client/traceability"
bsnet "github.com/ipfs/boxo/bitswap/network"
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
mockrouting "github.com/ipfs/boxo/routing/mock"
Expand All @@ -18,13 +19,15 @@ import (
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
tu "github.com/libp2p/go-libp2p-testing/etc"
tnet "github.com/libp2p/go-libp2p-testing/net"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)

func getVirtualNetwork() tn.Network {
// FIXME: the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
return tn.VirtualNetwork(delay.Fixed(0))
}

func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
Expand Down Expand Up @@ -110,7 +113,7 @@ func TestSessionBetweenPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(time.Millisecond))
vnet := tn.VirtualNetwork(delay.Fixed(time.Millisecond))
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.SetSimulateDontHavesOnTimeout(false)})
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -215,16 +218,23 @@ func TestFetchNotConnected(t *testing.T) {
defer cancel()

vnet := getVirtualNetwork()
rs := mockrouting.NewServer()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)})
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()

other := ig.Next()
var otherClient mockrouting.Client
other := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
otherClient = rs.Client(id)
return nil, nil // don't add content search, only the client needs it
})

// Provide 10 blocks on Peer A
blks := bgen.Blocks(10)
for _, block := range blks {
addBlock(t, ctx, other, block)
err := otherClient.Provide(ctx, block.Cid(), true)
require.NoError(t, err)
}

var cids []cid.Cid
Expand All @@ -235,7 +245,9 @@ func TestFetchNotConnected(t *testing.T) {
// Request blocks with Peer B
// Note: Peer A and Peer B are not initially connected, so this tests
// that Peer B will search for and find Peer A
thisNode := ig.Next()
thisNode := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
return nil, []bitswap.Option{bitswap.WithContentSearch(rs.Client(id))}
})
ses := thisNode.Exchange.NewSession(ctx).(*session.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

Expand All @@ -258,16 +270,19 @@ func TestFetchAfterDisconnect(t *testing.T) {
defer cancel()

vnet := getVirtualNetwork()
rs := mockrouting.NewServer()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{
bitswap.ProviderSearchDelay(10 * time.Millisecond),
bitswap.RebroadcastDelay(delay.Fixed(15 * time.Millisecond)),
})
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()

inst := ig.Instances(2)
peerA := inst[0]
peerB := inst[1]
var aClient mockrouting.Client
peerA := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
aClient = rs.Client(id)
return nil, nil // don't add content search, only the client needs it
})

// Provide 5 blocks on Peer A
blks := bgen.Blocks(10)
Expand All @@ -279,9 +294,14 @@ func TestFetchAfterDisconnect(t *testing.T) {
firstBlks := blks[:5]
for _, block := range firstBlks {
addBlock(t, ctx, peerA, block)
err := aClient.Provide(ctx, block.Cid(), true)
require.NoError(t, err)
}

// Request all blocks with Peer B
peerB := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
return nil, []bitswap.Option{bitswap.WithContentSearch(rs.Client(id))}
})
ses := peerB.Exchange.NewSession(ctx).(*session.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

Expand Down Expand Up @@ -313,6 +333,8 @@ func TestFetchAfterDisconnect(t *testing.T) {
lastBlks := blks[5:]
for _, block := range lastBlks {
addBlock(t, ctx, peerA, block)
err := aClient.Provide(ctx, block.Cid(), true)
require.NoError(t, err)
}

// Peer B should call FindProviders() and find Peer A
Expand Down
Loading