diff --git a/share/shwap/p2p/bitswap/block.go b/share/shwap/p2p/bitswap/block.go index 940afb5bc2..ed745ed25d 100644 --- a/share/shwap/p2p/bitswap/block.go +++ b/share/shwap/p2p/bitswap/block.go @@ -21,6 +21,10 @@ type Block interface { CID() cid.Cid // Height reports the Height of the Shwap container behind the Block. Height() uint64 + // Size reports expected size of the Block(without serialization overhead). + // Must support getting size when the Block is not populated or empty and strive to + // be low overhead. + Size(context.Context, eds.Accessor) (int, error) // Populate fills up the Block with the Shwap container getting it out of the EDS // Accessor. diff --git a/share/shwap/p2p/bitswap/block_store.go b/share/shwap/p2p/bitswap/block_store.go index 910a95c204..2e676b2f3a 100644 --- a/share/shwap/p2p/bitswap/block_store.go +++ b/share/shwap/p2p/bitswap/block_store.go @@ -28,19 +28,28 @@ type Blockstore struct { Getter AccessorGetter } -func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, error) { +func (b *Blockstore) getBlockAndAccessor(ctx context.Context, cid cid.Cid) (Block, eds.AccessorStreamer, error) { blk, err := EmptyBlock(cid) if err != nil { - return nil, err + return nil, nil, err } acc, err := b.Getter.GetByHeight(ctx, blk.Height()) if errors.Is(err, store.ErrNotFound) { log.Debugf("no EDS Accessor for height %v found", blk.Height()) - return nil, ipld.ErrNotFound{Cid: cid} + return nil, nil, ipld.ErrNotFound{Cid: cid} } if err != nil { - return nil, fmt.Errorf("getting EDS Accessor for height %v: %w", blk.Height(), err) + return nil, nil, fmt.Errorf("getting EDS Accessor for height %v: %w", blk.Height(), err) + } + + return blk, acc, nil +} + +func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { + blk, acc, err := b.getBlockAndAccessor(ctx, cid) + if err != nil { + return nil, err } defer func() { if err := acc.Close(); err != nil { @@ -55,24 +64,28 @@ func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, e return convertBitswap(blk) } -func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { - blk, err := b.getBlock(ctx, cid) +func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { + // NOTE: Bitswap prioritizes peers based on their active/pending work and the priority that peers set for requests(work) + // themselves. The prioritization happens on the Get operation of Blockstore not GetSize, while GetSize is expected + // to be as lightweight as possible. + // + // Here is the best case we only open the Accessor and getting its size, avoiding expensive compute to get the size. + blk, acc, err := b.getBlockAndAccessor(ctx, cid) if err != nil { - return nil, err + return 0, err } + defer func() { + if err := acc.Close(); err != nil { + log.Warnf("failed to close EDS accessor for height %v: %s", blk.Height(), err) + } + }() - return blk, nil -} - -func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { - // TODO(@Wondertan): Bitswap checks the size of the data(GetSize) before serving it via Get. This means - // GetSize may do an unnecessary read from disk which we can avoid by either caching on Blockstore level - // or returning constant size(we know at that point that we have requested data) - blk, err := b.Get(ctx, cid) + size, err := blk.Size(ctx, acc) if err != nil { - return 0, err + return 0, fmt.Errorf("getting block size: %w", err) } - return len(blk.RawData()), nil + + return size, nil } func (b *Blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) { diff --git a/share/shwap/p2p/bitswap/block_test.go b/share/shwap/p2p/bitswap/block_test.go index 39bb55bc14..f17c5a236b 100644 --- a/share/shwap/p2p/bitswap/block_test.go +++ b/share/shwap/p2p/bitswap/block_test.go @@ -95,6 +95,10 @@ func (t *testBlock) Height() uint64 { return 1 } +func (t *testBlock) Size(_ context.Context, _ eds.Accessor) (int, error) { + return len(t.data), nil +} + func (t *testBlock) Populate(context.Context, eds.Accessor) error { return nil // noop } diff --git a/share/shwap/p2p/bitswap/row_block.go b/share/shwap/p2p/bitswap/row_block.go index 4763f63954..bc3fee6dbb 100644 --- a/share/shwap/p2p/bitswap/row_block.go +++ b/share/shwap/p2p/bitswap/row_block.go @@ -6,6 +6,7 @@ import ( "github.com/ipfs/go-cid" + libshare "github.com/celestiaorg/go-square/v2/share" "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/share" @@ -72,6 +73,12 @@ func (rb *RowBlock) Height() uint64 { return rb.ID.Height } +func (rb *RowBlock) Size(ctx context.Context, acc eds.Accessor) (int, error) { + squareSize := acc.Size(ctx) + rowSize := libshare.ShareSize * squareSize / 2 + return rowSize, nil +} + func (rb *RowBlock) Marshal() ([]byte, error) { if rb.Container.IsEmpty() { return nil, fmt.Errorf("cannot marshal empty RowBlock") diff --git a/share/shwap/p2p/bitswap/row_namespace_data_block.go b/share/shwap/p2p/bitswap/row_namespace_data_block.go index 52ddbd9520..67315b4b8c 100644 --- a/share/shwap/p2p/bitswap/row_namespace_data_block.go +++ b/share/shwap/p2p/bitswap/row_namespace_data_block.go @@ -78,6 +78,23 @@ func (rndb *RowNamespaceDataBlock) Height() uint64 { return rndb.ID.Height } +func (rndb *RowNamespaceDataBlock) Size(ctx context.Context, acc eds.Accessor) (int, error) { + // no way to statically learn the size of requested data, so read it out and compute + // TODO(@Wondertan): Consider adding optimized RowNamespaceDataSize method to the Accessor + err := rndb.Populate(ctx, acc) + if err != nil { + return 0, err + } + + // TODO(@Wondertan): Avoid converting in favour of getting size just by looking at container + blk, err := convertBitswap(rndb) + if err != nil { + return 0, err + } + + return len(blk.RawData()), nil +} + func (rndb *RowNamespaceDataBlock) Marshal() ([]byte, error) { if rndb.Container.IsEmpty() { return nil, fmt.Errorf("cannot marshal empty RowNamespaceDataBlock") diff --git a/share/shwap/p2p/bitswap/sample_block.go b/share/shwap/p2p/bitswap/sample_block.go index 062369c0be..9d5263f91e 100644 --- a/share/shwap/p2p/bitswap/sample_block.go +++ b/share/shwap/p2p/bitswap/sample_block.go @@ -3,9 +3,12 @@ package bitswap import ( "context" "fmt" + "math" "github.com/ipfs/go-cid" + libshare "github.com/celestiaorg/go-square/v2/share" + "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/shwap" @@ -71,6 +74,12 @@ func (sb *SampleBlock) Height() uint64 { return sb.ID.Height } +func (sb *SampleBlock) Size(ctx context.Context, acc eds.Accessor) (int, error) { + squareSize := acc.Size(ctx) + sampleSize := libshare.ShareSize + share.AxisRootSize*int(math.Log2(float64(squareSize))) + return sampleSize, nil +} + func (sb *SampleBlock) Marshal() ([]byte, error) { if sb.Container.IsEmpty() { return nil, fmt.Errorf("cannot marshal empty SampleBlock")