Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
it definitely don't work. FIXUP
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Aug 4, 2022
1 parent 31c4da9 commit ef64278
Show file tree
Hide file tree
Showing 12 changed files with 337 additions and 189 deletions.
7 changes: 3 additions & 4 deletions client/benchmarks_test.go → benchmarks_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package client_test
package bitswap_test

import (
"context"
Expand All @@ -17,8 +17,7 @@ import (
blocks "github.com/ipfs/go-block-format"
protocol "github.com/libp2p/go-libp2p-core/protocol"

bitswap "github.com/ipfs/go-bitswap/client"
bssession "github.com/ipfs/go-bitswap/client/internal/session"
"github.com/ipfs/go-bitswap"
testinstance "github.com/ipfs/go-bitswap/client/testinstance"
bsnet "github.com/ipfs/go-bitswap/network"
tn "github.com/ipfs/go-bitswap/testnet"
Expand Down Expand Up @@ -498,7 +497,7 @@ func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.
}

func oneAtATime(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background()).(*bssession.Session)
ses := bs.NewSession(context.Background())
for _, c := range ks {
_, err := ses.GetBlock(context.Background(), c)
if err != nil {
Expand Down
19 changes: 6 additions & 13 deletions client/bitswap_test.go → bitswap_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package client_test
package bitswap_test

import (
"bytes"
Expand All @@ -9,8 +9,7 @@ import (
"testing"
"time"

bitswap "github.com/ipfs/go-bitswap/client"
bssession "github.com/ipfs/go-bitswap/client/internal/session"
"github.com/ipfs/go-bitswap"
testinstance "github.com/ipfs/go-bitswap/client/testinstance"
"github.com/ipfs/go-bitswap/decision"
bsmsg "github.com/ipfs/go-bitswap/message"
Expand All @@ -33,14 +32,6 @@ func isCI() bool {
return os.Getenv("CI") != ""
}

// FIXME the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
const kNetworkDelay = 0 * time.Millisecond

func getVirtualNetwork() tn.Network {
return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
t.Helper()
err := inst.Blockstore().Put(ctx, blk)
Expand All @@ -54,7 +45,9 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
}

func TestClose(t *testing.T) {
vnet := getVirtualNetwork()
// FIXME: the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -142,7 +135,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond)
defer cancel()

ns := wantsBlock.Exchange.NewSession(ctx).(*bssession.Session)
ns := wantsBlock.Exchange.NewSession(ctx)

received, err := ns.GetBlock(ctx, block.Cid())
if received != nil {
Expand Down
55 changes: 4 additions & 51 deletions client/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-bitswap/client/internal"
bsbpm "github.com/ipfs/go-bitswap/client/internal/blockpresencemanager"
bsgetter "github.com/ipfs/go-bitswap/client/internal/getter"
bsmq "github.com/ipfs/go-bitswap/client/internal/messagequeue"
Expand All @@ -25,6 +24,7 @@ import (
bssm "github.com/ipfs/go-bitswap/client/internal/sessionmanager"
bsspm "github.com/ipfs/go-bitswap/client/internal/sessionpeermanager"
"github.com/ipfs/go-bitswap/decision"
"github.com/ipfs/go-bitswap/internal"
"github.com/ipfs/go-bitswap/internal/defaults"
bsmsg "github.com/ipfs/go-bitswap/message"
bmetrics "github.com/ipfs/go-bitswap/metrics"
Expand All @@ -44,28 +44,10 @@ import (
var log = logging.Logger("bitswap-client")
var sflog = log.Desugar()

var _ exchange.SessionExchange = (*Client)(nil)

var (
// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
// provideCollector even before they are actually provided.
// TODO: Does this need to be this large givent that?
HasBlockBufferSize = 256
provideKeysBufferSize = 2048
)

// Option defines the functional option type that can be used to configure
// bitswap instances
type Option func(*Client)

// ProvideEnabled is an option for enabling/disabling provide announcements
func ProvideEnabled(enabled bool) Option {
return func(bs *Client) {
bs.provideEnabled = enabled
}
}

// ProviderSearchDelay overwrites the global provider search delay
func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
return func(bs *Client) {
Expand Down Expand Up @@ -188,8 +170,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
blockstore: bstore,
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
pm: pm,
pqm: pqm,
sm: sm,
Expand All @@ -198,7 +178,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
counters: new(counters),
dupMetric: m.DupHist(),
allMetric: m.AllHist(),
provideEnabled: true,
provSearchDelay: defaults.ProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
engineSetSendDontHaves: true,
Expand Down Expand Up @@ -244,13 +223,6 @@ type Client struct {
// manages channels of outgoing blocks for sessions
notif notifications.PubSub

// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
newBlocks chan cid.Cid
// provideKeys directly feeds provide workers
provideKeys chan cid.Cid

process process.Process

// Counters for various statistics
Expand All @@ -271,9 +243,6 @@ type Client struct {
// in which CIDs
sim *bssim.SessionInterestManager

// whether or not to make provide announcements
provideEnabled bool

// how long to wait before looking for providers in a session
provSearchDelay time.Duration

Expand Down Expand Up @@ -343,11 +312,10 @@ func (bs *Client) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.
return session.GetBlocks(ctx, keys)
}

// NotifyNewBlocks announces the existence of blocks to this bitswap service. The
// service will potentially notify its peers.
// NotifyNewBlocks announces the existence of blocks to this bitswap service.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error {
func (bs *Client) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error {
ctx, span := internal.StartSpan(ctx, "NotifyNewBlocks")
defer span.End()

Expand All @@ -366,31 +334,16 @@ func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) er
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(ctx, "", blkCids, nil, nil)

// Send wanted blocks to decision engine
bs.engine.NotifyNewBlocks(blks)

// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
bs.notif.Publish(blks...)

// If the reprovider is enabled, send block to reprovider
if bs.provideEnabled {
for _, blk := range blks {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}
}

return nil
}

// receiveBlocksFrom process blocks received from the network
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
Expand Down
28 changes: 23 additions & 5 deletions client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"testing"
"time"

bitswap "github.com/ipfs/go-bitswap/client"
bssession "github.com/ipfs/go-bitswap/client/internal/session"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/client/internal/session"
testinstance "github.com/ipfs/go-bitswap/client/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
blocks "github.com/ipfs/go-block-format"
Expand All @@ -18,6 +18,24 @@ import (
tu "github.com/libp2p/go-libp2p-testing/etc"
)

func getVirtualNetwork() tn.Network {
// FIXME: the tests are really sensitive to the network delay. fix them to work
// well under varying conditions
return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
}

func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
t.Helper()
err := inst.Blockstore().Put(ctx, blk)
if err != nil {
t.Fatal(err)
}
err = inst.Exchange.NotifyNewBlocks(ctx, blk)
if err != nil {
t.Fatal(err)
}
}

func TestBasicSessions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -154,7 +172,7 @@ func TestSessionSplitFetch(t *testing.T) {
}

// Create a session on the remaining peer and fetch all the blocks 10 at a time
ses := inst[10].Exchange.NewSession(ctx).(*bssession.Session)
ses := inst[10].Exchange.NewSession(ctx).(*session.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -199,7 +217,7 @@ func TestFetchNotConnected(t *testing.T) {
// Note: Peer A and Peer B are not initially connected, so this tests
// that Peer B will search for and find Peer A
thisNode := ig.Next()
ses := thisNode.Exchange.NewSession(ctx).(*bssession.Session)
ses := thisNode.Exchange.NewSession(ctx).(*session.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

ch, err := ses.GetBlocks(ctx, cids)
Expand Down Expand Up @@ -245,7 +263,7 @@ func TestFetchAfterDisconnect(t *testing.T) {
}

// Request all blocks with Peer B
ses := peerB.Exchange.NewSession(ctx).(*bssession.Session)
ses := peerB.Exchange.NewSession(ctx).(*session.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

ch, err := ses.GetBlocks(ctx, cids)
Expand Down
12 changes: 4 additions & 8 deletions client/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

// Stat is a struct that provides various statistics on bitswap operations
type Stat struct {
ProvideBufLen int
Wantlist []cid.Cid
Peers []string
BlocksReceived uint64
Expand All @@ -19,9 +18,7 @@ type Stat struct {
}

// Stat returns aggregated statistics about bitswap operations
func (bs *Client) Stat() (*Stat, error) {
st := new(Stat)
st.ProvideBufLen = len(bs.newBlocks)
func (bs *Client) Stat() (st Stat, err error) {
st.Wantlist = bs.GetWantlist()
bs.counterLk.Lock()
c := bs.counters
Expand All @@ -33,10 +30,9 @@ func (bs *Client) Stat() (*Stat, error) {
bs.counterLk.Unlock()

peers := bs.engine.Peers()
st.Peers = make([]string, 0, len(peers))

for _, p := range peers {
st.Peers = append(st.Peers, p.Pretty())
st.Peers = make([]string, len(peers))
for i, p := range peers {
st.Peers[i] = p.Pretty()
}
sort.Strings(st.Peers)

Expand Down
2 changes: 1 addition & 1 deletion client/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, netOption
px := process.WithTeardown(func() error {
return nil
})
bs := client.New(ctx, adapter, bstore, decision, px, stats, bsOptions...).(*client.Bitswap)
bs := client.New(ctx, adapter, bstore, decision, px, stats, bsOptions...)

return Instance{
Adapter: adapter,
Expand Down
40 changes: 0 additions & 40 deletions client/workers.go

This file was deleted.

2 changes: 1 addition & 1 deletion internal/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"fmt"
"math/rand"

bsmsg "github.com/ipfs/go-bitswap/message"
"github.com/ipfs/go-bitswap/client/wantlist"
bsmsg "github.com/ipfs/go-bitswap/message"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
Expand Down
13 changes: 13 additions & 0 deletions internal/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package internal

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return otel.Tracer("go-bitswap").Start(ctx, fmt.Sprintf("Bitswap.%s", name), opts...)
}
Loading

0 comments on commit ef64278

Please sign in to comment.