Skip to content

Commit

Permalink
feat(share/availability): Dont store q4 on archival (#3751)
Browse files Browse the repository at this point in the history
Avoidы storing q4 for data outside of availability window
  • Loading branch information
walldiss authored Sep 20, 2024
1 parent 8ba3dad commit 4289846
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 40 deletions.
9 changes: 8 additions & 1 deletion core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/store"
)
Expand Down Expand Up @@ -67,7 +68,13 @@ func storeEDS(
return nil
}

err := store.Put(ctx, eh.DAH, eh.Height(), eds)
var err error
// archival nodes should not store Q4 outside the availability window.
if pruner.IsWithinAvailabilityWindow(eh.Time(), full.Window) {
err = store.PutODSQ4(ctx, eh.DAH, eh.Height(), eds)
} else {
err = store.PutODS(ctx, eh.DAH, eh.Height(), eds)
}
if err == nil {
log.Debugw("stored EDS for height", "height", eh.Height())
}
Expand Down
2 changes: 1 addition & 1 deletion header/headertest/fraud/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (f *FraudMaker) MakeExtendedHeader(odsSize int, edsStore *store.Store) head
hdr.DataHash = dah.Hash()

ctx := ipld.CtxWithProofsAdder(context.Background(), adder)
require.NoError(f.t, edsStore.Put(ctx, &dah, uint64(h.Height), square))
require.NoError(f.t, edsStore.PutODSQ4(ctx, &dah, uint64(h.Height), square))

*eds = *square
}
Expand Down
2 changes: 1 addition & 1 deletion libs/edssser/edssser.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,6 @@ func (ss *EDSsser) put(ctx context.Context, t *testing.T, height int) (time.Dura
}

now := time.Now()
err = ss.edsstore.Put(ctx, roots, uint64(height), square)
err = ss.edsstore.PutODSQ4(ctx, roots, uint64(height), square)
return time.Since(now), err
}
12 changes: 10 additions & 2 deletions share/availability/full/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/shwap"
Expand Down Expand Up @@ -41,7 +43,7 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header
dah := header.DAH
// if the data square is empty, we can safely link the header height in the store to an empty EDS.
if share.DataHash(dah.Hash()).IsEmptyEDS() {
err := fa.store.Put(ctx, dah, header.Height(), share.EmptyEDS())
err := fa.store.PutODSQ4(ctx, dah, header.Height(), share.EmptyEDS())
if err != nil {
return fmt.Errorf("put empty EDS: %w", err)
}
Expand Down Expand Up @@ -74,7 +76,13 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header
return err
}

err = fa.store.Put(ctx, dah, header.Height(), eds)
// archival nodes should not store Q4 outside the availability window.
if pruner.IsWithinAvailabilityWindow(header.Time(), full.Window) {
err = fa.store.PutODSQ4(ctx, dah, header.Height(), eds)
} else {
err = fa.store.PutODS(ctx, dah, header.Height(), eds)
}

