Skip to content

Commit

Permalink
bitswap/client: add basic traceable blocks
Browse files Browse the repository at this point in the history
Fixes #209
  • Loading branch information
Jorropo authored and hacdias committed Jul 26, 2023
1 parent 1f5df74 commit 05ffe06
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 19 deletions.
15 changes: 8 additions & 7 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@ package client
import (
"context"
"errors"

"sync"
"time"

delay "github.com/ipfs/go-ipfs-delay"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

bsbpm "github.com/ipfs/boxo/bitswap/client/internal/blockpresencemanager"
bsgetter "github.com/ipfs/boxo/bitswap/client/internal/getter"
bsmq "github.com/ipfs/boxo/bitswap/client/internal/messagequeue"
Expand All @@ -33,11 +28,14 @@ import (
exchange "github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var log = logging.Logger("bitswap-client")
Expand Down Expand Up @@ -239,6 +237,7 @@ type counters struct {

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
// It returns a [github.com/ipfs/boxo/bitswap/client/traceability.Block] assertable [blocks.Block].
func (bs *Client) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "GetBlock", trace.WithAttributes(attribute.String("Key", k.String())))
defer span.End()
Expand All @@ -248,6 +247,7 @@ func (bs *Client) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error)
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
// It returns a [github.com/ipfs/boxo/bitswap/client/traceability.Block] assertable [blocks.Block].
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
Expand Down Expand Up @@ -284,7 +284,8 @@ func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
bs.notif.Publish(blks...)
var zero peer.ID
bs.notif.Publish(zero, blks...)

return nil
}
Expand Down Expand Up @@ -325,7 +326,7 @@ func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []bl
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
for _, b := range wanted {
bs.notif.Publish(b)
bs.notif.Publish(from, b)
}

for _, b := range wanted {
Expand Down
16 changes: 12 additions & 4 deletions bitswap/client/internal/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package notifications
import (
"context"
"sync"
"time"

pubsub "github.com/cskr/pubsub"
"github.com/ipfs/boxo/bitswap/client/traceability"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
)

const bufferSize = 16
Expand All @@ -15,7 +18,7 @@ const bufferSize = 16
// for cids. It's used internally by bitswap to decouple receiving blocks
// and actually providing them back to the GetBlocks caller.
type PubSub interface {
Publish(blocks ...blocks.Block)
Publish(from peer.ID, blocks ...blocks.Block)
Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block
Shutdown()
}
Expand All @@ -35,7 +38,7 @@ type impl struct {
closed chan struct{}
}

func (ps *impl) Publish(blocks ...blocks.Block) {
func (ps *impl) Publish(from peer.ID, blocks ...blocks.Block) {
ps.lk.RLock()
defer ps.lk.RUnlock()
select {
Expand All @@ -45,7 +48,7 @@ func (ps *impl) Publish(blocks ...blocks.Block) {
}

for _, block := range blocks {
ps.wrapped.Pub(block, block.Cid().KeyString())
ps.wrapped.Pub(traceability.Block{Block: block, From: from}, block.Cid().KeyString())
}
}

Expand Down Expand Up @@ -84,6 +87,8 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl
default:
}

subscribe := time.Now()

// AddSubOnceEach listens for each key in the list, and closes the channel
// once all keys have been received
ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
Expand Down Expand Up @@ -113,10 +118,13 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Bl
if !ok {
return
}
block, ok := val.(blocks.Block)
block, ok := val.(traceability.Block)
if !ok {
// FIXME: silently dropping errors wtf ?
return
}
block.Delay = time.Since(subscribe)

select {
case <-ctx.Done():
return
Expand Down
22 changes: 14 additions & 8 deletions bitswap/client/internal/notifications/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
"github.com/libp2p/go-libp2p/core/peer"
)

func TestDuplicates(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

b1 := blocks.NewBlock([]byte("1"))
b2 := blocks.NewBlock([]byte("2"))
Expand All @@ -22,16 +24,16 @@ func TestDuplicates(t *testing.T) {
defer n.Shutdown()
ch := n.Subscribe(context.Background(), b1.Cid(), b2.Cid())

n.Publish(b1)
n.Publish(zero, b1)
blockRecvd, ok := <-ch
if !ok {
t.Fail()
}
assertBlocksEqual(t, b1, blockRecvd)

n.Publish(b1) // ignored duplicate
n.Publish(zero, b1) // ignored duplicate

n.Publish(b2)
n.Publish(zero, b2)
blockRecvd, ok = <-ch
if !ok {
t.Fail()
Expand All @@ -41,14 +43,15 @@ func TestDuplicates(t *testing.T) {

func TestPublishSubscribe(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

blockSent := blocks.NewBlock([]byte("Greetings from The Interval"))

n := New()
defer n.Shutdown()
ch := n.Subscribe(context.Background(), blockSent.Cid())

n.Publish(blockSent)
n.Publish(zero, blockSent)
blockRecvd, ok := <-ch
if !ok {
t.Fail()
Expand All @@ -60,6 +63,7 @@ func TestPublishSubscribe(t *testing.T) {

func TestSubscribeMany(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

e1 := blocks.NewBlock([]byte("1"))
e2 := blocks.NewBlock([]byte("2"))
Expand All @@ -68,14 +72,14 @@ func TestSubscribeMany(t *testing.T) {
defer n.Shutdown()
ch := n.Subscribe(context.Background(), e1.Cid(), e2.Cid())

n.Publish(e1)
n.Publish(zero, e1)
r1, ok := <-ch
if !ok {
t.Fatal("didn't receive first expected block")
}
assertBlocksEqual(t, e1, r1)

n.Publish(e2)
n.Publish(zero, e2)
r2, ok := <-ch
if !ok {
t.Fatal("didn't receive second expected block")
Expand All @@ -87,6 +91,7 @@ func TestSubscribeMany(t *testing.T) {
// would be requested twice at the same time.
func TestDuplicateSubscribe(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

e1 := blocks.NewBlock([]byte("1"))

Expand All @@ -95,7 +100,7 @@ func TestDuplicateSubscribe(t *testing.T) {
ch1 := n.Subscribe(context.Background(), e1.Cid())
ch2 := n.Subscribe(context.Background(), e1.Cid())

n.Publish(e1)
n.Publish(zero, e1)
r1, ok := <-ch1
if !ok {
t.Fatal("didn't receive first expected block")
Expand Down Expand Up @@ -158,6 +163,7 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {

func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
test.Flaky(t)
var zero peer.ID // this test doesn't check the peer id

g := blocksutil.NewBlockGenerator()
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -179,7 +185,7 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
t.Log("cancel context before any blocks published")
cancel()
for _, b := range bs {
n.Publish(b)
n.Publish(zero, b)
}

t.Log("publishing the large number of blocks to the ignored channel must not deadlock")
Expand Down
21 changes: 21 additions & 0 deletions bitswap/client/traceability/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package traceability

import (
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/libp2p/go-libp2p/core/peer"
)

// Block is a block whos provenance has been tracked.
type Block struct {
blocks.Block

// From contains the peer id of the node who sent us the block.
// It will be the zero value if we did not downloaded this block from the
// network. (such as by getting the block from NotifyNewBlocks).
From peer.ID
// Delay contains how long did we had to wait between when we started being
// intrested and when we actually got the block.
Delay time.Duration
}

0 comments on commit 05ffe06

Please sign in to comment.