Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eds/store): store.GetDAH #1511

Merged
merged 12 commits into from
Dec 22, 2022
23 changes: 19 additions & 4 deletions docs/adr/adr-011-blocksync-overhaul-part-1.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,10 @@ func (s *Store) Blockstore() blockstore.Blockstore

##### `eds.Store.CARBlockstore`

`CARBlockstore` method returns a [`Blockstore`][blockstore] interface implementation instance, providing random access
over share and NMT Merkle proof in a specific EDS identified by DataHash. It is required for FNs/BNs to enable [reading
data by namespace](#reading-data-by-namespace).
`CARBlockstore` method returns a read-only [`Blockstore`][blockstore] interface implementation
instance to provide random access over share and NMT Merkle proof in a specific EDS identified by
DataHash, along with its corresponding DAH. It is required for FNs/BNs to enable [reading data by
namespace](#reading-data-by-namespace).

___NOTES:___

Expand All @@ -286,7 +287,21 @@ ___NOTES:___
// CARBlockstore returns an IPFS Blockstore providing access to individual shares/nodes of a specific EDS identified by
// DataHash and registered on the Store. NOTE: The Blockstore does not store whole Celestia Blocks but IPFS blocks.
// We represent `shares` and NMT Merkle proofs as IPFS blocks and IPLD nodes so Bitswap can access those.
func (s *Store) CARBlockstore(DataHash) (blockstore.Blockstore, error)
func (s *Store) CARBlockstore(context.Context, DataHash) (dagstore.ReadBlockstore, error)
```

##### `eds.Store.GetDAH`

The `GetDAH` method returns the DAH (`share.Root`) of the EDS identified by `DataHash`. Internally it:

- Acquires a `ShardAccessor` for the corresponding shard
- Reads the CAR Header from the accessor
- Converts the header's root CIDs into a `share.Root`
- Verifies the integrity of the `share.Root` by comparing it with the `DataHash`

```go
// GetDAH returns the DataAvailabilityHeader for the EDS identified by DataHash.
func (s *Store) GetDAH(context.Context, share.DataHash) (*share.Root, error)
```

##### `eds.Store.Get`
Expand Down
2 changes: 1 addition & 1 deletion share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (d

// a share can exist in multiple EDSes, so just take the first one.
shardKey := keys[0]
accessor, err := bs.store.getAccessor(ctx, shardKey)
accessor, err := bs.store.getCachedAccessor(ctx, shardKey)
if err != nil {
return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err)
}
Expand Down
4 changes: 4 additions & 0 deletions share/eds/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func TestBlockstore_Operations(t *testing.T) {
carBS, err := edsStore.CARBlockstore(ctx, dah.Hash())
require.NoError(t, err)

root, err := edsStore.GetDAH(ctx, dah.Hash())
require.NoError(t, err)
require.True(t, dah.Equals(root))

blockstores := []dagstore.ReadBlockstore{topLevelBS, carBS}

for {
Expand Down
85 changes: 68 additions & 17 deletions share/eds/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package eds

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
Expand All @@ -14,10 +16,13 @@ import (
"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
carv1 "github.com/ipld/go-car"

"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/ipld"
)

const (
Expand Down Expand Up @@ -186,7 +191,7 @@ func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.ReadCloser,
if err != nil {
return nil, fmt.Errorf("failed to get accessor: %w", err)
}
return accessor.sa, nil
return accessor, nil
}

// Blockstore returns an IPFS blockstore providing access to individual shares/nodes of all EDS
Expand All @@ -197,31 +202,59 @@ func (s *Store) Blockstore() bstore.Blockstore {
return s.bs
}

// CARBlockstore returns the IPFS blockstore that provides access to the IPLD blocks stored in an
// individual CAR file.
func (s *Store) CARBlockstore(ctx context.Context, dataHash []byte) (dagstore.ReadBlockstore, error) {
key := shard.KeyFromString(fmt.Sprintf("%X", dataHash))
// CARBlockstore returns an IPFS Blockstore providing access to individual shares/nodes of a
// specific EDS identified by DataHash and registered on the Store. NOTE: The Blockstore does not
// store whole Celestia Blocks but IPFS blocks. We represent `shares` and NMT Merkle proofs as IPFS
// blocks and IPLD nodes so Bitswap can access those.
func (s *Store) CARBlockstore(
ctx context.Context,
root share.DataHash,
) (dagstore.ReadBlockstore, error) {
key := shard.KeyFromString(root.String())
accessor, err := s.getCachedAccessor(ctx, key)
if err != nil {
return nil, fmt.Errorf("eds/store: failed to get accessor: %w", err)
}
return accessor.bs, nil
}

// GetDAH returns the DataAvailabilityHeader for the EDS identified by DataHash.
func (s *Store) GetDAH(ctx context.Context, root share.DataHash) (*share.Root, error) {
key := shard.KeyFromString(root.String())
accessor, err := s.getAccessor(ctx, key)
if err != nil {
return nil, err
return nil, fmt.Errorf("eds/store: failed to get accessor: %w", err)
}
defer accessor.Close()

return accessor.bs, nil
carHeader, err := carv1.ReadHeader(bufio.NewReader(accessor))
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("eds/store: failed to read car header: %w", err)
}

dah := dahFromCARHeader(carHeader)
if !bytes.Equal(dah.Hash(), root) {
walldiss marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("eds/store: content integrity mismatch from CAR for root %x", root)
}
return dah, nil
}

func (s *Store) getAccessor(ctx context.Context, key shard.Key) (*accessorWithBlockstore, error) {
// try to fetch from cache
accessor, err := s.cache.Get(key)
if err != nil && err != errCacheMiss {
log.Errorw("unexpected error while reading key from bs cache %s: %s", key, err)
// dahFromCARHeader returns the DataAvailabilityHeader stored in the CIDs of a CARv1 header.
func dahFromCARHeader(carHeader *carv1.CarHeader) *header.DataAvailabilityHeader {
rootCount := len(carHeader.Roots)
rootBytes := make([][]byte, 0)
for _, root := range carHeader.Roots {
rootBytes = append(rootBytes, ipld.NamespacedSha256FromCID(root))
}
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
if accessor != nil {
return accessor, nil
return &header.DataAvailabilityHeader{
RowsRoots: rootBytes[:rootCount/2],
ColumnRoots: rootBytes[rootCount/2:],
}
}

// wasn't found in cache, so acquire it and add to cache
func (s *Store) getAccessor(ctx context.Context, key shard.Key) (*dagstore.ShardAccessor, error) {
ch := make(chan dagstore.ShardResult, 1)
err = s.dgstr.AcquireShard(ctx, key, ch, dagstore.AcquireOpts{})
err := s.dgstr.AcquireShard(ctx, key, ch, dagstore.AcquireOpts{})
if err != nil {
return nil, fmt.Errorf("failed to initialize shard acquisition: %w", err)
}
Expand All @@ -231,12 +264,30 @@ func (s *Store) getAccessor(ctx context.Context, key shard.Key) (*accessorWithBl
if res.Error != nil {
return nil, fmt.Errorf("failed to acquire shard: %w", res.Error)
}
return s.cache.Add(key, res.Accessor)
return res.Accessor, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (s *Store) getCachedAccessor(ctx context.Context, key shard.Key) (*accessorWithBlockstore, error) {
// try to fetch from cache
accessor, err := s.cache.Get(key)
if err != nil && err != errCacheMiss {
log.Errorf("unexpected error while reading key from bs cache %s: %s", key, err)
}
if accessor != nil {
return accessor, nil
}

// wasn't found in cache, so acquire it and add to cache
shardAccessor, err := s.getAccessor(ctx, key)
if err != nil {
return nil, err
}
return s.cache.Add(key, shardAccessor)
}

// Remove removes EDS from Store by the given share.Root hash and cleans up all
// the indexing.
func (s *Store) Remove(ctx context.Context, root share.DataHash) error {
Expand Down
2 changes: 1 addition & 1 deletion share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func Test_BlockstoreCache(t *testing.T) {
assert.ErrorIs(t, err, errCacheMiss)

// now get it, so that the key is in the cache
_, err = edsStore.CARBlockstore(ctx, dah.Hash())
_, err = edsStore.getCachedAccessor(ctx, shardKey)
assert.NoError(t, err)
_, err = edsStore.cache.Get(shardKey)
assert.NoError(t, err, errCacheMiss)
Expand Down