Skip to content

Commit

Permalink
feat(shwap): Shwap core integration (#3598)
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss authored Jul 31, 2024
1 parent c4e602a commit 8912fc0
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 95 deletions.
20 changes: 3 additions & 17 deletions core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package core

import (
"context"
"errors"
"fmt"

"github.com/filecoin-project/dagstore"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-app/app"
Expand All @@ -19,8 +17,7 @@ import (
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/store"
)

// extendBlock extends the given block data, returning the resulting
Expand Down Expand Up @@ -58,26 +55,15 @@ func storeEDS(
ctx context.Context,
eh *header.ExtendedHeader,
eds *rsmt2d.ExtendedDataSquare,
adder *ipld.ProofsAdder,
store *eds.Store,
store *store.Store,
window pruner.AvailabilityWindow,
) error {
if eds.Equals(share.EmptyEDS()) {
return nil
}

if !pruner.IsWithinAvailabilityWindow(eh.Time(), window) {
log.Debugw("skipping storage of historic block", "height", eh.Height())
return nil
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)

err := store.Put(ctx, share.DataHash(eh.DataHash), eds)
if errors.Is(err, dagstore.ErrShardExists) {
// block with given root already exists, return nil
return nil
}
err := store.Put(ctx, eh.DAH, eh.Height(), eds)
if err == nil {
log.Debugw("stored EDS for height", "height", eh.Height())
}
Expand Down
24 changes: 7 additions & 17 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,17 @@ import (
"golang.org/x/sync/errgroup"

libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/store"
)

const concurrencyLimit = 4

type Exchange struct {
fetcher *BlockFetcher
store *eds.Store
store *store.Store
construct header.ConstructFn

availabilityWindow pruner.AvailabilityWindow
Expand All @@ -31,7 +29,7 @@ type Exchange struct {

func NewExchange(
fetcher *BlockFetcher,
store *eds.Store,
store *store.Store,
construct header.ConstructFn,
opts ...Option,
) (*Exchange, error) {
Expand Down Expand Up @@ -135,11 +133,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
return nil, fmt.Errorf("fetching block info for height %d: %w", &block.Height, err)
}

// extend block data
adder := ipld.NewProofsAdder(int(block.Data.SquareSize), false)
defer adder.Purge()

eds, err := extendBlock(block.Data, block.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
eds, err := extendBlock(block.Data, block.Header.Version.App)
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err)
}
Expand All @@ -154,7 +148,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
&block.Height, hash, eh.Hash())
}

err = storeEDS(ctx, eh, eds, adder, ce.store, ce.availabilityWindow)
err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow)
if err != nil {
return nil, err
}
Expand All @@ -180,11 +174,7 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
}
log.Debugw("fetched signed block from core", "height", b.Header.Height)

// extend block data
adder := ipld.NewProofsAdder(int(b.Data.SquareSize), false)
defer adder.Purge()

eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
eds, err := extendBlock(b.Data, b.Header.Version.App)
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
}
Expand All @@ -194,7 +184,7 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}

err = storeEDS(ctx, eh, eds, adder, ce.store, ce.availabilityWindow)
err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow)
if err != nil {
return nil, err
}
Expand Down
58 changes: 19 additions & 39 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package core

