diff --git a/cmd/node.go b/cmd/node.go index 51ac4a6d2e..e8891c78f5 100644 --- a/cmd/node.go +++ b/cmd/node.go @@ -9,6 +9,7 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/header" "github.com/celestiaorg/celestia-node/nodebuilder/node" "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + "github.com/celestiaorg/celestia-node/nodebuilder/pruner" "github.com/celestiaorg/celestia-node/nodebuilder/rpc" "github.com/celestiaorg/celestia-node/nodebuilder/state" ) @@ -22,6 +23,7 @@ func NewBridge(options ...func(*cobra.Command, []*pflag.FlagSet)) *cobra.Command rpc.Flags(), gateway.Flags(), state.Flags(), + pruner.Flags(), } cmd := &cobra.Command{ Use: "bridge [subcommand]", @@ -72,6 +74,7 @@ func NewFull(options ...func(*cobra.Command, []*pflag.FlagSet)) *cobra.Command { rpc.Flags(), gateway.Flags(), state.Flags(), + pruner.Flags(), } cmd := &cobra.Command{ Use: "full [subcommand]", diff --git a/cmd/util.go b/cmd/util.go index 08fa02155b..bbc901e4f2 100644 --- a/cmd/util.go +++ b/cmd/util.go @@ -16,6 +16,7 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/header" "github.com/celestiaorg/celestia-node/nodebuilder/node" "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + "github.com/celestiaorg/celestia-node/nodebuilder/pruner" rpc_cfg "github.com/celestiaorg/celestia-node/nodebuilder/rpc" "github.com/celestiaorg/celestia-node/nodebuilder/state" "github.com/celestiaorg/celestia-node/share" @@ -105,13 +106,6 @@ func PersistentPreRunEnv(cmd *cobra.Command, nodeType node.Type, _ []string) err return err } - if nodeType != node.Bridge { - err = header.ParseFlags(cmd, &cfg.Header) - if err != nil { - return err - } - } - ctx, err = ParseMiscFlags(ctx, cmd) if err != nil { return err @@ -121,6 +115,24 @@ func PersistentPreRunEnv(cmd *cobra.Command, nodeType node.Type, _ []string) err gateway.ParseFlags(cmd, &cfg.Gateway) state.ParseFlags(cmd, &cfg.State) + switch nodeType { + case node.Light: + err = header.ParseFlags(cmd, &cfg.Header) + if err != nil { + return err + } + case node.Full: + err = header.ParseFlags(cmd, &cfg.Header) + if err != nil { + return err + } + pruner.ParseFlags(cmd, &cfg.Pruner) + case node.Bridge: + pruner.ParseFlags(cmd, &cfg.Pruner) + default: + panic(fmt.Sprintf("invalid node type: %v", nodeType)) + } + // set config ctx = WithNodeConfig(ctx, &cfg) cmd.SetContext(ctx) diff --git a/header/headertest/testing.go b/header/headertest/testing.go index 7b0ae64262..1285e2cdd0 100644 --- a/header/headertest/testing.go +++ b/header/headertest/testing.go @@ -46,6 +46,14 @@ func NewStore(t *testing.T) libhead.Store[*header.ExtendedHeader] { return headertest.NewStore[*header.ExtendedHeader](t, NewTestSuite(t, 3, 0), 10) } +func NewCustomStore( + t *testing.T, + generator headertest.Generator[*header.ExtendedHeader], + numHeaders int, +) libhead.Store[*header.ExtendedHeader] { + return headertest.NewStore[*header.ExtendedHeader](t, generator, numHeaders) +} + // NewTestSuite setups a new test suite with a given number of validators. func NewTestSuite(t *testing.T, numValidators int, blockTime time.Duration) *TestSuite { valSet, vals := RandValidatorSet(numValidators, 10) @@ -82,8 +90,10 @@ func (s *TestSuite) genesis() *header.ExtendedHeader { return eh } -func MakeCommit(blockID types.BlockID, height int64, round int32, - voteSet *types.VoteSet, validators []types.PrivValidator, now time.Time) (*types.Commit, error) { +func MakeCommit( + blockID types.BlockID, height int64, round int32, + voteSet *types.VoteSet, validators []types.PrivValidator, now time.Time, +) (*types.Commit, error) { // all sign for i := 0; i < len(validators); i++ { @@ -157,7 +167,8 @@ func (s *TestSuite) NextHeader() *header.ExtendedHeader { } func (s *TestSuite) GenRawHeader( - height uint64, lastHeader, lastCommit, dataHash libhead.Hash) *header.RawHeader { + height uint64, lastHeader, lastCommit, dataHash libhead.Hash, +) *header.RawHeader { rh := RandRawHeader(s.t) rh.Height = int64(height) rh.LastBlockID = types.BlockID{Hash: bytes.HexBytes(lastHeader)} @@ -167,9 +178,9 @@ func (s *TestSuite) GenRawHeader( rh.NextValidatorsHash = s.valSet.Hash() rh.ProposerAddress = s.nextProposer().Address - rh.Time = time.Now() + rh.Time = time.Now().UTC() if s.blockTime > 0 { - rh.Time = s.Head().Time().Add(s.blockTime) + rh.Time = s.Head().Time().UTC().Add(s.blockTime) } return rh @@ -189,7 +200,7 @@ func (s *TestSuite) Commit(h *header.RawHeader) *types.Commit { ValidatorIndex: int32(i), Height: h.Height, Round: round, - Timestamp: tmtime.Now(), + Timestamp: tmtime.Now().UTC(), Type: tmproto.PrecommitType, BlockID: bid, } @@ -214,6 +225,11 @@ func (s *TestSuite) nextProposer() *types.Validator { // RandExtendedHeader provides an ExtendedHeader fixture. func RandExtendedHeader(t testing.TB) *header.ExtendedHeader { + timestamp := time.Now().UTC() + return RandExtendedHeaderAtTimestamp(t, timestamp) +} + +func RandExtendedHeaderAtTimestamp(t testing.TB, timestamp time.Time) *header.ExtendedHeader { dah := share.EmptyRoot() rh := RandRawHeader(t) @@ -224,7 +240,7 @@ func RandExtendedHeader(t testing.TB) *header.ExtendedHeader { voteSet := types.NewVoteSet(rh.ChainID, rh.Height, 0, tmproto.PrecommitType, valSet) blockID := RandBlockID(t) blockID.Hash = rh.Hash() - commit, err := MakeCommit(blockID, rh.Height, 0, voteSet, vals, time.Now()) + commit, err := MakeCommit(blockID, rh.Height, 0, voteSet, vals, timestamp) require.NoError(t, err) return &header.ExtendedHeader{ @@ -279,7 +295,7 @@ func RandRawHeader(t testing.TB) *header.RawHeader { Version: version.Consensus{Block: 11, App: 1}, ChainID: "test", Height: mrand.Int63(), //nolint:gosec - Time: time.Now(), + Time: time.Now().UTC(), LastBlockID: RandBlockID(t), LastCommitHash: tmrand.Bytes(32), DataHash: tmrand.Bytes(32), @@ -320,7 +336,7 @@ func ExtendedHeaderFromEDS(t testing.TB, height uint64, eds *rsmt2d.ExtendedData blockID := RandBlockID(t) blockID.Hash = gen.Hash() voteSet := types.NewVoteSet(gen.ChainID, gen.Height, 0, tmproto.PrecommitType, valSet) - commit, err := MakeCommit(blockID, gen.Height, 0, voteSet, vals, time.Now()) + commit, err := MakeCommit(blockID, gen.Height, 0, voteSet, vals, time.Now().UTC()) require.NoError(t, err) eh := &header.ExtendedHeader{ diff --git a/nodebuilder/config.go b/nodebuilder/config.go index d323f401d7..bf9b1a5bfe 100644 --- a/nodebuilder/config.go +++ b/nodebuilder/config.go @@ -15,6 +15,7 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/header" "github.com/celestiaorg/celestia-node/nodebuilder/node" "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + "github.com/celestiaorg/celestia-node/nodebuilder/pruner" "github.com/celestiaorg/celestia-node/nodebuilder/rpc" "github.com/celestiaorg/celestia-node/nodebuilder/share" "github.com/celestiaorg/celestia-node/nodebuilder/state" @@ -35,6 +36,7 @@ type Config struct { Share share.Config Header header.Config DASer das.Config `toml:",omitempty"` + Pruner pruner.Config } // DefaultConfig provides a default Config for a given Node Type 'tp'. @@ -49,6 +51,7 @@ func DefaultConfig(tp node.Type) *Config { Gateway: gateway.DefaultConfig(), Share: share.DefaultConfig(tp), Header: header.DefaultConfig(tp), + Pruner: pruner.DefaultConfig(), } switch tp { diff --git a/nodebuilder/module.go b/nodebuilder/module.go index ad287b1ac8..e3370eb083 100644 --- a/nodebuilder/module.go +++ b/nodebuilder/module.go @@ -16,7 +16,7 @@ import ( modhead "github.com/celestiaorg/celestia-node/nodebuilder/header" "github.com/celestiaorg/celestia-node/nodebuilder/node" "github.com/celestiaorg/celestia-node/nodebuilder/p2p" - "github.com/celestiaorg/celestia-node/nodebuilder/prune" + "github.com/celestiaorg/celestia-node/nodebuilder/pruner" "github.com/celestiaorg/celestia-node/nodebuilder/rpc" "github.com/celestiaorg/celestia-node/nodebuilder/share" "github.com/celestiaorg/celestia-node/nodebuilder/state" @@ -58,7 +58,7 @@ func ConstructModule(tp node.Type, network p2p.Network, cfg *Config, store Store blob.ConstructModule(), da.ConstructModule(), node.ConstructModule(tp), - prune.ConstructModule(tp), + pruner.ConstructModule(tp, &cfg.Pruner), rpc.ConstructModule(tp, &cfg.RPC), ) diff --git a/nodebuilder/prune/module.go b/nodebuilder/prune/module.go deleted file mode 100644 index 2141b74bf1..0000000000 --- a/nodebuilder/prune/module.go +++ /dev/null @@ -1,47 +0,0 @@ -package prune - -import ( - "context" - - "go.uber.org/fx" - - "github.com/celestiaorg/celestia-node/nodebuilder/node" - "github.com/celestiaorg/celestia-node/pruner" - "github.com/celestiaorg/celestia-node/pruner/archival" - "github.com/celestiaorg/celestia-node/pruner/light" -) - -func ConstructModule(tp node.Type) fx.Option { - baseComponents := fx.Options( - fx.Provide(fx.Annotate( - pruner.NewService, - fx.OnStart(func(ctx context.Context, p *pruner.Service) error { - return p.Start(ctx) - }), - fx.OnStop(func(ctx context.Context, p *pruner.Service) error { - return p.Stop(ctx) - }), - )), - ) - - switch tp { - case node.Full, node.Bridge: - return fx.Module("prune", - baseComponents, - fx.Provide(func() pruner.Pruner { - return archival.NewPruner() - }), - fx.Supply(archival.Window), - ) - case node.Light: - return fx.Module("prune", - baseComponents, - fx.Provide(func() pruner.Pruner { - return light.NewPruner() - }), - fx.Supply(light.Window), - ) - default: - panic("unknown node type") - } -} diff --git a/nodebuilder/pruner/config.go b/nodebuilder/pruner/config.go new file mode 100644 index 0000000000..1aa8c6ad6f --- /dev/null +++ b/nodebuilder/pruner/config.go @@ -0,0 +1,13 @@ +package pruner + +var MetricsEnabled bool + +type Config struct { + EnableService bool +} + +func DefaultConfig() Config { + return Config{ + EnableService: false, + } +} diff --git a/nodebuilder/pruner/constructors.go b/nodebuilder/pruner/constructors.go new file mode 100644 index 0000000000..1b84d19d0d --- /dev/null +++ b/nodebuilder/pruner/constructors.go @@ -0,0 +1,33 @@ +package pruner + +import ( + "github.com/ipfs/go-datastore" + + hdr "github.com/celestiaorg/go-header" + + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + "github.com/celestiaorg/celestia-node/pruner" +) + +func newPrunerService( + p pruner.Pruner, + window pruner.AvailabilityWindow, + getter hdr.Store[*header.ExtendedHeader], + ds datastore.Batching, + opts ...pruner.Option, +) (*pruner.Service, error) { + serv, err := pruner.NewService(p, window, getter, ds, p2p.BlockTime, opts...) + if err != nil { + return nil, err + } + + if MetricsEnabled { + err := pruner.WithPrunerMetrics(serv) + if err != nil { + return nil, err + } + } + + return serv, nil +} diff --git a/nodebuilder/pruner/flags.go b/nodebuilder/pruner/flags.go new file mode 100644 index 0000000000..7734c49e46 --- /dev/null +++ b/nodebuilder/pruner/flags.go @@ -0,0 +1,20 @@ +package pruner + +import ( + "github.com/spf13/cobra" + flag "github.com/spf13/pflag" +) + +const pruningFlag = "experimental-pruning" + +func Flags() *flag.FlagSet { + flags := &flag.FlagSet{} + + flags.Bool(pruningFlag, false, "EXPERIMENTAL: Enables pruning of blocks outside the pruning window.") + + return flags +} + +func ParseFlags(cmd *cobra.Command, cfg *Config) { + cfg.EnableService = cmd.Flag(pruningFlag).Changed +} diff --git a/nodebuilder/pruner/module.go b/nodebuilder/pruner/module.go new file mode 100644 index 0000000000..248798c3a4 --- /dev/null +++ b/nodebuilder/pruner/module.go @@ -0,0 +1,71 @@ +package pruner + +import ( + "context" + + "go.uber.org/fx" + + "github.com/celestiaorg/celestia-node/nodebuilder/node" + "github.com/celestiaorg/celestia-node/pruner" + "github.com/celestiaorg/celestia-node/pruner/archival" + "github.com/celestiaorg/celestia-node/pruner/full" + "github.com/celestiaorg/celestia-node/pruner/light" + "github.com/celestiaorg/celestia-node/share/eds" +) + +func ConstructModule(tp node.Type, cfg *Config) fx.Option { + if !cfg.EnableService { + switch tp { + case node.Light: + // light nodes are still subject to sampling within window + // even if pruning is not enabled. + return fx.Supply(light.Window) + case node.Full, node.Bridge: + return fx.Supply(archival.Window) + default: + panic("unknown node type") + } + } + + baseComponents := fx.Options( + fx.Provide(fx.Annotate( + newPrunerService, + fx.OnStart(func(ctx context.Context, p *pruner.Service) error { + return p.Start(ctx) + }), + fx.OnStop(func(ctx context.Context, p *pruner.Service) error { + return p.Stop(ctx) + }), + )), + // This is necessary to invoke the pruner service as independent thanks to a + // quirk in FX. + fx.Invoke(func(_ *pruner.Service) {}), + ) + + switch tp { + case node.Full: + return fx.Module("prune", + baseComponents, + fx.Provide(func(store *eds.Store) pruner.Pruner { + return full.NewPruner(store) + }), + fx.Supply(full.Window), + ) + case node.Bridge: + return fx.Module("prune", + baseComponents, + fx.Provide(func(store *eds.Store) pruner.Pruner { + return full.NewPruner(store) + }), + fx.Supply(full.Window), + ) + // TODO: Eventually, light nodes will be capable of pruning samples + // in which case, this can be enabled. + case node.Light: + return fx.Module("prune", + fx.Supply(light.Window), + ) + default: + panic("unknown node type") + } +} diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index 7830f0e8f6..72a0b7c960 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -29,6 +29,7 @@ import ( modhead "github.com/celestiaorg/celestia-node/nodebuilder/header" "github.com/celestiaorg/celestia-node/nodebuilder/node" "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + modprune "github.com/celestiaorg/celestia-node/nodebuilder/pruner" "github.com/celestiaorg/celestia-node/nodebuilder/share" "github.com/celestiaorg/celestia-node/state" ) @@ -80,6 +81,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti // control over which module to enable metrics for modhead.MetricsEnabled = true modcore.MetricsEnabled = true + modprune.MetricsEnabled = true baseComponents := fx.Options( fx.Supply(metricOpts), diff --git a/pruner/archival/pruner.go b/pruner/archival/pruner.go index 7b1cb935f3..a1a55db0da 100644 --- a/pruner/archival/pruner.go +++ b/pruner/archival/pruner.go @@ -15,6 +15,6 @@ func NewPruner() *Pruner { return &Pruner{} } -func (p *Pruner) Prune(context.Context, ...*header.ExtendedHeader) error { +func (p *Pruner) Prune(context.Context, *header.ExtendedHeader) error { return nil } diff --git a/pruner/checkpoint.go b/pruner/checkpoint.go new file mode 100644 index 0000000000..10db918cb5 --- /dev/null +++ b/pruner/checkpoint.go @@ -0,0 +1,73 @@ +package pruner + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/ipfs/go-datastore" + + "github.com/celestiaorg/celestia-node/header" +) + +var ( + storePrefix = datastore.NewKey("pruner") + checkpointKey = datastore.NewKey("checkpoint") +) + +// checkpoint contains information related to the state of the +// pruner service that is periodically persisted to disk. +type checkpoint struct { + LastPrunedHeight uint64 `json:"last_pruned_height"` + FailedHeaders map[uint64]struct{} `json:"failed"` +} + +// initializeCheckpoint initializes the checkpoint, storing the earliest header in the chain. +func (s *Service) initializeCheckpoint(ctx context.Context) error { + return s.updateCheckpoint(ctx, uint64(1), nil) +} + +// loadCheckpoint loads the last checkpoint from disk, initializing it if it does not already exist. +func (s *Service) loadCheckpoint(ctx context.Context) error { + bin, err := s.ds.Get(ctx, checkpointKey) + if err != nil { + if err == datastore.ErrNotFound { + return s.initializeCheckpoint(ctx) + } + return fmt.Errorf("failed to load checkpoint: %w", err) + } + + var cp *checkpoint + err = json.Unmarshal(bin, &cp) + if err != nil { + return fmt.Errorf("failed to unmarshal checkpoint: %w", err) + } + + s.checkpoint = cp + return nil +} + +// updateCheckpoint updates the checkpoint with the last pruned header height +// and persists it to disk. +func (s *Service) updateCheckpoint( + ctx context.Context, + lastPrunedHeight uint64, + failedHeights map[uint64]struct{}, +) error { + for height := range failedHeights { + s.checkpoint.FailedHeaders[height] = struct{}{} + } + + s.checkpoint.LastPrunedHeight = lastPrunedHeight + + bin, err := json.Marshal(s.checkpoint) + if err != nil { + return err + } + + return s.ds.Put(ctx, checkpointKey, bin) +} + +func (s *Service) lastPruned(ctx context.Context) (*header.ExtendedHeader, error) { + return s.getter.GetByHeight(ctx, s.checkpoint.LastPrunedHeight) +} diff --git a/pruner/find.go b/pruner/find.go new file mode 100644 index 0000000000..5091c168a0 --- /dev/null +++ b/pruner/find.go @@ -0,0 +1,114 @@ +package pruner + +import ( + "context" + "time" + + "github.com/celestiaorg/celestia-node/header" +) + +// maxHeadersPerLoop is the maximum number of headers to fetch +// for a prune loop (prevents fetching too many headers at a +// time for nodes that have a large number of pruneable headers). +var maxHeadersPerLoop = uint64(512) + +// findPruneableHeaders returns all headers that are eligible for pruning +// (outside the sampling window). +func (s *Service) findPruneableHeaders( + ctx context.Context, + lastPruned *header.ExtendedHeader, +) ([]*header.ExtendedHeader, error) { + pruneCutoff := time.Now().UTC().Add(time.Duration(-s.window)) + + if !lastPruned.Time().UTC().Before(pruneCutoff) { + // this can happen when the network is young and all blocks + // are still within the AvailabilityWindow + return nil, nil + } + + estimatedCutoffHeight, err := s.calculateEstimatedCutoff(ctx, lastPruned, pruneCutoff) + if err != nil { + return nil, err + } + + if lastPruned.Height() == estimatedCutoffHeight { + // nothing left to prune + return nil, nil + } + + log.Debugw("finder: fetching header range", "last pruned", lastPruned.Height(), + "target height", estimatedCutoffHeight) + + headers, err := s.getter.GetRangeByHeight(ctx, lastPruned, estimatedCutoffHeight) + if err != nil { + log.Errorw("failed to get range from header store", "from", lastPruned.Height(), + "to", estimatedCutoffHeight, "error", err) + return nil, err + } + // ensures genesis block gets pruned + if lastPruned.Height() == 1 { + headers = append([]*header.ExtendedHeader{lastPruned}, headers...) + } + + // if our estimated range didn't cover enough headers, we need to fetch more + // TODO: This is really inefficient in the case that lastPruned is the default value, or if the + // node has been offline for a long time. Instead of increasing the boundary by one in the for + // loop we could increase by a range every iteration + headerCount := len(headers) + for { + if headerCount > int(maxHeadersPerLoop) { + headers = headers[:maxHeadersPerLoop] + break + } + lastHeader := headers[len(headers)-1] + if lastHeader.Time().After(pruneCutoff) { + break + } + + nextHeader, err := s.getter.GetByHeight(ctx, lastHeader.Height()+1) + if err != nil { + log.Errorw("failed to get header by height", "height", lastHeader.Height()+1, "error", err) + return nil, err + } + headers = append(headers, nextHeader) + headerCount++ + } + + for i, h := range headers { + if h.Time().After(pruneCutoff) { + if i == 0 { + // we can't prune anything + return nil, nil + } + + // we can ignore the rest of the headers since they are all newer than the cutoff + return headers[:i], nil + } + } + return headers, nil +} + +func (s *Service) calculateEstimatedCutoff( + ctx context.Context, + lastPruned *header.ExtendedHeader, + pruneCutoff time.Time, +) (uint64, error) { + estimatedRange := uint64(pruneCutoff.UTC().Sub(lastPruned.Time().UTC()) / s.blockTime) + estimatedCutoffHeight := lastPruned.Height() + estimatedRange + + head, err := s.getter.Head(ctx) + if err != nil { + log.Errorw("failed to get Head from header store", "error", err) + return 0, err + } + + if head.Height() < estimatedCutoffHeight { + estimatedCutoffHeight = head.Height() + } + + if estimatedCutoffHeight-lastPruned.Height() > maxHeadersPerLoop { + estimatedCutoffHeight = lastPruned.Height() + maxHeadersPerLoop + } + + return estimatedCutoffHeight, nil +} diff --git a/pruner/full/pruner.go b/pruner/full/pruner.go new file mode 100644 index 0000000000..49967b5050 --- /dev/null +++ b/pruner/full/pruner.go @@ -0,0 +1,40 @@ +package full + +import ( + "context" + "errors" + + "github.com/filecoin-project/dagstore" + logging "github.com/ipfs/go-log/v2" + + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds" +) + +var log = logging.Logger("pruner/full") + +type Pruner struct { + store *eds.Store +} + +func NewPruner(store *eds.Store) *Pruner { + return &Pruner{ + store: store, + } +} + +func (p *Pruner) Prune(ctx context.Context, eh *header.ExtendedHeader) error { + // short circuit on empty roots + if eh.DAH.Equals(share.EmptyRoot()) { + return nil + } + + log.Debugf("pruning header %s", eh.DAH.Hash()) + + err := p.store.Remove(ctx, eh.DAH.Hash()) + if err != nil && !errors.Is(err, dagstore.ErrShardUnknown) { + return err + } + return nil +} diff --git a/pruner/full/window.go b/pruner/full/window.go new file mode 100644 index 0000000000..4ad69234e2 --- /dev/null +++ b/pruner/full/window.go @@ -0,0 +1,12 @@ +package full + +import ( + "time" + + "github.com/celestiaorg/celestia-node/pruner" + "github.com/celestiaorg/celestia-node/pruner/light" +) + +// Window is the availability window for light nodes in the Celestia +// network (30 days + 1 hour). +const Window = pruner.AvailabilityWindow(time.Duration(light.Window) + time.Hour) diff --git a/pruner/light/pruner.go b/pruner/light/pruner.go index 513bfa2b66..61401bae74 100644 --- a/pruner/light/pruner.go +++ b/pruner/light/pruner.go @@ -12,6 +12,6 @@ func NewPruner() *Pruner { return &Pruner{} } -func (p *Pruner) Prune(context.Context, ...*header.ExtendedHeader) error { +func (p *Pruner) Prune(context.Context, *header.ExtendedHeader) error { return nil } diff --git a/pruner/light/window.go b/pruner/light/window.go index dc1a9e4444..2241ecb063 100644 --- a/pruner/light/window.go +++ b/pruner/light/window.go @@ -1,11 +1,9 @@ package light import ( - "time" - "github.com/celestiaorg/celestia-node/pruner" ) // Window is the availability window for light nodes in the Celestia // network (30 days). -const Window = pruner.AvailabilityWindow(time.Second * 86400 * 30) +const Window = pruner.AvailabilityWindow(30 * 24 * 60 * 60) diff --git a/pruner/metrics.go b/pruner/metrics.go new file mode 100644 index 0000000000..c43217dc3d --- /dev/null +++ b/pruner/metrics.go @@ -0,0 +1,80 @@ +package pruner + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +var ( + meter = otel.Meter("storage_pruner") +) + +type metrics struct { + prunedCounter metric.Int64Counter + + lastPruned metric.Int64ObservableGauge + failedPrunes metric.Int64ObservableGauge + + clientReg metric.Registration +} + +func (s *Service) WithMetrics() error { + prunedCounter, err := meter.Int64Counter("prnr_pruned_counter", + metric.WithDescription("pruner pruned header counter")) + if err != nil { + return err + } + + failedPrunes, err := meter.Int64ObservableGauge("prnr_failed_counter", + metric.WithDescription("pruner failed prunes counter")) + if err != nil { + return err + } + + lastPruned, err := meter.Int64ObservableGauge("prnr_last_pruned", + metric.WithDescription("pruner highest pruned height")) + if err != nil { + return err + } + + callback := func(_ context.Context, observer metric.Observer) error { + observer.ObserveInt64(lastPruned, int64(s.checkpoint.LastPrunedHeight)) + observer.ObserveInt64(failedPrunes, int64(len(s.checkpoint.FailedHeaders))) + return nil + } + + clientReg, err := meter.RegisterCallback(callback, lastPruned, failedPrunes) + if err != nil { + return err + } + + s.metrics = &metrics{ + prunedCounter: prunedCounter, + lastPruned: lastPruned, + failedPrunes: failedPrunes, + clientReg: clientReg, + } + return nil +} + +func (m *metrics) close() error { + if m == nil { + return nil + } + + return m.clientReg.Unregister() +} + +func (m *metrics) observePrune(ctx context.Context, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + m.prunedCounter.Add(ctx, 1, metric.WithAttributes( + attribute.Bool("failed", failed))) +} diff --git a/pruner/params.go b/pruner/params.go new file mode 100644 index 0000000000..253ea5e1a9 --- /dev/null +++ b/pruner/params.go @@ -0,0 +1,41 @@ +package pruner + +import ( + "fmt" + "time" +) + +type Option func(*Params) + +type Params struct { + // pruneCycle is the frequency at which the pruning Service + // runs the ticker. If set to 0, the Service will not run. + pruneCycle time.Duration +} + +func (p *Params) Validate() error { + if p.pruneCycle == time.Duration(0) { + return fmt.Errorf("invalid GC cycle given, value should be positive and non-zero") + } + return nil +} + +func DefaultParams() Params { + return Params{ + pruneCycle: time.Minute * 5, + } +} + +// WithPruneCycle configures how often the pruning Service +// triggers a pruning cycle. +func WithPruneCycle(cycle time.Duration) Option { + return func(p *Params) { + p.pruneCycle = cycle + } +} + +// WithPrunerMetrics is a utility function to turn on pruner metrics and that is +// expected to be "invoked" by the fx lifecycle. +func WithPrunerMetrics(s *Service) error { + return s.WithMetrics() +} diff --git a/pruner/pruner.go b/pruner/pruner.go index fae60e483c..a591a65392 100644 --- a/pruner/pruner.go +++ b/pruner/pruner.go @@ -9,5 +9,5 @@ import ( // Pruner contains methods necessary to prune data // from the node's datastore. type Pruner interface { - Prune(context.Context, ...*header.ExtendedHeader) error + Prune(context.Context, *header.ExtendedHeader) error } diff --git a/pruner/service.go b/pruner/service.go index f67265977a..65935e75d8 100644 --- a/pruner/service.go +++ b/pruner/service.go @@ -2,24 +2,191 @@ package pruner import ( "context" + "fmt" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + logging "github.com/ipfs/go-log/v2" + + hdr "github.com/celestiaorg/go-header" + + "github.com/celestiaorg/celestia-node/header" ) -// Service handles the pruning routine for the node using the -// prune Pruner. +var log = logging.Logger("pruner/service") + +// Service handles running the pruning cycle for the node. type Service struct { pruner Pruner + window AvailabilityWindow + + getter hdr.Getter[*header.ExtendedHeader] + + ds datastore.Datastore + checkpoint *checkpoint + + blockTime time.Duration + + ctx context.Context + cancel context.CancelFunc + doneCh chan struct{} + + params Params + metrics *metrics } -func NewService(p Pruner) *Service { - return &Service{ - pruner: p, +func NewService( + p Pruner, + window AvailabilityWindow, + getter hdr.Getter[*header.ExtendedHeader], + ds datastore.Datastore, + blockTime time.Duration, + opts ...Option, +) (*Service, error) { + params := DefaultParams() + for _, opt := range opts { + opt(¶ms) + } + + if err := params.Validate(); err != nil { + return nil, err } + + return &Service{ + pruner: p, + window: window, + getter: getter, + checkpoint: &checkpoint{FailedHeaders: map[uint64]struct{}{}}, + ds: namespace.Wrap(ds, storePrefix), + blockTime: blockTime, + doneCh: make(chan struct{}), + params: params, + }, nil } +// Start loads the pruner's last pruned height (1 if pruner is freshly +// initialized) and runs the prune loop, pruning any blocks older than +// the given availability window. func (s *Service) Start(context.Context) error { + s.ctx, s.cancel = context.WithCancel(context.Background()) + + err := s.loadCheckpoint(s.ctx) + if err != nil { + return err + } + log.Debugw("loaded checkpoint", "lastPruned", s.checkpoint.LastPrunedHeight) + + go s.run() return nil } -func (s *Service) Stop(context.Context) error { - return nil +func (s *Service) Stop(ctx context.Context) error { + s.cancel() + + s.metrics.close() + + select { + case <-s.doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("pruner unable to exit within context deadline") + } +} + +// run prunes blocks older than the availability wiindow periodically until the +// pruner service is stopped. +func (s *Service) run() { + defer close(s.doneCh) + + ticker := time.NewTicker(s.params.pruneCycle) + defer ticker.Stop() + + lastPrunedHeader, err := s.lastPruned(s.ctx) + if err != nil { + log.Errorw("failed to get last pruned header", "height", s.checkpoint.LastPrunedHeight, + "err", err) + log.Warn("exiting pruner service!") + + s.cancel() + } + + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + lastPrunedHeader = s.prune(s.ctx, lastPrunedHeader) + } + } +} + +func (s *Service) prune( + ctx context.Context, + lastPrunedHeader *header.ExtendedHeader, +) *header.ExtendedHeader { + // prioritize retrying previously-failed headers + s.retryFailed(s.ctx) + + for { + select { + case <-s.ctx.Done(): + return lastPrunedHeader + default: + } + + headers, err := s.findPruneableHeaders(ctx, lastPrunedHeader) + if err != nil || len(headers) == 0 { + return lastPrunedHeader + } + + failed := make(map[uint64]struct{}) + + log.Debugw("pruning headers", "from", headers[0].Height(), "to", + headers[len(headers)-1].Height()) + + for _, eh := range headers { + pruneCtx, cancel := context.WithTimeout(ctx, time.Second*5) + + err = s.pruner.Prune(pruneCtx, eh) + if err != nil { + log.Errorw("failed to prune block", "height", eh.Height(), "err", err) + failed[eh.Height()] = struct{}{} + } else { + lastPrunedHeader = eh + } + + s.metrics.observePrune(pruneCtx, err != nil) + cancel() + } + + err = s.updateCheckpoint(s.ctx, lastPrunedHeader.Height(), failed) + if err != nil { + log.Errorw("failed to update checkpoint", "err", err) + return lastPrunedHeader + } + + if uint64(len(headers)) < maxHeadersPerLoop { + // we've pruned all the blocks we can + return lastPrunedHeader + } + } +} + +func (s *Service) retryFailed(ctx context.Context) { + log.Debugw("retrying failed headers", "amount", len(s.checkpoint.FailedHeaders)) + + for failed := range s.checkpoint.FailedHeaders { + h, err := s.getter.GetByHeight(ctx, failed) + if err != nil { + log.Errorw("failed to load header from failed map", "height", failed, "err", err) + continue + } + err = s.pruner.Prune(ctx, h) + if err != nil { + log.Errorw("failed to prune block from failed map", "height", failed, "err", err) + continue + } + delete(s.checkpoint.FailedHeaders, failed) + } } diff --git a/pruner/service_test.go b/pruner/service_test.go new file mode 100644 index 0000000000..01932abaf2 --- /dev/null +++ b/pruner/service_test.go @@ -0,0 +1,349 @@ +package pruner + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/header/headertest" +) + +/* + | toPrune | availability window | +*/ + +// TestService tests the pruner service to check whether the expected +// amount of blocks are pruned within a given AvailabilityWindow. +// This test runs a pruning cycle once which should prune at least +// 2 blocks (as the AvailabilityWindow is ~2 blocks). Since the +// prune-able header determination is time-based, it cannot be +// exact. +func TestService(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + blockTime := time.Millisecond + + // all headers generated in suite are timestamped to time.Now(), so + // they will all be considered "pruneable" within the availability window ( + suite := headertest.NewTestSuite(t, 1, blockTime) + store := headertest.NewCustomStore(t, suite, 20) + + mp := &mockPruner{} + + serv, err := NewService( + mp, + AvailabilityWindow(time.Millisecond*2), + store, + sync.MutexWrap(datastore.NewMapDatastore()), + blockTime, + ) + require.NoError(t, err) + + serv.ctx, serv.cancel = ctx, cancel + + err = serv.loadCheckpoint(ctx) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 2) + + lastPruned, err := serv.lastPruned(ctx) + require.NoError(t, err) + lastPruned = serv.prune(ctx, lastPruned) + + assert.Greater(t, lastPruned.Height(), uint64(2)) + assert.Greater(t, serv.checkpoint.LastPrunedHeight, uint64(2)) +} + +// TestService_FailedAreRecorded checks whether the pruner service +// can accurately detect blocks to be pruned and store them +// to checkpoint. +func TestService_FailedAreRecorded(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + blockTime := time.Millisecond + + // all headers generated in suite are timestamped to time.Now(), so + // they will all be considered "pruneable" within the availability window + suite := headertest.NewTestSuite(t, 1, blockTime) + store := headertest.NewCustomStore(t, suite, 100) + + mp := &mockPruner{ + failHeight: map[uint64]int{4: 0, 5: 0, 13: 0}, + } + + serv, err := NewService( + mp, + AvailabilityWindow(time.Millisecond*20), + store, + sync.MutexWrap(datastore.NewMapDatastore()), + blockTime, + ) + require.NoError(t, err) + + serv.ctx = ctx + + err = serv.loadCheckpoint(ctx) + require.NoError(t, err) + + // ensures at least 13 blocks are prune-able + time.Sleep(time.Millisecond * 50) + + // trigger a prune job + lastPruned, err := serv.lastPruned(ctx) + require.NoError(t, err) + _ = serv.prune(ctx, lastPruned) + + assert.Len(t, serv.checkpoint.FailedHeaders, 3) + for expectedFail := range mp.failHeight { + _, exists := serv.checkpoint.FailedHeaders[expectedFail] + assert.True(t, exists) + } + + // trigger another prune job, which will prioritize retrying + // failed blocks + lastPruned, err = serv.lastPruned(ctx) + require.NoError(t, err) + _ = serv.prune(ctx, lastPruned) + + assert.Len(t, serv.checkpoint.FailedHeaders, 0) +} + +func TestServiceCheckpointing(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + store := headertest.NewStore(t) + + mp := &mockPruner{} + + serv, err := NewService( + mp, + AvailabilityWindow(time.Second), + store, + sync.MutexWrap(datastore.NewMapDatastore()), + time.Millisecond, + ) + require.NoError(t, err) + + err = serv.loadCheckpoint(ctx) + require.NoError(t, err) + + // ensure checkpoint was initialized correctly + assert.Equal(t, uint64(1), serv.checkpoint.LastPrunedHeight) + assert.Empty(t, serv.checkpoint.FailedHeaders) + + // update checkpoint + err = serv.updateCheckpoint(ctx, uint64(3), map[uint64]struct{}{2: {}}) + require.NoError(t, err) + + // ensure checkpoint was updated correctly in datastore + err = serv.loadCheckpoint(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(3), serv.checkpoint.LastPrunedHeight) + assert.Len(t, serv.checkpoint.FailedHeaders, 1) +} + +// TestPrune_LargeNumberOfBlocks tests that the pruner service with a large +// number of blocks to prune (an archival node turning into a pruned node) is +// able to prune the blocks in one prune cycle. +func TestPrune_LargeNumberOfBlocks(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + maxHeadersPerLoop = 10 + t.Cleanup(func() { + maxHeadersPerLoop = 1024 + }) + + blockTime := time.Nanosecond + availabilityWindow := AvailabilityWindow(blockTime * 10) + + // all headers generated in suite are timestamped to time.Now(), so + // they will all be considered "pruneable" within the availability window + suite := headertest.NewTestSuite(t, 1, blockTime) + store := headertest.NewCustomStore(t, suite, int(maxHeadersPerLoop*6)) // add small buffer + + mp := &mockPruner{failHeight: make(map[uint64]int, 0)} + + serv, err := NewService( + mp, + availabilityWindow, + store, + sync.MutexWrap(datastore.NewMapDatastore()), + blockTime, + ) + require.NoError(t, err) + serv.ctx = ctx + + err = serv.loadCheckpoint(ctx) + require.NoError(t, err) + + // ensures availability window has passed + time.Sleep(time.Duration(availabilityWindow) + time.Millisecond*100) + + // trigger a prune job + lastPruned, err := serv.lastPruned(ctx) + require.NoError(t, err) + _ = serv.prune(ctx, lastPruned) + + // ensure all headers have been pruned + assert.Equal(t, maxHeadersPerLoop*5, serv.checkpoint.LastPrunedHeight) + assert.Len(t, serv.checkpoint.FailedHeaders, 0) +} + +func TestFindPruneableHeaders(t *testing.T) { + testCases := []struct { + name string + availWindow AvailabilityWindow + blockTime time.Duration + startTime time.Time + headerAmount int + expectedLength int + }{ + { + name: "Estimated range matches expected", + // Availability window is one week + availWindow: AvailabilityWindow(time.Hour * 24 * 7), + blockTime: time.Hour, + // Make two weeks of headers + headerAmount: 2 * (24 * 7), + startTime: time.Now().Add(-2 * time.Hour * 24 * 7), + // One week of headers are pruneable + expectedLength: (24 * 7) + 1, + }, + { + name: "Estimated range not sufficient but finds the correct tail", + // Availability window is one week + availWindow: AvailabilityWindow(time.Hour * 24 * 7), + blockTime: time.Hour, + // Make three weeks of headers + headerAmount: 3 * (24 * 7), + startTime: time.Now().Add(-3 * time.Hour * 24 * 7), + // Two weeks of headers are pruneable + expectedLength: (2 * 24 * 7) + 1, + }, + { + name: "No pruneable headers", + // Availability window is two weeks + availWindow: AvailabilityWindow(2 * time.Hour * 24 * 7), + blockTime: time.Hour, + // Make one week of headers + headerAmount: 24 * 7, + startTime: time.Now().Add(-time.Hour * 24 * 7), + // No headers are pruneable + expectedLength: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + headerGenerator := NewSpacedHeaderGenerator(t, tc.startTime, tc.blockTime) + store := headertest.NewCustomStore(t, headerGenerator, tc.headerAmount) + + mp := &mockPruner{} + + serv, err := NewService( + mp, + tc.availWindow, + store, + sync.MutexWrap(datastore.NewMapDatastore()), + tc.blockTime, + ) + require.NoError(t, err) + + err = serv.Start(ctx) + require.NoError(t, err) + + lastPruned, err := serv.lastPruned(ctx) + require.NoError(t, err) + + pruneable, err := serv.findPruneableHeaders(ctx, lastPruned) + require.NoError(t, err) + require.Len(t, pruneable, tc.expectedLength) + + pruneableCutoff := time.Now().Add(-time.Duration(tc.availWindow)) + // All returned headers are older than the availability window + for _, h := range pruneable { + require.WithinRange(t, h.Time(), tc.startTime, pruneableCutoff) + } + + // The next header after the last pruneable header is too new to prune + if len(pruneable) != 0 { + lastPruneable := pruneable[len(pruneable)-1] + if lastPruneable.Height() != store.Height() { + firstUnpruneable, err := store.GetByHeight(ctx, lastPruneable.Height()+1) + require.NoError(t, err) + require.WithinRange(t, firstUnpruneable.Time(), pruneableCutoff, time.Now()) + } + } + }) + } +} + +type mockPruner struct { + deletedHeaderHashes []pruned + + // tells the mockPruner on which heights to fail + failHeight map[uint64]int +} + +type pruned struct { + hash string + height uint64 +} + +func (mp *mockPruner) Prune(_ context.Context, h *header.ExtendedHeader) error { + for fail := range mp.failHeight { + if h.Height() == fail { + // if retried, return successful + if mp.failHeight[fail] > 0 { + return nil + } + mp.failHeight[fail]++ + return fmt.Errorf("failed to prune") + } + } + mp.deletedHeaderHashes = append(mp.deletedHeaderHashes, pruned{hash: h.Hash().String(), height: h.Height()}) + return nil +} + +// TODO @renaynay @distractedm1nd: Deduplicate via headertest utility. +// https://github.com/celestiaorg/celestia-node/issues/3278. +type SpacedHeaderGenerator struct { + t *testing.T + TimeBetweenHeaders time.Duration + currentTime time.Time + currentHeight int64 +} + +func NewSpacedHeaderGenerator( + t *testing.T, startTime time.Time, timeBetweenHeaders time.Duration, +) *SpacedHeaderGenerator { + return &SpacedHeaderGenerator{ + t: t, + TimeBetweenHeaders: timeBetweenHeaders, + currentTime: startTime, + currentHeight: 1, + } +} + +func (shg *SpacedHeaderGenerator) NextHeader() *header.ExtendedHeader { + h := headertest.RandExtendedHeaderAtTimestamp(shg.t, shg.currentTime) + h.RawHeader.Height = shg.currentHeight + h.RawHeader.Time = shg.currentTime + shg.currentHeight++ + shg.currentTime = shg.currentTime.Add(shg.TimeBetweenHeaders) + return h +}