diff --git a/bitswap/client/client.go b/bitswap/client/client.go index bab03c3cd..e83e82e8f 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -19,7 +19,6 @@ import ( bssm "github.com/ipfs/boxo/bitswap/client/internal/sessionmanager" bsspm "github.com/ipfs/boxo/bitswap/client/internal/sessionpeermanager" "github.com/ipfs/boxo/bitswap/internal" - "github.com/ipfs/boxo/bitswap/internal/defaults" bsmsg "github.com/ipfs/boxo/bitswap/message" bmetrics "github.com/ipfs/boxo/bitswap/metrics" bsnet "github.com/ipfs/boxo/bitswap/network" @@ -38,65 +37,6 @@ import ( var log = logging.Logger("bitswap/client") -// Option defines the functional option type that can be used to configure -// bitswap instances -type Option func(*Client) - -// ProviderSearchDelay sets the initial dely before triggering a provider -// search to find more peers and broadcast the want list. It also partially -// controls re-broadcasts delay when the session idles (does not receive any -// blocks), but these have back-off logic to increase the interval. See -// [defaults.ProvSearchDelay] for the default. -func ProviderSearchDelay(newProvSearchDelay time.Duration) Option { - return func(bs *Client) { - bs.provSearchDelay = newProvSearchDelay - } -} - -// RebroadcastDelay sets a custom delay for periodic search of a random want. -// When the value ellapses, a random CID from the wantlist is chosen and the -// client attempts to find more peers for it and sends them the single want. -// [defaults.RebroadcastDelay] for the default. -func RebroadcastDelay(newRebroadcastDelay delay.D) Option { - return func(bs *Client) { - bs.rebroadcastDelay = newRebroadcastDelay - } -} - -func SetSimulateDontHavesOnTimeout(send bool) Option { - return func(bs *Client) { - bs.simulateDontHavesOnTimeout = send - } -} - -// Configures the Client to use given tracer. -// This provides methods to access all messages sent and received by the Client. -// This interface can be used to implement various statistics (this is original intent). -func WithTracer(tap tracer.Tracer) Option { - return func(bs *Client) { - bs.tracer = tap - } -} - -func WithBlockReceivedNotifier(brn BlockReceivedNotifier) Option { - return func(bs *Client) { - bs.blockReceivedNotifier = brn - } -} - -// WithoutDuplicatedBlockStats disable collecting counts of duplicated blocks -// received. This counter requires triggering a blockstore.Has() call for -// every block received by launching goroutines in parallel. In the worst case -// (no caching/blooms etc), this is an expensive call for the datastore to -// answer. In a normal case (caching), this has the power of evicting a -// different block from intermediary caches. In the best case, it doesn't -// affect performance. Use if this stat is not relevant. -func WithoutDuplicatedBlockStats() Option { - return func(bs *Client) { - bs.skipDuplicatedBlocksStats = true - } -} - type BlockReceivedNotifier interface { // ReceivedBlocks notifies the decision engine that a peer is well-behaving // and gave us useful data, potentially increasing its score and making us @@ -130,10 +70,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore return bsmq.New(ctx, p, network, onDontHaveTimeout) } + opts := getOpts(options) + sim := bssim.New() bpm := bsbpm.New() pm := bspm.New(ctx, peerQueueFactory, network.Self()) - pqm := bspqm.New(ctx, network) + pqm := bspqm.New(ctx, network, + bspqm.WithFindProviderTimeout(opts.findProviderTimeout), + bspqm.WithMaxConcurrentFinds(opts.maxConcurrentFinds), + bspqm.WithMaxProvidersPerFind(opts.maxProvidersPerFind)) sessionFactory := func( sessctx context.Context, @@ -168,14 +113,11 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore counters: new(counters), dupMetric: bmetrics.DupHist(ctx), allMetric: bmetrics.AllHist(ctx), - provSearchDelay: defaults.ProvSearchDelay, - rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay), - simulateDontHavesOnTimeout: true, - } - - // apply functional options before starting and running bitswap - for _, option := range options { - option(bs) + provSearchDelay: opts.provSearchDelay, + rebroadcastDelay: opts.rebroadcastDelay, + blockReceivedNotifier: opts.blockReceivedNotifier, + simulateDontHavesOnTimeout: opts.simulateDontHavesOnTimeout, + tracer: opts.tracer, } pqm.Startup() diff --git a/bitswap/client/internal/providerquerymanager/options.go b/bitswap/client/internal/providerquerymanager/options.go new file mode 100644 index 000000000..e48ff2466 --- /dev/null +++ b/bitswap/client/internal/providerquerymanager/options.go @@ -0,0 +1,71 @@ +package providerquerymanager + +import ( + "time" +) + +const ( + defaultFindProviderTimeout = 10 * time.Second + defaultMaxConcurrentFinds = 16 + defaultMaxProvidersPerFind = 10 +) + +type config struct { + findProviderTimeout time.Duration + maxConcurrentFinds int + maxProvidersPerFind int +} + +type Option func(*config) + +func getOpts(opts []Option) config { + cfg := config{ + findProviderTimeout: defaultFindProviderTimeout, + maxConcurrentFinds: defaultMaxConcurrentFinds, + maxProvidersPerFind: defaultMaxProvidersPerFind, + } + for _, opt := range opts { + opt(&cfg) + } + return cfg +} + +// WitrFindProviderTimeout configures the maximum amount of time to spend on a +// single find providers attempt. This value can be changed at runtime using +// SetFindProviderTimeout. Use 0 to configure the default. +func WithFindProviderTimeout(to time.Duration) Option { + return func(c *config) { + if to == 0 { + to = defaultFindProviderTimeout + } + c.findProviderTimeout = to + } +} + +// WithMaxConcurrentFinds configures the maxmum number of workers that run +// FindProvidersAsync at the same time. Use 0 to configure the default value. +func WithMaxConcurrentFinds(n int) Option { + return func(c *config) { + if n == 0 { + n = defaultMaxConcurrentFinds + } else if n < 0 { + panic("bitswap: WithMaxConcurrentFinds given negative value") + } + c.maxConcurrentFinds = n + } +} + +// WithMaxProvidersPerFind configures the maximum number of providers that are +// returned from a single fiond providers attempt. Use 0 to configure the +// default value or use a negative value to find all providers within the +// timeout configured by WithFindProviderTimeout. +func WithMaxProvidersPerFind(n int) Option { + return func(c *config) { + if n == 0 { + n = defaultMaxProvidersPerFind + } else if n < 0 { + n = 0 // 0 means find all + } + c.maxProvidersPerFind = n + } +} diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index c85efe737..19a099c76 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -3,8 +3,11 @@ package providerquerymanager import ( "context" "sync" + "sync/atomic" "time" + "github.com/gammazero/chanqueue" + "github.com/gammazero/deque" "github.com/ipfs/boxo/bitswap/client/internal" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -15,12 +18,6 @@ import ( var log = logging.Logger("bitswap/client/provqrymgr") -const ( - maxProviders = 10 - maxInProcessRequests = 6 - defaultTimeout = 10 * time.Second -) - type inProgressRequestStatus struct { ctx context.Context cancelFn func() @@ -76,31 +73,30 @@ type cancelRequestMessage struct { // - ensure two findprovider calls for the same block don't run concurrently // - manage timeouts type ProviderQueryManager struct { - ctx context.Context - network ProviderQueryNetwork - providerQueryMessages chan providerQueryMessage - providerRequestsProcessing chan *findProviderRequest - incomingFindProviderRequests chan *findProviderRequest + ctx context.Context + network ProviderQueryNetwork + providerQueryMessages chan providerQueryMessage + providerRequestsProcessing *chanqueue.ChanQueue[*findProviderRequest] - findProviderTimeout time.Duration - timeoutMutex sync.RWMutex + findProviderTimeout atomic.Int64 // do not touch outside the run loop inProgressRequestStatuses map[cid.Cid]*inProgressRequestStatus + + opts config } // New initializes a new ProviderQueryManager for a given context and a given // network provider. -func New(ctx context.Context, network ProviderQueryNetwork) *ProviderQueryManager { - return &ProviderQueryManager{ - ctx: ctx, - network: network, - providerQueryMessages: make(chan providerQueryMessage, 16), - providerRequestsProcessing: make(chan *findProviderRequest), - incomingFindProviderRequests: make(chan *findProviderRequest), - inProgressRequestStatuses: make(map[cid.Cid]*inProgressRequestStatus), - findProviderTimeout: defaultTimeout, +func New(ctx context.Context, network ProviderQueryNetwork, options ...Option) *ProviderQueryManager { + pqm := &ProviderQueryManager{ + ctx: ctx, + network: network, + providerQueryMessages: make(chan providerQueryMessage), + opts: getOpts(options), } + pqm.SetFindProviderTimeout(pqm.opts.findProviderTimeout) + return pqm } // Startup starts processing for the ProviderQueryManager. @@ -113,11 +109,14 @@ type inProgressRequest struct { incoming chan peer.ID } -// SetFindProviderTimeout changes the timeout for finding providers -func (pqm *ProviderQueryManager) SetFindProviderTimeout(findProviderTimeout time.Duration) { - pqm.timeoutMutex.Lock() - pqm.findProviderTimeout = findProviderTimeout - pqm.timeoutMutex.Unlock() +// SetFindProviderTimeout changes the timeout for finding providers. Setting a +// value of 0 resets to the value configures when this ProviderQueryManager was +// created. +func (pqm *ProviderQueryManager) SetFindProviderTimeout(timeout time.Duration) { + if timeout == 0 { + timeout = pqm.opts.findProviderTimeout + } + pqm.findProviderTimeout.Store(int64(timeout)) } // FindProvidersAsync finds providers for the given block. @@ -161,31 +160,35 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, } func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, receivedInProgressRequest inProgressRequest, onCloseFn func()) <-chan peer.ID { - // maintains an unbuffered queue for incoming providers for given request for a given session - // essentially, as a provider comes in, for a given CID, we want to immediately broadcast to all - // sessions that queried that CID, without worrying about whether the client code is actually - // reading from the returned channel -- so that the broadcast never blocks - // based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd + // maintains an unbuffered queue for incoming providers for given request + // for a given session. Eessentially, as a provider comes in, for a given + // CID, immediately broadcast to all sessions that queried that CID, + // without worrying about whether the client code is actually reading from + // the returned channel -- so that the broadcast never blocks. returnedProviders := make(chan peer.ID) - receivedProviders := append([]peer.ID(nil), receivedInProgressRequest.providersSoFar[0:]...) + var receivedProviders deque.Deque[peer.ID] + receivedProviders.Grow(len(receivedInProgressRequest.providersSoFar)) + for _, pid := range receivedInProgressRequest.providersSoFar { + receivedProviders.PushBack(pid) + } incomingProviders := receivedInProgressRequest.incoming go func() { defer close(returnedProviders) defer onCloseFn() outgoingProviders := func() chan<- peer.ID { - if len(receivedProviders) == 0 { + if receivedProviders.Len() == 0 { return nil } return returnedProviders } nextProvider := func() peer.ID { - if len(receivedProviders) == 0 { + if receivedProviders.Len() == 0 { return "" } - return receivedProviders[0] + return receivedProviders.Front() } - for len(receivedProviders) > 0 || incomingProviders != nil { + for receivedProviders.Len() > 0 || incomingProviders != nil { select { case <-pqm.ctx.Done(): return @@ -198,10 +201,10 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k if !ok { incomingProviders = nil } else { - receivedProviders = append(receivedProviders, provider) + receivedProviders.PushBack(provider) } case outgoingProviders() <- nextProvider(): - receivedProviders = receivedProviders[1:] + receivedProviders.PopFront() } } }() @@ -231,24 +234,25 @@ func (pqm *ProviderQueryManager) cancelProviderRequest(ctx context.Context, k ci } func (pqm *ProviderQueryManager) findProviderWorker() { - // findProviderWorker just cycles through incoming provider queries one - // at a time. We have six of these workers running at once - // to let requests go in parallel but keep them rate limited + // findProviderWorker just cycles through incoming provider queries one at + // a time. There are pqm.opts.maxConcurrentFinds of these workers running + // concurrently to let requests go in parallel but keep them rate limited. + maxProviders := pqm.opts.maxProvidersPerFind for { select { - case fpr, ok := <-pqm.providerRequestsProcessing: + case fpr, ok := <-pqm.providerRequestsProcessing.Out(): if !ok { return } k := fpr.k - log.Debugf("Beginning Find Provider Request for cid: %s", k.String()) - pqm.timeoutMutex.RLock() - findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout) - pqm.timeoutMutex.RUnlock() + log.Debugw("Beginning Find Provider request", "cid", k.String()) + findProviderCtx, cancel := context.WithTimeout(fpr.ctx, time.Duration(pqm.findProviderTimeout.Load())) span := trace.SpanFromContext(findProviderCtx) span.AddEvent("StartFindProvidersAsync") providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders) wg := &sync.WaitGroup{} + // Read each peer ID from the providers chan and start a goroutine + // to connect to it and send a receivedProviderMessage. for p := range providers { wg.Add(1) go func(p peer.ID) { @@ -257,7 +261,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() { err := pqm.network.ConnectTo(findProviderCtx, p) if err != nil { span.RecordError(err, trace.WithAttributes(attribute.Stringer("peer", p))) - log.Debugf("failed to connect to provider %s: %s", p, err) + log.Debugw("failed to connect to provider", "err", err, "peerID", p) return } span.AddEvent("ConnectedToProvider", trace.WithAttributes(attribute.Stringer("peer", p))) @@ -287,40 +291,6 @@ func (pqm *ProviderQueryManager) findProviderWorker() { } } -func (pqm *ProviderQueryManager) providerRequestBufferWorker() { - // the provider request buffer worker just maintains an unbounded - // buffer for incoming provider queries and dispatches to the find - // provider workers as they become available - // based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd - var providerQueryRequestBuffer []*findProviderRequest - nextProviderQuery := func() *findProviderRequest { - if len(providerQueryRequestBuffer) == 0 { - return nil - } - return providerQueryRequestBuffer[0] - } - outgoingRequests := func() chan<- *findProviderRequest { - if len(providerQueryRequestBuffer) == 0 { - return nil - } - return pqm.providerRequestsProcessing - } - - for { - select { - case incomingRequest, ok := <-pqm.incomingFindProviderRequests: - if !ok { - return - } - providerQueryRequestBuffer = append(providerQueryRequestBuffer, incomingRequest) - case outgoingRequests() <- nextProviderQuery(): - providerQueryRequestBuffer = providerQueryRequestBuffer[1:] - case <-pqm.ctx.Done(): - return - } - } -} - func (pqm *ProviderQueryManager) cleanupInProcessRequests() { for _, requestStatus := range pqm.inProgressRequestStatuses { for listener := range requestStatus.listeners { @@ -333,9 +303,24 @@ func (pqm *ProviderQueryManager) cleanupInProcessRequests() { func (pqm *ProviderQueryManager) run() { defer pqm.cleanupInProcessRequests() - go pqm.providerRequestBufferWorker() - for i := 0; i < maxInProcessRequests; i++ { - go pqm.findProviderWorker() + var wg sync.WaitGroup + pqm.providerRequestsProcessing = chanqueue.New[*findProviderRequest]() + defer func() { + pqm.providerRequestsProcessing.Close() + // Afers workers done, close and drain ChanQueue. + go func() { + wg.Wait() + for range pqm.providerRequestsProcessing.Out() { + } + }() + }() + + wg.Add(pqm.opts.maxConcurrentFinds) + for i := 0; i < pqm.opts.maxConcurrentFinds; i++ { + go func() { + pqm.findProviderWorker() + wg.Done() + }() } for { @@ -350,14 +335,14 @@ func (pqm *ProviderQueryManager) run() { } func (rpm *receivedProviderMessage) debugMessage() { - log.Debugf("Received provider (%s) (%s)", rpm.p, rpm.k) + log.Debugw("Received provider", "peerID", rpm.p, "cid", rpm.k) trace.SpanFromContext(rpm.ctx).AddEvent("ReceivedProvider", trace.WithAttributes(attribute.Stringer("provider", rpm.p), attribute.Stringer("cid", rpm.k))) } func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) { requestStatus, ok := pqm.inProgressRequestStatuses[rpm.k] if !ok { - log.Debugf("Received provider (%s) for cid (%s) not requested", rpm.p.String(), rpm.k.String()) + log.Debugw("Received provider not requested", "peerID", rpm.p.String(), "cid", rpm.k.String()) return } requestStatus.providersSoFar = append(requestStatus.providersSoFar, rpm.p) @@ -371,7 +356,7 @@ func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) { } func (fpqm *finishedProviderQueryMessage) debugMessage() { - log.Debugf("Finished Provider Query on cid: %s", fpqm.k) + log.Debugw("Finished Provider Query", "cid", fpqm.k) trace.SpanFromContext(fpqm.ctx).AddEvent("FinishedProviderQuery", trace.WithAttributes(attribute.Stringer("cid", fpqm.k))) } @@ -385,18 +370,20 @@ func (fpqm *finishedProviderQueryMessage) handle(pqm *ProviderQueryManager) { close(listener) } delete(pqm.inProgressRequestStatuses, fpqm.k) + if len(pqm.inProgressRequestStatuses) == 0 { + pqm.inProgressRequestStatuses = nil + } requestStatus.cancelFn() } func (npqm *newProvideQueryMessage) debugMessage() { - log.Debugf("New Provider Query on cid: %s", npqm.k) + log.Debugw("New Provider Query", "cid", npqm.k) trace.SpanFromContext(npqm.ctx).AddEvent("NewProvideQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k))) } func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { requestStatus, ok := pqm.inProgressRequestStatuses[npqm.k] if !ok { - ctx, cancelFn := context.WithCancel(pqm.ctx) span := trace.SpanFromContext(npqm.ctx) span.AddEvent("NewQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k))) @@ -408,10 +395,13 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { cancelFn: cancelFn, } + if pqm.inProgressRequestStatuses == nil { + pqm.inProgressRequestStatuses = make(map[cid.Cid]*inProgressRequestStatus) + } pqm.inProgressRequestStatuses[npqm.k] = requestStatus select { - case pqm.incomingFindProviderRequests <- &findProviderRequest{ + case pqm.providerRequestsProcessing.In() <- &findProviderRequest{ k: npqm.k, ctx: ctx, }: @@ -433,7 +423,7 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { } func (crm *cancelRequestMessage) debugMessage() { - log.Debugf("Cancel provider query on cid: %s", crm.k) + log.Debugw("Cancel provider query", "cid", crm.k) trace.SpanFromContext(crm.ctx).AddEvent("CancelRequest", trace.WithAttributes(attribute.Stringer("cid", crm.k))) } @@ -452,6 +442,9 @@ func (crm *cancelRequestMessage) handle(pqm *ProviderQueryManager) { close(crm.incomingProviders) if len(requestStatus.listeners) == 0 { delete(pqm.inProgressRequestStatuses, crm.k) + if len(pqm.inProgressRequestStatuses) == 0 { + pqm.inProgressRequestStatuses = nil + } requestStatus.cancelFn() } } diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go b/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go index 9deb77f99..83d49a931 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go @@ -258,28 +258,29 @@ func TestRateLimitingRequests(t *testing.T) { providerQueryManager := New(ctx, fpn) providerQueryManager.Startup() - keys := random.Cids(maxInProcessRequests + 1) + maxConcurrentFinds := providerQueryManager.opts.maxConcurrentFinds + keys := random.Cids(maxConcurrentFinds + 1) sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() var requestChannels []<-chan peer.ID - for i := 0; i < maxInProcessRequests+1; i++ { + for i := 0; i < maxConcurrentFinds+1; i++ { requestChannels = append(requestChannels, providerQueryManager.FindProvidersAsync(sessionCtx, keys[i])) } time.Sleep(20 * time.Millisecond) fpn.queriesMadeMutex.Lock() - if fpn.liveQueries != maxInProcessRequests { + if fpn.liveQueries != maxConcurrentFinds { t.Logf("Queries made: %d\n", fpn.liveQueries) t.Fatal("Did not limit parallel requests to rate limit") } fpn.queriesMadeMutex.Unlock() - for i := 0; i < maxInProcessRequests+1; i++ { + for i := 0; i < maxConcurrentFinds+1; i++ { for range requestChannels[i] { } } fpn.queriesMadeMutex.Lock() defer fpn.queriesMadeMutex.Unlock() - if fpn.queriesMade != maxInProcessRequests+1 { + if fpn.queriesMade != maxConcurrentFinds+1 { t.Logf("Queries made: %d\n", fpn.queriesMade) t.Fatal("Did not make all separate requests") } diff --git a/bitswap/client/optios.go b/bitswap/client/optios.go new file mode 100644 index 000000000..d33df4f7c --- /dev/null +++ b/bitswap/client/optios.go @@ -0,0 +1,112 @@ +package client + +import ( + "time" + + "github.com/ipfs/boxo/bitswap/internal/defaults" + "github.com/ipfs/boxo/bitswap/tracer" + delay "github.com/ipfs/go-ipfs-delay" +) + +type clientConfig struct { + blockReceivedNotifier BlockReceivedNotifier + provSearchDelay time.Duration + rebroadcastDelay delay.D + simulateDontHavesOnTimeout bool + skipDuplicatedBlocksStats bool + tracer tracer.Tracer + + // ProviderQueryManager options. + findProviderTimeout time.Duration + maxConcurrentFinds int + maxProvidersPerFind int +} + +// Option defines the functional option type that can be used to configure +// bitswap instances +type Option func(*clientConfig) + +func getOpts(opts []Option) clientConfig { + cfg := clientConfig{ + provSearchDelay: defaults.ProvSearchDelay, + rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay), + simulateDontHavesOnTimeout: true, + } + for _, opt := range opts { + opt(&cfg) + } + return cfg +} + +// ProviderSearchDelay sets the initial dely before triggering a provider +// search to find more peers and broadcast the want list. It also partially +// controls re-broadcasts delay when the session idles (does not receive any +// blocks), but these have back-off logic to increase the interval. See +// [defaults.ProvSearchDelay] for the default. +func ProviderSearchDelay(newProvSearchDelay time.Duration) Option { + return func(c *clientConfig) { + c.provSearchDelay = newProvSearchDelay + } +} + +// RebroadcastDelay sets a custom delay for periodic search of a random want. +// When the value ellapses, a random CID from the wantlist is chosen and the +// client attempts to find more peers for it and sends them the single want. +// [defaults.RebroadcastDelay] for the default. +func RebroadcastDelay(newRebroadcastDelay delay.D) Option { + return func(c *clientConfig) { + c.rebroadcastDelay = newRebroadcastDelay + } +} + +func SetSimulateDontHavesOnTimeout(send bool) Option { + return func(c *clientConfig) { + c.simulateDontHavesOnTimeout = send + } +} + +// Configures the Client to use given tracer. +// This provides methods to access all messages sent and received by the Client. +// This interface can be used to implement various statistics (this is original intent). +func WithTracer(tap tracer.Tracer) Option { + return func(c *clientConfig) { + c.tracer = tap + } +} + +func WithBlockReceivedNotifier(brn BlockReceivedNotifier) Option { + return func(c *clientConfig) { + c.blockReceivedNotifier = brn + } +} + +// WithoutDuplicatedBlockStats disable collecting counts of duplicated blocks +// received. This counter requires triggering a blockstore.Has() call for +// every block received by launching goroutines in parallel. In the worst case +// (no caching/blooms etc), this is an expensive call for the datastore to +// answer. In a normal case (caching), this has the power of evicting a +// different block from intermediary caches. In the best case, it doesn't +// affect performance. Use if this stat is not relevant. +func WithoutDuplicatedBlockStats() Option { + return func(c *clientConfig) { + c.skipDuplicatedBlocksStats = true + } +} + +func WithFindProviderTimeout(to time.Duration) Option { + return func(c *clientConfig) { + c.findProviderTimeout = to + } +} + +func WithMaxConcurrentFinds(n int) Option { + return func(c *clientConfig) { + c.maxConcurrentFinds = n + } +} + +func WithMaxProvidersPerFind(n int) Option { + return func(c *clientConfig) { + c.maxProvidersPerFind = n + } +} diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 9b929b99c..98b0ffc94 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -64,8 +64,6 @@ import ( var log = logging.Logger("bitswap/server/decision") const ( - // outboxChanBuffer must be 0 to prevent stale messages from being sent - outboxChanBuffer = 0 // targetMessageSize is the ideal size of the batched payload. We try to // pop this much data off the request queue, but it may be a little more // or less depending on what's in the queue. @@ -76,6 +74,9 @@ const ( // queuedTagWeight is the default weight for peers that have work queued // on their behalf. queuedTagWeight = 10 + + // Interval at which peers in the PeerTaskQueue are incrementally unfrozen. + unfreezePeerInterval = 100 * time.Millisecond ) // Envelope contains a message for a Peer. @@ -391,9 +392,9 @@ func NewEngine( bstoreWorkerCount: defaults.BitswapEngineBlockstoreWorkerCount, maxOutstandingBytesPerPeer: defaults.BitswapMaxOutstandingBytesPerPeer, peerTagger: peerTagger, - outbox: make(chan (<-chan *Envelope), outboxChanBuffer), + outbox: make(chan (<-chan *Envelope)), workSignal: make(chan struct{}, 1), - ticker: time.NewTicker(time.Millisecond * 100), + ticker: time.NewTicker(unfreezePeerInterval), wantHaveReplaceSize: defaults.DefaultWantHaveReplaceSize, taskWorkerCount: defaults.BitswapEngineTaskWorkerCount, sendDontHaves: true, diff --git a/examples/go.mod b/examples/go.mod index ac7a5343e..bd19ac1a5 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -44,6 +44,8 @@ require ( github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/gabriel-vasile/mimetype v1.4.6 // indirect + github.com/gammazero/channelqueue v0.2.2 // indirect + github.com/gammazero/deque v1.0.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index 8d7be4a96..e72da49e9 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -77,6 +77,10 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= +github.com/gammazero/channelqueue v0.2.2 h1:ufNzIbeDBxNfHj0m5uwUfOwvTmHF/O40hu2ZNnvF+/8= +github.com/gammazero/channelqueue v0.2.2/go.mod h1:824o5HHE+yO1xokh36BIuSv8YWwXW0364ku91eRMFS4= +github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= +github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= diff --git a/go.mod b/go.mod index db188a203..292fbb95a 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,8 @@ require ( github.com/cskr/pubsub v1.0.2 github.com/dustin/go-humanize v1.0.1 github.com/gabriel-vasile/mimetype v1.4.6 + github.com/gammazero/chanqueue v1.0.0 + github.com/gammazero/deque v1.0.0 github.com/gogo/protobuf v1.3.2 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 diff --git a/go.sum b/go.sum index f3c5da72b..2e41a5014 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,10 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= +github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o= +github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc= +github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= +github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=