diff --git a/core/eds.go b/core/eds.go index e39a530829..a069ca7cec 100644 --- a/core/eds.go +++ b/core/eds.go @@ -16,6 +16,7 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/pruner" + "github.com/celestiaorg/celestia-node/pruner/full" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/store" ) @@ -67,7 +68,13 @@ func storeEDS( return nil } - err := store.Put(ctx, eh.DAH, eh.Height(), eds) + var err error + // archival nodes should not store Q4 outside the availability window. + if pruner.IsWithinAvailabilityWindow(eh.Time(), full.Window) { + err = store.PutODSQ4(ctx, eh.DAH, eh.Height(), eds) + } else { + err = store.PutODS(ctx, eh.DAH, eh.Height(), eds) + } if err == nil { log.Debugw("stored EDS for height", "height", eh.Height()) } diff --git a/header/headertest/fraud/testing.go b/header/headertest/fraud/testing.go index 7af5683c42..5f4bdca084 100644 --- a/header/headertest/fraud/testing.go +++ b/header/headertest/fraud/testing.go @@ -64,7 +64,7 @@ func (f *FraudMaker) MakeExtendedHeader(odsSize int, edsStore *store.Store) head hdr.DataHash = dah.Hash() ctx := ipld.CtxWithProofsAdder(context.Background(), adder) - require.NoError(f.t, edsStore.Put(ctx, &dah, uint64(h.Height), square)) + require.NoError(f.t, edsStore.PutODSQ4(ctx, &dah, uint64(h.Height), square)) *eds = *square } diff --git a/libs/edssser/edssser.go b/libs/edssser/edssser.go index 6d8d22c712..dfb01e1756 100644 --- a/libs/edssser/edssser.go +++ b/libs/edssser/edssser.go @@ -155,6 +155,6 @@ func (ss *EDSsser) put(ctx context.Context, t *testing.T, height int) (time.Dura } now := time.Now() - err = ss.edsstore.Put(ctx, roots, uint64(height), square) + err = ss.edsstore.PutODSQ4(ctx, roots, uint64(height), square) return time.Since(now), err } diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index f4f0465dda..91550849c8 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -8,6 +8,8 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/pruner" + "github.com/celestiaorg/celestia-node/pruner/full" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds/byzantine" "github.com/celestiaorg/celestia-node/share/shwap" @@ -41,7 +43,7 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header dah := header.DAH // if the data square is empty, we can safely link the header height in the store to an empty EDS. if share.DataHash(dah.Hash()).IsEmptyEDS() { - err := fa.store.Put(ctx, dah, header.Height(), share.EmptyEDS()) + err := fa.store.PutODSQ4(ctx, dah, header.Height(), share.EmptyEDS()) if err != nil { return fmt.Errorf("put empty EDS: %w", err) } @@ -74,7 +76,13 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, header *header return err } - err = fa.store.Put(ctx, dah, header.Height(), eds) + // archival nodes should not store Q4 outside the availability window. + if pruner.IsWithinAvailabilityWindow(header.Time(), full.Window) { + err = fa.store.PutODSQ4(ctx, dah, header.Height(), eds) + } else { + err = fa.store.PutODS(ctx, dah, header.Height(), eds) + } + if err != nil { return fmt.Errorf("full availability: failed to store eds: %w", err) } diff --git a/share/availability/full/availability_test.go b/share/availability/full/availability_test.go index 0a40bae454..95f8bda533 100644 --- a/share/availability/full/availability_test.go +++ b/share/availability/full/availability_test.go @@ -60,7 +60,7 @@ func TestSharesAvailable_StoredEds(t *testing.T) { require.NoError(t, err) avail := NewShareAvailability(store, nil) - err = store.Put(ctx, roots, eh.Height(), eds) + err = store.PutODSQ4(ctx, roots, eh.Height(), eds) require.NoError(t, err) has, err := store.HasByHeight(ctx, eh.Height()) diff --git a/share/shwap/p2p/shrex/shrex_getter/shrex_test.go b/share/shwap/p2p/shrex/shrex_getter/shrex_test.go index 01af5e00ae..b01eb71d7b 100644 --- a/share/shwap/p2p/shrex/shrex_getter/shrex_test.go +++ b/share/shwap/p2p/shrex/shrex_getter/shrex_test.go @@ -77,7 +77,7 @@ func TestShrexGetter(t *testing.T) { eh := headertest.RandExtendedHeaderWithRoot(t, roots) eh.RawHeader.Height = int64(height) - err = edsStore.Put(ctx, roots, height, randEDS) + err = edsStore.PutODSQ4(ctx, roots, height, randEDS) require.NoError(t, err) fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: roots.Hash(), @@ -118,7 +118,7 @@ func TestShrexGetter(t *testing.T) { eh := headertest.RandExtendedHeaderWithRoot(t, roots) eh.RawHeader.Height = int64(height) - err = edsStore.Put(ctx, roots, height, eds) + err = edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: roots.Hash(), @@ -160,7 +160,7 @@ func TestShrexGetter(t *testing.T) { eh := headertest.RandExtendedHeaderWithRoot(t, roots) eh.RawHeader.Height = int64(height) - err = edsStore.Put(ctx, roots, height, eds) + err = edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: roots.Hash(), @@ -189,7 +189,7 @@ func TestShrexGetter(t *testing.T) { eh := headertest.RandExtendedHeaderWithRoot(t, roots) eh.RawHeader.Height = int64(height) - err = edsStore.Put(ctx, roots, height, randEDS) + err = edsStore.PutODSQ4(ctx, roots, height, randEDS) require.NoError(t, err) fullPeerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{ DataHash: roots.Hash(), diff --git a/share/shwap/p2p/shrex/shrexeds/exchange_test.go b/share/shwap/p2p/shrex/shrexeds/exchange_test.go index fafffe1d0e..89e1dafa1c 100644 --- a/share/shwap/p2p/shrex/shrexeds/exchange_test.go +++ b/share/shwap/p2p/shrex/shrexeds/exchange_test.go @@ -33,7 +33,7 @@ func TestExchange_RequestEDS(t *testing.T) { roots, err := share.NewAxisRoots(eds) require.NoError(t, err) height := uint64(1) - err = store.Put(ctx, roots, height, eds) + err = store.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) requestedEDS, err := client.RequestEDS(ctx, roots, height, server.host.ID()) @@ -51,7 +51,7 @@ func TestExchange_RequestEDS(t *testing.T) { lock := make(chan struct{}) go func() { <-lock - err := store.Put(ctx, roots, height, eds) + err := store.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) lock <- struct{}{} }() diff --git a/share/shwap/p2p/shrex/shrexnd/exchange_test.go b/share/shwap/p2p/shrex/shrexnd/exchange_test.go index c49ebd867e..81abdfba26 100644 --- a/share/shwap/p2p/shrex/shrexnd/exchange_test.go +++ b/share/shwap/p2p/shrex/shrexnd/exchange_test.go @@ -47,7 +47,7 @@ func TestExchange_RequestND_NotFound(t *testing.T) { require.NoError(t, err) height := height.Add(1) - err = edsStore.Put(ctx, roots, height, eds) + err = edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) namespace := sharetest.RandV0Namespace() diff --git a/store/getter_test.go b/store/getter_test.go index 41f943d411..6c908cd924 100644 --- a/store/getter_test.go +++ b/store/getter_test.go @@ -29,7 +29,7 @@ func TestStoreGetter(t *testing.T) { height := height.Add(1) eh.RawHeader.Height = int64(height) - err := edsStore.Put(ctx, eh.DAH, height, eds) + err := edsStore.PutODSQ4(ctx, eh.DAH, height, eds) require.NoError(t, err) squareSize := int(eds.Width()) @@ -52,7 +52,7 @@ func TestStoreGetter(t *testing.T) { height := height.Add(1) eh.RawHeader.Height = int64(height) - err := edsStore.Put(ctx, eh.DAH, height, eds) + err := edsStore.PutODSQ4(ctx, eh.DAH, height, eds) require.NoError(t, err) retrievedEDS, err := sg.GetEDS(ctx, eh) @@ -71,7 +71,7 @@ func TestStoreGetter(t *testing.T) { eh := headertest.RandExtendedHeaderWithRoot(t, roots) height := height.Add(1) eh.RawHeader.Height = int64(height) - err := edsStore.Put(ctx, eh.DAH, height, eds) + err := edsStore.PutODSQ4(ctx, eh.DAH, height, eds) require.NoError(t, err) shares, err := sg.GetSharesByNamespace(ctx, eh, ns) diff --git a/store/metrics.go b/store/metrics.go index 792b40f357..53720493e6 100644 --- a/store/metrics.go +++ b/store/metrics.go @@ -14,6 +14,7 @@ import ( const ( failedKey = "failed" + withQ4Key = "with_q4" sizeKey = "eds_size" ) @@ -90,7 +91,13 @@ func (m *metrics) addCacheMetrics(c cache.Cache) error { return nil } -func (m *metrics) observePut(ctx context.Context, dur time.Duration, size uint, failed bool) { +func (m *metrics) observePut( + ctx context.Context, + dur time.Duration, + size uint, + withQ4 bool, + failed bool, +) { if m == nil { return } @@ -100,7 +107,10 @@ func (m *metrics) observePut(ctx context.Context, dur time.Duration, size uint, m.put.Record(ctx, dur.Seconds(), metric.WithAttributes( attribute.Bool(failedKey, failed), - attribute.Int(sizeKey, int(size)))) + attribute.Bool(withQ4Key, withQ4), + attribute.Int(sizeKey, int(size)), + ), + ) } func (m *metrics) observePutExist(ctx context.Context) { diff --git a/store/store.go b/store/store.go index f64fb4159d..ac8951e50b 100644 --- a/store/store.go +++ b/store/store.go @@ -90,11 +90,30 @@ func (s *Store) Stop(context.Context) error { return s.metrics.close() } -func (s *Store) Put( +func (s *Store) PutODSQ4( ctx context.Context, roots *share.AxisRoots, height uint64, square *rsmt2d.ExtendedDataSquare, +) error { + return s.put(ctx, roots, height, square, true) +} + +func (s *Store) PutODS( + ctx context.Context, + roots *share.AxisRoots, + height uint64, + square *rsmt2d.ExtendedDataSquare, +) error { + return s.put(ctx, roots, height, square, false) +} + +func (s *Store) put( + ctx context.Context, + roots *share.AxisRoots, + height uint64, + square *rsmt2d.ExtendedDataSquare, + writeQ4 bool, ) error { datahash := share.DataHash(roots.Hash()) // we don't need to store empty EDS, just link the height to the empty file @@ -121,21 +140,27 @@ func (s *Store) Put( lock.lock() defer lock.unlock() - exists, err := s.createFile(square, roots, height) + var exists bool + if writeQ4 { + exists, err = s.createODSQ4File(square, roots, height) + } else { + exists, err = s.createODSFile(square, roots, height) + } + if exists { s.metrics.observePutExist(ctx) return nil } if err != nil { - s.metrics.observePut(ctx, time.Since(tNow), square.Width(), true) + s.metrics.observePut(ctx, time.Since(tNow), square.Width(), writeQ4, true) return fmt.Errorf("creating file: %w", err) } - s.metrics.observePut(ctx, time.Since(tNow), square.Width(), false) + s.metrics.observePut(ctx, time.Since(tNow), square.Width(), writeQ4, false) return nil } -func (s *Store) createFile( +func (s *Store) createODSQ4File( square *rsmt2d.ExtendedDataSquare, roots *share.AxisRoots, height uint64, @@ -170,6 +195,39 @@ func (s *Store) createFile( return false, nil } +func (s *Store) createODSFile( + square *rsmt2d.ExtendedDataSquare, + roots *share.AxisRoots, + height uint64, +) (bool, error) { + pathODS := s.hashToPath(roots.Hash(), odsFileExt) + err := file.CreateODS(pathODS, roots, square) + if errors.Is(err, os.ErrExist) { + // TODO(@Wondertan): Should we verify that the exist file is correct? + return true, nil + } + if err != nil { + // ensure we don't have partial writes if any operation fails + removeErr := s.removeODS(height, roots.Hash()) + return false, errors.Join( + fmt.Errorf("creating ODS file: %w", err), + removeErr, + ) + } + + // create hard link with height as name + err = s.linkHeight(roots.Hash(), height) + if err != nil { + // ensure we don't have partial writes if any operation fails + removeErr := s.removeODS(height, roots.Hash()) + return false, errors.Join( + fmt.Errorf("hardlinking height: %w", err), + removeErr, + ) + } + return false, nil +} + func (s *Store) linkHeight(datahash share.DataHash, height uint64) error { // create hard link with height as name pathOds := s.hashToPath(datahash, odsFileExt) diff --git a/store/store_cache_test.go b/store/store_cache_test.go index 50198e3749..1655b31f22 100644 --- a/store/store_cache_test.go +++ b/store/store_cache_test.go @@ -25,7 +25,7 @@ func TestStore_WithCache(t *testing.T) { t.Cleanup(cancel) eds, roots := randomEDS(t) height := height.Add(1) - err = store.Put(ctx, roots, height, eds) + err = store.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) // check that the height is not in the cache (cache was disabled) @@ -56,7 +56,7 @@ func TestStore_WithCache(t *testing.T) { t.Cleanup(cancel) eds, roots := randomEDS(t) height := height.Add(1) - err = store.Put(ctx, roots, height, eds) + err = store.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) acc, err := store.cache.Get(height) diff --git a/store/store_test.go b/store/store_test.go index 30d2c37463..cb038e5349 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -37,7 +37,7 @@ func TestEDSStore(t *testing.T) { eds, roots := randomEDS(t) height := height.Add(1) - err = edsStore.Put(ctx, roots, height, eds) + err = edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) // file should exist in the store @@ -54,7 +54,7 @@ func TestEDSStore(t *testing.T) { eds, roots := randomEDS(t) height := height.Add(1) - err = edsStore.Put(ctx, roots, height, eds) + err = edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) // file should be cached after put @@ -78,12 +78,12 @@ func TestEDSStore(t *testing.T) { eds, roots := randomEDS(t) height := height.Add(1) - err = edsStore.Put(ctx, roots, height, eds) + err = edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) // ensure file is written. There should be only ods + q4 files and 1 link ensureAmountFileAndLinks(t, dir, 2, 1) - err = edsStore.Put(ctx, roots, height, eds) + err = edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) // ensure file is not duplicated. @@ -94,7 +94,7 @@ func TestEDSStore(t *testing.T) { eds, roots := randomEDS(t) height := height.Add(1) - err = edsStore.Put(ctx, roots, height, eds) + err = edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) f, err := edsStore.GetByHeight(ctx, height) @@ -112,7 +112,7 @@ func TestEDSStore(t *testing.T) { eds, roots := randomEDS(t) height := height.Add(1) - err := edsStore.Put(ctx, roots, height, eds) + err := edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) f, err := edsStore.GetByHash(ctx, roots.Hash()) @@ -148,7 +148,7 @@ func TestEDSStore(t *testing.T) { height := height.Add(1) hash := share.EmptyEDSDataHash() - err = edsStore.Put(ctx, share.EmptyEDSRoots(), height, share.EmptyEDS()) + err = edsStore.PutODSQ4(ctx, share.EmptyEDSRoots(), height, share.EmptyEDS()) require.NoError(t, err) ensureAmountFileAndLinks(t, dir, 0, 1) @@ -178,7 +178,7 @@ func TestEDSStore(t *testing.T) { eds, roots := randomEDS(t) height := height.Add(1) - err = edsStore.Put(ctx, roots, height, eds) + err = edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) // ensure file is written ensureAmountFileAndLinks(t, dir, 2, 1) @@ -210,7 +210,7 @@ func TestEDSStore(t *testing.T) { height := height.Add(1) hash := share.EmptyEDSDataHash() - err = edsStore.Put(ctx, share.EmptyEDSRoots(), height, share.EmptyEDS()) + err = edsStore.PutODSQ4(ctx, share.EmptyEDSRoots(), height, share.EmptyEDS()) require.NoError(t, err) // empty file is not counted as a file ensureAmountFileAndLinks(t, dir, 0, 1) @@ -236,7 +236,7 @@ func TestEDSStore(t *testing.T) { square, roots := randomEDS(t) height := height.Add(1) - err = edsStore.Put(ctx, roots, height, square) + err = edsStore.PutODSQ4(ctx, roots, height, square) require.NoError(t, err) err = edsStore.RemoveQ4(ctx, height, roots.Hash()) @@ -283,7 +283,7 @@ func TestEDSStore(t *testing.T) { require.NoError(t, err) require.False(t, has) - err = edsStore.Put(ctx, roots, height, eds) + err = edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(t, err) // assert that the empty file can be accessed by height @@ -307,7 +307,7 @@ func TestEDSStore(t *testing.T) { // store empty EDSs for i := from; i <= to; i++ { - err := edsStore.Put(ctx, roots, uint64(i), eds) + err := edsStore.PutODSQ4(ctx, roots, uint64(i), eds) require.NoError(t, err) } @@ -353,7 +353,7 @@ func BenchmarkStore(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { roots := edstest.RandomAxisRoots(b, 1) - _ = edsStore.Put(ctx, roots, uint64(i), eds) + _ = edsStore.PutODSQ4(ctx, roots, uint64(i), eds) } }) @@ -364,7 +364,7 @@ func BenchmarkStore(b *testing.B) { require.NoError(b, err) height := uint64(1984) - err = edsStore.Put(ctx, roots, height, eds) + err = edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(b, err) b.ResetTimer() @@ -381,7 +381,7 @@ func BenchmarkStore(b *testing.B) { require.NoError(b, err) height := uint64(1984) - err = edsStore.Put(ctx, roots, height, eds) + err = edsStore.PutODSQ4(ctx, roots, height, eds) require.NoError(b, err) b.ResetTimer()