diff --git a/bitswap.go b/bitswap.go index cfb138cf..8c549ede 100644 --- a/bitswap.go +++ b/bitswap.go @@ -6,7 +6,6 @@ import ( "context" "errors" "fmt" - "sync" "time" @@ -464,72 +463,82 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks return session.GetBlocks(ctx, keys) } -// HasBlock announces the existence of a block to this bitswap service. The +// NotifyNewBlocks announces the existence of blocks to this bitswap service. The // service will potentially notify its peers. -func (bs *Bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { - ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.String("Block", blk.Cid().String()))) +// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure +// that those blocks are available in the blockstore before calling this function. +func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error { + ctx, span := internal.StartSpan(ctx, "NotifyNewBlocks") defer span.End() - return bs.receiveBlocksFrom(ctx, "", []blocks.Block{blk}, nil, nil) -} -// TODO: Some of this stuff really only needs to be done when adding a block -// from the user, not when receiving it from the network. -// In case you run `git blame` on this comment, I'll save you some time: ask -// @whyrusleeping, I don't know the answers you seek. -func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error { select { case <-bs.process.Closing(): return errors.New("bitswap is closed") default: } - wanted := blks + blkCids := make([]cid.Cid, len(blks)) + for i, blk := range blks { + blkCids[i] = blk.Cid() + } + + // Send all block keys (including duplicates) to any sessions that want them. + // (The duplicates are needed by sessions for accounting purposes) + bs.sm.ReceiveFrom(ctx, "", blkCids, nil, nil) + + // Send wanted blocks to decision engine + bs.engine.NotifyNewBlocks(blks) - // If blocks came from the network - if from != "" { - var notWanted []blocks.Block - wanted, notWanted = bs.sim.SplitWantedUnwanted(blks) - for _, b := range notWanted { - log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from) + // Publish the block to any Bitswap clients that had requested blocks. + // (the sessions use this pubsub mechanism to inform clients of incoming + // blocks) + bs.notif.Publish(blks...) + + // If the reprovider is enabled, send block to reprovider + if bs.provideEnabled { + for _, blk := range blks { + select { + case bs.newBlocks <- blk.Cid(): + // send block off to be reprovided + case <-bs.process.Closing(): + return bs.process.Close() + } } } - // Put wanted blocks into blockstore - if len(wanted) > 0 { - err := bs.blockstore.PutMany(ctx, wanted) - if err != nil { - log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err) - return err - } + return nil +} + +// receiveBlocksFrom process blocks received from the network +func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error { + select { + case <-bs.process.Closing(): + return errors.New("bitswap is closed") + default: } - // NOTE: There exists the possiblity for a race condition here. If a user - // creates a node, then adds it to the dagservice while another goroutine - // is waiting on a GetBlock for that object, they will receive a reference - // to the same node. We should address this soon, but i'm not going to do - // it now as it requires more thought and isnt causing immediate problems. + wanted, notWanted := bs.sim.SplitWantedUnwanted(blks) + for _, b := range notWanted { + log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from) + } allKs := make([]cid.Cid, 0, len(blks)) for _, b := range blks { allKs = append(allKs, b.Cid()) } - // If the message came from the network - if from != "" { - // Inform the PeerManager so that we can calculate per-peer latency - combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves)) - combined = append(combined, allKs...) - combined = append(combined, haves...) - combined = append(combined, dontHaves...) - bs.pm.ResponseReceived(from, combined) - } + // Inform the PeerManager so that we can calculate per-peer latency + combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves)) + combined = append(combined, allKs...) + combined = append(combined, haves...) + combined = append(combined, dontHaves...) + bs.pm.ResponseReceived(from, combined) - // Send all block keys (including duplicates) to any sessions that want them. - // (The duplicates are needed by sessions for accounting purposes) + // Send all block keys (including duplicates) to any sessions that want them for accounting purpose. bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves) // Send wanted blocks to decision engine - bs.engine.ReceiveFrom(from, wanted) + bs.engine.ReceivedBlocks(from, wanted) // Publish the block to any Bitswap clients that had requested blocks. // (the sessions use this pubsub mechanism to inform clients of incoming @@ -538,22 +547,8 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b bs.notif.Publish(b) } - // If the reprovider is enabled, send wanted blocks to reprovider - if bs.provideEnabled { - for _, blk := range wanted { - select { - case bs.newBlocks <- blk.Cid(): - // send block off to be reprovided - case <-bs.process.Closing(): - return bs.process.Close() - } - } - } - - if from != "" { - for _, b := range wanted { - log.Debugw("Bitswap.GetBlockRequest.End", "cid", b.Cid()) - } + for _, b := range wanted { + log.Debugw("Bitswap.GetBlockRequest.End", "cid", b.Cid()) } return nil diff --git a/bitswap_test.go b/bitswap_test.go index 048d7e6a..eae7fa75 100644 --- a/bitswap_test.go +++ b/bitswap_test.go @@ -42,6 +42,18 @@ func getVirtualNetwork() tn.Network { return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) } +func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) { + t.Helper() + err := inst.Blockstore().Put(ctx, blk) + if err != nil { + t.Fatal(err) + } + err = inst.Exchange.NotifyNewBlocks(ctx, blk) + if err != nil { + t.Fatal(err) + } +} + func TestClose(t *testing.T) { vnet := getVirtualNetwork() ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) @@ -95,9 +107,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { hasBlock := peers[0] defer hasBlock.Exchange.Close() - if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil { - t.Fatal(err) - } + addBlock(t, context.Background(), hasBlock, block) wantsBlock := peers[1] defer wantsBlock.Exchange.Close() @@ -128,9 +138,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { wantsBlock := ig.Next() defer wantsBlock.Exchange.Close() - if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil { - t.Fatal(err) - } + addBlock(t, context.Background(), hasBlock, block) ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond) defer cancel() @@ -163,9 +171,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) { hasBlock := peers[0] defer hasBlock.Exchange.Close() - if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil { - t.Fatal(err) - } + addBlock(t, context.Background(), hasBlock, block) doesNotWantBlock := peers[1] defer doesNotWantBlock.Exchange.Close() @@ -232,15 +238,6 @@ func TestPendingBlockAdded(t *testing.T) { if !blkrecvd.Cid().Equals(lastBlock.Cid()) { t.Fatal("received wrong block") } - - // Make sure Bitswap adds the block to the blockstore - blockInStore, err := instance.Blockstore().Has(context.Background(), lastBlock.Cid()) - if err != nil { - t.Fatal(err) - } - if !blockInStore { - t.Fatal("Block was not added to block store") - } } func TestLargeSwarm(t *testing.T) { @@ -307,10 +304,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { first := instances[0] for _, b := range blocks { blkeys = append(blkeys, b.Cid()) - err := first.Exchange.HasBlock(ctx, b) - if err != nil { - t.Fatal(err) - } + addBlock(t, ctx, first, b) } t.Log("Distribute!") @@ -341,16 +335,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { t.Fatal(err) } } - - t.Log("Verify!") - - for _, inst := range instances { - for _, b := range blocks { - if _, err := inst.Blockstore().Get(ctx, b.Cid()); err != nil { - t.Fatal(err) - } - } - } } // TODO simplify this test. get to the _essence_! @@ -383,10 +367,7 @@ func TestSendToWantingPeer(t *testing.T) { } // peerB announces to the network that he has block alpha - err = peerB.Exchange.HasBlock(ctx, alpha) - if err != nil { - t.Fatal(err) - } + addBlock(t, ctx, peerB, alpha) // At some point, peerA should get alpha (or timeout) blkrecvd, ok := <-alphaPromise @@ -445,10 +426,7 @@ func TestBasicBitswap(t *testing.T) { blocks := bg.Blocks(1) // First peer has block - err := instances[0].Exchange.HasBlock(context.Background(), blocks[0]) - if err != nil { - t.Fatal(err) - } + addBlock(t, context.Background(), instances[0], blocks[0]) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -545,10 +523,7 @@ func TestDoubleGet(t *testing.T) { t.Fatal("expected channel to be closed") } - err = instances[0].Exchange.HasBlock(context.Background(), blocks[0]) - if err != nil { - t.Fatal(err) - } + addBlock(t, context.Background(), instances[0], blocks[0]) select { case blk, ok := <-blkch2: @@ -708,10 +683,7 @@ func TestBitswapLedgerOneWay(t *testing.T) { instances := ig.Instances(2) blocks := bg.Blocks(1) - err := instances[0].Exchange.HasBlock(context.Background(), blocks[0]) - if err != nil { - t.Fatal(err) - } + addBlock(t, context.Background(), instances[0], blocks[0]) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -760,19 +732,12 @@ func TestBitswapLedgerTwoWay(t *testing.T) { instances := ig.Instances(2) blocks := bg.Blocks(2) - err := instances[0].Exchange.HasBlock(context.Background(), blocks[0]) - if err != nil { - t.Fatal(err) - } - - err = instances[1].Exchange.HasBlock(context.Background(), blocks[1]) - if err != nil { - t.Fatal(err) - } + addBlock(t, context.Background(), instances[0], blocks[0]) + addBlock(t, context.Background(), instances[1], blocks[1]) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - _, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid()) + _, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid()) if err != nil { t.Fatal(err) } @@ -911,17 +876,14 @@ func TestTracer(t *testing.T) { bitswap.WithTracer(wiretap)(instances[0].Exchange) // First peer has block - err := instances[0].Exchange.HasBlock(context.Background(), blocks[0]) - if err != nil { - t.Fatal(err) - } + addBlock(t, context.Background(), instances[0], blocks[0]) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() // Second peer broadcasts want for block CID // (Received by first and third peers) - _, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid()) + _, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid()) if err != nil { t.Fatal(err) } @@ -995,10 +957,8 @@ func TestTracer(t *testing.T) { // After disabling WireTap, no new messages are logged bitswap.WithTracer(nil)(instances[0].Exchange) - err = instances[0].Exchange.HasBlock(context.Background(), blocks[1]) - if err != nil { - t.Fatal(err) - } + addBlock(t, context.Background(), instances[0], blocks[1]) + _, err = instances[1].Exchange.GetBlock(ctx, blocks[1].Cid()) if err != nil { t.Fatal(err) diff --git a/bitswap_with_sessions_test.go b/bitswap_with_sessions_test.go index 40eed0ff..7532a908 100644 --- a/bitswap_with_sessions_test.go +++ b/bitswap_with_sessions_test.go @@ -187,9 +187,7 @@ func TestFetchNotConnected(t *testing.T) { // Provide 10 blocks on Peer A blks := bgen.Blocks(10) for _, block := range blks { - if err := other.Exchange.HasBlock(ctx, block); err != nil { - t.Fatal(err) - } + addBlock(t, ctx, other, block) } var cids []cid.Cid @@ -243,9 +241,7 @@ func TestFetchAfterDisconnect(t *testing.T) { firstBlks := blks[:5] for _, block := range firstBlks { - if err := peerA.Exchange.HasBlock(ctx, block); err != nil { - t.Fatal(err) - } + addBlock(t, ctx, peerA, block) } // Request all blocks with Peer B @@ -279,9 +275,7 @@ func TestFetchAfterDisconnect(t *testing.T) { // Provide remaining blocks lastBlks := blks[5:] for _, block := range lastBlks { - if err := peerA.Exchange.HasBlock(ctx, block); err != nil { - t.Fatal(err) - } + addBlock(t, ctx, peerA, block) } // Peer B should call FindProviders() and find Peer A @@ -334,9 +328,7 @@ func TestInterestCacheOverflow(t *testing.T) { // wait to ensure that all the above cids were added to the sessions cache time.Sleep(time.Millisecond * 50) - if err := b.Exchange.HasBlock(ctx, blks[0]); err != nil { - t.Fatal(err) - } + addBlock(t, ctx, b, blks[0]) select { case blk, ok := <-zeroch: @@ -381,9 +373,7 @@ func TestPutAfterSessionCacheEvict(t *testing.T) { // wait to ensure that all the above cids were added to the sessions cache time.Sleep(time.Millisecond * 50) - if err := a.Exchange.HasBlock(ctx, blks[17]); err != nil { - t.Fatal(err) - } + addBlock(t, ctx, a, blks[17]) select { case <-blkch: @@ -423,9 +413,7 @@ func TestMultipleSessions(t *testing.T) { } time.Sleep(time.Millisecond * 10) - if err := b.Exchange.HasBlock(ctx, blk); err != nil { - t.Fatal(err) - } + addBlock(t, ctx, b, blk) select { case <-blkch2: diff --git a/go.mod b/go.mod index 46cd891c..862807c7 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,8 @@ require ( go.uber.org/zap v1.16.0 ) +replace github.com/ipfs/go-ipfs-exchange-interface => github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20220713142804-1181846dc171 + require ( github.com/btcsuite/btcd v0.21.0-beta // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 2bb54ecc..950b229c 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= +github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20220713142804-1181846dc171 h1:J6IkkSKshHms3yQEOrNK/7B2YcCJ6ZbyDDmaXHwOj4Y= +github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20220713142804-1181846dc171/go.mod h1:z6+RhJuDQbqKguVyslSOuVDhqF9JtTrO3eptSAiW2/Y= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= @@ -287,8 +289,6 @@ github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG github.com/ipfs/go-ipfs-ds-help v0.1.1/go.mod h1:SbBafGJuGsPI/QL3j9Fc5YPLeAu+SzOkI0gFwAg+mOs= github.com/ipfs/go-ipfs-ds-help v1.1.0 h1:yLE2w9RAsl31LtfMt91tRZcrx+e61O5mDxFRR994w4Q= github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNot+rsOU/5IatU= -github.com/ipfs/go-ipfs-exchange-interface v0.1.0 h1:TiMekCrOGQuWYtZO3mf4YJXDIdNgnKWZ9IE3fGlnWfo= -github.com/ipfs/go-ipfs-exchange-interface v0.1.0/go.mod h1:ych7WPlyHqFvCi/uQI48zLZuAWVP5iTQPXEfVaw5WEI= github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY= github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= github.com/ipfs/go-ipfs-routing v0.2.1 h1:E+whHWhJkdN9YeoHZNj5itzc+OR292AJ2uE9FFiW0BY= diff --git a/internal/decision/engine.go b/internal/decision/engine.go index c8c33097..b3877757 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -769,27 +769,29 @@ func (e *Engine) splitWantsDenials(p peer.ID, allWants []bsmsg.Entry) ([]bsmsg.E return wants, denied } -// ReceiveFrom is called when new blocks are received and added to the block -// store, meaning there may be peers who want those blocks, so we should send -// the blocks to them. -// +// ReceivedBlocks is called when new blocks are received from the network. // This function also updates the receive side of the ledger. -func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block) { +func (e *Engine) ReceivedBlocks(from peer.ID, blks []blocks.Block) { if len(blks) == 0 { return } - if from != "" { - l := e.findOrCreate(from) - l.lk.Lock() + l := e.findOrCreate(from) - // Record how many bytes were received in the ledger - for _, blk := range blks { - log.Debugw("Bitswap engine <- block", "local", e.self, "from", from, "cid", blk.Cid(), "size", len(blk.RawData())) - e.scoreLedger.AddToReceivedBytes(l.Partner, len(blk.RawData())) - } + // Record how many bytes were received in the ledger + l.lk.Lock() + for _, blk := range blks { + log.Debugw("Bitswap engine <- block", "local", e.self, "from", from, "cid", blk.Cid(), "size", len(blk.RawData())) + e.scoreLedger.AddToReceivedBytes(l.Partner, len(blk.RawData())) + } + l.lk.Unlock() +} - l.lk.Unlock() +// NotifyNewBlocks is called when new blocks becomes available locally, and in particular when the caller of bitswap +// decide to store those blocks and make them available on the network. +func (e *Engine) NotifyNewBlocks(blks []blocks.Block) { + if len(blks) == 0 { + return } // Get the size of each block diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index c4dc5348..ca3c7abd 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -104,7 +104,7 @@ func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInte e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) return engineSet{ Peer: peer.ID(idStr), - //Strategy: New(true), + // Strategy: New(true), PeerTagger: fpt, Blockstore: bs, Engine: e, @@ -126,7 +126,7 @@ func TestConsistentAccounting(t *testing.T) { sender.Engine.MessageSent(receiver.Peer, m) receiver.Engine.MessageReceived(ctx, sender.Peer, m) - receiver.Engine.ReceiveFrom(sender.Peer, m.Blocks()) + receiver.Engine.ReceivedBlocks(sender.Peer, m.Blocks()) } // Ensure sender records the change @@ -936,10 +936,11 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) { t.Fatal("expected no envelope yet") } + e.ReceivedBlocks(otherPeer, []blocks.Block{blks[0], blks[2]}) if err := bs.PutMany(context.Background(), []blocks.Block{blks[0], blks[2]}); err != nil { t.Fatal(err) } - e.ReceiveFrom(otherPeer, []blocks.Block{blks[0], blks[2]}) + e.NotifyNewBlocks([]blocks.Block{blks[0], blks[2]}) _, env = getNextEnvelope(e, next, 5*time.Millisecond) if env == nil { t.Fatal("expected envelope") @@ -1000,10 +1001,11 @@ func TestSendDontHave(t *testing.T) { } // Receive all the blocks + e.ReceivedBlocks(otherPeer, []blocks.Block{blks[0], blks[2]}) if err := bs.PutMany(context.Background(), blks); err != nil { t.Fatal(err) } - e.ReceiveFrom(otherPeer, blks) + e.NotifyNewBlocks(blks) // Envelope should contain 2 HAVEs / 2 blocks _, env = getNextEnvelope(e, next, 10*time.Millisecond) diff --git a/internal/notifications/notifications.go b/internal/notifications/notifications.go index 7defea73..ed4b79f5 100644 --- a/internal/notifications/notifications.go +++ b/internal/notifications/notifications.go @@ -15,7 +15,7 @@ const bufferSize = 16 // for cids. It's used internally by bitswap to decouple receiving blocks // and actually providing them back to the GetBlocks caller. type PubSub interface { - Publish(block blocks.Block) + Publish(blocks ...blocks.Block) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block Shutdown() } @@ -35,7 +35,7 @@ type impl struct { closed chan struct{} } -func (ps *impl) Publish(block blocks.Block) { +func (ps *impl) Publish(blocks ...blocks.Block) { ps.lk.RLock() defer ps.lk.RUnlock() select { @@ -44,7 +44,9 @@ func (ps *impl) Publish(block blocks.Block) { default: } - ps.wrapped.Pub(block, block.Cid().KeyString()) + for _, block := range blocks { + ps.wrapped.Pub(block, block.Cid().KeyString()) + } } func (ps *impl) Shutdown() {