Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap): Shwap core integration #3598

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 3 additions & 17 deletions core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package core

import (
"context"
"errors"
"fmt"

"github.com/filecoin-project/dagstore"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-app/app"
Expand All @@ -19,8 +17,7 @@ import (
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/store"
)

// extendBlock extends the given block data, returning the resulting
Expand Down Expand Up @@ -58,26 +55,15 @@ func storeEDS(
ctx context.Context,
eh *header.ExtendedHeader,
eds *rsmt2d.ExtendedDataSquare,
adder *ipld.ProofsAdder,
store *eds.Store,
store *store.Store,
window pruner.AvailabilityWindow,
) error {
if eds.Equals(share.EmptyEDS()) {
return nil
}

if !pruner.IsWithinAvailabilityWindow(eh.Time(), window) {
log.Debugw("skipping storage of historic block", "height", eh.Height())
return nil
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)

err := store.Put(ctx, share.DataHash(eh.DataHash), eds)
if errors.Is(err, dagstore.ErrShardExists) {
// block with given root already exists, return nil
return nil
}
err := store.Put(ctx, eh.DAH, eh.Height(), eds)
if err == nil {
log.Debugw("stored EDS for height", "height", eh.Height())
}
Expand Down
24 changes: 7 additions & 17 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,17 @@ import (
"golang.org/x/sync/errgroup"

libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/store"
)

const concurrencyLimit = 4

type Exchange struct {
fetcher *BlockFetcher
store *eds.Store
store *store.Store
construct header.ConstructFn

availabilityWindow pruner.AvailabilityWindow
Expand All @@ -31,7 +29,7 @@ type Exchange struct {

func NewExchange(
fetcher *BlockFetcher,
store *eds.Store,
store *store.Store,
construct header.ConstructFn,
opts ...Option,
) (*Exchange, error) {
Expand Down Expand Up @@ -135,11 +133,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
return nil, fmt.Errorf("fetching block info for height %d: %w", &block.Height, err)
}

// extend block data
adder := ipld.NewProofsAdder(int(block.Data.SquareSize), false)
defer adder.Purge()

eds, err := extendBlock(block.Data, block.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
eds, err := extendBlock(block.Data, block.Header.Version.App)
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err)
}
Expand All @@ -154,7 +148,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
&block.Height, hash, eh.Hash())
}

err = storeEDS(ctx, eh, eds, adder, ce.store, ce.availabilityWindow)
err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow)
if err != nil {
return nil, err
}
Expand All @@ -180,11 +174,7 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
}
log.Debugw("fetched signed block from core", "height", b.Header.Height)

// extend block data
adder := ipld.NewProofsAdder(int(b.Data.SquareSize), false)
defer adder.Purge()

eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
eds, err := extendBlock(b.Data, b.Header.Version.App)
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
}
Expand All @@ -194,7 +184,7 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}

err = storeEDS(ctx, eh, eds, adder, ce.store, ce.availabilityWindow)
err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow)
if err != nil {
return nil, err
}
Expand Down
58 changes: 19 additions & 39 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package core

