Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provider query manager improvements #716

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 11 additions & 69 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
bssm "github.com/ipfs/boxo/bitswap/client/internal/sessionmanager"
bsspm "github.com/ipfs/boxo/bitswap/client/internal/sessionpeermanager"
"github.com/ipfs/boxo/bitswap/internal"
"github.com/ipfs/boxo/bitswap/internal/defaults"
bsmsg "github.com/ipfs/boxo/bitswap/message"
bmetrics "github.com/ipfs/boxo/bitswap/metrics"
bsnet "github.com/ipfs/boxo/bitswap/network"
Expand All @@ -38,65 +37,6 @@ import (

var log = logging.Logger("bitswap/client")

// Option defines the functional option type that can be used to configure
// bitswap instances
type Option func(*Client)

// ProviderSearchDelay sets the initial dely before triggering a provider
// search to find more peers and broadcast the want list. It also partially
// controls re-broadcasts delay when the session idles (does not receive any
// blocks), but these have back-off logic to increase the interval. See
// [defaults.ProvSearchDelay] for the default.
func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
return func(bs *Client) {
bs.provSearchDelay = newProvSearchDelay
}
}

// RebroadcastDelay sets a custom delay for periodic search of a random want.
// When the value ellapses, a random CID from the wantlist is chosen and the
// client attempts to find more peers for it and sends them the single want.
// [defaults.RebroadcastDelay] for the default.
func RebroadcastDelay(newRebroadcastDelay delay.D) Option {
return func(bs *Client) {
bs.rebroadcastDelay = newRebroadcastDelay
}
}

func SetSimulateDontHavesOnTimeout(send bool) Option {
return func(bs *Client) {
bs.simulateDontHavesOnTimeout = send
}
}

// Configures the Client to use given tracer.
// This provides methods to access all messages sent and received by the Client.
// This interface can be used to implement various statistics (this is original intent).
func WithTracer(tap tracer.Tracer) Option {
return func(bs *Client) {
bs.tracer = tap
}
}

func WithBlockReceivedNotifier(brn BlockReceivedNotifier) Option {
return func(bs *Client) {
bs.blockReceivedNotifier = brn
}
}

// WithoutDuplicatedBlockStats disable collecting counts of duplicated blocks
// received. This counter requires triggering a blockstore.Has() call for
// every block received by launching goroutines in parallel. In the worst case
// (no caching/blooms etc), this is an expensive call for the datastore to
// answer. In a normal case (caching), this has the power of evicting a
// different block from intermediary caches. In the best case, it doesn't
// affect performance. Use if this stat is not relevant.
func WithoutDuplicatedBlockStats() Option {
return func(bs *Client) {
bs.skipDuplicatedBlocksStats = true
}
}

type BlockReceivedNotifier interface {
// ReceivedBlocks notifies the decision engine that a peer is well-behaving
// and gave us useful data, potentially increasing its score and making us
Expand Down Expand Up @@ -130,10 +70,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
return bsmq.New(ctx, p, network, onDontHaveTimeout)
}

opts := getOpts(options)

sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
pqm := bspqm.New(ctx, network)
pqm := bspqm.New(ctx, network,
bspqm.WithFindProviderTimeout(opts.findProviderTimeout),
bspqm.WithMaxConcurrentFinds(opts.maxConcurrentFinds),
bspqm.WithMaxProvidersPerFind(opts.maxProvidersPerFind))

sessionFactory := func(
sessctx context.Context,
Expand Down Expand Up @@ -168,14 +113,11 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
counters: new(counters),
dupMetric: bmetrics.DupHist(ctx),
allMetric: bmetrics.AllHist(ctx),
provSearchDelay: defaults.ProvSearchDelay,
rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay),
simulateDontHavesOnTimeout: true,
}

// apply functional options before starting and running bitswap
for _, option := range options {
option(bs)
provSearchDelay: opts.provSearchDelay,
rebroadcastDelay: opts.rebroadcastDelay,
blockReceivedNotifier: opts.blockReceivedNotifier,
simulateDontHavesOnTimeout: opts.simulateDontHavesOnTimeout,
tracer: opts.tracer,
}

pqm.Startup()
Expand Down
71 changes: 71 additions & 0 deletions bitswap/client/internal/providerquerymanager/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package providerquerymanager

import (
"time"
)

const (
defaultFindProviderTimeout = 10 * time.Second
defaultMaxConcurrentFinds = 16
defaultMaxProvidersPerFind = 10
)

type config struct {
findProviderTimeout time.Duration
maxConcurrentFinds int
maxProvidersPerFind int
}

type Option func(*config)

func getOpts(opts []Option) config {
cfg := config{
findProviderTimeout: defaultFindProviderTimeout,
maxConcurrentFinds: defaultMaxConcurrentFinds,
maxProvidersPerFind: defaultMaxProvidersPerFind,
}
for _, opt := range opts {
opt(&cfg)
}
return cfg
}

// WitrFindProviderTimeout configures the maximum amount of time to spend on a
// single find providers attempt. This value can be changed at runtime using
// SetFindProviderTimeout. Use 0 to configure the default.
func WithFindProviderTimeout(to time.Duration) Option {
return func(c *config) {
if to == 0 {
to = defaultFindProviderTimeout
}
c.findProviderTimeout = to
}
}

// WithMaxConcurrentFinds configures the maxmum number of workers that run
// FindProvidersAsync at the same time. Use 0 to configure the default value.
func WithMaxConcurrentFinds(n int) Option {
return func(c *config) {
if n == 0 {
n = defaultMaxConcurrentFinds
} else if n < 0 {
panic("bitswap: WithMaxConcurrentFinds given negative value")
}
c.maxConcurrentFinds = n
}
}

// WithMaxProvidersPerFind configures the maximum number of providers that are
// returned from a single fiond providers attempt. Use 0 to configure the
// default value or use a negative value to find all providers within the
// timeout configured by WithFindProviderTimeout.
func WithMaxProvidersPerFind(n int) Option {
return func(c *config) {
if n == 0 {
n = defaultMaxProvidersPerFind
} else if n < 0 {
n = 0 // 0 means find all
}
c.maxProvidersPerFind = n
}
}
Loading
Loading