Skip to content

Commit

Permalink
feat(share): EDSStore scaffolding (#1232)
Browse files Browse the repository at this point in the history
Co-authored-by: rene <41963722+renaynay@users.noreply.github.com>
Co-authored-by: Hlib Kanunnikov <hlibwondertan@gmail.com>
Closes #1107
Closes #1108
Closes #1110
Closes #1111
Closes #1112
  • Loading branch information
Ryan authored Nov 23, 2022
1 parent 06b9c0c commit e7ea1f1
Show file tree
Hide file tree
Showing 9 changed files with 488 additions and 10 deletions.
7 changes: 4 additions & 3 deletions api/docgen/openrpc.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Package docgen generates an OpenRPC spec for the Celestia Node. It has been inspired by and adapted from Filecoin's
// Lotus API implementation.
// Package docgen generates an OpenRPC spec for the Celestia Node. It has been inspired by and
// adapted from Filecoin's Lotus API implementation.
package docgen

import (
Expand Down Expand Up @@ -66,7 +66,8 @@ func ParseCommentsFromNodebuilderModules(moduleNames ...string) Comments {
v := &Visitor{make(map[string]ast.Node)}
ast.Walk(v, f)

// TODO(@distractedm1nd): An issue with this could be two methods with the same name in different modules
// TODO(@distractedm1nd): An issue with this could be two methods with the same name in different
// modules
for mn, node := range v.Methods {
filteredComments := cmap.Filter(node).Comments()
if len(filteredComments) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions api/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func implementsMarshaler(t *testing.T, typ reflect.Type) {

switch typ.Kind() {
case reflect.Struct:
// a user defined struct could implement json.Marshaler on the pointer receiver, so check there first.
// note that the "non-pointer" receiver is checked before the switch.
// a user defined struct could implement json.Marshaler on the pointer receiver, so check there
// first. note that the "non-pointer" receiver is checked before the switch.
pointerType := reflect.TypeOf(reflect.New(typ).Elem().Addr().Interface())
if pointerType.Implements(reflect.TypeOf(new(json.Marshaler)).Elem()) {
return
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/cosmos/cosmos-sdk/api v0.1.0
github.com/dgraph-io/badger/v2 v2.2007.4
github.com/etclabscore/go-openrpc-reflect v0.0.37
github.com/filecoin-project/dagstore v0.5.6
github.com/filecoin-project/go-jsonrpc v0.1.8
github.com/gammazero/workerpool v1.1.3
github.com/gogo/protobuf v1.3.3
Expand Down Expand Up @@ -188,6 +189,7 @@ require (
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-peertaskqueue v0.7.0 // indirect
github.com/ipfs/go-verifcid v0.0.1 // indirect
github.com/ipld/go-car/v2 v2.4.1 // indirect
github.com/ipld/go-codec-dagpb v1.3.1 // indirect
github.com/ipld/go-ipld-prime v0.16.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
Expand Down Expand Up @@ -249,6 +251,7 @@ require (
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.2 // indirect
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down Expand Up @@ -283,6 +286,7 @@ require (
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/ulikunitz/xz v0.5.8 // indirect
github.com/vivint/infectious v0.0.0-20200605153912-25a574ae18a3 // indirect
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158 // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
Expand Down Expand Up @@ -320,6 +324,7 @@ require (

replace (
github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.4.0-sdk-v0.46.0
github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20221014072825-395797efb659
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
github.com/libp2p/go-libp2p-pubsub v0.7.0 => github.com/celestiaorg/go-libp2p-pubsub v0.6.2-0.20220812132010-46b2a019f2f2
github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.5.0-tm-v0.34.20
Expand Down
67 changes: 67 additions & 0 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions nodebuilder/default_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/celestiaorg/celestia-node/state"
)

// PackageToDefaultImpl maps a package to its default implementation. Currently only used for method discovery for
// openrpc spec generation
// PackageToDefaultImpl maps a package to its default implementation. Currently only used for
// method discovery for openrpc spec generation
var PackageToDefaultImpl = map[string]interface{}{
"fraud": &fraud.ProofService{},
"state": &state.CoreAccessor{},
Expand Down
6 changes: 4 additions & 2 deletions nodebuilder/share/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ import (
//
//go:generate mockgen -destination=mocks/api.go -package=mocks . Module
type Module interface {
// SharesAvailable subjectively validates if Shares committed to the given Root are available on the Network.
// SharesAvailable subjectively validates if Shares committed to the given Root are available on
// the Network.
SharesAvailable(context.Context, *share.Root) error
// ProbabilityOfAvailability calculates the probability of the data square
// being available based on the number of samples collected.
ProbabilityOfAvailability() float64
GetShare(ctx context.Context, dah *share.Root, row, col int) (share.Share, error)
GetShares(ctx context.Context, root *share.Root) ([][]share.Share, error)
// GetSharesByNamespace iterates over a square's row roots and accumulates the found shares in the given namespace.ID.
// GetSharesByNamespace iterates over a square's row roots and accumulates the found shares in the
// given namespace.ID.
GetSharesByNamespace(ctx context.Context, root *share.Root, namespace namespace.ID) ([]share.Share, error)
}

Expand Down
3 changes: 2 additions & 1 deletion share/eds/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ type retrievalSession struct {
bget blockservice.BlockGetter
adder *ipld.NmtNodeAdder

// TODO(@Wondertan): Extract into a separate data structure https://github.com/celestiaorg/rsmt2d/issues/135
// TODO(@Wondertan): Extract into a separate data structure
// https://github.com/celestiaorg/rsmt2d/issues/135
squareQuadrants []*quadrant
squareCellsLks [][]sync.Mutex
squareCellsCount uint32
Expand Down
230 changes: 230 additions & 0 deletions share/eds/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package eds

import (
"context"
"fmt"
"io"
"os"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/index"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-datastore"

"github.com/celestiaorg/celestia-node/share"

"github.com/celestiaorg/rsmt2d"
)

const (
blocksPath = "/blocks/"
indexPath = "/index/"
transientsPath = "/transients/"
)

// Store maintains (via DAGStore) a top-level index enabling granular and efficient random access to
// every share and/or Merkle proof over every registered CARv1 file. The EDSStore provides a custom
// Blockstore interface implementation to achieve access. The main use-case is randomized sampling
// over the whole chain of EDS block data and getting data by namespace.
type Store struct {
dgstr *dagstore.DAGStore
mounts *mount.Registry

topIdx index.Inverted
carIdx index.FullIndexRepo

basepath string
}

// NewStore creates a new EDS Store under the given basepath and datastore.
func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
err := setupPath(basepath)
if err != nil {
return nil, fmt.Errorf("failed to setup eds.Store directories: %w", err)
}

r := mount.NewRegistry()
err = r.Register("fs", &mount.FSMount{FS: os.DirFS(basepath + blocksPath)})
if err != nil {
return nil, fmt.Errorf("failed to register FS mount on the registry: %w", err)
}

fsRepo, err := index.NewFSRepo(basepath + indexPath)
if err != nil {
return nil, fmt.Errorf("failed to create index repository: %w", err)
}

invertedRepo := index.NewInverted(ds)
dagStore, err := dagstore.NewDAGStore(
dagstore.Config{
TransientsDir: basepath + transientsPath,
IndexRepo: fsRepo,
Datastore: ds,
MountRegistry: r,
TopLevelIndex: invertedRepo,
},
)
if err != nil {
return nil, fmt.Errorf("failed to create DAGStore: %w", err)
}

return &Store{
basepath: basepath,
dgstr: dagStore,
topIdx: invertedRepo,
carIdx: fsRepo,
mounts: r,
}, nil
}

// Start starts the underlying DAGStore.
func (s *Store) Start(ctx context.Context) error {
return s.dgstr.Start(ctx)
}

// Stop stops the underlying DAGStore.
func (s *Store) Stop() error {
return s.dgstr.Close()
}

// Put stores the given data square with DataRoot's hash as a key.
//
// The square is verified on the Exchange level, and Put only stores the square, trusting it.
// The resulting file stores all the shares and NMT Merkle Proofs of the EDS.
// Additionally, the file gets indexed s.t. store.Blockstore can access them.
func (s *Store) Put(ctx context.Context, root share.Root, square *rsmt2d.ExtendedDataSquare) error {
key := root.String()
f, err := os.OpenFile(s.basepath+blocksPath+key, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}

err = WriteEDS(ctx, square, f)
if err != nil {
return fmt.Errorf("failed to write EDS to file: %w", err)
}

ch := make(chan dagstore.ShardResult, 1)
err = s.dgstr.RegisterShard(ctx, shard.KeyFromString(key), &mount.FSMount{
FS: os.DirFS(s.basepath + blocksPath),
Path: key,
}, ch, dagstore.RegisterOpts{})
if err != nil {
return fmt.Errorf("failed to initiate shard registration: %w", err)
}

select {
case <-ctx.Done():
return ctx.Err()
case result := <-ch:
if result.Error != nil {
return fmt.Errorf("failed to register shard: %w", result.Error)
}
return nil
}
}

// GetCAR takes a DataRoot and returns a buffered reader to the respective EDS serialized as a
// CARv1 file.
// The Reader strictly reads the CAR header and first quadrant (1/4) of the EDS, omitting all the
// NMT Merkle proofs. Integrity of the store data is not verified.
//
// Caller must Close returned reader after reading.
func (s *Store) GetCAR(ctx context.Context, root share.Root) (io.ReadCloser, error) {
key := root.String()

ch := make(chan dagstore.ShardResult, 1)
err := s.dgstr.AcquireShard(ctx, shard.KeyFromString(key), ch, dagstore.AcquireOpts{})
if err != nil {
return nil, fmt.Errorf("failed to initiate shard acquisition: %w", err)
}

select {
case <-ctx.Done():
return nil, ctx.Err()
case result := <-ch:
if result.Error != nil {
return nil, fmt.Errorf("failed to acquire shard: %w", result.Error)
}
return result.Accessor, nil
}
}

// Remove removes EDS from Store by the given share.Root and cleans up all the indexing.
func (s *Store) Remove(ctx context.Context, root share.Root) error {
key := root.String()

ch := make(chan dagstore.ShardResult, 1)
err := s.dgstr.DestroyShard(ctx, shard.KeyFromString(key), ch, dagstore.DestroyOpts{})
if err != nil {
return fmt.Errorf("failed to initiate shard destruction: %w", err)
}

select {
case result := <-ch:
if result.Error != nil {
return fmt.Errorf("failed to destroy shard: %w", result.Error)
}
case <-ctx.Done():
return ctx.Err()
}

dropped, err := s.carIdx.DropFullIndex(shard.KeyFromString(key))
if !dropped {
log.Warnf("failed to drop index for %s", key)
}
if err != nil {
return fmt.Errorf("failed to drop index for %s: %w", key, err)
}

err = os.Remove(s.basepath + blocksPath + key)
if err != nil {
return fmt.Errorf("failed to remove CAR file: %w", err)
}
return nil
}

// Get reads EDS out of Store by given DataRoot.
//
// It reads only one quadrant(1/4) of the EDS and verifies the integrity of the stored data by
// recomputing it.
func (s *Store) Get(ctx context.Context, root share.Root) (*rsmt2d.ExtendedDataSquare, error) {
f, err := s.GetCAR(ctx, root)
if err != nil {
return nil, fmt.Errorf("failed to get CAR file: %w", err)
}
eds, err := ReadEDS(ctx, f, root)
if err != nil {
return nil, fmt.Errorf("failed to read EDS from CAR file: %w", err)
}
return eds, nil
}

// Has checks if EDS exists by the given share.Root.
func (s *Store) Has(ctx context.Context, root share.Root) (bool, error) {
key := root.String()
info, err := s.dgstr.GetShardInfo(shard.KeyFromString(key))
if err == dagstore.ErrShardUnknown {
return false, err
}

return true, info.Error
}

func setupPath(basepath string) error {
perms := os.FileMode(0755)
err := os.Mkdir(basepath+blocksPath, perms)
if err != nil {
return fmt.Errorf("failed to create blocks directory: %w", err)
}
err = os.Mkdir(basepath+transientsPath, perms)
if err != nil {
return fmt.Errorf("failed to create transients directory: %w", err)
}
err = os.Mkdir(basepath+indexPath, perms)
if err != nil {
return fmt.Errorf("failed to create index directory: %w", err)
}
return nil
}
Loading

0 comments on commit e7ea1f1

Please sign in to comment.