import (
"bytes"
"context"
"testing"
"time"

"github.com/cosmos/cosmos-sdk/client/flags"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -17,7 +14,7 @@ import (
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/store"
)

func TestCoreExchange_RequestHeaders(t *testing.T) {
Expand All @@ -30,7 +27,8 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {

generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store := createStore(t)
store, err := store.NewStore(store.DefaultParameters(), t.TempDir())
require.NoError(t, err)

ce, err := NewExchange(fetcher, store, header.MakeExtendedHeader)
require.NoError(t, err)
Expand All @@ -56,7 +54,11 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
assert.Equal(t, expectedLastHeightInRange, headers[len(headers)-1].Height())

for _, h := range headers {
has, err := store.Has(ctx, h.DAH.Hash())
has, err := store.HasByHash(ctx, h.DAH.Hash())
require.NoError(t, err)
assert.True(t, has)

has, err = store.HasByHeight(ctx, h.Height())
require.NoError(t, err)
assert.True(t, has)
}
Expand All @@ -74,7 +76,8 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {

generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store := createStore(t)
store, err := store.NewStore(store.DefaultParameters(), t.TempDir())
require.NoError(t, err)

ce, err := NewExchange(
fetcher,
Expand All @@ -96,10 +99,15 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {

// ensure none of the "historic" EDSs were stored
for _, h := range headers {
if bytes.Equal(h.DataHash, share.EmptyEDSRoots().Hash()) {
has, err := store.HasByHeight(ctx, h.Height())
require.NoError(t, err)
assert.False(t, has)

// empty EDSs are expected to exist in the store, so we skip them
if h.DAH.Equals(share.EmptyEDSRoots()) {
continue
}
has, err := store.Has(ctx, h.DAH.Hash())
has, err = store.HasByHash(ctx, h.DAH.Hash())
require.NoError(t, err)
assert.False(t, has)
}
Expand All @@ -114,32 +122,6 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn
return NewBlockFetcher(cctx.Client), cctx
}

func createStore(t *testing.T) *eds.Store {
t.Helper()

storeCfg := eds.DefaultParameters()
store, err := eds.NewStore(storeCfg, t.TempDir(), ds_sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = store.Start(ctx)
require.NoError(t, err)

// store an empty square to initialize EDS store
eds := share.EmptyEDS()
err = store.Put(ctx, share.EmptyEDSRoots().Hash(), eds)
require.NoError(t, err)

t.Cleanup(func() {
err = store.Stop(ctx)
require.NoError(t, err)
})

return store
}

// fillBlocks fills blocks until the context is canceled.
func fillBlocks(
t *testing.T,
Expand Down Expand Up @@ -187,10 +169,8 @@ func generateNonEmptyBlocks(
case b, ok := <-sub:
require.True(t, ok)

if !bytes.Equal(b.Data.Hash(), share.EmptyEDSRoots().Hash()) {
hashes = append(hashes, share.DataHash(b.Data.Hash()))
i++
}
hashes = append(hashes, share.DataHash(b.Data.Hash()))
i++
case <-ctx.Done():
t.Fatal("failed to fill blocks within timeout")
}
Expand Down
17 changes: 6 additions & 11 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ import (
"go.opentelemetry.io/otel/attribute"

libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
"github.com/celestiaorg/celestia-node/store"
)

var (
Expand All @@ -39,7 +37,7 @@ type Listener struct {
fetcher *BlockFetcher

construct header.ConstructFn
store *eds.Store
store *store.Store
availabilityWindow pruner.AvailabilityWindow

headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
Expand All @@ -58,7 +56,7 @@ func NewListener(
fetcher *BlockFetcher,
hashBroadcaster shrexsub.BroadcastFn,
construct header.ConstructFn,
store *eds.Store,
store *store.Store,
blocktime time.Duration,
opts ...Option,
) (*Listener, error) {
Expand Down Expand Up @@ -214,11 +212,8 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
span.SetAttributes(
attribute.Int64("height", b.Header.Height),
)
// extend block data
adder := ipld.NewProofsAdder(int(b.Data.SquareSize), false)
defer adder.Purge()

eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
eds, err := extendBlock(b.Data, b.Header.Version.App)
if err != nil {
return fmt.Errorf("extending block data: %w", err)
}
Expand All @@ -229,7 +224,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
panic(fmt.Errorf("making extended header: %w", err))
}

err = storeEDS(ctx, eh, eds, adder, cl.store, cl.availabilityWindow)
err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow)
if err != nil {
return fmt.Errorf("storing EDS: %w", err)
}
Expand Down
12 changes: 9 additions & 3 deletions core/listener_no_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/store"
)

// TestListenerWithNonEmptyBlocks ensures that non-empty blocks are actually
Expand All @@ -29,11 +30,12 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {
fetcher, cctx := createCoreFetcher(t, cfg)
eds := createEdsPubSub(ctx, t)

store := createStore(t)
store, err := store.NewStore(store.DefaultParameters(), t.TempDir())
require.NoError(t, err)

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, store, testChainID)
err := cl.Start(ctx)
err = cl.Start(ctx)
require.NoError(t, err)

// listen for eds hashes broadcasted through eds-sub and ensure store has
Expand All @@ -54,7 +56,11 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {
continue
}

has, err := store.Has(ctx, msg.DataHash)
has, err := store.HasByHash(ctx, msg.DataHash)
require.NoError(t, err)
require.True(t, has)

has, err = store.HasByHeight(ctx, msg.Height)
require.NoError(t, err)
require.True(t, has)
}
Expand Down
Loading
Loading