diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index 946f0c52c8..0489c57c91 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -28,14 +28,17 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" + "github.com/prometheus/tsdb/fileutil" ) const ( @@ -335,6 +338,216 @@ func (p *Pruner) Prune(root common.Hash) error { return prune(p.snaptree, root, p.db, p.stateBloom, filterName, middleRoots, start) } +type BlockPruner struct { + db ethdb.Database + oldAncientPath string + newAncientPath string + node *node.Node + BlockAmountReserved uint64 +} + +func NewBlockPruner(db ethdb.Database, n *node.Node, oldAncientPath, newAncientPath string, BlockAmountReserved uint64) *BlockPruner { + return &BlockPruner{ + db: db, + oldAncientPath: oldAncientPath, + newAncientPath: newAncientPath, + node: n, + BlockAmountReserved: BlockAmountReserved, + } +} + +func (p *BlockPruner) backUpOldDb(name string, cache, handles int, namespace string, readonly, interrupt bool) error { + // Open old db wrapper. + chainDb, err := p.node.OpenDatabaseWithFreezer(name, cache, handles, p.oldAncientPath, namespace, readonly, true, interrupt) + if err != nil { + log.Error("Failed to open ancient database", "err=", err) + return err + } + defer chainDb.Close() + log.Info("chainDB opened successfully") + + // Get the number of items in old ancient db. + itemsOfAncient, err := chainDb.ItemAmountInAncient() + log.Info("the number of items in ancientDB is ", "itemsOfAncient", itemsOfAncient) + + // If we can't access the freezer or it's empty, abort. + if err != nil || itemsOfAncient == 0 { + log.Error("can't access the freezer or it's empty, abort") + return errors.New("can't access the freezer or it's empty, abort") + } + + // If the items in freezer is less than the block amount that we want to reserve, it is not enough, should stop. + if itemsOfAncient < p.BlockAmountReserved { + log.Error("the number of old blocks is not enough to reserve,", "ancient items", itemsOfAncient, "the amount specified", p.BlockAmountReserved) + return errors.New("the number of old blocks is not enough to reserve") + } + + var oldOffSet uint64 + if interrupt { + // The interrupt scecario within this function is specific for old and new ancientDB exsisted concurrently, + // should use last version of offset for oldAncientDB, because current offset is + // actually of the new ancientDB_Backup, but what we want is the offset of ancientDB being backup. + oldOffSet = rawdb.ReadOffSetOfLastAncientFreezer(chainDb) + } else { + // Using current version of ancientDB for oldOffSet because the db for backup is current version. + oldOffSet = rawdb.ReadOffSetOfCurrentAncientFreezer(chainDb) + } + log.Info("the oldOffSet is ", "oldOffSet", oldOffSet) + + // Get the start BlockNumber for pruning. + startBlockNumber := oldOffSet + itemsOfAncient - p.BlockAmountReserved + log.Info("new offset/new startBlockNumber is ", "new offset", startBlockNumber) + + // Create new ancientdb backup and record the new and last version of offset in kvDB as well. + // For every round, newoffset actually equals to the startBlockNumber in ancient backup db. + frdbBack, err := rawdb.NewFreezerDb(chainDb, p.newAncientPath, namespace, readonly, startBlockNumber) + if err != nil { + log.Error("Failed to create ancient freezer backup", "err=", err) + return err + } + defer frdbBack.Close() + + offsetBatch := chainDb.NewBatch() + rawdb.WriteOffSetOfCurrentAncientFreezer(offsetBatch, startBlockNumber) + rawdb.WriteOffSetOfLastAncientFreezer(offsetBatch, oldOffSet) + if err := offsetBatch.Write(); err != nil { + log.Crit("Failed to write offset into disk", "err", err) + } + + // It's guaranteed that the old/new offsets are updated as well as the new ancientDB are created if this flock exist. + lock, _, err := fileutil.Flock(filepath.Join(p.newAncientPath, "PRUNEFLOCKBACK")) + if err != nil { + log.Error("file lock error", "err", err) + return err + } + + log.Info("prune info", "old offset", oldOffSet, "number of items in ancientDB", itemsOfAncient, "amount to reserve", p.BlockAmountReserved) + log.Info("new offset/new startBlockNumber recorded successfully ", "new offset", startBlockNumber) + + start := time.Now() + // All ancient data after and including startBlockNumber should write into new ancientDB ancient_back. + for blockNumber := startBlockNumber; blockNumber < itemsOfAncient+oldOffSet; blockNumber++ { + blockHash := rawdb.ReadCanonicalHash(chainDb, blockNumber) + block := rawdb.ReadBlock(chainDb, blockHash, blockNumber) + receipts := rawdb.ReadRawReceipts(chainDb, blockHash, blockNumber) + borReceipts := []*types.Receipt{rawdb.ReadBorReceipt(chainDb, blockHash, blockNumber)} + + // Calculate the total difficulty of the block + td := rawdb.ReadTd(chainDb, blockHash, blockNumber) + if td == nil { + return consensus.ErrUnknownAncestor + } + // Write into new ancient_back db. + if _, err := rawdb.WriteAncientBlocks(frdbBack, []*types.Block{block}, []types.Receipts{receipts}, []types.Receipts{borReceipts}, td); err != nil { + log.Error("failed to write new ancient", "error", err) + return err + } + // Print the log every 5s for better trace. + if common.PrettyDuration(time.Since(start)) > common.PrettyDuration(5*time.Second) { + log.Info("block backup process running successfully", "current blockNumber for backup", blockNumber) + start = time.Now() + } + } + lock.Release() + log.Info("block back up done", "current start blockNumber in ancientDB", startBlockNumber) + return nil +} + +// Backup the ancient data for the old ancient db, i.e. the most recent 128 blocks in ancient db. +func (p *BlockPruner) BlockPruneBackUp(name string, cache, handles int, namespace string, readonly, interrupt bool) error { + start := time.Now() + + if err := p.backUpOldDb(name, cache, handles, namespace, readonly, interrupt); err != nil { + return err + } + + log.Info("Block pruning BackUp successfully", "time duration since start is", common.PrettyDuration(time.Since(start))) + return nil +} + +func (p *BlockPruner) RecoverInterruption(name string, cache, handles int, namespace string, readonly bool) error { + log.Info("RecoverInterruption for block prune") + newExist, err := CheckFileExist(p.newAncientPath) + if err != nil { + log.Error("newAncientDb path error") + return err + } + + if newExist { + log.Info("New ancientDB_backup existed in interruption scenario") + flockOfAncientBack, err := CheckFileExist(filepath.Join(p.newAncientPath, "PRUNEFLOCKBACK")) + if err != nil { + log.Error("Failed to check flock of ancientDB_Back %v", err) + return err + } + + // Indicating both old and new ancientDB existed concurrently. + // Delete directly for the new ancientdb to prune from start, e.g.: path ../chaindb/ancient_backup + if err := os.RemoveAll(p.newAncientPath); err != nil { + log.Error("Failed to remove old ancient directory %v", err) + return err + } + if flockOfAncientBack { + // Indicating the oldOffset/newOffset have already been updated. + if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, true); err != nil { + log.Error("Failed to prune") + return err + } + } else { + // Indicating the flock did not exist and the new offset did not be updated, so just handle this case as usual. + if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, false); err != nil { + log.Error("Failed to prune") + return err + } + } + + if err := p.AncientDbReplacer(); err != nil { + log.Error("Failed to replace ancientDB") + return err + } + } else { + log.Info("New ancientDB_backup did not exist in interruption scenario") + // Indicating new ancientDB even did not be created, just prune starting at backup from startBlockNumber as usual, + // in this case, the new offset have not been written into kvDB. + if err := p.BlockPruneBackUp(name, cache, handles, namespace, readonly, false); err != nil { + log.Error("Failed to prune") + return err + } + if err := p.AncientDbReplacer(); err != nil { + log.Error("Failed to replace ancientDB") + return err + } + } + + return nil +} + +func CheckFileExist(path string) (bool, error) { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + // Indicating the file didn't exist. + return false, nil + } + return true, err + } + return true, nil +} + +func (p *BlockPruner) AncientDbReplacer() error { + // Delete directly for the old ancientdb, e.g.: path ../chaindb/ancient + if err := os.RemoveAll(p.oldAncientPath); err != nil { + log.Error("Failed to remove old ancient directory %v", err) + return err + } + + // Rename the new ancientdb path same to the old + if err := os.Rename(p.newAncientPath, p.oldAncientPath); err != nil { + log.Error("Failed to rename new ancient directory") + return err + } + return nil +} + // RecoverPruning will resume the pruning procedure during the system restart. // This function is used in this case: user tries to prune state data, but the // system was interrupted midway because of crash or manual-kill. In this case diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 135defc0b9..462cd8b818 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -560,7 +560,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * } else { d.ancientLimit = 0 } - frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. + frozen, _ := d.stateDB.ItemAmountInAncient() // Ignore the error here since light client can also hit here. // If a part of blockchain data has already been written into active store, // disable the ancient style insertion explicitly. diff --git a/ethdb/database.go b/ethdb/database.go index 88c8de6a16..017d4288aa 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -93,6 +93,12 @@ type AncientReader interface { // AncientSize returns the ancient size of the specified category. AncientSize(kind string) (uint64, error) + + // ItemAmountInAncient returns the actual length of current ancientDB. + ItemAmountInAncient() (uint64, error) + + // AncientOffSet returns the offset of current ancientDB. + AncientOffSet() uint64 } // AncientBatchReader is the interface for 'batched' or 'atomic' reading. @@ -164,6 +170,7 @@ type AncientStore interface { // Database contains all the methods required by the high level database to not // only access the key-value data store but also the chain freezer. +// //go:generate mockgen -destination=../eth/filters/IDatabase.go -package=filters . Database type Database interface { Reader diff --git a/node/node.go b/node/node.go index 5cf233d17a..f36afe54b8 100644 --- a/node/node.go +++ b/node/node.go @@ -726,7 +726,7 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r // also attaching a chain freezer to it that moves ancient chain data from the // database to immutable append-only files. If the node is an ephemeral one, a // memory database is returned. -func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, namespace string, readonly bool) (ethdb.Database, error) { +func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, namespace string, readonly, disableFreeze, isLastOffset bool) (ethdb.Database, error) { n.lock.Lock() defer n.lock.Unlock() if n.state == closedState { @@ -745,7 +745,7 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, case !filepath.IsAbs(freezer): freezer = n.ResolvePath(freezer) } - db, err = rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, freezer, namespace, readonly) + db, err = rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, freezer, namespace, readonly, disableFreeze, isLastOffset) } if err == nil {