Skip to content

Commit

Permalink
Don't add blocks to the datastore
Browse files Browse the repository at this point in the history
This leave the responsibility and choice to do so to the caller, typically go-blockservice.

This has several benefit:
- untangle the code
- allow to use an exchange as pure block retrieval
- avoid double add

Close ipfs/kubo#7956
  • Loading branch information
MichaelMure committed Jul 14, 2022
1 parent 8497368 commit 9f2c3e0
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 164 deletions.
111 changes: 53 additions & 58 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"errors"
"fmt"

"sync"
"time"

Expand Down Expand Up @@ -464,72 +463,82 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
return session.GetBlocks(ctx, keys)
}

// HasBlock announces the existence of a block to this bitswap service. The
// NotifyNewBlocks announces the existence of blocks to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.String("Block", blk.Cid().String())))
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error {
ctx, span := internal.StartSpan(ctx, "NotifyNewBlocks")
defer span.End()
return bs.receiveBlocksFrom(ctx, "", []blocks.Block{blk}, nil, nil)
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
default:
}

wanted := blks
blkCids := make([]cid.Cid, len(blks))
for i, blk := range blks {
blkCids[i] = blk.Cid()
}

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(ctx, "", blkCids, nil, nil)

// Send wanted blocks to decision engine
bs.engine.NotifyNewBlocks(blks)

// If blocks came from the network
if from != "" {
var notWanted []blocks.Block
wanted, notWanted = bs.sim.SplitWantedUnwanted(blks)
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
bs.notif.Publish(blks...)

// If the reprovider is enabled, send block to reprovider
if bs.provideEnabled {
for _, blk := range blks {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
}

// Put wanted blocks into blockstore
if len(wanted) > 0 {
err := bs.blockstore.PutMany(ctx, wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
}
return nil
}

// receiveBlocksFrom process blocks received from the network
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
default:
}

// NOTE: There exists the possiblity for a race condition here. If a user
// creates a node, then adds it to the dagservice while another goroutine
// is waiting on a GetBlock for that object, they will receive a reference
// to the same node. We should address this soon, but i'm not going to do
// it now as it requires more thought and isnt causing immediate problems.
wanted, notWanted := bs.sim.SplitWantedUnwanted(blks)
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}

allKs := make([]cid.Cid, 0, len(blks))
for _, b := range blks {
allKs = append(allKs, b.Cid())
}

// If the message came from the network
if from != "" {
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, combined)
}
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, combined)

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
// Send all block keys (including duplicates) to any sessions that want them for accounting purpose.
bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)

// Send wanted blocks to decision engine
bs.engine.ReceiveFrom(from, wanted)
bs.engine.ReceivedBlocks(from, wanted)

// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of incoming
Expand All @@ -538,22 +547,8 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
bs.notif.Publish(b)
}

// If the reprovider is enabled, send wanted blocks to reprovider
if bs.provideEnabled {
for _, blk := range wanted {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
}

if from != "" {
for _, b := range wanted {
log.Debugw("Bitswap.GetBlockRequest.End", "cid", b.Cid())
}
for _, b := range wanted {
log.Debugw("Bitswap.GetBlockRequest.End", "cid", b.Cid())
}

return nil
Expand Down
94 changes: 27 additions & 67 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ func getVirtualNetwork() tn.Network {
return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
t.Helper()
err := inst.Blockstore().Put(ctx, blk)
if err != nil {
t.Fatal(err)
}
err = inst.Exchange.NotifyNewBlocks(ctx, blk)
if err != nil {
t.Fatal(err)
}
}

func TestClose(t *testing.T) {
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
Expand Down Expand Up @@ -95,9 +107,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), hasBlock, block)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()
Expand Down Expand Up @@ -128,9 +138,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
wantsBlock := ig.Next()
defer wantsBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), hasBlock, block)

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond)
defer cancel()
Expand Down Expand Up @@ -163,9 +171,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), hasBlock, block)

doesNotWantBlock := peers[1]
defer doesNotWantBlock.Exchange.Close()
Expand Down Expand Up @@ -232,15 +238,6 @@ func TestPendingBlockAdded(t *testing.T) {
if !blkrecvd.Cid().Equals(lastBlock.Cid()) {
t.Fatal("received wrong block")
}

// Make sure Bitswap adds the block to the blockstore
blockInStore, err := instance.Blockstore().Has(context.Background(), lastBlock.Cid())
if err != nil {
t.Fatal(err)
}
if !blockInStore {
t.Fatal("Block was not added to block store")
}
}

func TestLargeSwarm(t *testing.T) {
Expand Down Expand Up @@ -307,10 +304,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
first := instances[0]
for _, b := range blocks {
blkeys = append(blkeys, b.Cid())
err := first.Exchange.HasBlock(ctx, b)
if err != nil {
t.Fatal(err)
}
addBlock(t, ctx, first, b)
}

t.Log("Distribute!")
Expand Down Expand Up @@ -341,16 +335,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.Fatal(err)
}
}

t.Log("Verify!")

for _, inst := range instances {
for _, b := range blocks {
if _, err := inst.Blockstore().Get(ctx, b.Cid()); err != nil {
t.Fatal(err)
}
}
}
}

// TODO simplify this test. get to the _essence_!
Expand Down Expand Up @@ -383,10 +367,7 @@ func TestSendToWantingPeer(t *testing.T) {
}

// peerB announces to the network that he has block alpha
err = peerB.Exchange.HasBlock(ctx, alpha)
if err != nil {
t.Fatal(err)
}
addBlock(t, ctx, peerB, alpha)

// At some point, peerA should get alpha (or timeout)
blkrecvd, ok := <-alphaPromise
Expand Down Expand Up @@ -445,10 +426,7 @@ func TestBasicBitswap(t *testing.T) {
blocks := bg.Blocks(1)

// First peer has block
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Expand Down Expand Up @@ -545,10 +523,7 @@ func TestDoubleGet(t *testing.T) {
t.Fatal("expected channel to be closed")
}

err = instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])

select {
case blk, ok := <-blkch2:
Expand Down Expand Up @@ -708,10 +683,7 @@ func TestBitswapLedgerOneWay(t *testing.T) {

instances := ig.Instances(2)
blocks := bg.Blocks(1)
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Expand Down Expand Up @@ -760,19 +732,12 @@ func TestBitswapLedgerTwoWay(t *testing.T) {

instances := ig.Instances(2)
blocks := bg.Blocks(2)
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}

err = instances[1].Exchange.HasBlock(context.Background(), blocks[1])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])
addBlock(t, context.Background(), instances[1], blocks[1])

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
_, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -911,17 +876,14 @@ func TestTracer(t *testing.T) {
bitswap.WithTracer(wiretap)(instances[0].Exchange)

// First peer has block
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// Second peer broadcasts want for block CID
// (Received by first and third peers)
_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
_, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -995,10 +957,8 @@ func TestTracer(t *testing.T) {
// After disabling WireTap, no new messages are logged
bitswap.WithTracer(nil)(instances[0].Exchange)

err = instances[0].Exchange.HasBlock(context.Background(), blocks[1])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[1])

_, err = instances[1].Exchange.GetBlock(ctx, blocks[1].Cid())
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 9f2c3e0

Please sign in to comment.