if err != nil {
return fmt.Errorf("full availability: failed to store eds: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion share/availability/full/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestSharesAvailable_StoredEds(t *testing.T) {
require.NoError(t, err)
avail := NewShareAvailability(store, nil)

err = store.Put(ctx, roots, eh.Height(), eds)
err = store.PutODSQ4(ctx, roots, eh.Height(), eds)
require.NoError(t, err)

has, err := store.HasByHeight(ctx, eh.Height())
Expand Down
8 changes: 4 additions & 4 deletions share/shwap/p2p/shrex/shrex_getter/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestShrexGetter(t *testing.T) {
eh := headertest.RandExtendedHeaderWithRoot(t, roots)
eh.RawHeader.Height = int64(height)

err = edsStore.Put(ctx, roots, height, randEDS)
err = edsStore.PutODSQ4(ctx, roots, height, randEDS)
require.NoError(t, err)
fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: roots.Hash(),
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestShrexGetter(t *testing.T) {
eh := headertest.RandExtendedHeaderWithRoot(t, roots)
eh.RawHeader.Height = int64(height)

err = edsStore.Put(ctx, roots, height, eds)
err = edsStore.PutODSQ4(ctx, roots, height, eds)
require.NoError(t, err)
fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: roots.Hash(),
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestShrexGetter(t *testing.T) {
eh := headertest.RandExtendedHeaderWithRoot(t, roots)
eh.RawHeader.Height = int64(height)

err = edsStore.Put(ctx, roots, height, eds)
err = edsStore.PutODSQ4(ctx, roots, height, eds)
require.NoError(t, err)
fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: roots.Hash(),
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestShrexGetter(t *testing.T) {
eh := headertest.RandExtendedHeaderWithRoot(t, roots)
eh.RawHeader.Height = int64(height)

err = edsStore.Put(ctx, roots, height, randEDS)
err = edsStore.PutODSQ4(ctx, roots, height, randEDS)
require.NoError(t, err)
fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: roots.Hash(),
Expand Down
4 changes: 2 additions & 2 deletions share/shwap/p2p/shrex/shrexeds/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestExchange_RequestEDS(t *testing.T) {
roots, err := share.NewAxisRoots(eds)
require.NoError(t, err)
height := uint64(1)
err = store.Put(ctx, roots, height, eds)
err = store.PutODSQ4(ctx, roots, height, eds)
require.NoError(t, err)

requestedEDS, err := client.RequestEDS(ctx, roots, height, server.host.ID())
Expand All @@ -51,7 +51,7 @@ func TestExchange_RequestEDS(t *testing.T) {
lock := make(chan struct{})
go func() {
<-lock
err := store.Put(ctx, roots, height, eds)
err := store.PutODSQ4(ctx, roots, height, eds)
require.NoError(t, err)
lock <- struct{}{}
}()
Expand Down
2 changes: 1 addition & 1 deletion share/shwap/p2p/shrex/shrexnd/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestExchange_RequestND_NotFound(t *testing.T) {
require.NoError(t, err)

height := height.Add(1)
err = edsStore.Put(ctx, roots, height, eds)
err = edsStore.PutODSQ4(ctx, roots, height, eds)
require.NoError(t, err)

namespace := sharetest.RandV0Namespace()
Expand Down
6 changes: 3 additions & 3 deletions store/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestStoreGetter(t *testing.T) {
height := height.Add(1)
eh.RawHeader.Height = int64(height)

err := edsStore.Put(ctx, eh.DAH, height, eds)
err := edsStore.PutODSQ4(ctx, eh.DAH, height, eds)
require.NoError(t, err)

squareSize := int(eds.Width())
Expand All @@ -52,7 +52,7 @@ func TestStoreGetter(t *testing.T) {
height := height.Add(1)
eh.RawHeader.Height = int64(height)

err := edsStore.Put(ctx, eh.DAH, height, eds)
err := edsStore.PutODSQ4(ctx, eh.DAH, height, eds)
require.NoError(t, err)

retrievedEDS, err := sg.GetEDS(ctx, eh)
Expand All @@ -71,7 +71,7 @@ func TestStoreGetter(t *testing.T) {
eh := headertest.RandExtendedHeaderWithRoot(t, roots)
height := height.Add(1)
eh.RawHeader.Height = int64(height)
err := edsStore.Put(ctx, eh.DAH, height, eds)
err := edsStore.PutODSQ4(ctx, eh.DAH, height, eds)
require.NoError(t, err)

shares, err := sg.GetSharesByNamespace(ctx, eh, ns)
Expand Down
14 changes: 12 additions & 2 deletions store/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

const (
failedKey = "failed"
withQ4Key = "with_q4"
sizeKey = "eds_size"
)

Expand Down Expand Up @@ -90,7 +91,13 @@ func (m *metrics) addCacheMetrics(c cache.Cache) error {
return nil
}

func (m *metrics) observePut(ctx context.Context, dur time.Duration, size uint, failed bool) {
func (m *metrics) observePut(
ctx context.Context,
dur time.Duration,
size uint,
withQ4 bool,
failed bool,
) {
if m == nil {
return
}
Expand All @@ -100,7 +107,10 @@ func (m *metrics) observePut(ctx context.Context, dur time.Duration, size uint,

m.put.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed),
attribute.Int(sizeKey, int(size))))
attribute.Bool(withQ4Key, withQ4),
attribute.Int(sizeKey, int(size)),
),
)
}

func (m *metrics) observePutExist(ctx context.Context) {
Expand Down
68 changes: 63 additions & 5 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,30 @@ func (s *Store) Stop(context.Context) error {
return s.metrics.close()
}

func (s *Store) Put(
func (s *Store) PutODSQ4(
ctx context.Context,
roots *share.AxisRoots,
height uint64,
square *rsmt2d.ExtendedDataSquare,
) error {
return s.put(ctx, roots, height, square, true)
}

func (s *Store) PutODS(
ctx context.Context,
roots *share.AxisRoots,
height uint64,
square *rsmt2d.ExtendedDataSquare,
) error {
return s.put(ctx, roots, height, square, false)
}

func (s *Store) put(
ctx context.Context,
roots *share.AxisRoots,
height uint64,
square *rsmt2d.ExtendedDataSquare,
writeQ4 bool,
) error {
datahash := share.DataHash(roots.Hash())
// we don't need to store empty EDS, just link the height to the empty file
Expand All @@ -121,21 +140,27 @@ func (s *Store) Put(
lock.lock()
defer lock.unlock()

exists, err := s.createFile(square, roots, height)
var exists bool
if writeQ4 {
exists, err = s.createODSQ4File(square, roots, height)
} else {
exists, err = s.createODSFile(square, roots, height)
}

if exists {
s.metrics.observePutExist(ctx)
return nil
}
if err != nil {
s.metrics.observePut(ctx, time.Since(tNow), square.Width(), true)
s.metrics.observePut(ctx, time.Since(tNow), square.Width(), writeQ4, true)
return fmt.Errorf("creating file: %w", err)
}

s.metrics.observePut(ctx, time.Since(tNow), square.Width(), false)
s.metrics.observePut(ctx, time.Since(tNow), square.Width(), writeQ4, false)
return nil
}

func (s *Store) createFile(
func (s *Store) createODSQ4File(
square *rsmt2d.ExtendedDataSquare,
roots *share.AxisRoots,
height uint64,
Expand Down Expand Up @@ -170,6 +195,39 @@ func (s *Store) createFile(
return false, nil
}

func (s *Store) createODSFile(
square *rsmt2d.ExtendedDataSquare,
roots *share.AxisRoots,
height uint64,
) (bool, error) {
pathODS := s.hashToPath(roots.Hash(), odsFileExt)
err := file.CreateODS(pathODS, roots, square)
if errors.Is(err, os.ErrExist) {
// TODO(@Wondertan): Should we verify that the exist file is correct?
return true, nil
}
if err != nil {
// ensure we don't have partial writes if any operation fails
removeErr := s.removeODS(height, roots.Hash())
return false, errors.Join(
fmt.Errorf("creating ODS file: %w", err),
removeErr,
)
}

// create hard link with height as name
err = s.linkHeight(roots.Hash(), height)
if err != nil {
// ensure we don't have partial writes if any operation fails
removeErr := s.removeODS(height, roots.Hash())
return false, errors.Join(
fmt.Errorf("hardlinking height: %w", err),
removeErr,
)
}
return false, nil
}

func (s *Store) linkHeight(datahash share.DataHash, height uint64) error {
// create hard link with height as name
pathOds := s.hashToPath(datahash, odsFileExt)
Expand Down
4 changes: 2 additions & 2 deletions store/store_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestStore_WithCache(t *testing.T) {
t.Cleanup(cancel)
eds, roots := randomEDS(t)
height := height.Add(1)
err = store.Put(ctx, roots, height, eds)
err = store.PutODSQ4(ctx, roots, height, eds)
require.NoError(t, err)

// check that the height is not in the cache (cache was disabled)
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestStore_WithCache(t *testing.T) {
t.Cleanup(cancel)
eds, roots := randomEDS(t)
height := height.Add(1)
err = store.Put(ctx, roots, height, eds)
err = store.PutODSQ4(ctx, roots, height, eds)
require.NoError(t, err)

acc, err := store.cache.Get(height)
Expand Down
Loading

0 comments on commit 4289846

Please sign in to comment.