diff --git a/share/shwap/p2p/bitswap/block.go b/share/shwap/p2p/bitswap/block.go index ed745ed25d..940afb5bc2 100644 --- a/share/shwap/p2p/bitswap/block.go +++ b/share/shwap/p2p/bitswap/block.go @@ -21,10 +21,6 @@ type Block interface { CID() cid.Cid // Height reports the Height of the Shwap container behind the Block. Height() uint64 - // Size reports expected size of the Block(without serialization overhead). - // Must support getting size when the Block is not populated or empty and strive to - // be low overhead. - Size(context.Context, eds.Accessor) (int, error) // Populate fills up the Block with the Shwap container getting it out of the EDS // Accessor. diff --git a/share/shwap/p2p/bitswap/block_registry.go b/share/shwap/p2p/bitswap/block_registry.go index 94a53bc7b9..1d45c2c59e 100644 --- a/share/shwap/p2p/bitswap/block_registry.go +++ b/share/shwap/p2p/bitswap/block_registry.go @@ -23,13 +23,24 @@ func EmptyBlock(cid cid.Cid) (Block, error) { return blk, nil } +// maxBlockSize returns the maximum size of the Block type in the given CID. +func maxBlockSize(cid cid.Cid) (int, error) { + spec, ok := specRegistry[cid.Prefix().MhType] + if !ok { + return -1, fmt.Errorf("unsupported Block type: %v", cid.Prefix().MhType) + } + + return spec.maxSize, nil +} + // registerBlock registers the new Block type and multihash for it. -func registerBlock(mhcode, codec uint64, idSize int, bldrFn func(cid.Cid) (Block, error)) { +func registerBlock(mhcode, codec uint64, maxSize, idSize int, bldrFn func(cid.Cid) (Block, error)) { mh.Register(mhcode, func() hash.Hash { return &hasher{IDSize: idSize} }) specRegistry[mhcode] = blockSpec{ idSize: idSize, + maxSize: maxSize, codec: codec, builder: bldrFn, } @@ -38,6 +49,7 @@ func registerBlock(mhcode, codec uint64, idSize int, bldrFn func(cid.Cid) (Block // blockSpec holds constant metadata about particular Block types. type blockSpec struct { idSize int + maxSize int codec uint64 builder func(cid.Cid) (Block, error) } diff --git a/share/shwap/p2p/bitswap/block_store.go b/share/shwap/p2p/bitswap/block_store.go index bed0166dfc..f13dc90695 100644 --- a/share/shwap/p2p/bitswap/block_store.go +++ b/share/shwap/p2p/bitswap/block_store.go @@ -64,25 +64,14 @@ func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) return convertBitswap(blk) } -func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { - // NOTE: Bitswap prioritizes peers based on their active/pending work and the priority that peers set for the work - // themselves, where the work is the Get operation. The prioritization happens only on the Get operation and not - // GetSize, while GetSize is expected to be as lightweight as possible. - // - // Here is the best case we only open the Accessor and getting its size, avoiding expensive compute to get the size. - blk, acc, err := b.getBlockAndAccessor(ctx, cid) - if err != nil { - return 0, err - } - defer func() { - if err := acc.Close(); err != nil { - log.Warnf("failed to close EDS accessor for height %v: %s", blk.Height(), err) - } - }() +func (b *Blockstore) GetSize(_ context.Context, cid cid.Cid) (int, error) { + // NOTE: Size is used as a weight for the incoming request to Bitswap. Bitswap uses fair scheduling for the requests + // and prioritize peers with less active work with work being cumulative weight of in process requests. - size, err := blk.Size(ctx, acc) + // By using constant max block size instead of real we equalize the weight of all the requests + size, err := maxBlockSize(cid) if err != nil { - return 0, fmt.Errorf("getting block size: %w", err) + return 0, err } return size, nil diff --git a/share/shwap/p2p/bitswap/block_test.go b/share/shwap/p2p/bitswap/block_test.go index f17c5a236b..e2d84141f1 100644 --- a/share/shwap/p2p/bitswap/block_test.go +++ b/share/shwap/p2p/bitswap/block_test.go @@ -20,6 +20,7 @@ import ( const ( testCodec = 0x9999 testMultihashCode = 0x9999 + testBlockSize = 256 testIDSize = 2 ) @@ -27,6 +28,7 @@ func init() { registerBlock( testMultihashCode, testCodec, + testBlockSize, testIDSize, func(cid cid.Cid) (Block, error) { return newEmptyTestBlock(cid) @@ -67,7 +69,7 @@ type testBlock struct { } func newTestBlock(id int) *testBlock { - bytes := make([]byte, 256) + bytes := make([]byte, testBlockSize) _, _ = crand.Read(bytes) return &testBlock{id: testID(id), data: bytes} } diff --git a/share/shwap/p2p/bitswap/row_block.go b/share/shwap/p2p/bitswap/row_block.go index bc3fee6dbb..4bf1966d8c 100644 --- a/share/shwap/p2p/bitswap/row_block.go +++ b/share/shwap/p2p/bitswap/row_block.go @@ -23,10 +23,15 @@ const ( rowMultihashCode = 0x7801 ) +// maxRowSize is the maximum size of the RowBlock. +// It is calculated as half of the square size multiplied by the share size. +var maxRowSize = share.MaxSquareSize / 2 * libshare.ShareSize + func init() { registerBlock( rowMultihashCode, rowCodec, + maxRowSize, shwap.RowIDSize, func(cid cid.Cid) (Block, error) { return EmptyRowBlockFromCID(cid) @@ -73,12 +78,6 @@ func (rb *RowBlock) Height() uint64 { return rb.ID.Height } -func (rb *RowBlock) Size(ctx context.Context, acc eds.Accessor) (int, error) { - squareSize := acc.Size(ctx) - rowSize := libshare.ShareSize * squareSize / 2 - return rowSize, nil -} - func (rb *RowBlock) Marshal() ([]byte, error) { if rb.Container.IsEmpty() { return nil, fmt.Errorf("cannot marshal empty RowBlock") diff --git a/share/shwap/p2p/bitswap/row_namespace_data_block.go b/share/shwap/p2p/bitswap/row_namespace_data_block.go index cfbdb56942..1f9d26907c 100644 --- a/share/shwap/p2p/bitswap/row_namespace_data_block.go +++ b/share/shwap/p2p/bitswap/row_namespace_data_block.go @@ -22,10 +22,14 @@ const ( rowNamespaceDataMultihashCode = 0x7821 ) +// maxRNDSize is the maximum size of the RowNamespaceDataBlock. +var maxRNDSize = maxRowSize + func init() { registerBlock( rowNamespaceDataMultihashCode, rowNamespaceDataCodec, + maxRNDSize, shwap.RowNamespaceDataIDSize, func(cid cid.Cid) (Block, error) { return EmptyRowNamespaceDataBlockFromCID(cid) @@ -78,23 +82,6 @@ func (rndb *RowNamespaceDataBlock) Height() uint64 { return rndb.ID.Height } -func (rndb *RowNamespaceDataBlock) Size(ctx context.Context, acc eds.Accessor) (int, error) { - // no way to statically learn the size of requested data, so read it out and compute - // TODO(@Wondertan): Consider adding optimized RowNamespaceDataSize method to the Accessor - err := rndb.Populate(ctx, acc) - if err != nil { - return 0, err - } - - // TODO(@Wondertan): Avoid converting in favor of getting size just by looking at container - blk, err := convertBitswap(rndb) - if err != nil { - return 0, err - } - - return len(blk.RawData()), nil -} - func (rndb *RowNamespaceDataBlock) Marshal() ([]byte, error) { if rndb.Container.IsEmpty() { return nil, fmt.Errorf("cannot marshal empty RowNamespaceDataBlock") diff --git a/share/shwap/p2p/bitswap/sample_block.go b/share/shwap/p2p/bitswap/sample_block.go index 9d5263f91e..0f25ccb454 100644 --- a/share/shwap/p2p/bitswap/sample_block.go +++ b/share/shwap/p2p/bitswap/sample_block.go @@ -24,10 +24,15 @@ const ( sampleMultihashCode = 0x7811 ) +// maxSampleSize is the maximum size of the SampleBlock. +// It is calculated as the size of the share plus the size of the proof. +var maxSampleSize = libshare.ShareSize + share.AxisRootSize*int(math.Log2(float64(share.MaxSquareSize))) + func init() { registerBlock( sampleMultihashCode, sampleCodec, + maxSampleSize, shwap.SampleIDSize, func(cid cid.Cid) (Block, error) { return EmptySampleBlockFromCID(cid) @@ -74,12 +79,6 @@ func (sb *SampleBlock) Height() uint64 { return sb.ID.Height } -func (sb *SampleBlock) Size(ctx context.Context, acc eds.Accessor) (int, error) { - squareSize := acc.Size(ctx) - sampleSize := libshare.ShareSize + share.AxisRootSize*int(math.Log2(float64(squareSize))) - return sampleSize, nil -} - func (sb *SampleBlock) Marshal() ([]byte, error) { if sb.Container.IsEmpty() { return nil, fmt.Errorf("cannot marshal empty SampleBlock")