Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bitswap/client: add basic traceable blocks #308

Merged
merged 4 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The following emojis are used to highlight certain changes:
* The only change to the default behavior on CAR responses is that we follow
IPIP-412 and make `order=dfs;dups=n` explicit in the returned
`Content-Type` HTTP header.
* ✨ While the call signature remains the same, the blocks that Bitswap returns can now be cast to [traceability.Block](./bitswap/client/traceability/block.go), which will additionally tell you where the Block came from and how long it took to fetch. This helps consumers of Bitswap collect better metrics on Bitswap behavior.

### Changed

Expand Down
30 changes: 24 additions & 6 deletions bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ipfs/boxo/bitswap"
"github.com/ipfs/boxo/bitswap/client/internal/session"
"github.com/ipfs/boxo/bitswap/client/traceability"
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
tn "github.com/ipfs/boxo/bitswap/testnet"
"github.com/ipfs/boxo/internal/test"
Expand All @@ -17,6 +18,7 @@ import (
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
tu "github.com/libp2p/go-libp2p-testing/etc"
"github.com/libp2p/go-libp2p/core/peer"
)

func getVirtualNetwork() tn.Network {
Expand Down Expand Up @@ -71,16 +73,32 @@ func TestBasicSessions(t *testing.T) {
if !blkout.Cid().Equals(block.Cid()) {
t.Fatal("got wrong block")
}

traceBlock, ok := blkout.(traceability.Block)
if !ok {
t.Fatal("did not get tracable block")
}

if traceBlock.From != b.Peer {
t.Fatal("should have received block from peer B, did not")
}
}

func assertBlockLists(got, exp []blocks.Block) error {
func assertBlockListsFrom(from peer.ID, got, exp []blocks.Block) error {
if len(got) != len(exp) {
return fmt.Errorf("got wrong number of blocks, %d != %d", len(got), len(exp))
}

h := cid.NewSet()
for _, b := range got {
h.Add(b.Cid())
traceableBlock, ok := b.(traceability.Block)
if !ok {
return fmt.Errorf("not a traceable block: %s", b.Cid())
}
if traceableBlock.From != from {
return fmt.Errorf("incorrect peer sent block, expect %s, got %s", from, traceableBlock.From)
}
}
for _, b := range exp {
if !h.Has(b.Cid()) {
Expand Down Expand Up @@ -133,7 +151,7 @@ func TestSessionBetweenPeers(t *testing.T) {
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil {
if err := assertBlockListsFrom(inst[0].Peer, got, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -192,7 +210,7 @@ func TestSessionSplitFetch(t *testing.T) {
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil {
if err := assertBlockListsFrom(inst[i].Peer, got, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -238,7 +256,7 @@ func TestFetchNotConnected(t *testing.T) {
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks); err != nil {
if err := assertBlockListsFrom(other.Peer, got, blks); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -289,7 +307,7 @@ func TestFetchAfterDisconnect(t *testing.T) {
got = append(got, b)
}

if err := assertBlockLists(got, blks[:5]); err != nil {
if err := assertBlockListsFrom(peerA.Peer, got, blks[:5]); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -318,7 +336,7 @@ func TestFetchAfterDisconnect(t *testing.T) {
}
}

if err := assertBlockLists(got, blks); err != nil {
if err := assertBlockListsFrom(peerA.Peer, got, blks); err != nil {
t.Fatal(err)
}
}
Expand Down
15 changes: 8 additions & 7 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@ package client
import (
"context"
"errors"

"sync"
"time"

delay "github.com/ipfs/go-ipfs-delay"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

bsbpm "github.com/ipfs/boxo/bitswap/client/internal/blockpresencemanager"
bsgetter "github.com/ipfs/boxo/bitswap/client/internal/getter"
bsmq "github.com/ipfs/boxo/bitswap/client/internal/messagequeue"
Expand All @@ -33,11 +28,14 @@ import (
exchange "github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var log = logging.Logger("bitswap-client")
Expand Down Expand Up @@ -239,6 +237,7 @@ type counters struct {

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
// It returns a [github.com/ipfs/boxo/bitswap/client/traceability.Block] assertable [blocks.Block].
func (bs *Client) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "GetBlock", trace.WithAttributes(attribute.String("Key", k.String())))
defer span.End()
Expand All @@ -248,6 +247,7 @@ func (bs *Client) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error)
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
// It returns a [github.com/ipfs/boxo/bitswap/client/traceability.Block] assertable [blocks.Block].
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
Expand Down Expand Up @@ -284,7 +284,8 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
bs.notif.Publish(blks...)
var zero peer.ID
bs.notif.Publish(zero, blks...)

return nil
}
Expand Down Expand Up @@ -325,7 +326,7 @@ func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []bl
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
for _, b := range wanted {
bs.notif.Publish(b)
bs.notif.Publish(from, b)
}

for _, b := range wanted {
Expand Down
16 changes: 12 additions & 4 deletions bitswap/client/internal/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package notifications
import (
"context"
"sync"
"time"

pubsub "github.com/cskr/pubsub"
"github.com/ipfs/boxo/bitswap/client/traceability"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
)

const bufferSize = 16
Expand All @@ -15,7 +18,7 @@ const bufferSize = 16
// for cids. It's used internally by bitswap to decouple receiving blocks
// and actually providing them back to the GetBlocks caller.
type PubSub interface {
Publish(blocks ...blocks.Block)
Publish(from peer.ID, blocks ...blocks.Block)
Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block
Shutdown()
}
Expand All @@ -35,7 +38,7 @@ type impl struct {
closed chan struct{}
}

func (ps *impl) Publish(blocks ...blocks.Block) {
func (ps *impl) Publish(from peer.ID, blocks ...blocks.Block) {
ps.lk.RLock()
defer ps.lk.RUnlock()
select {
Expand All @@ -45,7 +48,7 @@ func (ps *impl) Publish(blocks ...blocks.Block) {
}

for _, block := range blocks {
ps.wrapped.Pub(block, block.Cid().KeyString())
ps.wrapped.Pub(traceability.Block{Block: block, From: from}, block.Cid().KeyString())
}
}

Expand Down Expand Up @@ -84,6 +87,8 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl
default:
}

subscribe := time.Now()

// AddSubOnceEach listens for each key in the list, and closes the channel
// once all keys have been received
ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
Expand Down Expand Up @@ -113,10 +118,13 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl
if !ok {
return
}
block, ok := val.(blocks.Block)
block, ok := val.(traceability.Block)
if !ok {
// FIXME: silently dropping errors wtf ?
return
}
block.Delay = time.Since(subscribe)

select {
case <-ctx.Done():
return
Expand Down
22 changes: 14 additions & 8 deletions bitswap/client/internal/notifications/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
"github.com/libp2p/go-libp2p/core/peer"
)

func TestDuplicates(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

b1 := blocks.NewBlock([]byte("1"))
b2 := blocks.NewBlock([]byte("2"))
Expand All @@ -22,16 +24,16 @@ func TestDuplicates(t *testing.T) {
defer n.Shutdown()
ch := n.Subscribe(context.Background(), b1.Cid(), b2.Cid())

n.Publish(b1)
n.Publish(zero, b1)
blockRecvd, ok := <-ch
if !ok {
t.Fail()
}
assertBlocksEqual(t, b1, blockRecvd)

n.Publish(b1) // ignored duplicate
n.Publish(zero, b1) // ignored duplicate

n.Publish(b2)
n.Publish(zero, b2)
blockRecvd, ok = <-ch
if !ok {
t.Fail()
Expand All @@ -41,14 +43,15 @@ func TestDuplicates(t *testing.T) {

func TestPublishSubscribe(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

blockSent := blocks.NewBlock([]byte("Greetings from The Interval"))

n := New()
defer n.Shutdown()
ch := n.Subscribe(context.Background(), blockSent.Cid())

n.Publish(blockSent)
n.Publish(zero, blockSent)
blockRecvd, ok := <-ch
if !ok {
t.Fail()
Expand All @@ -60,6 +63,7 @@ func TestPublishSubscribe(t *testing.T) {

func TestSubscribeMany(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

e1 := blocks.NewBlock([]byte("1"))
e2 := blocks.NewBlock([]byte("2"))
Expand All @@ -68,14 +72,14 @@ func TestSubscribeMany(t *testing.T) {
defer n.Shutdown()
ch := n.Subscribe(context.Background(), e1.Cid(), e2.Cid())

n.Publish(e1)
n.Publish(zero, e1)
r1, ok := <-ch
if !ok {
t.Fatal("didn't receive first expected block")
}
assertBlocksEqual(t, e1, r1)

n.Publish(e2)
n.Publish(zero, e2)
r2, ok := <-ch
if !ok {
t.Fatal("didn't receive second expected block")
Expand All @@ -87,6 +91,7 @@ func TestSubscribeMany(t *testing.T) {
// would be requested twice at the same time.
func TestDuplicateSubscribe(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

e1 := blocks.NewBlock([]byte("1"))

Expand All @@ -95,7 +100,7 @@ func TestDuplicateSubscribe(t *testing.T) {
ch1 := n.Subscribe(context.Background(), e1.Cid())
ch2 := n.Subscribe(context.Background(), e1.Cid())

n.Publish(e1)
n.Publish(zero, e1)
r1, ok := <-ch1
if !ok {
t.Fatal("didn't receive first expected block")
Expand Down Expand Up @@ -158,6 +163,7 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {

func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

g := blocksutil.NewBlockGenerator()
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -179,7 +185,7 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
t.Log("cancel context before any blocks published")
cancel()
for _, b := range bs {
n.Publish(b)
n.Publish(zero, b)
}

t.Log("publishing the large number of blocks to the ignored channel must not deadlock")
Expand Down
21 changes: 21 additions & 0 deletions bitswap/client/traceability/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package traceability

import (
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/libp2p/go-libp2p/core/peer"
)

// Block is a block whose provenance has been tracked.
type Block struct {
blocks.Block

// From contains the peer id of the node who sent us the block.
// It will be the zero value if we did not downloaded this block from the
// network. (such as by getting the block from NotifyNewBlocks).
From peer.ID
// Delay contains how long did we had to wait between when we started being
// intrested and when we actually got the block.
Delay time.Duration
}