Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(shwap/bitswap): Blockstore.GetSize: getting size with no compute #3894

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions share/shwap/p2p/bitswap/block_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

// EmptyBlock constructs an empty Block with type in the given CID.
func EmptyBlock(cid cid.Cid) (Block, error) {
spec, ok := specRegistry[cid.Prefix().MhType]
if !ok {
return nil, fmt.Errorf("unsupported Block type: %v", cid.Prefix().MhType)
spec, err := getSpec(cid)
if err != nil {
return nil, err
}

blk, err := spec.builder(cid)
Expand All @@ -23,27 +23,49 @@ func EmptyBlock(cid cid.Cid) (Block, error) {
return blk, nil
}

// maxBlockSize returns the maximum size of the Block type in the given CID.
func maxBlockSize(cid cid.Cid) (int, error) {
spec, err := getSpec(cid)
if err != nil {
return 0, err
}

return spec.maxSize, nil
}

// registerBlock registers the new Block type and multihash for it.
func registerBlock(mhcode, codec uint64, idSize int, bldrFn func(cid.Cid) (Block, error)) {
func registerBlock(mhcode, codec uint64, maxSize, idSize int, bldrFn func(cid.Cid) (Block, error)) {
mh.Register(mhcode, func() hash.Hash {
return &hasher{IDSize: idSize}
})
specRegistry[mhcode] = blockSpec{
specRegistry[codec] = blockSpec{
idSize: idSize,
codec: codec,
maxSize: maxSize,
mhCode: mhcode,
builder: bldrFn,
}
}

// getSpec returns the blockSpec for the given CID.
func getSpec(cid cid.Cid) (blockSpec, error) {
spec, ok := specRegistry[cid.Type()]
if !ok {
return blockSpec{}, fmt.Errorf("unsupported codec %d", cid.Type())
}

return spec, nil
}

// blockSpec holds constant metadata about particular Block types.
type blockSpec struct {
idSize int
codec uint64
maxSize int
mhCode uint64
builder func(cid.Cid) (Block, error)
}

func (spec *blockSpec) String() string {
return fmt.Sprintf("BlockSpec{IDSize: %d, Codec: %d}", spec.idSize, spec.codec)
return fmt.Sprintf("BlockSpec{IDSize: %d, MHCode: %d}", spec.idSize, spec.mhCode)
}

var specRegistry = make(map[uint64]blockSpec)
26 changes: 11 additions & 15 deletions share/shwap/p2p/bitswap/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Blockstore struct {
Getter AccessorGetter
}

func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
blk, err := EmptyBlock(cid)
if err != nil {
return nil, err
Expand All @@ -42,6 +42,7 @@ func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, e
if err != nil {
return nil, fmt.Errorf("getting EDS Accessor for height %v: %w", blk.Height(), err)
}

defer func() {
if err := acc.Close(); err != nil {
log.Warnf("failed to close EDS accessor for height %v: %s", blk.Height(), err)
Expand All @@ -55,24 +56,19 @@ func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, e
return convertBitswap(blk)
}

func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
blk, err := b.getBlock(ctx, cid)
if err != nil {
return nil, err
}

return blk, nil
}
func (b *Blockstore) GetSize(_ context.Context, cid cid.Cid) (int, error) {
// NOTE: Size is used as a weight for the incoming Bitswap requests. Bitswap uses fair scheduling for the requests
// and prioritizes peers with less *active* work. Active work of a peer is a cumulative weight of all the in-progress
// requests.

func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
// TODO(@Wondertan): Bitswap checks the size of the data(GetSize) before serving it via Get. This means
// GetSize may do an unnecessary read from disk which we can avoid by either caching on Blockstore level
// or returning constant size(we know at that point that we have requested data)
blk, err := b.Get(ctx, cid)
// Constant max block size is used instead of factual size. This avoids disk IO but equalizes the weights of the
// requests of the same type. E.g. row of 2MB EDS and row of 8MB EDS will have the same weight.
size, err := maxBlockSize(cid)
if err != nil {
return 0, err
}
return len(blk.RawData()), nil

return size, nil
}

func (b *Blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
Expand Down
4 changes: 3 additions & 1 deletion share/shwap/p2p/bitswap/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
const (
testCodec = 0x9999
testMultihashCode = 0x9999
testBlockSize = 256
testIDSize = 2
)

func init() {
registerBlock(
testMultihashCode,
testCodec,
testBlockSize,
testIDSize,
func(cid cid.Cid) (Block, error) {
return newEmptyTestBlock(cid)
Expand Down Expand Up @@ -67,7 +69,7 @@ type testBlock struct {
}

func newTestBlock(id int) *testBlock {
bytes := make([]byte, 256)
bytes := make([]byte, testBlockSize)
_, _ = crand.Read(bytes)
return &testBlock{id: testID(id), data: bytes}
}
Expand Down
16 changes: 9 additions & 7 deletions share/shwap/p2p/bitswap/cid.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ func encodeToCID(bm encoding.BinaryMarshaler, mhcode, codec uint64) cid.Cid {

// validateCID checks correctness of the CID.
func validateCID(cid cid.Cid) error {
prefix := cid.Prefix()
spec, ok := specRegistry[prefix.MhType]
if !ok {
return fmt.Errorf("unsupported multihash type %d", prefix.MhType)
spec, err := getSpec(cid)
if err != nil {
return err
}

if prefix.Codec != spec.codec {
return fmt.Errorf("invalid CID codec %d", prefix.Codec)
prefix := cid.Prefix()
if prefix.Version != 1 {
return fmt.Errorf("invalid cid version %d", prefix.Version)
}
if prefix.MhType != spec.mhCode {
return fmt.Errorf("invalid multihash type %d", prefix.MhType)
}

if prefix.MhLength != spec.idSize {
return fmt.Errorf("invalid multihash length %d", prefix.MhLength)
}
Expand Down
6 changes: 6 additions & 0 deletions share/shwap/p2p/bitswap/row_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ipfs/go-cid"

libshare "github.com/celestiaorg/go-square/v2/share"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/share"
Expand All @@ -22,10 +23,15 @@ const (
rowMultihashCode = 0x7801
)

// maxRowSize is the maximum size of the RowBlock.
// It is calculated as half of the square size multiplied by the share size.
var maxRowSize = share.MaxSquareSize / 2 * libshare.ShareSize

func init() {
registerBlock(
rowMultihashCode,
rowCodec,
maxRowSize,
shwap.RowIDSize,
func(cid cid.Cid) (Block, error) {
return EmptyRowBlockFromCID(cid)
Expand Down
4 changes: 4 additions & 0 deletions share/shwap/p2p/bitswap/row_namespace_data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ const (
rowNamespaceDataMultihashCode = 0x7821
)

// maxRNDSize is the maximum size of the RowNamespaceDataBlock.
var maxRNDSize = maxRowSize

func init() {
registerBlock(
rowNamespaceDataMultihashCode,
rowNamespaceDataCodec,
maxRNDSize,
shwap.RowNamespaceDataIDSize,
func(cid cid.Cid) (Block, error) {
return EmptyRowNamespaceDataBlockFromCID(cid)
Expand Down
8 changes: 8 additions & 0 deletions share/shwap/p2p/bitswap/sample_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package bitswap
import (
"context"
"fmt"
"math"

"github.com/ipfs/go-cid"

libshare "github.com/celestiaorg/go-square/v2/share"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/shwap"
Expand All @@ -21,10 +24,15 @@ const (
sampleMultihashCode = 0x7811
)

// maxSampleSize is the maximum size of the SampleBlock.
// It is calculated as the size of the share plus the size of the proof.
var maxSampleSize = libshare.ShareSize + share.AxisRootSize*int(math.Log2(float64(share.MaxSquareSize)))

func init() {
registerBlock(
sampleMultihashCode,
sampleCodec,
maxSampleSize,
shwap.SampleIDSize,
func(cid cid.Cid) (Block, error) {
return EmptySampleBlockFromCID(cid)
Expand Down