From 64604d33391fa7be25e0fed441aecaa6978d2ed2 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Mon, 11 Nov 2024 05:46:47 -1000 Subject: [PATCH 01/15] Provider query manager improvements - Replace Debugf with Debugw to avoid unnecessary formatting whn no debug logging. - Use atomic value to protect changable timeout. - Remove inProgressRequestStatuses map when empty. - Do not append to slice forever. Use Deque that reuses items removed from front of slice. --- .../providerquerymanager.go | 72 ++++++++++--------- go.mod | 1 + go.sum | 2 + 3 files changed, 43 insertions(+), 32 deletions(-) diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index c85efe737..3a2cf6614 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -3,8 +3,10 @@ package providerquerymanager import ( "context" "sync" + "sync/atomic" "time" + "github.com/gammazero/deque" "github.com/ipfs/boxo/bitswap/client/internal" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -82,8 +84,7 @@ type ProviderQueryManager struct { providerRequestsProcessing chan *findProviderRequest incomingFindProviderRequests chan *findProviderRequest - findProviderTimeout time.Duration - timeoutMutex sync.RWMutex + findProviderTimeout atomic.Int64 // do not touch outside the run loop inProgressRequestStatuses map[cid.Cid]*inProgressRequestStatus @@ -92,15 +93,15 @@ type ProviderQueryManager struct { // New initializes a new ProviderQueryManager for a given context and a given // network provider. func New(ctx context.Context, network ProviderQueryNetwork) *ProviderQueryManager { - return &ProviderQueryManager{ + pqm := &ProviderQueryManager{ ctx: ctx, network: network, providerQueryMessages: make(chan providerQueryMessage, 16), providerRequestsProcessing: make(chan *findProviderRequest), incomingFindProviderRequests: make(chan *findProviderRequest), - inProgressRequestStatuses: make(map[cid.Cid]*inProgressRequestStatus), - findProviderTimeout: defaultTimeout, } + pqm.SetFindProviderTimeout(defaultTimeout) + return pqm } // Startup starts processing for the ProviderQueryManager. @@ -115,9 +116,7 @@ type inProgressRequest struct { // SetFindProviderTimeout changes the timeout for finding providers func (pqm *ProviderQueryManager) SetFindProviderTimeout(findProviderTimeout time.Duration) { - pqm.timeoutMutex.Lock() - pqm.findProviderTimeout = findProviderTimeout - pqm.timeoutMutex.Unlock() + pqm.findProviderTimeout.Store(int64(findProviderTimeout)) } // FindProvidersAsync finds providers for the given block. @@ -167,25 +166,28 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k // reading from the returned channel -- so that the broadcast never blocks // based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd returnedProviders := make(chan peer.ID) - receivedProviders := append([]peer.ID(nil), receivedInProgressRequest.providersSoFar[0:]...) + receivedProviders := deque.New[peer.ID]() + for _, pid := range receivedInProgressRequest.providersSoFar { + receivedProviders.PushBack(pid) + } incomingProviders := receivedInProgressRequest.incoming go func() { defer close(returnedProviders) defer onCloseFn() outgoingProviders := func() chan<- peer.ID { - if len(receivedProviders) == 0 { + if receivedProviders.Len() == 0 { return nil } return returnedProviders } nextProvider := func() peer.ID { - if len(receivedProviders) == 0 { + if receivedProviders.Len() == 0 { return "" } - return receivedProviders[0] + return receivedProviders.Front() } - for len(receivedProviders) > 0 || incomingProviders != nil { + for receivedProviders.Len() > 0 || incomingProviders != nil { select { case <-pqm.ctx.Done(): return @@ -198,10 +200,10 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k if !ok { incomingProviders = nil } else { - receivedProviders = append(receivedProviders, provider) + receivedProviders.PushBack(provider) } case outgoingProviders() <- nextProvider(): - receivedProviders = receivedProviders[1:] + receivedProviders.PopFront() } } }() @@ -241,10 +243,8 @@ func (pqm *ProviderQueryManager) findProviderWorker() { return } k := fpr.k - log.Debugf("Beginning Find Provider Request for cid: %s", k.String()) - pqm.timeoutMutex.RLock() - findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout) - pqm.timeoutMutex.RUnlock() + log.Debugw("Beginning Find Provider request", "cid", k.String()) + findProviderCtx, cancel := context.WithTimeout(fpr.ctx, time.Duration(pqm.findProviderTimeout.Load())) span := trace.SpanFromContext(findProviderCtx) span.AddEvent("StartFindProvidersAsync") providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders) @@ -257,7 +257,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() { err := pqm.network.ConnectTo(findProviderCtx, p) if err != nil { span.RecordError(err, trace.WithAttributes(attribute.Stringer("peer", p))) - log.Debugf("failed to connect to provider %s: %s", p, err) + log.Debugw("failed to connect to provider", "err", err, "peerID", p) return } span.AddEvent("ConnectedToProvider", trace.WithAttributes(attribute.Stringer("peer", p))) @@ -292,15 +292,15 @@ func (pqm *ProviderQueryManager) providerRequestBufferWorker() { // buffer for incoming provider queries and dispatches to the find // provider workers as they become available // based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd - var providerQueryRequestBuffer []*findProviderRequest + providerQueryRequestBuffer := deque.New[*findProviderRequest]() nextProviderQuery := func() *findProviderRequest { - if len(providerQueryRequestBuffer) == 0 { + if providerQueryRequestBuffer.Len() == 0 { return nil } - return providerQueryRequestBuffer[0] + return providerQueryRequestBuffer.Front() } outgoingRequests := func() chan<- *findProviderRequest { - if len(providerQueryRequestBuffer) == 0 { + if providerQueryRequestBuffer.Len() == 0 { return nil } return pqm.providerRequestsProcessing @@ -312,9 +312,9 @@ func (pqm *ProviderQueryManager) providerRequestBufferWorker() { if !ok { return } - providerQueryRequestBuffer = append(providerQueryRequestBuffer, incomingRequest) + providerQueryRequestBuffer.PushBack(incomingRequest) case outgoingRequests() <- nextProviderQuery(): - providerQueryRequestBuffer = providerQueryRequestBuffer[1:] + providerQueryRequestBuffer.PopFront() case <-pqm.ctx.Done(): return } @@ -350,14 +350,14 @@ func (pqm *ProviderQueryManager) run() { } func (rpm *receivedProviderMessage) debugMessage() { - log.Debugf("Received provider (%s) (%s)", rpm.p, rpm.k) + log.Debugw("Received provider", "peerID", rpm.p, "cid", rpm.k) trace.SpanFromContext(rpm.ctx).AddEvent("ReceivedProvider", trace.WithAttributes(attribute.Stringer("provider", rpm.p), attribute.Stringer("cid", rpm.k))) } func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) { requestStatus, ok := pqm.inProgressRequestStatuses[rpm.k] if !ok { - log.Debugf("Received provider (%s) for cid (%s) not requested", rpm.p.String(), rpm.k.String()) + log.Debugw("Received provider not requested", "peerID", rpm.p.String(), "cid", rpm.k.String()) return } requestStatus.providersSoFar = append(requestStatus.providersSoFar, rpm.p) @@ -371,7 +371,7 @@ func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) { } func (fpqm *finishedProviderQueryMessage) debugMessage() { - log.Debugf("Finished Provider Query on cid: %s", fpqm.k) + log.Debugw("Finished Provider Query", "cid", fpqm.k) trace.SpanFromContext(fpqm.ctx).AddEvent("FinishedProviderQuery", trace.WithAttributes(attribute.Stringer("cid", fpqm.k))) } @@ -385,18 +385,20 @@ func (fpqm *finishedProviderQueryMessage) handle(pqm *ProviderQueryManager) { close(listener) } delete(pqm.inProgressRequestStatuses, fpqm.k) + if len(pqm.inProgressRequestStatuses) == 0 { + pqm.inProgressRequestStatuses = nil + } requestStatus.cancelFn() } func (npqm *newProvideQueryMessage) debugMessage() { - log.Debugf("New Provider Query on cid: %s", npqm.k) + log.Debugw("New Provider Query", "cid", npqm.k) trace.SpanFromContext(npqm.ctx).AddEvent("NewProvideQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k))) } func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { requestStatus, ok := pqm.inProgressRequestStatuses[npqm.k] if !ok { - ctx, cancelFn := context.WithCancel(pqm.ctx) span := trace.SpanFromContext(npqm.ctx) span.AddEvent("NewQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k))) @@ -408,6 +410,9 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { cancelFn: cancelFn, } + if pqm.inProgressRequestStatuses == nil { + pqm.inProgressRequestStatuses = make(map[cid.Cid]*inProgressRequestStatus) + } pqm.inProgressRequestStatuses[npqm.k] = requestStatus select { @@ -433,7 +438,7 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { } func (crm *cancelRequestMessage) debugMessage() { - log.Debugf("Cancel provider query on cid: %s", crm.k) + log.Debugw("Cancel provider query", "cid", crm.k) trace.SpanFromContext(crm.ctx).AddEvent("CancelRequest", trace.WithAttributes(attribute.Stringer("cid", crm.k))) } @@ -452,6 +457,9 @@ func (crm *cancelRequestMessage) handle(pqm *ProviderQueryManager) { close(crm.incomingProviders) if len(requestStatus.listeners) == 0 { delete(pqm.inProgressRequestStatuses, crm.k) + if len(pqm.inProgressRequestStatuses) == 0 { + pqm.inProgressRequestStatuses = nil + } requestStatus.cancelFn() } } diff --git a/go.mod b/go.mod index db188a203..a46926640 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/cskr/pubsub v1.0.2 github.com/dustin/go-humanize v1.0.1 github.com/gabriel-vasile/mimetype v1.4.6 + github.com/gammazero/deque v0.2.1 github.com/gogo/protobuf v1.3.2 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 diff --git a/go.sum b/go.sum index f3c5da72b..3fa1a9fa4 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= +github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= +github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= From 39caa17fe8a2c10ae5671b1fd1d0d15228da024a Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 12 Nov 2024 00:51:44 -1000 Subject: [PATCH 02/15] mod tidy examples --- examples/go.mod | 1 + examples/go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/examples/go.mod b/examples/go.mod index ac7a5343e..604949cc6 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -44,6 +44,7 @@ require ( github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/gabriel-vasile/mimetype v1.4.6 // indirect + github.com/gammazero/deque v0.2.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index 8d7be4a96..04c2a4dcd 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -77,6 +77,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= +github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= +github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= From 9cb975fc76f892f907010d38e1f5b581c2af6f96 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 15 Nov 2024 02:10:43 -1000 Subject: [PATCH 03/15] update deque --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a46926640..dffb3a4a9 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/cskr/pubsub v1.0.2 github.com/dustin/go-humanize v1.0.1 github.com/gabriel-vasile/mimetype v1.4.6 - github.com/gammazero/deque v0.2.1 + github.com/gammazero/deque v1.0.0 github.com/gogo/protobuf v1.3.2 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 diff --git a/go.sum b/go.sum index 3fa1a9fa4..a803f6182 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= -github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= -github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= +github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= +github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= From f65d610bbf10f451c82facd5218601114dfee5b2 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 15 Nov 2024 02:46:15 -1000 Subject: [PATCH 04/15] mod tidy examples --- examples/go.mod | 2 +- examples/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/go.mod b/examples/go.mod index 604949cc6..537d616fd 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -44,7 +44,7 @@ require ( github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/gabriel-vasile/mimetype v1.4.6 // indirect - github.com/gammazero/deque v0.2.1 // indirect + github.com/gammazero/deque v1.0.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index 04c2a4dcd..255918b76 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -77,8 +77,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= -github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= -github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= +github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= +github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= From a351504e2a549277057c326e1c301f90bd2b52b6 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 15 Nov 2024 06:46:56 -1000 Subject: [PATCH 05/15] finish update --- .../internal/providerquerymanager/providerquerymanager.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index 3a2cf6614..46bd07927 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -166,7 +166,8 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k // reading from the returned channel -- so that the broadcast never blocks // based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd returnedProviders := make(chan peer.ID) - receivedProviders := deque.New[peer.ID]() + var receivedProviders deque.Deque[peer.ID] + receivedProviders.Grow(len(receivedInProgressRequest.providersSoFar)) for _, pid := range receivedInProgressRequest.providersSoFar { receivedProviders.PushBack(pid) } @@ -292,7 +293,7 @@ func (pqm *ProviderQueryManager) providerRequestBufferWorker() { // buffer for incoming provider queries and dispatches to the find // provider workers as they become available // based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd - providerQueryRequestBuffer := deque.New[*findProviderRequest]() + var providerQueryRequestBuffer deque.Deque[*findProviderRequest] nextProviderQuery := func() *findProviderRequest { if providerQueryRequestBuffer.Len() == 0 { return nil From 351b8a9b1d8982586b957a47aea1bb512959a83e Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 22 Nov 2024 06:26:56 -1000 Subject: [PATCH 06/15] Use const for unfreesePeerInterval --- bitswap/server/internal/decision/engine.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 9b929b99c..98b0ffc94 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -64,8 +64,6 @@ import ( var log = logging.Logger("bitswap/server/decision") const ( - // outboxChanBuffer must be 0 to prevent stale messages from being sent - outboxChanBuffer = 0 // targetMessageSize is the ideal size of the batched payload. We try to // pop this much data off the request queue, but it may be a little more // or less depending on what's in the queue. @@ -76,6 +74,9 @@ const ( // queuedTagWeight is the default weight for peers that have work queued // on their behalf. queuedTagWeight = 10 + + // Interval at which peers in the PeerTaskQueue are incrementally unfrozen. + unfreezePeerInterval = 100 * time.Millisecond ) // Envelope contains a message for a Peer. @@ -391,9 +392,9 @@ func NewEngine( bstoreWorkerCount: defaults.BitswapEngineBlockstoreWorkerCount, maxOutstandingBytesPerPeer: defaults.BitswapMaxOutstandingBytesPerPeer, peerTagger: peerTagger, - outbox: make(chan (<-chan *Envelope), outboxChanBuffer), + outbox: make(chan (<-chan *Envelope)), workSignal: make(chan struct{}, 1), - ticker: time.NewTicker(time.Millisecond * 100), + ticker: time.NewTicker(unfreezePeerInterval), wantHaveReplaceSize: defaults.DefaultWantHaveReplaceSize, taskWorkerCount: defaults.BitswapEngineTaskWorkerCount, sendDontHaves: true, From 3e11c1fdc52aebae871838288786676b3b949dc9 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 22 Nov 2024 06:27:54 -1000 Subject: [PATCH 07/15] providerQueryMessages should be unbuffered, confirmed by testing --- .../internal/providerquerymanager/providerquerymanager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index 46bd07927..760d92035 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -96,7 +96,7 @@ func New(ctx context.Context, network ProviderQueryNetwork) *ProviderQueryManage pqm := &ProviderQueryManager{ ctx: ctx, network: network, - providerQueryMessages: make(chan providerQueryMessage, 16), + providerQueryMessages: make(chan providerQueryMessage), providerRequestsProcessing: make(chan *findProviderRequest), incomingFindProviderRequests: make(chan *findProviderRequest), } From f3dd35d55840ed6fa41fd2d551e24af7684e79d1 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 22 Nov 2024 08:20:48 -1000 Subject: [PATCH 08/15] Replace providerRequestBufferWorker with channelqueue Add configurable options: - WithMaxInProcessRequests - WithMaxProvidersPerFind --- .../internal/providerquerymanager/options.go | 42 +++++++ .../providerquerymanager.go | 105 ++++++++---------- .../providerquerymanager_test.go | 1 + go.mod | 1 + go.sum | 2 + 5 files changed, 90 insertions(+), 61 deletions(-) create mode 100644 bitswap/client/internal/providerquerymanager/options.go diff --git a/bitswap/client/internal/providerquerymanager/options.go b/bitswap/client/internal/providerquerymanager/options.go new file mode 100644 index 000000000..dade41062 --- /dev/null +++ b/bitswap/client/internal/providerquerymanager/options.go @@ -0,0 +1,42 @@ +package providerquerymanager + +const ( + defaultMaxProvidersPerFind = 10 + defaultMaxInProcessRequests = 6 +) + +type config struct { + maxProvidersPerFind int + maxInProcessRequests int +} + +type Option func(*config) + +func getOpts(opts []Option) config { + cfg := config{ + maxProvidersPerFind: defaultMaxProvidersPerFind, + maxInProcessRequests: defaultMaxInProcessRequests, + } + for _, opt := range opts { + opt(&cfg) + } + return cfg +} + +func WithMaxProvidersPerFind(n int) Option { + return func(c *config) { + if n == 0 { + n = defaultMaxProvidersPerFind + } + c.maxProvidersPerFind = n + } +} + +func WithMaxInProcessRequests(n int) Option { + return func(c *config) { + if n == 0 { + n = defaultMaxInProcessRequests + } + c.maxInProcessRequests = n + } +} diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index 760d92035..f0b1c411a 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "time" + "github.com/gammazero/channelqueue" "github.com/gammazero/deque" "github.com/ipfs/boxo/bitswap/client/internal" "github.com/ipfs/go-cid" @@ -18,9 +19,7 @@ import ( var log = logging.Logger("bitswap/client/provqrymgr") const ( - maxProviders = 10 - maxInProcessRequests = 6 - defaultTimeout = 10 * time.Second + defaultTimeout = 10 * time.Second ) type inProgressRequestStatus struct { @@ -78,27 +77,27 @@ type cancelRequestMessage struct { // - ensure two findprovider calls for the same block don't run concurrently // - manage timeouts type ProviderQueryManager struct { - ctx context.Context - network ProviderQueryNetwork - providerQueryMessages chan providerQueryMessage - providerRequestsProcessing chan *findProviderRequest - incomingFindProviderRequests chan *findProviderRequest + ctx context.Context + network ProviderQueryNetwork + providerQueryMessages chan providerQueryMessage + providerRequestsProcessing *channelqueue.ChannelQueue[*findProviderRequest] findProviderTimeout atomic.Int64 // do not touch outside the run loop inProgressRequestStatuses map[cid.Cid]*inProgressRequestStatus + + opts config } // New initializes a new ProviderQueryManager for a given context and a given // network provider. -func New(ctx context.Context, network ProviderQueryNetwork) *ProviderQueryManager { +func New(ctx context.Context, network ProviderQueryNetwork, options ...Option) *ProviderQueryManager { pqm := &ProviderQueryManager{ - ctx: ctx, - network: network, - providerQueryMessages: make(chan providerQueryMessage), - providerRequestsProcessing: make(chan *findProviderRequest), - incomingFindProviderRequests: make(chan *findProviderRequest), + ctx: ctx, + network: network, + providerQueryMessages: make(chan providerQueryMessage), + opts: getOpts(options), } pqm.SetFindProviderTimeout(defaultTimeout) return pqm @@ -160,11 +159,11 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, } func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, receivedInProgressRequest inProgressRequest, onCloseFn func()) <-chan peer.ID { - // maintains an unbuffered queue for incoming providers for given request for a given session - // essentially, as a provider comes in, for a given CID, we want to immediately broadcast to all - // sessions that queried that CID, without worrying about whether the client code is actually - // reading from the returned channel -- so that the broadcast never blocks - // based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd + // maintains an unbuffered queue for incoming providers for given request + // for a given session. Eessentially, as a provider comes in, for a given + // CID, immediately broadcast to all sessions that queried that CID, + // without worrying about whether the client code is actually reading from + // the returned channel -- so that the broadcast never blocks. returnedProviders := make(chan peer.ID) var receivedProviders deque.Deque[peer.ID] receivedProviders.Grow(len(receivedInProgressRequest.providersSoFar)) @@ -234,12 +233,13 @@ func (pqm *ProviderQueryManager) cancelProviderRequest(ctx context.Context, k ci } func (pqm *ProviderQueryManager) findProviderWorker() { - // findProviderWorker just cycles through incoming provider queries one - // at a time. We have six of these workers running at once - // to let requests go in parallel but keep them rate limited + // findProviderWorker just cycles through incoming provider queries one at + // a time. There are maxInProcessRequests of these workers running + // concurrently to let requests go in parallel but keep them rate limited. + maxProviders := pqm.opts.maxProvidersPerFind for { select { - case fpr, ok := <-pqm.providerRequestsProcessing: + case fpr, ok := <-pqm.providerRequestsProcessing.Out(): if !ok { return } @@ -250,6 +250,8 @@ func (pqm *ProviderQueryManager) findProviderWorker() { span.AddEvent("StartFindProvidersAsync") providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders) wg := &sync.WaitGroup{} + // Read each peer ID from the providers chan and start a goroutine + // to connect to it and send a receivedProviderMessage. for p := range providers { wg.Add(1) go func(p peer.ID) { @@ -288,40 +290,6 @@ func (pqm *ProviderQueryManager) findProviderWorker() { } } -func (pqm *ProviderQueryManager) providerRequestBufferWorker() { - // the provider request buffer worker just maintains an unbounded - // buffer for incoming provider queries and dispatches to the find - // provider workers as they become available - // based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd - var providerQueryRequestBuffer deque.Deque[*findProviderRequest] - nextProviderQuery := func() *findProviderRequest { - if providerQueryRequestBuffer.Len() == 0 { - return nil - } - return providerQueryRequestBuffer.Front() - } - outgoingRequests := func() chan<- *findProviderRequest { - if providerQueryRequestBuffer.Len() == 0 { - return nil - } - return pqm.providerRequestsProcessing - } - - for { - select { - case incomingRequest, ok := <-pqm.incomingFindProviderRequests: - if !ok { - return - } - providerQueryRequestBuffer.PushBack(incomingRequest) - case outgoingRequests() <- nextProviderQuery(): - providerQueryRequestBuffer.PopFront() - case <-pqm.ctx.Done(): - return - } - } -} - func (pqm *ProviderQueryManager) cleanupInProcessRequests() { for _, requestStatus := range pqm.inProgressRequestStatuses { for listener := range requestStatus.listeners { @@ -334,9 +302,24 @@ func (pqm *ProviderQueryManager) cleanupInProcessRequests() { func (pqm *ProviderQueryManager) run() { defer pqm.cleanupInProcessRequests() - go pqm.providerRequestBufferWorker() - for i := 0; i < maxInProcessRequests; i++ { - go pqm.findProviderWorker() + var wg sync.WaitGroup + pqm.providerRequestsProcessing = channelqueue.New[*findProviderRequest](-1) + defer func() { + pqm.providerRequestsProcessing.Close() + // Afers workers done, close and drain channelqueue. + go func() { + wg.Wait() + for range pqm.providerRequestsProcessing.Out() { + } + }() + }() + + wg.Add(pqm.opts.maxInProcessRequests) + for i := 0; i < pqm.opts.maxInProcessRequests; i++ { + go func() { + pqm.findProviderWorker() + wg.Done() + }() } for { @@ -417,7 +400,7 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { pqm.inProgressRequestStatuses[npqm.k] = requestStatus select { - case pqm.incomingFindProviderRequests <- &findProviderRequest{ + case pqm.providerRequestsProcessing.In() <- &findProviderRequest{ k: npqm.k, ctx: ctx, }: diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go b/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go index 9deb77f99..2ed7ee8c9 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go @@ -258,6 +258,7 @@ func TestRateLimitingRequests(t *testing.T) { providerQueryManager := New(ctx, fpn) providerQueryManager.Startup() + maxInProcessRequests := providerQueryManager.opts.maxInProcessRequests keys := random.Cids(maxInProcessRequests + 1) sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() diff --git a/go.mod b/go.mod index dffb3a4a9..9a0c8dc66 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/cskr/pubsub v1.0.2 github.com/dustin/go-humanize v1.0.1 github.com/gabriel-vasile/mimetype v1.4.6 + github.com/gammazero/channelqueue v0.2.2 github.com/gammazero/deque v1.0.0 github.com/gogo/protobuf v1.3.2 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index a803f6182..566acffd2 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= +github.com/gammazero/channelqueue v0.2.2 h1:ufNzIbeDBxNfHj0m5uwUfOwvTmHF/O40hu2ZNnvF+/8= +github.com/gammazero/channelqueue v0.2.2/go.mod h1:824o5HHE+yO1xokh36BIuSv8YWwXW0364ku91eRMFS4= github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= From 853b3ce8fe115f90a66a2e3f39c754879b5ae47c Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 22 Nov 2024 08:53:57 -1000 Subject: [PATCH 09/15] Make ProviderQueryManager options configurable through client --- bitswap/client/client.go | 79 ++++------------------------- bitswap/client/optios.go | 105 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 69 deletions(-) create mode 100644 bitswap/client/optios.go diff --git a/bitswap/client/client.go b/bitswap/client/client.go index bab03c3cd..7de8081ea 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -19,7 +19,6 @@ import ( bssm "github.com/ipfs/boxo/bitswap/client/internal/sessionmanager" bsspm "github.com/ipfs/boxo/bitswap/client/internal/sessionpeermanager" "github.com/ipfs/boxo/bitswap/internal" - "github.com/ipfs/boxo/bitswap/internal/defaults" bsmsg "github.com/ipfs/boxo/bitswap/message" bmetrics "github.com/ipfs/boxo/bitswap/metrics" bsnet "github.com/ipfs/boxo/bitswap/network" @@ -38,65 +37,6 @@ import ( var log = logging.Logger("bitswap/client") -// Option defines the functional option type that can be used to configure -// bitswap instances -type Option func(*Client) - -// ProviderSearchDelay sets the initial dely before triggering a provider -// search to find more peers and broadcast the want list. It also partially -// controls re-broadcasts delay when the session idles (does not receive any -// blocks), but these have back-off logic to increase the interval. See -// [defaults.ProvSearchDelay] for the default. -func ProviderSearchDelay(newProvSearchDelay time.Duration) Option { - return func(bs *Client) { - bs.provSearchDelay = newProvSearchDelay - } -} - -// RebroadcastDelay sets a custom delay for periodic search of a random want. -// When the value ellapses, a random CID from the wantlist is chosen and the -// client attempts to find more peers for it and sends them the single want. -// [defaults.RebroadcastDelay] for the default. -func RebroadcastDelay(newRebroadcastDelay delay.D) Option { - return func(bs *Client) { - bs.rebroadcastDelay = newRebroadcastDelay - } -} - -func SetSimulateDontHavesOnTimeout(send bool) Option { - return func(bs *Client) { - bs.simulateDontHavesOnTimeout = send - } -} - -// Configures the Client to use given tracer. -// This provides methods to access all messages sent and received by the Client. -// This interface can be used to implement various statistics (this is original intent). -func WithTracer(tap tracer.Tracer) Option { - return func(bs *Client) { - bs.tracer = tap - } -} - -func WithBlockReceivedNotifier(brn BlockReceivedNotifier) Option { - return func(bs *Client) { - bs.blockReceivedNotifier = brn - } -} - -// WithoutDuplicatedBlockStats disable collecting counts of duplicated blocks -// received. This counter requires triggering a blockstore.Has() call for -// every block received by launching goroutines in parallel. In the worst case -// (no caching/blooms etc), this is an expensive call for the datastore to -// answer. In a normal case (caching), this has the power of evicting a -// different block from intermediary caches. In the best case, it doesn't -// affect performance. Use if this stat is not relevant. -func WithoutDuplicatedBlockStats() Option { - return func(bs *Client) { - bs.skipDuplicatedBlocksStats = true - } -} - type BlockReceivedNotifier interface { // ReceivedBlocks notifies the decision engine that a peer is well-behaving // and gave us useful data, potentially increasing its score and making us @@ -130,10 +70,14 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore return bsmq.New(ctx, p, network, onDontHaveTimeout) } + opts := getOpts(options) + sim := bssim.New() bpm := bsbpm.New() pm := bspm.New(ctx, peerQueueFactory, network.Self()) - pqm := bspqm.New(ctx, network) + pqm := bspqm.New(ctx, network, + bspqm.WithMaxProvidersPerFind(opts.pqmMaxProvidersPerFind), + bspqm.WithMaxInProcessRequests(opts.pqmMaxInProcessRequests)) sessionFactory := func( sessctx context.Context, @@ -168,14 +112,11 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore counters: new(counters), dupMetric: bmetrics.DupHist(ctx), allMetric: bmetrics.AllHist(ctx), - provSearchDelay: defaults.ProvSearchDelay, - rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay), - simulateDontHavesOnTimeout: true, - } - - // apply functional options before starting and running bitswap - for _, option := range options { - option(bs) + provSearchDelay: opts.provSearchDelay, + rebroadcastDelay: opts.rebroadcastDelay, + blockReceivedNotifier: opts.blockReceivedNotifier, + simulateDontHavesOnTimeout: opts.simulateDontHavesOnTimeout, + tracer: opts.tracer, } pqm.Startup() diff --git a/bitswap/client/optios.go b/bitswap/client/optios.go new file mode 100644 index 000000000..8538324ca --- /dev/null +++ b/bitswap/client/optios.go @@ -0,0 +1,105 @@ +package client + +import ( + "time" + + "github.com/ipfs/boxo/bitswap/internal/defaults" + "github.com/ipfs/boxo/bitswap/tracer" + delay "github.com/ipfs/go-ipfs-delay" +) + +type clientConfig struct { + blockReceivedNotifier BlockReceivedNotifier + provSearchDelay time.Duration + rebroadcastDelay delay.D + simulateDontHavesOnTimeout bool + skipDuplicatedBlocksStats bool + tracer tracer.Tracer + + // ProviderQueryManager options. + pqmMaxProvidersPerFind int + pqmMaxInProcessRequests int +} + +// Option defines the functional option type that can be used to configure +// bitswap instances +type Option func(*clientConfig) + +func getOpts(opts []Option) clientConfig { + cfg := clientConfig{ + provSearchDelay: defaults.ProvSearchDelay, + rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay), + simulateDontHavesOnTimeout: true, + } + for _, opt := range opts { + opt(&cfg) + } + return cfg +} + +// ProviderSearchDelay sets the initial dely before triggering a provider +// search to find more peers and broadcast the want list. It also partially +// controls re-broadcasts delay when the session idles (does not receive any +// blocks), but these have back-off logic to increase the interval. See +// [defaults.ProvSearchDelay] for the default. +func ProviderSearchDelay(newProvSearchDelay time.Duration) Option { + return func(c *clientConfig) { + c.provSearchDelay = newProvSearchDelay + } +} + +// RebroadcastDelay sets a custom delay for periodic search of a random want. +// When the value ellapses, a random CID from the wantlist is chosen and the +// client attempts to find more peers for it and sends them the single want. +// [defaults.RebroadcastDelay] for the default. +func RebroadcastDelay(newRebroadcastDelay delay.D) Option { + return func(c *clientConfig) { + c.rebroadcastDelay = newRebroadcastDelay + } +} + +func SetSimulateDontHavesOnTimeout(send bool) Option { + return func(c *clientConfig) { + c.simulateDontHavesOnTimeout = send + } +} + +// Configures the Client to use given tracer. +// This provides methods to access all messages sent and received by the Client. +// This interface can be used to implement various statistics (this is original intent). +func WithTracer(tap tracer.Tracer) Option { + return func(c *clientConfig) { + c.tracer = tap + } +} + +func WithBlockReceivedNotifier(brn BlockReceivedNotifier) Option { + return func(c *clientConfig) { + c.blockReceivedNotifier = brn + } +} + +// WithoutDuplicatedBlockStats disable collecting counts of duplicated blocks +// received. This counter requires triggering a blockstore.Has() call for +// every block received by launching goroutines in parallel. In the worst case +// (no caching/blooms etc), this is an expensive call for the datastore to +// answer. In a normal case (caching), this has the power of evicting a +// different block from intermediary caches. In the best case, it doesn't +// affect performance. Use if this stat is not relevant. +func WithoutDuplicatedBlockStats() Option { + return func(c *clientConfig) { + c.skipDuplicatedBlocksStats = true + } +} + +func WithPQMMaxProvidersPerFind(n int) Option { + return func(c *clientConfig) { + c.pqmMaxProvidersPerFind = n + } +} + +func WithPQMMaxInProcessRequests(n int) Option { + return func(c *clientConfig) { + c.pqmMaxInProcessRequests = n + } +} From 4b48c084902fb09797077fa53fa314d09976242f Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 22 Nov 2024 11:03:12 -1000 Subject: [PATCH 10/15] Rename maxInProcessRequests to maxConcurrentFinds --- bitswap/client/client.go | 4 ++-- .../internal/providerquerymanager/options.go | 24 +++++++++---------- .../providerquerymanager.go | 6 ++--- .../providerquerymanager_test.go | 12 +++++----- bitswap/client/optios.go | 12 +++++----- 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 7de8081ea..0ea294b97 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -76,8 +76,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore bpm := bsbpm.New() pm := bspm.New(ctx, peerQueueFactory, network.Self()) pqm := bspqm.New(ctx, network, - bspqm.WithMaxProvidersPerFind(opts.pqmMaxProvidersPerFind), - bspqm.WithMaxInProcessRequests(opts.pqmMaxInProcessRequests)) + bspqm.WithMaxConcurrentFinds(opts.pqmMaxConcurrentFinds), + bspqm.WithMaxProvidersPerFind(opts.pqmMaxProvidersPerFind)) sessionFactory := func( sessctx context.Context, diff --git a/bitswap/client/internal/providerquerymanager/options.go b/bitswap/client/internal/providerquerymanager/options.go index dade41062..be158408b 100644 --- a/bitswap/client/internal/providerquerymanager/options.go +++ b/bitswap/client/internal/providerquerymanager/options.go @@ -1,21 +1,21 @@ package providerquerymanager const ( - defaultMaxProvidersPerFind = 10 - defaultMaxInProcessRequests = 6 + defaultMaxConcurrentFinds = 16 + defaultMaxProvidersPerFind = 10 ) type config struct { - maxProvidersPerFind int - maxInProcessRequests int + maxConcurrentFinds int + maxProvidersPerFind int } type Option func(*config) func getOpts(opts []Option) config { cfg := config{ - maxProvidersPerFind: defaultMaxProvidersPerFind, - maxInProcessRequests: defaultMaxInProcessRequests, + maxConcurrentFinds: defaultMaxConcurrentFinds, + maxProvidersPerFind: defaultMaxProvidersPerFind, } for _, opt := range opts { opt(&cfg) @@ -23,20 +23,20 @@ func getOpts(opts []Option) config { return cfg } -func WithMaxProvidersPerFind(n int) Option { +func WithMaxConcurrentFinds(n int) Option { return func(c *config) { if n == 0 { - n = defaultMaxProvidersPerFind + n = defaultMaxConcurrentFinds } - c.maxProvidersPerFind = n + c.maxConcurrentFinds = n } } -func WithMaxInProcessRequests(n int) Option { +func WithMaxProvidersPerFind(n int) Option { return func(c *config) { if n == 0 { - n = defaultMaxInProcessRequests + n = defaultMaxProvidersPerFind } - c.maxInProcessRequests = n + c.maxProvidersPerFind = n } } diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index f0b1c411a..3acd6a963 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -234,7 +234,7 @@ func (pqm *ProviderQueryManager) cancelProviderRequest(ctx context.Context, k ci func (pqm *ProviderQueryManager) findProviderWorker() { // findProviderWorker just cycles through incoming provider queries one at - // a time. There are maxInProcessRequests of these workers running + // a time. There are pqm.opts.maxConcurrentFinds of these workers running // concurrently to let requests go in parallel but keep them rate limited. maxProviders := pqm.opts.maxProvidersPerFind for { @@ -314,8 +314,8 @@ func (pqm *ProviderQueryManager) run() { }() }() - wg.Add(pqm.opts.maxInProcessRequests) - for i := 0; i < pqm.opts.maxInProcessRequests; i++ { + wg.Add(pqm.opts.maxConcurrentFinds) + for i := 0; i < pqm.opts.maxConcurrentFinds; i++ { go func() { pqm.findProviderWorker() wg.Done() diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go b/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go index 2ed7ee8c9..83d49a931 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go @@ -258,29 +258,29 @@ func TestRateLimitingRequests(t *testing.T) { providerQueryManager := New(ctx, fpn) providerQueryManager.Startup() - maxInProcessRequests := providerQueryManager.opts.maxInProcessRequests - keys := random.Cids(maxInProcessRequests + 1) + maxConcurrentFinds := providerQueryManager.opts.maxConcurrentFinds + keys := random.Cids(maxConcurrentFinds + 1) sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() var requestChannels []<-chan peer.ID - for i := 0; i < maxInProcessRequests+1; i++ { + for i := 0; i < maxConcurrentFinds+1; i++ { requestChannels = append(requestChannels, providerQueryManager.FindProvidersAsync(sessionCtx, keys[i])) } time.Sleep(20 * time.Millisecond) fpn.queriesMadeMutex.Lock() - if fpn.liveQueries != maxInProcessRequests { + if fpn.liveQueries != maxConcurrentFinds { t.Logf("Queries made: %d\n", fpn.liveQueries) t.Fatal("Did not limit parallel requests to rate limit") } fpn.queriesMadeMutex.Unlock() - for i := 0; i < maxInProcessRequests+1; i++ { + for i := 0; i < maxConcurrentFinds+1; i++ { for range requestChannels[i] { } } fpn.queriesMadeMutex.Lock() defer fpn.queriesMadeMutex.Unlock() - if fpn.queriesMade != maxInProcessRequests+1 { + if fpn.queriesMade != maxConcurrentFinds+1 { t.Logf("Queries made: %d\n", fpn.queriesMade) t.Fatal("Did not make all separate requests") } diff --git a/bitswap/client/optios.go b/bitswap/client/optios.go index 8538324ca..cd7220a8b 100644 --- a/bitswap/client/optios.go +++ b/bitswap/client/optios.go @@ -17,8 +17,8 @@ type clientConfig struct { tracer tracer.Tracer // ProviderQueryManager options. - pqmMaxProvidersPerFind int - pqmMaxInProcessRequests int + pqmMaxConcurrentFinds int + pqmMaxProvidersPerFind int } // Option defines the functional option type that can be used to configure @@ -92,14 +92,14 @@ func WithoutDuplicatedBlockStats() Option { } } -func WithPQMMaxProvidersPerFind(n int) Option { +func WithPQMMaxConcurrentFinds(n int) Option { return func(c *clientConfig) { - c.pqmMaxProvidersPerFind = n + c.pqmMaxConcurrentFinds = n } } -func WithPQMMaxInProcessRequests(n int) Option { +func WithPQMMaxProvidersPerFind(n int) Option { return func(c *clientConfig) { - c.pqmMaxInProcessRequests = n + c.pqmMaxProvidersPerFind = n } } From f6d672a073dc8dc49a80a3d7d8a873898dca3642 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 22 Nov 2024 11:19:43 -1000 Subject: [PATCH 11/15] rename options --- bitswap/client/client.go | 4 ++-- bitswap/client/optios.go | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 0ea294b97..c73a30416 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -76,8 +76,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore bpm := bsbpm.New() pm := bspm.New(ctx, peerQueueFactory, network.Self()) pqm := bspqm.New(ctx, network, - bspqm.WithMaxConcurrentFinds(opts.pqmMaxConcurrentFinds), - bspqm.WithMaxProvidersPerFind(opts.pqmMaxProvidersPerFind)) + bspqm.WithMaxConcurrentFinds(opts.maxConcurrentFinds), + bspqm.WithMaxProvidersPerFind(opts.maxProvidersPerFind)) sessionFactory := func( sessctx context.Context, diff --git a/bitswap/client/optios.go b/bitswap/client/optios.go index cd7220a8b..ef0979ae9 100644 --- a/bitswap/client/optios.go +++ b/bitswap/client/optios.go @@ -17,8 +17,8 @@ type clientConfig struct { tracer tracer.Tracer // ProviderQueryManager options. - pqmMaxConcurrentFinds int - pqmMaxProvidersPerFind int + maxConcurrentFinds int + maxProvidersPerFind int } // Option defines the functional option type that can be used to configure @@ -92,14 +92,14 @@ func WithoutDuplicatedBlockStats() Option { } } -func WithPQMMaxConcurrentFinds(n int) Option { +func WithMaxConcurrentFinds(n int) Option { return func(c *clientConfig) { - c.pqmMaxConcurrentFinds = n + c.maxConcurrentFinds = n } } -func WithPQMMaxProvidersPerFind(n int) Option { +func WithMaxProvidersPerFind(n int) Option { return func(c *clientConfig) { - c.pqmMaxProvidersPerFind = n + c.maxProvidersPerFind = n } } From 84f8f134c510fa455c282dacb2337b99d5d48a16 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 22 Nov 2024 12:41:21 -1000 Subject: [PATCH 12/15] Add WitrFindProviderTimeout option --- .../internal/providerquerymanager/options.go | 29 +++++++++++++++++++ .../providerquerymanager.go | 13 ++++++--- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/bitswap/client/internal/providerquerymanager/options.go b/bitswap/client/internal/providerquerymanager/options.go index be158408b..e48ff2466 100644 --- a/bitswap/client/internal/providerquerymanager/options.go +++ b/bitswap/client/internal/providerquerymanager/options.go @@ -1,11 +1,17 @@ package providerquerymanager +import ( + "time" +) + const ( + defaultFindProviderTimeout = 10 * time.Second defaultMaxConcurrentFinds = 16 defaultMaxProvidersPerFind = 10 ) type config struct { + findProviderTimeout time.Duration maxConcurrentFinds int maxProvidersPerFind int } @@ -14,6 +20,7 @@ type Option func(*config) func getOpts(opts []Option) config { cfg := config{ + findProviderTimeout: defaultFindProviderTimeout, maxConcurrentFinds: defaultMaxConcurrentFinds, maxProvidersPerFind: defaultMaxProvidersPerFind, } @@ -23,19 +30,41 @@ func getOpts(opts []Option) config { return cfg } +// WitrFindProviderTimeout configures the maximum amount of time to spend on a +// single find providers attempt. This value can be changed at runtime using +// SetFindProviderTimeout. Use 0 to configure the default. +func WithFindProviderTimeout(to time.Duration) Option { + return func(c *config) { + if to == 0 { + to = defaultFindProviderTimeout + } + c.findProviderTimeout = to + } +} + +// WithMaxConcurrentFinds configures the maxmum number of workers that run +// FindProvidersAsync at the same time. Use 0 to configure the default value. func WithMaxConcurrentFinds(n int) Option { return func(c *config) { if n == 0 { n = defaultMaxConcurrentFinds + } else if n < 0 { + panic("bitswap: WithMaxConcurrentFinds given negative value") } c.maxConcurrentFinds = n } } +// WithMaxProvidersPerFind configures the maximum number of providers that are +// returned from a single fiond providers attempt. Use 0 to configure the +// default value or use a negative value to find all providers within the +// timeout configured by WithFindProviderTimeout. func WithMaxProvidersPerFind(n int) Option { return func(c *config) { if n == 0 { n = defaultMaxProvidersPerFind + } else if n < 0 { + n = 0 // 0 means find all } c.maxProvidersPerFind = n } diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index 3acd6a963..28feda2ce 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -99,7 +99,7 @@ func New(ctx context.Context, network ProviderQueryNetwork, options ...Option) * providerQueryMessages: make(chan providerQueryMessage), opts: getOpts(options), } - pqm.SetFindProviderTimeout(defaultTimeout) + pqm.SetFindProviderTimeout(pqm.opts.findProviderTimeout) return pqm } @@ -113,9 +113,14 @@ type inProgressRequest struct { incoming chan peer.ID } -// SetFindProviderTimeout changes the timeout for finding providers -func (pqm *ProviderQueryManager) SetFindProviderTimeout(findProviderTimeout time.Duration) { - pqm.findProviderTimeout.Store(int64(findProviderTimeout)) +// SetFindProviderTimeout changes the timeout for finding providers. Setting a +// value of 0 resets to the value configures when this ProviderQueryManager was +// created. +func (pqm *ProviderQueryManager) SetFindProviderTimeout(timeout time.Duration) { + if timeout == 0 { + timeout = pqm.opts.findProviderTimeout + } + pqm.findProviderTimeout.Store(int64(timeout)) } // FindProvidersAsync finds providers for the given block. From 7409ebea25a26c7b9563402e12ec03bdc8d9f9d6 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 22 Nov 2024 12:56:44 -1000 Subject: [PATCH 13/15] Config findProviderTimeout through client --- bitswap/client/client.go | 1 + bitswap/client/optios.go | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/bitswap/client/client.go b/bitswap/client/client.go index c73a30416..e83e82e8f 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -76,6 +76,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore bpm := bsbpm.New() pm := bspm.New(ctx, peerQueueFactory, network.Self()) pqm := bspqm.New(ctx, network, + bspqm.WithFindProviderTimeout(opts.findProviderTimeout), bspqm.WithMaxConcurrentFinds(opts.maxConcurrentFinds), bspqm.WithMaxProvidersPerFind(opts.maxProvidersPerFind)) diff --git a/bitswap/client/optios.go b/bitswap/client/optios.go index ef0979ae9..d33df4f7c 100644 --- a/bitswap/client/optios.go +++ b/bitswap/client/optios.go @@ -17,6 +17,7 @@ type clientConfig struct { tracer tracer.Tracer // ProviderQueryManager options. + findProviderTimeout time.Duration maxConcurrentFinds int maxProvidersPerFind int } @@ -92,6 +93,12 @@ func WithoutDuplicatedBlockStats() Option { } } +func WithFindProviderTimeout(to time.Duration) Option { + return func(c *clientConfig) { + c.findProviderTimeout = to + } +} + func WithMaxConcurrentFinds(n int) Option { return func(c *clientConfig) { c.maxConcurrentFinds = n From 9326f0a835851d963caaea1c88fc179c697cfdda Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 22 Nov 2024 21:54:19 -1000 Subject: [PATCH 14/15] mod tidy examples --- .../internal/providerquerymanager/providerquerymanager.go | 4 ---- examples/go.mod | 1 + examples/go.sum | 2 ++ 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index 28feda2ce..c5994e4fe 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -18,10 +18,6 @@ import ( var log = logging.Logger("bitswap/client/provqrymgr") -const ( - defaultTimeout = 10 * time.Second -) - type inProgressRequestStatus struct { ctx context.Context cancelFn func() diff --git a/examples/go.mod b/examples/go.mod index 537d616fd..bd19ac1a5 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -44,6 +44,7 @@ require ( github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/gabriel-vasile/mimetype v1.4.6 // indirect + github.com/gammazero/channelqueue v0.2.2 // indirect github.com/gammazero/deque v1.0.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/examples/go.sum b/examples/go.sum index 255918b76..e72da49e9 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -77,6 +77,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= +github.com/gammazero/channelqueue v0.2.2 h1:ufNzIbeDBxNfHj0m5uwUfOwvTmHF/O40hu2ZNnvF+/8= +github.com/gammazero/channelqueue v0.2.2/go.mod h1:824o5HHE+yO1xokh36BIuSv8YWwXW0364ku91eRMFS4= github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= From 1be8e62045883db7440a57435669be549e042b6a Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Sun, 24 Nov 2024 04:03:48 -1000 Subject: [PATCH 15/15] use chanqueue for unbounded channel --- .../internal/providerquerymanager/providerquerymanager.go | 8 ++++---- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index c5994e4fe..19a099c76 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -6,7 +6,7 @@ import ( "sync/atomic" "time" - "github.com/gammazero/channelqueue" + "github.com/gammazero/chanqueue" "github.com/gammazero/deque" "github.com/ipfs/boxo/bitswap/client/internal" "github.com/ipfs/go-cid" @@ -76,7 +76,7 @@ type ProviderQueryManager struct { ctx context.Context network ProviderQueryNetwork providerQueryMessages chan providerQueryMessage - providerRequestsProcessing *channelqueue.ChannelQueue[*findProviderRequest] + providerRequestsProcessing *chanqueue.ChanQueue[*findProviderRequest] findProviderTimeout atomic.Int64 @@ -304,10 +304,10 @@ func (pqm *ProviderQueryManager) run() { defer pqm.cleanupInProcessRequests() var wg sync.WaitGroup - pqm.providerRequestsProcessing = channelqueue.New[*findProviderRequest](-1) + pqm.providerRequestsProcessing = chanqueue.New[*findProviderRequest]() defer func() { pqm.providerRequestsProcessing.Close() - // Afers workers done, close and drain channelqueue. + // Afers workers done, close and drain ChanQueue. go func() { wg.Wait() for range pqm.providerRequestsProcessing.Out() { diff --git a/go.mod b/go.mod index 9a0c8dc66..292fbb95a 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/cskr/pubsub v1.0.2 github.com/dustin/go-humanize v1.0.1 github.com/gabriel-vasile/mimetype v1.4.6 - github.com/gammazero/channelqueue v0.2.2 + github.com/gammazero/chanqueue v1.0.0 github.com/gammazero/deque v1.0.0 github.com/gogo/protobuf v1.3.2 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 566acffd2..2e41a5014 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,8 @@ github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= -github.com/gammazero/channelqueue v0.2.2 h1:ufNzIbeDBxNfHj0m5uwUfOwvTmHF/O40hu2ZNnvF+/8= -github.com/gammazero/channelqueue v0.2.2/go.mod h1:824o5HHE+yO1xokh36BIuSv8YWwXW0364ku91eRMFS4= +github.com/gammazero/chanqueue v1.0.0 h1:FER/sMailGFA3DDvFooEkipAMU+3c9Bg3bheloPSz6o= +github.com/gammazero/chanqueue v1.0.0/go.mod h1:fMwpwEiuUgpab0sH4VHiVcEoji1pSi+EIzeG4TPeKPc= github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34= github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=