From 161202f0be5921d30f8d7899be65c53b46cc8d53 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 13 Sep 2024 13:10:39 -0700 Subject: [PATCH 1/9] Do not fetch size for WantHave blocks --- .../internal/decision/blockstoremanager.go | 36 +++++++++ .../decision/blockstoremanager_test.go | 15 +--- bitswap/server/internal/decision/engine.go | 80 +++++++++++++------ 3 files changed, 94 insertions(+), 37 deletions(-) diff --git a/bitswap/server/internal/decision/blockstoremanager.go b/bitswap/server/internal/decision/blockstoremanager.go index aa16b3126..6de28d2a7 100644 --- a/bitswap/server/internal/decision/blockstoremanager.go +++ b/bitswap/server/internal/decision/blockstoremanager.go @@ -121,6 +121,42 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) ( return res, nil } +func (bsm *blockstoreManager) hasBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]struct{}, error) { + if len(ks) == 0 { + return nil, nil + } + hasBlocks := make([]bool, len(ks)) + + var count atomic.Int32 + err := bsm.jobPerKey(ctx, ks, func(i int, c cid.Cid) { + has, err := bsm.bs.Has(ctx, c) + if err != nil { + // Note: this isn't a fatal error. We shouldn't abort the request + log.Errorf("blockstore.GetSize(%s) error: %s", c, err) + return + } + if has { + hasBlocks[i] = true + count.Add(1) + } + }) + if err != nil { + return nil, err + } + results := count.Load() + if results == 0 { + return nil, nil + } + + res := make(map[cid.Cid]struct{}, results) + for i, ok := range hasBlocks { + if ok { + res[ks[i]] = struct{}{} + } + } + return res, nil +} + func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) { if len(ks) == 0 { return nil, nil diff --git a/bitswap/server/internal/decision/blockstoremanager_test.go b/bitswap/server/internal/decision/blockstoremanager_test.go index f65c88e83..2f2b7b23f 100644 --- a/bitswap/server/internal/decision/blockstoremanager_test.go +++ b/bitswap/server/internal/decision/blockstoremanager_test.go @@ -98,29 +98,22 @@ func TestBlockstoreManager(t *testing.T) { cids = append(cids, b.Cid()) } - sizes, err := bsm.getBlockSizes(ctx, cids) + hasBlocks, err := bsm.hasBlocks(ctx, cids) if err != nil { t.Fatal(err) } - if len(sizes) != len(blks)-1 { + if len(hasBlocks) != len(blks)-1 { t.Fatal("Wrong response length") } - for _, c := range cids { - expSize := len(exp[c].RawData()) - size, ok := sizes[c] - - // Only the last key should be missing + _, ok := hasBlocks[c] if c.Equals(cids[len(cids)-1]) { if ok { t.Fatal("Non-existent block should not be in sizes map") } } else { if !ok { - t.Fatal("Block should be in sizes map") - } - if size != expSize { - t.Fatal("Block has wrong size") + t.Fatal("Block should be in hasBlocks") } } } diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 1174c94c0..5830f7c2c 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -689,16 +689,36 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return true } + noReplace := e.maxBlockSizeReplaceHasWithBlock == 0 + // Get block sizes for unique CIDs. - wantKs := cid.NewSet() + wantKs := make([]cid.Cid, 0, len(wants)) + var haveKs []cid.Cid for _, entry := range wants { - wantKs.Add(entry.Cid) + if noReplace && entry.WantType == pb.Message_Wantlist_Have { + haveKs = append(haveKs, entry.Cid) + } else { + wantKs = append(wantKs, entry.Cid) + } } - blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys()) + blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs) if err != nil { log.Info("aborting message processing", err) return false } + if len(haveKs) != 0 { + hasBlocks, err := e.bsm.hasBlocks(ctx, haveKs) + if err != nil { + log.Info("aborting message processing", err) + return false + } + if blockSizes == nil { + blockSizes = make(map[cid.Cid]int, len(hasBlocks)) + } + for blkCid := range hasBlocks { + blockSizes[blkCid] = 0 + } + } e.lock.Lock() @@ -707,20 +727,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } var overflow []bsmsg.Entry - if len(wants) != 0 { - filteredWants := wants[:0] // shift inplace - for _, entry := range wants { - if !e.peerLedger.Wants(p, entry.Entry) { - // Cannot add entry because it would exceed size limit. - overflow = append(overflow, entry) - continue - } - filteredWants = append(filteredWants, entry) - } - // Clear truncated entries - early GC. - clear(wants[len(filteredWants):]) - wants = filteredWants - } + wants, overflow = e.filterOverflow(p, wants, overflow) if len(overflow) != 0 { log.Infow("handling wantlist overflow", "local", e.self, "from", p, "wantlistSize", len(wants), "overflowSize", len(overflow)) @@ -764,7 +771,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap sendDontHave(entry) } - // For each want-have / want-block + // For each want-block for _, entry := range wants { c := entry.Cid blockSize, found := blockSizes[c] @@ -776,7 +783,10 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap continue } // The block was found, add it to the queue - isWantBlock := e.sendAsBlock(entry.WantType, blockSize) + + // Check if this is a want-block or a have-block that can be converted + // to a want-block. + isWantBlock := blockSize != 0 && e.sendAsBlock(entry.WantType, blockSize) log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock) @@ -810,6 +820,25 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return false } +func (e *Engine) filterOverflow(p peer.ID, wants, overflow []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) { + if len(wants) == 0 { + return wants, overflow + } + + filteredWants := wants[:0] // shift inplace + for _, entry := range wants { + if !e.peerLedger.Wants(p, entry.Entry) { + // Cannot add entry because it would exceed size limit. + overflow = append(overflow, entry) + continue + } + filteredWants = append(filteredWants, entry) + } + // Clear truncated entries - early GC. + clear(wants[len(filteredWants):]) + return filteredWants, overflow +} + // handleOverflow processes incoming wants that could not be addded to the peer // ledger without exceeding the peer want limit. These are handled by trying to // make room by canceling existing wants for which there is no block. If this @@ -913,17 +942,17 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([] continue } + if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) { + denials = append(denials, et) + continue + } + if et.WantType == pb.Message_Wantlist_Have { log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c) } else { log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c) } - if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) { - denials = append(denials, et) - continue - } - // Do not take more wants that can be handled. if len(wants) < int(e.maxQueuedWantlistEntriesPerPeer) { wants = append(wants, et) @@ -1057,8 +1086,7 @@ func (e *Engine) PeerDisconnected(p peer.ID) { // If the want is a want-have, and it's below a certain size, send the full // block (instead of sending a HAVE) func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool { - isWantBlock := wantType == pb.Message_Wantlist_Block - return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock + return wantType == pb.Message_Wantlist_Block || blockSize <= e.maxBlockSizeReplaceHasWithBlock } func (e *Engine) numBytesSentTo(p peer.ID) uint64 { From f6fcbd7664f475e91fad52aa386d52abaf4b424c Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 13 Sep 2024 13:30:37 -0700 Subject: [PATCH 2/9] Option to set replaceHasWithBlockMaxSize --- CHANGELOG.md | 3 ++ bitswap/options.go | 8 ++++ bitswap/server/internal/decision/engine.go | 46 ++++++++----------- .../server/internal/decision/engine_test.go | 10 +--- bitswap/server/server.go | 13 ++++++ 5 files changed, 44 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7081f01cd..8f34e2358 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,9 @@ The following emojis are used to highlight certain changes: ### Added +* `boxo/bitswap/server`: + * A new `WithReplaceHasWithBlockMaxSize(n)` option can be used with `bitswap.New`. It sets the maximum size of a block in bytes up to which we will replace a want-have with a want-block. Setting a size of 0 disables this want-have replacement and means that block sizes are not read for want-have requests. + ### Changed ### Removed diff --git a/bitswap/options.go b/bitswap/options.go index 11e89fdf9..40bd1866f 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -71,6 +71,14 @@ func WithTaskComparator(comparator server.TaskComparator) Option { return Option{server.WithTaskComparator(comparator)} } +// WithReplaceHasWithBlockMaxSize sets the maximum size of a block in bytes up +// to which we will replace a want-have with a want-block. Setting a size of 0 +// disables this want-have replacement and means that block sizes are not read +// for want-have requests. +func WithReplaceHasWithBlockMaxSize(maxSize int) Option { + return Option{server.WithReplaceHasWithBlockMaxSize(maxSize)} +} + func ProviderSearchDelay(newProvSearchDelay time.Duration) Option { return Option{client.ProviderSearchDelay(newProvSearchDelay)} } diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 5830f7c2c..f215c4782 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -78,9 +78,9 @@ const ( // on their behalf. queuedTagWeight = 10 - // maxBlockSizeReplaceHasWithBlock is the maximum size of the block in - // bytes up to which we will replace a want-have with a want-block - maxBlockSizeReplaceHasWithBlock = 1024 + // defaultReplaceHasWithBlockMaxSize is the default maximum size of the + // block in bytes up to which we will replace a want-have with a want-block + defaultReplaceHasWithBlockMaxSize = 1024 ) // Envelope contains a message for a Peer. @@ -202,9 +202,9 @@ type Engine struct { targetMessageSize int - // maxBlockSizeReplaceHasWithBlock is the maximum size of the block in + // replaceHasWithBlockMaxSize is the maximum size of the block in // bytes up to which we will replace a want-have with a want-block - maxBlockSizeReplaceHasWithBlock int + replaceHasWithBlockMaxSize int sendDontHaves bool @@ -343,6 +343,14 @@ func WithSetSendDontHave(send bool) Option { } } +// WithReplaceHasWithBlockMaxSize sets the maximum size of a block in bytes up +// to which we will replace a want-have with a want-block. +func WithReplaceHasWithBlockMaxSize(maxSize int) Option { + return func(e *Engine) { + e.replaceHasWithBlockMaxSize = maxSize + } +} + // wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator { return func(a, b *peertask.QueueTask) bool { @@ -369,32 +377,14 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator { } // NewEngine creates a new block sending engine for the given block store. -// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum -// work already outstanding. +// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer +// more tasks if it has some maximum work already outstanding. func NewEngine( ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, opts ...Option, -) *Engine { - return newEngine( - ctx, - bs, - peerTagger, - self, - maxBlockSizeReplaceHasWithBlock, - opts..., - ) -} - -func newEngine( - ctx context.Context, - bs bstore.Blockstore, - peerTagger PeerTagger, - self peer.ID, - maxReplaceSize int, - opts ...Option, ) *Engine { e := &Engine{ scoreLedger: NewDefaultScoreLedger(), @@ -404,7 +394,7 @@ func newEngine( outbox: make(chan (<-chan *Envelope), outboxChanBuffer), workSignal: make(chan struct{}, 1), ticker: time.NewTicker(time.Millisecond * 100), - maxBlockSizeReplaceHasWithBlock: maxReplaceSize, + replaceHasWithBlockMaxSize: defaultReplaceHasWithBlockMaxSize, taskWorkerCount: defaults.BitswapEngineTaskWorkerCount, sendDontHaves: true, self: self, @@ -689,7 +679,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return true } - noReplace := e.maxBlockSizeReplaceHasWithBlock == 0 + noReplace := e.replaceHasWithBlockMaxSize == 0 // Get block sizes for unique CIDs. wantKs := make([]cid.Cid, 0, len(wants)) @@ -1086,7 +1076,7 @@ func (e *Engine) PeerDisconnected(p peer.ID) { // If the want is a want-have, and it's below a certain size, send the full // block (instead of sending a HAVE) func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool { - return wantType == pb.Message_Wantlist_Block || blockSize <= e.maxBlockSizeReplaceHasWithBlock + return wantType == pb.Message_Wantlist_Block || blockSize <= e.replaceHasWithBlockMaxSize } func (e *Engine) numBytesSentTo(p peer.ID) uint64 { diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index 593bbde0f..111fb5c32 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -191,14 +191,8 @@ func newEngineForTesting( maxReplaceSize int, opts ...Option, ) *Engine { - return newEngine( - ctx, - bs, - peerTagger, - self, - maxReplaceSize, - opts..., - ) + opts = append(opts, WithReplaceHasWithBlockMaxSize(maxReplaceSize)) + return NewEngine(ctx, bs, peerTagger, self, opts...) } func TestOutboxClosedWhenEngineClosed(t *testing.T) { diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 85651a5ef..2d3367548 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -251,6 +251,19 @@ func HasBlockBufferSize(count int) Option { } } +// WithReplaceHasWithBlockMaxSize sets the maximum size of a block in bytes up +// to which we will replace a want-have with a want-block. Setting a size of 0 +// disables this want-have replacement and means that block sizes are not read +// for want-have requests. +func WithReplaceHasWithBlockMaxSize(maxSize int) Option { + if maxSize < 0 { + maxSize = 0 + } + return func(bs *Server) { + bs.engineOptions = append(bs.engineOptions, decision.WithReplaceHasWithBlockMaxSize(maxSize)) + } +} + // WantlistForPeer returns the currently understood list of blocks requested by a // given peer. func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid { From 235a39fae22761bb4a4dba1d3d3b78ef61cec1ee Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 13 Sep 2024 18:50:41 -0700 Subject: [PATCH 3/9] do not create blockSizes man when not needed --- bitswap/server/internal/decision/engine.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index f215c4782..04a3b8960 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -702,11 +702,14 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap log.Info("aborting message processing", err) return false } - if blockSizes == nil { - blockSizes = make(map[cid.Cid]int, len(hasBlocks)) - } - for blkCid := range hasBlocks { - blockSizes[blkCid] = 0 + if len(hasBlocks) != 0 { + if blockSizes == nil { + blockSizes = make(map[cid.Cid]int, len(hasBlocks)) + } + for blkCid := range hasBlocks { + blockSizes[blkCid] = 0 + fmt.Println(" block cid:", blkCid) + } } } From ff77293b4d47cc3a408c32d8eef304192316423e Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 13 Sep 2024 19:00:10 -0700 Subject: [PATCH 4/9] log about replace logic enabled/disabled --- bitswap/server/internal/decision/engine.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 04a3b8960..9aced6b71 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -435,6 +435,12 @@ func NewEngine( e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...) + if e.replaceHasWithBlockMaxSize == 0 { + log.Info("Replace WantHave with WantBlock is disabled") + } else { + log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.replaceHasWithBlockMaxSize) + } + return e } From 1364a16755c3a70352d3f56ec42cc5308a7cbe34 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 20 Sep 2024 10:16:45 -0700 Subject: [PATCH 5/9] Remove debug print --- bitswap/server/internal/decision/engine.go | 1 - 1 file changed, 1 deletion(-) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 9aced6b71..f3b6c5081 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -714,7 +714,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap } for blkCid := range hasBlocks { blockSizes[blkCid] = 0 - fmt.Println(" block cid:", blkCid) } } } From 639ef65ebcb1bf3ca047ae45c0788b4a2fa642e5 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 20 Sep 2024 12:38:56 -0700 Subject: [PATCH 6/9] Improving option name and make consistent --- CHANGELOG.md | 2 +- bitswap/options.go | 12 ++++---- bitswap/server/internal/decision/engine.go | 30 +++++++++---------- .../server/internal/decision/engine_test.go | 4 +-- bitswap/server/server.go | 16 +++++----- 5 files changed, 32 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f34e2358..f1215dcc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ The following emojis are used to highlight certain changes: ### Added * `boxo/bitswap/server`: - * A new `WithReplaceHasWithBlockMaxSize(n)` option can be used with `bitswap.New`. It sets the maximum size of a block in bytes up to which we will replace a want-have with a want-block. Setting a size of 0 disables this want-have replacement and means that block sizes are not read for want-have requests. + * A new `WithWantHaveReplaceSize(n)` option can be used with `bitswap.New`. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. ### Changed diff --git a/bitswap/options.go b/bitswap/options.go index 40bd1866f..cab37e708 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -71,12 +71,12 @@ func WithTaskComparator(comparator server.TaskComparator) Option { return Option{server.WithTaskComparator(comparator)} } -// WithReplaceHasWithBlockMaxSize sets the maximum size of a block in bytes up -// to which we will replace a want-have with a want-block. Setting a size of 0 -// disables this want-have replacement and means that block sizes are not read -// for want-have requests. -func WithReplaceHasWithBlockMaxSize(maxSize int) Option { - return Option{server.WithReplaceHasWithBlockMaxSize(maxSize)} +// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to +// which the bitswap server will replace a WantHave with a WantBlock response. +// Setting this to 0 disables this WantHave replacement and means that block +// sizes are not read when processing WantHave requests. +func WithWantHaveReplaceSize(size int) Option { + return Option{server.WithWantHaveReplaceSize(size)} } func ProviderSearchDelay(newProvSearchDelay time.Duration) Option { diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index f3b6c5081..9164ba1af 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -78,9 +78,9 @@ const ( // on their behalf. queuedTagWeight = 10 - // defaultReplaceHasWithBlockMaxSize is the default maximum size of the - // block in bytes up to which we will replace a want-have with a want-block - defaultReplaceHasWithBlockMaxSize = 1024 + // defaultWantHave ReplaceSize is the default maximum size of the block in + // bytes up to which we will replace a WantHave with a WantBlock response. + defaultWantHaveReplaceSize = 1024 ) // Envelope contains a message for a Peer. @@ -202,9 +202,9 @@ type Engine struct { targetMessageSize int - // replaceHasWithBlockMaxSize is the maximum size of the block in - // bytes up to which we will replace a want-have with a want-block - replaceHasWithBlockMaxSize int + // wantHaveReplaceSize is the maximum size of the block in bytes up to + // which to replace a WantHave with a WantBlock. + wantHaveReplaceSize int sendDontHaves bool @@ -343,11 +343,11 @@ func WithSetSendDontHave(send bool) Option { } } -// WithReplaceHasWithBlockMaxSize sets the maximum size of a block in bytes up -// to which we will replace a want-have with a want-block. -func WithReplaceHasWithBlockMaxSize(maxSize int) Option { +// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to +// which to replace a WantHave with a WantBlock response. +func WithWantHaveReplaceSize(size int) Option { return func(e *Engine) { - e.replaceHasWithBlockMaxSize = maxSize + e.wantHaveReplaceSize = size } } @@ -394,7 +394,7 @@ func NewEngine( outbox: make(chan (<-chan *Envelope), outboxChanBuffer), workSignal: make(chan struct{}, 1), ticker: time.NewTicker(time.Millisecond * 100), - replaceHasWithBlockMaxSize: defaultReplaceHasWithBlockMaxSize, + wantHaveReplaceSize: defaultWantHaveReplaceSize, taskWorkerCount: defaults.BitswapEngineTaskWorkerCount, sendDontHaves: true, self: self, @@ -435,10 +435,10 @@ func NewEngine( e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...) - if e.replaceHasWithBlockMaxSize == 0 { + if e.wantHaveReplaceSize == 0 { log.Info("Replace WantHave with WantBlock is disabled") } else { - log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.replaceHasWithBlockMaxSize) + log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.wantHaveReplaceSize) } return e @@ -685,7 +685,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return true } - noReplace := e.replaceHasWithBlockMaxSize == 0 + noReplace := e.wantHaveReplaceSize == 0 // Get block sizes for unique CIDs. wantKs := make([]cid.Cid, 0, len(wants)) @@ -1084,7 +1084,7 @@ func (e *Engine) PeerDisconnected(p peer.ID) { // If the want is a want-have, and it's below a certain size, send the full // block (instead of sending a HAVE) func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool { - return wantType == pb.Message_Wantlist_Block || blockSize <= e.replaceHasWithBlockMaxSize + return wantType == pb.Message_Wantlist_Block || blockSize <= e.wantHaveReplaceSize } func (e *Engine) numBytesSentTo(p peer.ID) uint64 { diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index 111fb5c32..5cc1375c7 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -188,10 +188,10 @@ func newEngineForTesting( bs blockstore.Blockstore, peerTagger PeerTagger, self peer.ID, - maxReplaceSize int, + wantHaveReplaceSize int, opts ...Option, ) *Engine { - opts = append(opts, WithReplaceHasWithBlockMaxSize(maxReplaceSize)) + opts = append(opts, WithWantHaveReplaceSize(wantHaveReplaceSize)) return NewEngine(ctx, bs, peerTagger, self, opts...) } diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 2d3367548..a06db723f 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -251,16 +251,16 @@ func HasBlockBufferSize(count int) Option { } } -// WithReplaceHasWithBlockMaxSize sets the maximum size of a block in bytes up -// to which we will replace a want-have with a want-block. Setting a size of 0 -// disables this want-have replacement and means that block sizes are not read -// for want-have requests. -func WithReplaceHasWithBlockMaxSize(maxSize int) Option { - if maxSize < 0 { - maxSize = 0 +// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to +// which to replace a WantHave with a WantBlock. Setting to 0 disables this +// WantHave replacement and means that block sizes are not read when processing +// WantHave requests. +func WithWantHaveReplaceSize(size int) Option { + if size < 0 { + size = 0 } return func(bs *Server) { - bs.engineOptions = append(bs.engineOptions, decision.WithReplaceHasWithBlockMaxSize(maxSize)) + bs.engineOptions = append(bs.engineOptions, decision.WithWantHaveReplaceSize(size)) } } From 6555990dcef7f2baab19b91dfe69487024bd0fb7 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 26 Sep 2024 23:30:11 +0200 Subject: [PATCH 7/9] docs: WithWantHaveReplaceSize --- CHANGELOG.md | 2 +- bitswap/options.go | 3 +-- bitswap/server/server.go | 24 +++++++++++++++++++++--- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1215dcc9..5f015fd11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ The following emojis are used to highlight certain changes: ### Added * `boxo/bitswap/server`: - * A new `WithWantHaveReplaceSize(n)` option can be used with `bitswap.New`. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. + * A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672) ### Changed diff --git a/bitswap/options.go b/bitswap/options.go index cab37e708..6a98b27db 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -73,8 +73,7 @@ func WithTaskComparator(comparator server.TaskComparator) Option { // WithWantHaveReplaceSize sets the maximum size of a block in bytes up to // which the bitswap server will replace a WantHave with a WantBlock response. -// Setting this to 0 disables this WantHave replacement and means that block -// sizes are not read when processing WantHave requests. +// See [server.WithWantHaveReplaceSize] for details. func WithWantHaveReplaceSize(size int) Option { return Option{server.WithWantHaveReplaceSize(size)} } diff --git a/bitswap/server/server.go b/bitswap/server/server.go index a06db723f..8dbdfd701 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -252,9 +252,27 @@ func HasBlockBufferSize(count int) Option { } // WithWantHaveReplaceSize sets the maximum size of a block in bytes up to -// which to replace a WantHave with a WantBlock. Setting to 0 disables this -// WantHave replacement and means that block sizes are not read when processing -// WantHave requests. +// which the bitswap server will replace a WantHave with a WantBlock response. +// +// Behavior: +// - If size > 0: The server may send full blocks instead of just confirming possession +// for blocks up to the specified size. +// - If size = 0: WantHave replacement is disabled entirely. This allows the server to +// skip reading block sizes during WantHave request processing, which can be more +// efficient if the data storage bills "possession" checks and "reads" differently. +// +// Performance considerations: +// - Enabling replacement (size > 0) may reduce network round-trips but requires +// checking block sizes for each WantHave request to decide if replacement should occur. +// - Disabling replacement (size = 0) optimizes server performance by avoiding +// block size checks, potentially reducing infrastructure costs if possession checks +// are less expensive than full reads. +// +// The default implicit behavior is unspecified and may change in future releases. +// +// Use this option to set explicit behavior to balance between network +// efficiency, server performance, and potential storage cost optimizations +// based on your specific use case and storage backend. func WithWantHaveReplaceSize(size int) Option { if size < 0 { size = 0 From 1978d3c54da4db9c9dd999b5932446272fe5e8e6 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 26 Sep 2024 23:57:52 +0200 Subject: [PATCH 8/9] chore: fix logged func name --- bitswap/server/internal/decision/blockstoremanager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bitswap/server/internal/decision/blockstoremanager.go b/bitswap/server/internal/decision/blockstoremanager.go index 6de28d2a7..d4c0f4254 100644 --- a/bitswap/server/internal/decision/blockstoremanager.go +++ b/bitswap/server/internal/decision/blockstoremanager.go @@ -132,7 +132,7 @@ func (bsm *blockstoreManager) hasBlocks(ctx context.Context, ks []cid.Cid) (map[ has, err := bsm.bs.Has(ctx, c) if err != nil { // Note: this isn't a fatal error. We shouldn't abort the request - log.Errorf("blockstore.GetSize(%s) error: %s", c, err) + log.Errorf("blockstore.Has(%c) error: %s", c, err) return } if has { From db2d8bcdf6be27867c98afac3cdcc63c8d93dde0 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Fri, 27 Sep 2024 00:19:58 +0200 Subject: [PATCH 9/9] refactor: defaults.DefaultWantHaveReplaceSize bare minimum to make it possible to discover current implicit default by reading godocs --- bitswap/internal/defaults/defaults.go | 3 +++ bitswap/server/internal/decision/engine.go | 6 +----- bitswap/server/server.go | 3 ++- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/bitswap/internal/defaults/defaults.go b/bitswap/internal/defaults/defaults.go index f5511cc7a..b30bcc87f 100644 --- a/bitswap/internal/defaults/defaults.go +++ b/bitswap/internal/defaults/defaults.go @@ -37,4 +37,7 @@ const ( // RebroadcastDelay is the default delay to trigger broadcast of // random CIDs in the wantlist. RebroadcastDelay = time.Minute + + // DefaultWantHaveReplaceSize controls the implicit behavior of WithWantHaveReplaceSize. + DefaultWantHaveReplaceSize = 1024 ) diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 9164ba1af..5e4463e33 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -77,10 +77,6 @@ const ( // queuedTagWeight is the default weight for peers that have work queued // on their behalf. queuedTagWeight = 10 - - // defaultWantHave ReplaceSize is the default maximum size of the block in - // bytes up to which we will replace a WantHave with a WantBlock response. - defaultWantHaveReplaceSize = 1024 ) // Envelope contains a message for a Peer. @@ -394,7 +390,7 @@ func NewEngine( outbox: make(chan (<-chan *Envelope), outboxChanBuffer), workSignal: make(chan struct{}, 1), ticker: time.NewTicker(time.Millisecond * 100), - wantHaveReplaceSize: defaultWantHaveReplaceSize, + wantHaveReplaceSize: defaults.DefaultWantHaveReplaceSize, taskWorkerCount: defaults.BitswapEngineTaskWorkerCount, sendDontHaves: true, self: self, diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 8dbdfd701..46d29a8fc 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -268,7 +268,8 @@ func HasBlockBufferSize(count int) Option { // block size checks, potentially reducing infrastructure costs if possession checks // are less expensive than full reads. // -// The default implicit behavior is unspecified and may change in future releases. +// It defaults to [defaults.DefaultWantHaveReplaceSize] +// and the value may change in future releases. // // Use this option to set explicit behavior to balance between network // efficiency, server performance, and potential storage cost optimizations