import (
"bytes"
"context"
"testing"
"time"

"github.com/cosmos/cosmos-sdk/client/flags"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -17,7 +14,7 @@ import (
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/store"
)

func TestCoreExchange_RequestHeaders(t *testing.T) {
Expand All @@ -30,7 +27,8 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {

generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store := createStore(t)
store, err := store.NewStore(store.DefaultParameters(), t.TempDir())
require.NoError(t, err)

ce, err := NewExchange(fetcher, store, header.MakeExtendedHeader)
require.NoError(t, err)
Expand All @@ -56,7 +54,11 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
assert.Equal(t, expectedLastHeightInRange, headers[len(headers)-1].Height())

for _, h := range headers {
has, err := store.Has(ctx, h.DAH.Hash())
has, err := store.HasByHash(ctx, h.DAH.Hash())
require.NoError(t, err)
assert.True(t, has)

has, err = store.HasByHeight(ctx, h.Height())
require.NoError(t, err)
assert.True(t, has)
}
Expand All @@ -74,7 +76,8 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {

generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store := createStore(t)
store, err := store.NewStore(store.DefaultParameters(), t.TempDir())
require.NoError(t, err)

ce, err := NewExchange(
fetcher,
Expand All @@ -96,10 +99,15 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {

// ensure none of the "historic" EDSs were stored
for _, h := range headers {
if bytes.Equal(h.DataHash, share.EmptyEDSRoots().Hash()) {
has, err := store.HasByHeight(ctx, h.Height())
require.NoError(t, err)
assert.False(t, has)

// empty EDSs are expected to exist in the store, so we skip them
if h.DAH.Equals(share.EmptyEDSRoots()) {
continue
}
has, err := store.Has(ctx, h.DAH.Hash())
has, err = store.HasByHash(ctx, h.DAH.Hash())
require.NoError(t, err)
assert.False(t, has)
}
Expand All @@ -114,32 +122,6 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn
return NewBlockFetcher(cctx.Client), cctx
}

func createStore(t *testing.T) *eds.Store {
t.Helper()

storeCfg := eds.DefaultParameters()
store, err := eds.NewStore(storeCfg, t.TempDir(), ds_sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = store.Start(ctx)
require.NoError(t, err)

// store an empty square to initialize EDS store
eds := share.EmptyEDS()
err = store.Put(ctx, share.EmptyEDSRoots().Hash(), eds)
require.NoError(t, err)

t.Cleanup(func() {
err = store.Stop(ctx)
require.NoError(t, err)
})

return store
}

// fillBlocks fills blocks until the context is canceled.
func fillBlocks(
t *testing.T,
Expand Down Expand Up @@ -187,10 +169,8 @@ func generateNonEmptyBlocks(
case b, ok := <-sub:
require.True(t, ok)

if !bytes.Equal(b.Data.Hash(), share.EmptyEDSRoots().Hash()) {
hashes = append(hashes, share.DataHash(b.Data.Hash()))
i++
}
hashes = append(hashes, share.DataHash(b.Data.Hash()))
i++
case <-ctx.Done():
t.Fatal("failed to fill blocks within timeout")
}
Expand Down
17 changes: 6 additions & 11 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ import (
"go.opentelemetry.io/otel/attribute"

libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
"github.com/celestiaorg/celestia-node/store"
)

var (
Expand All @@ -39,7 +37,7 @@ type Listener struct {
fetcher *BlockFetcher

construct header.ConstructFn
store *eds.Store
store *store.Store
availabilityWindow pruner.AvailabilityWindow

headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
Expand All @@ -58,7 +56,7 @@ func NewListener(
fetcher *BlockFetcher,
hashBroadcaster shrexsub.BroadcastFn,
construct header.ConstructFn,
store *eds.Store,
store *store.Store,
blocktime time.Duration,
opts ...Option,
) (*Listener, error) {
Expand Down Expand Up @@ -214,11 +212,8 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
span.SetAttributes(
attribute.Int64("height", b.Header.Height),
)
// extend block data
adder := ipld.NewProofsAdder(int(b.Data.SquareSize), false)
defer adder.Purge()

eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
eds, err := extendBlock(b.Data, b.Header.Version.App)
if err != nil {
return fmt.Errorf("extending block data: %w", err)
}
Expand All @@ -229,7 +224,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
panic(fmt.Errorf("making extended header: %w", err))
}

err = storeEDS(ctx, eh, eds, adder, cl.store, cl.availabilityWindow)
err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow)
if err != nil {
return fmt.Errorf("storing EDS: %w", err)
}
Expand Down
12 changes: 9 additions & 3 deletions core/listener_no_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/store"
)

// TestListenerWithNonEmptyBlocks ensures that non-empty blocks are actually
Expand All @@ -29,11 +30,12 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {
fetcher, cctx := createCoreFetcher(t, cfg)
eds := createEdsPubSub(ctx, t)

store := createStore(t)
store, err := store.NewStore(store.DefaultParameters(), t.TempDir())
require.NoError(t, err)

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, store, testChainID)
err := cl.Start(ctx)
err = cl.Start(ctx)
require.NoError(t, err)

// listen for eds hashes broadcasted through eds-sub and ensure store has
Expand All @@ -54,7 +56,11 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {
continue
}

has, err := store.Has(ctx, msg.DataHash)
has, err := store.HasByHash(ctx, msg.DataHash)
require.NoError(t, err)
require.True(t, has)

has, err = store.HasByHeight(ctx, msg.Height)
require.NoError(t, err)
require.True(t, has)
}
Expand Down
Loading

0 comments on commit 8912fc0

Please sign in to comment.