From d95c05b98a37b2bff153698b1522a23c3a0b2612 Mon Sep 17 00:00:00 2001 From: Manav Darji Date: Thu, 9 May 2024 11:19:34 +0530 Subject: [PATCH] Enable ancient block pruning (#1216) * core/state: typo Signed-off-by: Delweng * core/rawdb: backport from https://github.com/bnb-chain/bsc/pull/543 Signed-off-by: Delweng * eth,ethdb,node,core/state: backport from https://github.com/bnb-chain/bsc/pull/543 Signed-off-by: Delweng * eth,core: backport from https://github.com/bnb-chain/bsc/pull/543 Signed-off-by: Delweng * cmd: open db with freeze disabled Signed-off-by: Delweng * cli: snapshot prune-block Signed-off-by: Delweng * fix typo Signed-off-by: Delweng * cli/snapshot: fix the issue of dup open db error Signed-off-by: Delweng * cli/snapshot: resolve datadir and ancient before backup Signed-off-by: Delweng * core: more prune-block log Signed-off-by: Delweng * core: truncatetail missing f.offset Signed-off-by: Delweng * core/rawdb: indextx adjust offset of pruned block Signed-off-by: Delweng * core/rawdb: freezer batch should implement the offset commit, ref https://github.com/bnb-chain/bsc/pull/1005 Signed-off-by: Delweng * core: check of ancientdb, backport https://github.com/bnb-chain/bsc/pull/817 Signed-off-by: Delweng * core/state: read raw borReceipt to backup Signed-off-by: Delweng * core/rawdb: bor receipt maybe in []Receipt or Receipt RLP format Signed-off-by: Delweng * core/state: typo and error msg Signed-off-by: Delweng * core/rawdb: offSet -> offset Signed-off-by: Delweng * cli/snapshot: comment Signed-off-by: Delweng * cli/snapshot: add prune-block doc Signed-off-by: Delweng * docs: add prune-block document Signed-off-by: Delweng * core/rawdb: print wrong bor-receipt length Signed-off-by: Delweng * internal/cli: add snapshot prune block tests (referenced from bsc's PR) * improve errors * cmd, core, eth, internal: fix lint * internal/cli: refactor snapshot prune block test * fix linters in tests * internal/cli: add inspect-ancient-db command, update docs * pruner: use a generic function for simplification * internal/cli: fixes for inspect-db command * internal/cli: improve pruning tests * core/rawdb: update end block calculation logic in inspect command * core/rawdb: improve checks db initialisation * core/rawdb: remove offset check * update mocks for span, ethdb and add command in makefile * docs/cli: update docs with inspect command * go mod tidy * refactor and resolve conflicts * resolve more conflicts * refactor * explicitly read node for hash scheme * add check for hash scheme, fix tests * fix typo * update docs and add warning * raise error if pbss is enabled * revert read raw bor receipt change * consensus/bor: handle nil header case in get root hash * address comments * core/rawdb: check chain continuity by matching parent hash * core/rawdb: account for pruned ancient blocks * go mod tidy * fix tests * fix tests --------- Signed-off-by: Delweng Co-authored-by: Delweng --- Makefile | 1 + cmd/geth/chaincmd.go | 7 +- cmd/geth/dbcmd.go | 25 +- cmd/geth/snapshot.go | 15 +- cmd/geth/verkle.go | 4 +- cmd/utils/flags.go | 10 +- consensus/bor/api.go | 4 + core/blockchain.go | 7 +- core/blockchain_repair_test.go | 8 + core/blockchain_sethead_test.go | 2 + core/blockchain_snapshot_test.go | 4 + core/blockchain_test.go | 26 +- core/headerchain.go | 3 + core/rawdb/accessors_chain.go | 17 + core/rawdb/accessors_chain_test.go | 6 +- core/rawdb/bor_receipt.go | 2 +- core/rawdb/chain_freezer.go | 4 +- core/rawdb/chain_iterator.go | 19 +- core/rawdb/database.go | 222 ++++++++++-- core/rawdb/freezer.go | 36 +- core/rawdb/freezer_batch.go | 14 +- core/rawdb/freezer_resettable.go | 12 +- core/rawdb/freezer_table.go | 2 +- core/rawdb/freezer_table_test.go | 26 +- core/rawdb/freezer_test.go | 18 +- core/rawdb/schema.go | 6 + core/rawdb/table.go | 10 + core/state/pruner/pruner.go | 237 +++++++++++++ docs/cli/README.md | 4 + docs/cli/inspect-ancient-db.md | 19 + docs/cli/server.md | 6 +- docs/cli/snapshot.md | 6 +- docs/cli/snapshot_inspect-ancient-db.md | 19 + docs/cli/snapshot_prune-block.md | 32 ++ docs/cli/snapshot_prune-state.md | 5 +- eth/backend.go | 2 +- eth/downloader/downloader.go | 2 +- eth/downloader/downloader_test.go | 2 +- eth/filters/IDatabase.go | 29 ++ eth/state_accessor.go | 4 + ethdb/database.go | 6 + ethdb/remotedb/remotedb.go | 8 + go.mod | 1 + go.sum | 1 + internal/cli/chain.go | 4 +- internal/cli/command.go | 10 + internal/cli/snapshot.go | 446 +++++++++++++++++++++++- internal/cli/snapshot_test.go | 246 +++++++++++++ node/node.go | 4 +- trie/triedb/pathdb/database_test.go | 2 +- 50 files changed, 1461 insertions(+), 144 deletions(-) create mode 100644 docs/cli/inspect-ancient-db.md create mode 100644 docs/cli/snapshot_inspect-ancient-db.md create mode 100644 docs/cli/snapshot_prune-block.md create mode 100644 internal/cli/snapshot_test.go diff --git a/Makefile b/Makefile index f074cddc32..e23161adf8 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,7 @@ protoc: generate-mocks: go generate mockgen -destination=./tests/bor/mocks/IHeimdallClient.go -package=mocks ./consensus/bor IHeimdallClient + go generate mockgen -destination=./eth/filters/IDatabase.go -package=filters ./ethdb Database go generate mockgen -destination=./eth/filters/IBackend.go -package=filters ./eth/filters Backend go generate mockgen -destination=../eth/filters/IDatabase.go -package=filters ./ethdb Database diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index a2d5f93281..307fe59f7f 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -197,7 +197,7 @@ func initGenesis(ctx *cli.Context) error { overrides.OverrideVerkle = new(big.Int).SetInt64(v) } for _, name := range []string{"chaindata", "lightchaindata"} { - chaindb, err := stack.OpenDatabaseWithFreezer(name, 0, 0, ctx.String(utils.AncientFlag.Name), "", false) + chaindb, err := stack.OpenDatabaseWithFreezer(name, 0, 0, ctx.String(utils.AncientFlag.Name), "", false, false, false) if err != nil { utils.Fatalf("Failed to open database: %v", err) } @@ -413,7 +413,7 @@ func importPreimages(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, false) + db := utils.MakeChainDatabase(ctx, stack, false, false) defer db.Close() start := time.Now() @@ -427,9 +427,8 @@ func importPreimages(ctx *cli.Context) error { } func parseDumpConfig(ctx *cli.Context, stack *node.Node) (*state.DumpConfig, ethdb.Database, common.Hash, error) { - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) defer db.Close() - var header *types.Header if ctx.NArg() > 1 { diff --git a/cmd/geth/dbcmd.go b/cmd/geth/dbcmd.go index b1ecc8d08e..2485f87b63 100644 --- a/cmd/geth/dbcmd.go +++ b/cmd/geth/dbcmd.go @@ -290,7 +290,7 @@ func inspect(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) defer db.Close() return rawdb.InspectDatabase(db, prefix, start) @@ -317,7 +317,7 @@ func checkStateContent(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) defer db.Close() var ( @@ -381,7 +381,7 @@ func dbStats(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) defer db.Close() showLeveldbStats(db) @@ -393,7 +393,7 @@ func dbCompact(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, false) + db := utils.MakeChainDatabase(ctx, stack, false, false) defer db.Close() log.Info("Stats before compaction") @@ -421,7 +421,7 @@ func dbGet(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) defer db.Close() key, err := common.ParseHexOrString(ctx.Args().Get(0)) @@ -450,7 +450,7 @@ func dbDelete(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, false) + db := utils.MakeChainDatabase(ctx, stack, false, false) defer db.Close() key, err := common.ParseHexOrString(ctx.Args().Get(0)) @@ -481,7 +481,7 @@ func dbPut(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, false) + db := utils.MakeChainDatabase(ctx, stack, false, false) defer db.Close() var ( @@ -520,7 +520,7 @@ func dbDumpTrie(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) defer db.Close() triedb := utils.MakeTrieDatabase(ctx, db, false, true, false) @@ -658,7 +658,7 @@ func importLDBdata(ctx *cli.Context) error { close(stop) }() - db := utils.MakeChainDatabase(ctx, stack, false) + db := utils.MakeChainDatabase(ctx, stack, false, false) defer db.Close() return utils.ImportLDBData(db, fName, int64(start), stop) } @@ -767,7 +767,7 @@ func exportChaindata(ctx *cli.Context) error { close(stop) }() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) defer db.Close() return utils.ExportChaindata(ctx.Args().Get(1), kind, exporter(db), stop) } @@ -775,9 +775,7 @@ func exportChaindata(ctx *cli.Context) error { func showMetaData(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) - defer db.Close() - + db := utils.MakeChainDatabase(ctx, stack, true, false) ancients, err := db.Ancients() if err != nil { fmt.Fprintf(os.Stderr, "Error accessing ancients: %v", err) @@ -803,6 +801,5 @@ func showMetaData(ctx *cli.Context) error { table.SetHeader([]string{"Field", "Value"}) table.AppendBulk(data) table.Render() - return nil } diff --git a/cmd/geth/snapshot.go b/cmd/geth/snapshot.go index 313676a85c..05345120da 100644 --- a/cmd/geth/snapshot.go +++ b/cmd/geth/snapshot.go @@ -172,7 +172,7 @@ func pruneState(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, false) + chaindb := utils.MakeChainDatabase(ctx, stack, false, false) defer chaindb.Close() if rawdb.ReadStateScheme(chaindb) != rawdb.HashScheme { @@ -215,9 +215,8 @@ func verifyState(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, true) + chaindb := utils.MakeChainDatabase(ctx, stack, true, false) defer chaindb.Close() - headBlock := rawdb.ReadHeadBlock(chaindb) if headBlock == nil { log.Error("Failed to load head block") @@ -268,7 +267,7 @@ func checkDanglingStorage(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) + db := utils.MakeChainDatabase(ctx, stack, true, false) defer db.Close() return snapshot.CheckDanglingStorage(db) } @@ -280,7 +279,7 @@ func traverseState(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, true) + chaindb := utils.MakeChainDatabase(ctx, stack, true, false) defer chaindb.Close() triedb := utils.MakeTrieDatabase(ctx, chaindb, false, true, false) @@ -404,7 +403,7 @@ func traverseRawState(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, true) + chaindb := utils.MakeChainDatabase(ctx, stack, true, false) defer chaindb.Close() triedb := utils.MakeTrieDatabase(ctx, chaindb, false, true, false) @@ -684,7 +683,7 @@ func snapshotExportPreimages(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, true) + chaindb := utils.MakeChainDatabase(ctx, stack, true, false) defer chaindb.Close() triedb := utils.MakeTrieDatabase(ctx, chaindb, false, true, false) @@ -743,7 +742,7 @@ func checkAccount(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, true) + chaindb := utils.MakeChainDatabase(ctx, stack, true, false) defer chaindb.Close() start := time.Now() diff --git a/cmd/geth/verkle.go b/cmd/geth/verkle.go index f617cf4a57..8eb20a53c4 100644 --- a/cmd/geth/verkle.go +++ b/cmd/geth/verkle.go @@ -117,7 +117,7 @@ func verifyVerkle(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, true) + chaindb := utils.MakeChainDatabase(ctx, stack, true, false) defer chaindb.Close() headBlock := rawdb.ReadHeadBlock(chaindb) @@ -172,7 +172,7 @@ func expandVerkle(ctx *cli.Context) error { stack, _ := makeConfigNode(ctx) defer stack.Close() - chaindb := utils.MakeChainDatabase(ctx, stack, true) + chaindb := utils.MakeChainDatabase(ctx, stack, true, false) defer chaindb.Close() var ( rootC common.Hash diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 78cc23d7eb..f65940f297 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -2112,8 +2112,8 @@ func SplitTagsFlag(tagsFlag string) map[string]string { return tagsMap } -// MakeChainDatabase opens a database using the flags passed to the client and will hard crash if it fails. -func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly bool) ethdb.Database { +// MakeChainDatabase opens a LevelDB using the flags passed to the client and will hard crash if it fails. +func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly, disableFreeze bool) ethdb.Database { var ( cache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheDatabaseFlag.Name) / 100 handles = MakeDatabaseHandles(ctx.Int(FDLimitFlag.Name)) @@ -2134,7 +2134,7 @@ func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly bool) ethdb. case ctx.String(SyncModeFlag.Name) == "light": chainDb, err = stack.OpenDatabase("lightchaindata", cache, handles, "", readonly) default: - chainDb, err = stack.OpenDatabaseWithFreezer("chaindata", cache, handles, ctx.String(AncientFlag.Name), "", readonly) + chainDb, err = stack.OpenDatabaseWithFreezer("chaindata", cache, handles, ctx.String(AncientFlag.Name), "", readonly, disableFreeze, false) } if err != nil { @@ -2153,7 +2153,7 @@ func tryMakeReadOnlyDatabase(ctx *cli.Context, stack *node.Node) ethdb.Database if rawdb.PreexistingDatabase(stack.ResolvePath("chaindata")) == "" { readonly = false } - return MakeChainDatabase(ctx, stack, readonly) + return MakeChainDatabase(ctx, stack, readonly, false) } func IsNetworkPreset(ctx *cli.Context) bool { @@ -2218,7 +2218,7 @@ func MakeGenesis(ctx *cli.Context) *core.Genesis { func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockChain, ethdb.Database) { var ( gspec = MakeGenesis(ctx) - chainDb = MakeChainDatabase(ctx, stack, readonly) + chainDb = MakeChainDatabase(ctx, stack, readonly, false) ) config, err := core.LoadChainConfig(chainDb, gspec) diff --git a/consensus/bor/api.go b/consensus/bor/api.go index 6d72e309e3..0de1ed9cab 100644 --- a/consensus/bor/api.go +++ b/consensus/bor/api.go @@ -317,6 +317,10 @@ func (api *API) GetRootHash(start uint64, end uint64) (string, error) { for i := 0; i < len(blockHeaders); i++ { blockHeader := blockHeaders[i] + // Handle no header case, which is possible if ancient pruning was done + if blockHeader == nil { + return "", errUnknownBlock + } header := crypto.Keccak256(appendBytes32( blockHeader.Number.Bytes(), new(big.Int).SetUint64(blockHeader.Time).Bytes(), diff --git a/core/blockchain.go b/core/blockchain.go index cc6a980be9..b1fec32cf3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -411,7 +411,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis } } // Ensure that a previous crash in SetHead doesn't leave extra ancients - if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { + //nolint:nestif + if frozen, err := bc.db.ItemAmountInAncient(); err == nil && frozen > 0 { + frozen, err = bc.db.Ancients() + if err != nil { + return nil, err + } var ( needRewind bool low uint64 diff --git a/core/blockchain_repair_test.go b/core/blockchain_repair_test.go index 19a0093a88..9a4916d343 100644 --- a/core/blockchain_repair_test.go +++ b/core/blockchain_repair_test.go @@ -1768,6 +1768,8 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s Directory: datadir, AncientsDirectory: ancient, Ephemeral: true, + IsLastOffset: false, + DisableFreeze: false, }) if err != nil { t.Fatalf("Failed to create persistent database: %v", err) @@ -1849,6 +1851,8 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s Directory: datadir, AncientsDirectory: ancient, Ephemeral: true, + IsLastOffset: false, + DisableFreeze: false, }) if err != nil { t.Fatalf("Failed to reopen persistent database: %v", err) @@ -1913,6 +1917,8 @@ func testIssue23496(t *testing.T, scheme string) { db, err := rawdb.Open(rawdb.OpenOptions{ Directory: datadir, AncientsDirectory: ancient, + IsLastOffset: false, + DisableFreeze: false, }) if err != nil { t.Fatalf("Failed to create persistent database: %v", err) @@ -1971,6 +1977,8 @@ func testIssue23496(t *testing.T, scheme string) { Directory: datadir, AncientsDirectory: ancient, Ephemeral: true, + IsLastOffset: false, + DisableFreeze: false, }) if err != nil { t.Fatalf("Failed to reopen persistent database: %v", err) diff --git a/core/blockchain_sethead_test.go b/core/blockchain_sethead_test.go index ec129381e7..d1fefd243f 100644 --- a/core/blockchain_sethead_test.go +++ b/core/blockchain_sethead_test.go @@ -1972,6 +1972,8 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme Directory: datadir, AncientsDirectory: ancient, Ephemeral: true, + IsLastOffset: false, + DisableFreeze: false, }) if err != nil { t.Fatalf("Failed to create persistent database: %v", err) diff --git a/core/blockchain_snapshot_test.go b/core/blockchain_snapshot_test.go index 2b3ecb7cf9..fb4384685e 100644 --- a/core/blockchain_snapshot_test.go +++ b/core/blockchain_snapshot_test.go @@ -70,6 +70,8 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo Directory: datadir, AncientsDirectory: ancient, Ephemeral: true, + IsLastOffset: false, + DisableFreeze: false, }) if err != nil { t.Fatalf("Failed to create persistent database: %v", err) @@ -262,6 +264,8 @@ func (snaptest *crashSnapshotTest) test(t *testing.T) { Directory: snaptest.datadir, AncientsDirectory: snaptest.ancient, Ephemeral: true, + IsLastOffset: false, + DisableFreeze: false, }) if err != nil { t.Fatalf("Failed to reopen persistent database: %v", err) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 6600bbfa2a..519c31fd28 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -946,7 +946,7 @@ func testFastVsFullChains(t *testing.T, scheme string) { t.Fatalf("failed to insert receipt %d: %v", n, err) } // Freezer style fast import the chain. - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -1051,7 +1051,7 @@ func testLightVsFastVsFullChainHeads(t *testing.T, scheme string) { // makeDb creates a db instance for testing. makeDb := func() ethdb.Database { - db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -2017,7 +2017,7 @@ func testLargeReorgTrieGC(t *testing.T, scheme string) { competitor, _ := GenerateChain(genesis.Config, shared[len(shared)-1], engine, genDb, 2*int(defaultCacheConfig.TriesInMemory)+1, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{3}) }) // Import the shared chain and the original canonical one - db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false) defer db.Close() chain, err := NewBlockChain(db, DefaultCacheConfigWithScheme(scheme), genesis, nil, engine, vm.Config{}, nil, nil, nil) @@ -2090,7 +2090,7 @@ func testBlockchainRecovery(t *testing.T, scheme string) { _, blocks, receipts := GenerateChainWithGenesis(gspec, ethash.NewFaker(), int(height), nil) // Import the chain as a ancient-first node and ensure all pointers are updated - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -2171,7 +2171,7 @@ func testInsertReceiptChainRollback(t *testing.T, scheme string) { } // Set up a BlockChain that uses the ancient store. - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -2250,7 +2250,7 @@ func testLowDiffLongChain(t *testing.T, scheme string) { }) // Import the canonical chain - diskdb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + diskdb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false) defer diskdb.Close() chain, err := NewBlockChain(diskdb, DefaultCacheConfigWithScheme(scheme), genesis, nil, engine, vm.Config{}, nil, nil, nil) @@ -2483,7 +2483,7 @@ func testInsertKnownChainData(t *testing.T, typ string, scheme string) { b.OffsetTime(-9) // A higher difficulty }) // Import the shared chain and the original canonical one - chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -2668,7 +2668,7 @@ func testInsertKnownChainDataWithMerging(t *testing.T, typ string, mergeHeight i } }) // Import the shared chain and the original canonical one - chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + chaindb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -3026,7 +3026,7 @@ func TestTransactionIndices(t *testing.T) { limit := []uint64{0, 32, 64, 128} for _, l := range limit { frdir := t.TempDir() - ancientDb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + ancientDb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false) _, _ = rawdb.WriteAncientBlocks(ancientDb, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), append([]types.Receipts{{}}, borReceipts...), big.NewInt(0)) l := l @@ -3050,7 +3050,7 @@ func TestTransactionIndices(t *testing.T) { } // Reconstruct a block chain which only reserves HEAD-64 tx indices - ancientDb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + ancientDb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false) defer ancientDb.Close() _, _ = rawdb.WriteAncientBlocks(ancientDb, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), append([]types.Receipts{{}}, borReceipts...), big.NewInt(0)) @@ -3140,7 +3140,7 @@ func testSkipStaleTxIndicesInSnapSync(t *testing.T, scheme string) { } } - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false) if err != nil { t.Fatalf("failed to create temp freezer db: %v", err) } @@ -4260,7 +4260,7 @@ func testSetCanonical(t *testing.T, scheme string) { gen.AddTx(tx) }) - diskdb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + diskdb, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false) defer diskdb.Close() chain, err := NewBlockChain(diskdb, DefaultCacheConfigWithScheme(scheme), gspec, nil, engine, vm.Config{}, nil, nil, nil) @@ -4639,7 +4639,7 @@ func TestTxIndexer(t *testing.T) { for _, c := range cases { frdir := t.TempDir() - db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false) + db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false, false, false) _, _ = rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), append([]types.Receipts{{}}, receipts...), append([]types.Receipts{{}}, borReceipts...), big.NewInt(0)) // Index the initial blocks from ancient store diff --git a/core/headerchain.go b/core/headerchain.go index b8ddc4ad9b..478a55bc08 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -174,6 +174,9 @@ func (hc *HeaderChain) Reorg(headers []*types.Header) error { ) for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash { + if frozen, _ := hc.chainDb.Ancients(); frozen == headNumber { + break + } rawdb.WriteCanonicalHash(batch, headHash, headNumber) if headNumber == 0 { diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index aeed8da55c..bfea75610b 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -165,6 +165,23 @@ func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 { return &number } +// ReadHeaderFromKvStore retrieves the block header corresponding to the hash only +// from the underlying key-value store and doesn't use ancient freezer for backup. +func ReadHeaderFromKvStore(db ethdb.KeyValueReader, hash common.Hash, number uint64) *types.Header { + data, _ := db.Get(headerKey(number, hash)) + if len(data) == 0 { + return nil + } + + header := new(types.Header) + if err := rlp.DecodeBytes(data, header); err != nil { + log.Error("Invalid block header RLP", "hash", hash, "err", err) + return nil + } + + return header +} + // WriteHeaderNumber stores the hash->number mapping. func WriteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { key := headerNumberKey(hash) diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index 984224b569..31f2169baf 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -470,7 +470,7 @@ func checkReceiptsRLP(have, want types.Receipts) error { func TestAncientStorage(t *testing.T) { // Freezer style fast import the chain. frdir := t.TempDir() - db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false) + db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false, false) if err != nil { t.Fatalf("failed to create database with ancient backend") } @@ -630,7 +630,7 @@ func BenchmarkWriteAncientBlocks(b *testing.B) { // Open freezer database. frdir := b.TempDir() - db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false) + db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false, false) if err != nil { b.Fatalf("failed to create database with ancient backend") } @@ -961,7 +961,7 @@ func TestHeadersRLPStorage(t *testing.T) { // Have N headers in the freezer frdir := t.TempDir() - db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false) + db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false, false) if err != nil { t.Fatalf("failed to create database with ancient backend") } diff --git a/core/rawdb/bor_receipt.go b/core/rawdb/bor_receipt.go index 68a436a081..a27b8e3b9a 100644 --- a/core/rawdb/bor_receipt.go +++ b/core/rawdb/bor_receipt.go @@ -68,7 +68,7 @@ func ReadRawBorReceipt(db ethdb.Reader, hash common.Hash, number uint64) *types. // Convert the receipts from their storage form to their internal representation var storageReceipt types.ReceiptForStorage if err := rlp.DecodeBytes(data, &storageReceipt); err != nil { - log.Error("Invalid receipt array RLP", "hash", hash, "err", err) + log.Error("Invalid bor receipt RLP", "hash", hash, "err", err) return nil } diff --git a/core/rawdb/chain_freezer.go b/core/rawdb/chain_freezer.go index ebde2687a8..4114c8a9dd 100644 --- a/core/rawdb/chain_freezer.go +++ b/core/rawdb/chain_freezer.go @@ -52,8 +52,8 @@ type chainFreezer struct { } // newChainFreezer initializes the freezer for ancient chain data. -func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFreezer, error) { - freezer, err := NewChainFreezer(datadir, namespace, readonly) +func newChainFreezer(datadir string, namespace string, readonly bool, offset uint64) (*chainFreezer, error) { + freezer, err := NewChainFreezer(datadir, namespace, readonly, offset) if err != nil { return nil, err } diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index c59b2dae77..ecb801a3b8 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -34,7 +34,7 @@ import ( // injects into the database the block hash->number mappings. func InitDatabaseFromFreezer(db ethdb.Database) { // If we can't access the freezer or it's empty, abort - frozen, err := db.Ancients() + frozen, err := db.ItemAmountInAncient() if err != nil || frozen == 0 { return } @@ -44,9 +44,10 @@ func InitDatabaseFromFreezer(db ethdb.Database) { start = time.Now() logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log hash common.Hash + offset = db.AncientOffSet() ) - for i := uint64(0); i < frozen; { + for i := uint64(0) + offset; i < frozen+offset; { // We read 100K hashes at a time, for a total of 3.2M count := uint64(100_000) if i+count > frozen { @@ -107,7 +108,11 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool rlp rlp.RawValue } - if to == from { + if offset := db.AncientOffSet(); offset > from { + from = offset + } + + if to <= from { return nil } @@ -201,6 +206,10 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool // There is a passed channel, the whole procedure will be interrupted if any // signal received. func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { + // adjust range boundary for pruned block + if offset := db.AncientOffSet(); offset > from { + from = offset + } // short circuit for invalid range if from >= to { return @@ -300,6 +309,10 @@ func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, inte // There is a passed channel, the whole procedure will be interrupted if any // signal received. func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { + // adjust range boundary for pruned block + if offset := db.AncientOffSet(); offset > from { + from = offset + } // short circuit for invalid range if from >= to { return diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 3a6d292d82..3762a4366f 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -20,6 +20,7 @@ import ( "bytes" "errors" "fmt" + "math/big" "os" "path" "path/filepath" @@ -137,6 +138,16 @@ func (db *nofreezedb) TruncateTail(items uint64) (uint64, error) { return 0, errNotSupported } +// ItemAmountInAncient returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) ItemAmountInAncient() (uint64, error) { + return 0, errNotSupported +} + +// AncientOffSet returns 0 as we don't have a backing chain freezer. +func (db *nofreezedb) AncientOffSet() uint64 { + return 0 +} + // Sync returns an error as we don't have a backing chain freezer. func (db *nofreezedb) Sync() error { return errNotSupported @@ -175,6 +186,51 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { return &nofreezedb{KeyValueStore: db} } +// ReadOffsetOfCurrentAncientFreezer reads the offset of current ancient freezer +func ReadOffsetOfCurrentAncientFreezer(db ethdb.KeyValueReader) uint64 { + offset, _ := db.Get(offsetOfCurrentAncientFreezer) + if offset == nil { + return 0 + } + + return new(big.Int).SetBytes(offset).Uint64() +} + +// ReadOffsetOfLastAncientFreezer reads the offset of last pruned ancient freezer +func ReadOffsetOfLastAncientFreezer(db ethdb.KeyValueReader) uint64 { + offset, _ := db.Get(offsetOfLastAncientFreezer) + if offset == nil { + return 0 + } + + return new(big.Int).SetBytes(offset).Uint64() +} + +// WriteOffsetOfCurrentAncientFreezer writes current offset of ancient freezer into ethdb +func WriteOffsetOfCurrentAncientFreezer(db ethdb.KeyValueWriter, offset uint64) { + if err := db.Put(offsetOfCurrentAncientFreezer, new(big.Int).SetUint64(offset).Bytes()); err != nil { + log.Crit("Failed to store offSetOfAncientFreezer", "err", err) + } +} + +// WriteOffsetOfLastAncientFreezer writes the last offset of ancient freezer into ethdb +func WriteOffsetOfLastAncientFreezer(db ethdb.KeyValueWriter, offset uint64) { + if err := db.Put(offsetOfLastAncientFreezer, new(big.Int).SetUint64(offset).Bytes()); err != nil { + log.Crit("Failed to store offSetOfAncientFreezer", "err", err) + } +} + +// NewDatabaseWithOnlyFreezer create a freezer db without state +func NewDatabaseWithOnlyFreezer(db ethdb.KeyValueStore, frz, namespace string, readonly bool, newOffSet uint64) (*Freezer, error) { + // Create the idle freezer instance, this operation should be atomic to avoid mismatch between offset and acientDB. + frdb, err := NewChainFreezer(frz, namespace, readonly, newOffSet) + if err != nil { + return nil, err + } + + return frdb, nil +} + // resolveChainFreezerDir is a helper function which resolves the absolute path // of chain freezer by considering backward compatibility. func resolveChainFreezerDir(ancient string) string { @@ -199,18 +255,31 @@ func resolveChainFreezerDir(ancient string) string { return freezer } +// resolveOffset is a helper function which resolves the value of offset to use +// while opening a chain freezer. +func resolveOffset(db ethdb.KeyValueStore, isLastOffset bool) uint64 { + // The offset of ancientDB should be handled differently in different scenarios. + if isLastOffset { + return ReadOffsetOfLastAncientFreezer(db) + } else { + return ReadOffsetOfCurrentAncientFreezer(db) + } +} + // NewDatabaseWithFreezer creates a high level database on top of a given key- // value data store with a freezer moving immutable chain segments into cold -// storage. The passed ancient indicates the path of root ancient directory -// where the chain freezer can be opened. -// nolint:gocognit -func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace string, readonly bool) (ethdb.Database, error) { +// storage. +// +//nolint:gocognit +func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace string, readonly, disableFreeze, isLastOffset bool) (ethdb.Database, error) { + offset := resolveOffset(db, isLastOffset) + // Create the idle freezer instance - frdb, err := newChainFreezer(resolveChainFreezerDir(ancient), namespace, readonly) + frdb, err := newChainFreezer(resolveChainFreezerDir(ancient), namespace, readonly, offset) if err != nil { - printChainMetadata(db) return nil, err } + // Since the freezer can be stored separately from the user's key-value database, // there's a fairly high probability that the user requests invalid combinations // of the freezer and database. Ensure that we don't shoot ourselves in the foot @@ -233,65 +302,103 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st // If the genesis hash is empty, we have a new key-value store, so nothing to // validate in this method. If, however, the genesis hash is not nil, compare // it to the freezer content. + //nolint:nestif if kvgenesis, _ := db.Get(headerHashKey(0)); len(kvgenesis) > 0 { - if frozen, _ := frdb.Ancients(); frozen > 0 { + if frozen, _ := frdb.ItemAmountInAncient(); frozen > 0 { // If the freezer already contains something, ensure that the genesis blocks // match, otherwise we might mix up freezers across chains and destroy both // the freezer and the key-value store. - frgenesis, err := frdb.Ancient(ChainFreezerHashTable, 0) - if err != nil { - printChainMetadata(db) - return nil, fmt.Errorf("failed to retrieve genesis from ancient %v", err) - } else if !bytes.Equal(kvgenesis, frgenesis) { - printChainMetadata(db) - return nil, fmt.Errorf("genesis mismatch: %#x (leveldb) != %#x (ancients)", kvgenesis, frgenesis) + // Only validate genesis if we have `offset` set to 0, which means ancient db pruning + // hasn't happened yet. If the pruning would've happened, genesis would have been wiped + // from ancient db. + if offset == 0 { + frgenesis, err := frdb.Ancient(ChainFreezerHashTable, 0) + if err != nil { + printChainMetadata(db) + return nil, fmt.Errorf("failed to retrieve genesis from ancient %v", err) + } else if !bytes.Equal(kvgenesis, frgenesis) { + printChainMetadata(db) + return nil, fmt.Errorf("genesis mismatch: %#x (leveldb) != %#x (ancients)", kvgenesis, frgenesis) + } } + // Key-value store and freezer belong to the same network. Ensure that they // are contiguous, otherwise we might end up with a non-functional freezer. - if kvhash, _ := db.Get(headerHashKey(frozen)); len(kvhash) == 0 { + // + // If ancient db pruning has happened, the number of items in ancient db should + // be less and hence we need to calculate the first block of leveldb by adding + // the offset to it i.e. start block of leveldb = frozen + offset. + startBlock := frozen + offset + var kvhash []byte + if kvhash, _ = db.Get(headerHashKey(startBlock)); len(kvhash) == 0 { // Subsequent header after the freezer limit is missing from the database. // Reject startup if the database has a more recent head. - if head := *ReadHeaderNumber(db, ReadHeadHeaderHash(db)); head > frozen-1 { + if head := *ReadHeaderNumber(db, ReadHeadHeaderHash(db)); head > startBlock-1 { // Find the smallest block stored in the key-value store // in range of [frozen, head] var number uint64 - for number = frozen; number <= head; number++ { + for number = startBlock; number <= head; number++ { if present, _ := db.Has(headerHashKey(number)); present { break } } // We are about to exit on error. Print database metadata before exiting printChainMetadata(db) - - return nil, fmt.Errorf("gap in the chain between ancients [0 - #%d] and leveldb [#%d - #%d] ", - frozen-1, number, head) + return nil, fmt.Errorf("gap in the chain between ancients [0 - #%d] and leveldb [#%d - #%d]", + startBlock-1, number, head) } // Database contains only older data than the freezer, this happens if the // state was wiped and reinited from an existing freezer. } // Otherwise, key-value store continues where the freezer left off, all is fine. // We might have duplicate blocks (crash after freezer write but before key-value - // store deletion, but that's fine). + // store deletion, but that's fine). Still, check if the first block of key-value + // store points to last block in freezer. + if head := ReadHeaderFromKvStore(db, common.BytesToHash(kvhash), startBlock); head != nil { + parentHash := head.ParentHash.Bytes() + ancientParentHash, _ := frdb.Ancient(ChainFreezerHashTable, startBlock-1) + if ancientParentHash == nil { + printChainMetadata(db) + return nil, fmt.Errorf("missing parent hash for block #%d in ancient", startBlock-1) + } + if !bytes.Equal(parentHash, ancientParentHash) { + printChainMetadata(db) + return nil, fmt.Errorf("broken chain due to parent hash mismatch: %#x (leveldb) != %#x (ancients) for block #%d, please set --datadir.ancient to the correct path", parentHash, ancientParentHash, startBlock-1) + } + // First block of key-value store points back to correct parent in ancient + } } else { - // If the freezer is empty, ensure nothing was moved yet from the key-value - // store, otherwise we'll end up missing data. We check block #1 to decide - // if we froze anything previously or not, but do take care of databases with - // only the genesis block. - if ReadHeadHeaderHash(db) != common.BytesToHash(kvgenesis) { - // Key-value store contains more data than the genesis block, make sure we - // didn't freeze anything yet. - if kvblob, _ := db.Get(headerHashKey(1)); len(kvblob) == 0 { + // This case means the freezer is empty. Either nothing is moved from the + // key-value store or we've pruned all data. + + // No pruning case + if offset == 0 { + // If the freezer is empty, ensure nothing was moved yet from the key-value + // store, otherwise we'll end up missing data. We check block #1 to decide + // if we froze anything previously or not, but do take care of databases with + // only the genesis block. + if ReadHeadHeaderHash(db) != common.BytesToHash(kvgenesis) { + // Key-value store contains more data than the genesis block, make sure we + // didn't freeze anything yet. + if kvblob, _ := db.Get(headerHashKey(1)); len(kvblob) == 0 { + printChainMetadata(db) + return nil, errors.New("ancient chain segments already extracted, please set --datadir.ancient to the correct path") + } + // Block #1 is still in the database, we're allowed to init a new feezer + } + // Otherwise, the head header is still the genesis, we're allowed to init a new + // freezer. + } else { + // Full pruning case. Check if the key-value store isn't missing any block. + if kvhash, _ := db.Get(headerHashKey(offset)); len(kvhash) == 0 { printChainMetadata(db) - return nil, errors.New("ancient chain segments already extracted, please set --datadir.ancient to the correct path") + return nil, fmt.Errorf("missing blocks from leveldb post ancientdb pruning, block: %d", offset) } - // Block #1 is still in the database, we're allowed to init a new freezer } - // Otherwise, the head header is still the genesis, we're allowed to init a new - // freezer. } } // Freezer is consistent with the key-value database, permit combining the two - if !frdb.readonly { + if !disableFreeze && !frdb.readonly { frdb.wg.Add(1) go func() { @@ -377,6 +484,11 @@ type OpenOptions struct { Cache int // the capacity(in megabytes) of the data caching Handles int // number of files to be open simultaneously ReadOnly bool + + // Ancient pruner related fields + DisableFreeze bool + IsLastOffset bool + // Ephemeral means that filesystem sync operations should be avoided: data integrity in the face of // a crash is not important. This option should typically be used in tests. Ephemeral bool @@ -428,7 +540,7 @@ func Open(o OpenOptions) (ethdb.Database, error) { return kvdb, nil } - frdb, err := NewDatabaseWithFreezer(kvdb, o.AncientsDirectory, o.Namespace, o.ReadOnly) + frdb, err := NewDatabaseWithFreezer(kvdb, o.AncientsDirectory, o.Namespace, o.ReadOnly, o.DisableFreeze, o.IsLastOffset) if err != nil { kvdb.Close() return nil, err @@ -467,6 +579,46 @@ func (s *stat) Count() string { return s.count.String() } +// AncientInspect inspects the underlying freezer db and prints stats relevant +// for ancient data pruning. It prints the start and end blocks of freezer db. +func AncientInspect(db ethdb.Database) error { + offset := counter(ReadOffsetOfCurrentAncientFreezer(db)) + // Get number of ancient rows inside the freezer. + ancients := counter(0) + + if count, err := db.ItemAmountInAncient(); err != nil { + log.Error("failed to get the items amount in ancientDB", "err", err) + return err + } else { + ancients = counter(count) + } + + var endNumber counter + + if offset+ancients <= 0 { + endNumber = 0 + } else { + sum := counter(0) + if ancients != 0 { + sum = ancients - 1 + } + endNumber = offset + sum + } + + stats := [][]string{ + {"Start block number of ancientDB (offset)", offset.String()}, + {"End block number of ancientDB", endNumber.String()}, + {"Remaining items in ancientDB", ancients.String()}, + } + table := tablewriter.NewWriter(os.Stdout) + table.SetHeader([]string{"Field", "Items"}) + table.SetFooter([]string{"AncientStore information", ""}) + table.AppendBulk(stats) + table.Render() + + return nil +} + // InspectDatabase traverses the entire database and checks the size // of all different categories of data. func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index b7824ddc0d..eae6705b68 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -70,6 +70,9 @@ type Freezer struct { writeLock sync.RWMutex writeBatch *freezerBatch + // Used during ancient db pruning + offset atomic.Uint64 // Starting block number in current freezer + readonly bool tables map[string]*freezerTable // Data tables for storing everything instanceLock *flock.Flock // File-system lock to prevent double opens @@ -78,8 +81,8 @@ type Freezer struct { // NewChainFreezer is a small utility method around NewFreezer that sets the // default parameters for the chain storage. -func NewChainFreezer(datadir string, namespace string, readonly bool) (*Freezer, error) { - return NewFreezer(datadir, namespace, readonly, freezerTableSize, chainFreezerNoSnappy) +func NewChainFreezer(datadir string, namespace string, readonly bool, offset uint64) (*Freezer, error) { + return NewFreezer(datadir, namespace, readonly, offset, freezerTableSize, chainFreezerNoSnappy) } // NewFreezer creates a freezer instance for maintaining immutable ordered @@ -87,7 +90,7 @@ func NewChainFreezer(datadir string, namespace string, readonly bool) (*Freezer, // // The 'tables' argument defines the data tables. If the value of a map // entry is true, snappy compression is disabled for the table. -func NewFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*Freezer, error) { +func NewFreezer(datadir string, namespace string, readonly bool, offset uint64, maxTableSize uint32, tables map[string]bool) (*Freezer, error) { // Create the initial freezer object var ( readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil) @@ -123,6 +126,7 @@ func NewFreezer(datadir string, namespace string, readonly bool, maxTableSize ui tables: make(map[string]*freezerTable), instanceLock: lock, } + freezer.offset.Store(offset) // Create the tables. for name, disableSnappy := range tables { @@ -153,10 +157,14 @@ func NewFreezer(datadir string, namespace string, readonly bool, maxTableSize ui return nil, err } + // Some blocks in ancientDB may have already been frozen and been pruned, so adding the offset to + // reprensent the absolute number of blocks already frozen. + freezer.frozen.Add(offset) + // Create the write batch. freezer.writeBatch = newFreezerBatch(freezer) - log.Info("Opened ancient database", "database", datadir, "readonly", readonly) + log.Info("Opened ancient database", "database", datadir, "readonly", readonly, "frozen", freezer.frozen.Load(), "offset", freezer.offset.Load()) return freezer, nil } @@ -186,7 +194,7 @@ func (f *Freezer) Close() error { // in the freezer. func (f *Freezer) HasAncient(kind string, number uint64) (bool, error) { if table := f.tables[kind]; table != nil { - return table.has(number), nil + return table.has(number - f.offset.Load()), nil } return false, nil } @@ -194,7 +202,7 @@ func (f *Freezer) HasAncient(kind string, number uint64) (bool, error) { // Ancient retrieves an ancient binary blob from the append-only immutable files. func (f *Freezer) Ancient(kind string, number uint64) ([]byte, error) { if table := f.tables[kind]; table != nil { - return table.Retrieve(number) + return table.Retrieve(number - f.offset.Load()) } return nil, errUnknownTable } @@ -217,6 +225,16 @@ func (f *Freezer) Ancients() (uint64, error) { return f.frozen.Load(), nil } +// ItemAmountInAncient returns the actual length of current ancientDB. +func (f *Freezer) ItemAmountInAncient() (uint64, error) { + return f.frozen.Load() - f.offset.Load(), nil +} + +// AncientOffSet returns the offset of current ancientDB. +func (f *Freezer) AncientOffSet() uint64 { + return f.offset.Load() +} + // Tail returns the number of first stored item in the freezer. func (f *Freezer) Tail() (uint64, error) { return f.tail.Load(), nil @@ -292,7 +310,7 @@ func (f *Freezer) TruncateHead(items uint64) (uint64, error) { return oitems, nil } for _, table := range f.tables { - if err := table.truncateHead(items); err != nil { + if err := table.truncateHead(items - f.offset.Load()); err != nil { return 0, err } } @@ -313,7 +331,7 @@ func (f *Freezer) TruncateTail(tail uint64) (uint64, error) { return old, nil } for _, table := range f.tables { - if err := table.truncateTail(tail); err != nil { + if err := table.truncateTail(tail - f.offset.Load()); err != nil { return 0, err } } @@ -453,7 +471,7 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error { return err } var ( - batch = newTable.newBatch() + batch = newTable.newBatch(f.offset.Load()) out []byte start = time.Now() logged = time.Now() diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go index 243d3cef85..fe7ae614af 100644 --- a/core/rawdb/freezer_batch.go +++ b/core/rawdb/freezer_batch.go @@ -18,6 +18,7 @@ package rawdb import ( "fmt" + "sync/atomic" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/rlp" @@ -36,7 +37,7 @@ type freezerBatch struct { func newFreezerBatch(f *Freezer) *freezerBatch { batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))} for kind, table := range f.tables { - batch.tables[kind] = table.newBatch() + batch.tables[kind] = table.newBatch(f.offset.Load()) } return batch @@ -94,11 +95,12 @@ type freezerTableBatch struct { indexBuffer []byte curItem uint64 // expected index of next append totalBytes int64 // counts written bytes since reset + offset uint64 } // newBatch creates a new batch for the freezer table. -func (t *freezerTable) newBatch() *freezerTableBatch { - batch := &freezerTableBatch{t: t} +func (t *freezerTable) newBatch(offset uint64) *freezerTableBatch { + batch := &freezerTableBatch{t: t, offset: offset} if !t.noCompression { batch.sb = new(snappyBuffer) } @@ -112,7 +114,8 @@ func (t *freezerTable) newBatch() *freezerTableBatch { func (batch *freezerTableBatch) reset() { batch.dataBuffer = batch.dataBuffer[:0] batch.indexBuffer = batch.indexBuffer[:0] - batch.curItem = batch.t.items.Load() + curItem := batch.t.items.Load() + batch.offset + batch.curItem = atomic.LoadUint64(&curItem) batch.totalBytes = 0 } @@ -222,7 +225,8 @@ func (batch *freezerTableBatch) commit() error { // Update headBytes of table. batch.t.headBytes += dataSize - batch.t.items.Store(batch.curItem) + items := batch.curItem - batch.offset + batch.t.items.Store(items) // Update metrics. batch.t.sizeGauge.Inc(dataSize + indexSize) diff --git a/core/rawdb/freezer_resettable.go b/core/rawdb/freezer_resettable.go index 23b36a4658..c64100beea 100644 --- a/core/rawdb/freezer_resettable.go +++ b/core/rawdb/freezer_resettable.go @@ -54,7 +54,7 @@ func NewResettableFreezer(datadir string, namespace string, readonly bool, maxTa } opener := func() (*Freezer, error) { - return NewFreezer(datadir, namespace, readonly, maxTableSize, tables) + return NewFreezer(datadir, namespace, readonly, 0, maxTableSize, tables) } freezer, err := opener() @@ -146,6 +146,16 @@ func (f *ResettableFreezer) Ancients() (uint64, error) { return f.freezer.Ancients() } +// AncientOffSet returns the offset of current ancientDB. +func (f *ResettableFreezer) AncientOffSet() uint64 { + return f.freezer.offset.Load() +} + +// ItemAmountInAncient returns the actual length of current ancientDB. +func (f *ResettableFreezer) ItemAmountInAncient() (uint64, error) { + return f.freezer.frozen.Load() - f.freezer.offset.Load(), nil +} + // Tail returns the number of first stored item in the freezer. func (f *ResettableFreezer) Tail() (uint64, error) { f.lock.RLock() diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 282ec95cd4..0e42e31017 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -1068,7 +1068,7 @@ func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { // Fill adds empty data till given number (convenience method for backward compatibility) func (t *freezerTable) Fill(number uint64) error { if t.items.Load() < number { - b := t.newBatch() + b := t.newBatch(0) log.Info("Filling all data into freezer for backward compatibility", "name", t.name, "items", &t.items, "number", number) for t.items.Load() < number { diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index c5f1e32c80..d54f29c56b 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -96,7 +96,7 @@ func TestFreezerBasicsClosing(t *testing.T) { // In-between writes, the table is closed and re-opened. for x := 0; x < 255; x++ { data := getChunk(15, x) - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(uint64(x), data)) require.NoError(t, batch.commit()) f.Close() @@ -234,7 +234,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Errorf("Expected error for missing index entry") } // We should now be able to store items again, from item = 1 - batch := f.newBatch() + batch := f.newBatch(0) for x := 1; x < 0xff; x++ { require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x))) } @@ -443,7 +443,7 @@ func TestFreezerRepairFirstFile(t *testing.T) { t.Fatal(err) } // Write 80 bytes, splitting out into two files - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(0, getChunk(40, 0xFF))) require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xEE))) require.NoError(t, batch.commit()) @@ -485,7 +485,7 @@ func TestFreezerRepairFirstFile(t *testing.T) { } // Write 40 bytes - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xDD))) require.NoError(t, batch.commit()) @@ -546,7 +546,7 @@ func TestFreezerReadAndTruncate(t *testing.T) { f.truncateHead(0) // Write the data again - batch := f.newBatch() + batch := f.newBatch(0) for x := 0; x < 30; x++ { require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x))) } @@ -569,7 +569,7 @@ func TestFreezerOffset(t *testing.T) { } // Write 6 x 20 bytes, splitting out into three files - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) @@ -636,7 +636,7 @@ func TestFreezerOffset(t *testing.T) { t.Log(f.dumpIndexString(0, 100)) // It should allow writing item 6. - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99))) require.NoError(t, batch.commit()) @@ -718,7 +718,7 @@ func TestTruncateTail(t *testing.T) { } // Write 7 x 20 bytes, splitting out into four files - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) @@ -838,7 +838,7 @@ func TestTruncateHead(t *testing.T) { } // Write 7 x 20 bytes, splitting out into four files - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) @@ -863,7 +863,7 @@ func TestTruncateHead(t *testing.T) { }) // Append new items - batch = f.newBatch() + batch = f.newBatch(0) require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) @@ -930,7 +930,7 @@ func getChunk(size int, b int) []byte { func writeChunks(t *testing.T, ft *freezerTable, n int, length int) { t.Helper() - batch := ft.newBatch() + batch := ft.newBatch(0) for i := 0; i < n; i++ { if err := batch.AppendRaw(uint64(i), getChunk(length, i)); err != nil { t.Fatalf("AppendRaw(%d, ...) returned error: %v", i, err) @@ -1198,7 +1198,7 @@ func TestFreezerReadonly(t *testing.T) { // Case 5: Now write some data via a batch. // This should fail either during AppendRaw or Commit - batch := f.newBatch() + batch := f.newBatch(0) writeErr := batch.AppendRaw(32, make([]byte, 1)) if writeErr == nil { @@ -1366,7 +1366,7 @@ func runRandTest(rt randTest) bool { } case opAppend: - batch := f.newBatch() + batch := f.newBatch(0) for i := 0; i < len(step.items); i++ { batch.AppendRaw(step.items[i], step.blobs[i]) } diff --git a/core/rawdb/freezer_test.go b/core/rawdb/freezer_test.go index 1c7791aee6..f4b015d300 100644 --- a/core/rawdb/freezer_test.go +++ b/core/rawdb/freezer_test.go @@ -125,7 +125,7 @@ func TestFreezerModifyRollback(t *testing.T) { // Reopen and check that the rolled-back data doesn't reappear. tables := map[string]bool{"test": true} - f2, err := NewFreezer(dir, "", false, 2049, tables) + f2, err := NewFreezer(dir, "", false, 0, 2049, tables) if err != nil { t.Fatalf("can't reopen freezer after failed ModifyAncients: %v", err) } @@ -284,20 +284,20 @@ func TestFreezerReadonlyValidate(t *testing.T) { dir := t.TempDir() // Open non-readonly freezer and fill individual tables // with different amount of data. - f, err := NewFreezer(dir, "", false, 2049, tables) + f, err := NewFreezer(dir, "", false, 0, 2049, tables) if err != nil { t.Fatal("can't open freezer", err) } var item = make([]byte, 1024) - aBatch := f.tables["a"].newBatch() + aBatch := f.tables["a"].newBatch(0) require.NoError(t, aBatch.AppendRaw(0, item)) require.NoError(t, aBatch.AppendRaw(1, item)) require.NoError(t, aBatch.AppendRaw(2, item)) require.NoError(t, aBatch.commit()) - bBatch := f.tables["b"].newBatch() + bBatch := f.tables["b"].newBatch(0) require.NoError(t, bBatch.AppendRaw(0, item)) require.NoError(t, bBatch.commit()) @@ -313,7 +313,7 @@ func TestFreezerReadonlyValidate(t *testing.T) { // Re-openening as readonly should fail when validating // table lengths. - _, err = NewFreezer(dir, "", true, 2049, tables) + _, err = NewFreezer(dir, "", true, 0, 2049, tables) if err == nil { t.Fatal("readonly freezer should fail with differing table lengths") } @@ -325,12 +325,12 @@ func TestFreezerConcurrentReadonly(t *testing.T) { tables := map[string]bool{"a": true} dir := t.TempDir() - f, err := NewFreezer(dir, "", false, 2049, tables) + f, err := NewFreezer(dir, "", false, 0, 2049, tables) if err != nil { t.Fatal("can't open freezer", err) } var item = make([]byte, 1024) - batch := f.tables["a"].newBatch() + batch := f.tables["a"].newBatch(0) items := uint64(10) for i := uint64(0); i < items; i++ { require.NoError(t, batch.AppendRaw(i, item)) @@ -351,7 +351,7 @@ func TestFreezerConcurrentReadonly(t *testing.T) { go func(i int) { defer wg.Done() - f, err := NewFreezer(dir, "", true, 2049, tables) + f, err := NewFreezer(dir, "", true, 0, 2049, tables) if err == nil { fs[i] = f } else { @@ -376,7 +376,7 @@ func newFreezerForTesting(t *testing.T, tables map[string]bool) (*Freezer, strin dir := t.TempDir() // note: using low max table size here to ensure the tests actually // switch between multiple files. - f, err := NewFreezer(dir, "", false, 2049, tables) + f, err := NewFreezer(dir, "", false, 0, 2049, tables) if err != nil { t.Fatal("can't open freezer", err) } diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 89db51cc3d..a3ddd11d12 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -82,6 +82,12 @@ var ( // fastTxLookupLimitKey tracks the transaction lookup limit during fast sync. fastTxLookupLimitKey = []byte("FastTransactionLookupLimit") + // offset of the new updated ancientDB. + offsetOfCurrentAncientFreezer = []byte("offsetOfCurrentAncientFreezer") + + // offset of the ancientDB before updated version. + offsetOfLastAncientFreezer = []byte("offsetOfLastAncientFreezer") + // badBlockKey tracks the list of bad blocks seen by local badBlockKey = []byte("InvalidBlock") diff --git a/core/rawdb/table.go b/core/rawdb/table.go index da2984457c..46661191cc 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -95,6 +95,16 @@ func (t *table) ReadAncients(fn func(reader ethdb.AncientReaderOp) error) (err e return t.db.ReadAncients(fn) } +// ItemAmountInAncient returns the actual length of current ancientDB. +func (t *table) ItemAmountInAncient() (uint64, error) { + return t.db.ItemAmountInAncient() +} + +// AncientOffSet returns the offset of current ancientDB. +func (t *table) AncientOffSet() uint64 { + return t.db.AncientOffSet() +} + // TruncateHead is a noop passthrough that just forwards the request to the underlying // database. func (t *table) TruncateHead(items uint64) (uint64, error) { diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index b7398f2138..abbc0044a6 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -28,13 +28,17 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" + + "github.com/prometheus/tsdb/fileutil" ) const ( @@ -332,6 +336,239 @@ func (p *Pruner) Prune(root common.Hash) error { return prune(p.snaptree, root, p.db, p.stateBloom, filterName, middleRoots, start) } +type BlockPruner struct { + oldAncientPath string + newAncientPath string + node *node.Node + BlockAmountReserved uint64 +} + +func NewBlockPruner(n *node.Node, oldAncientPath, newAncientPath string, BlockAmountReserved uint64) *BlockPruner { + return &BlockPruner{ + oldAncientPath: oldAncientPath, + newAncientPath: newAncientPath, + node: n, + BlockAmountReserved: BlockAmountReserved, + } +} + +// backupOldDb takes a backup of the old ancient db to a new ancient db. It moves the contents based on the +// value of number of blocks to be reserved. +func (p *BlockPruner) backupOldDb(name string, cache, handles int, namespace string, readonly, interrupt bool) error { + log.Info("Backup old ancientDB", "oldAncientPath", p.oldAncientPath, "newAncientPath", p.newAncientPath) + // Open old db wrapper. + chainDb, err := p.node.OpenDatabaseWithFreezer(name, cache, handles, p.oldAncientPath, namespace, readonly, true, interrupt) + if err != nil { + return fmt.Errorf("failed to open ancient database: %v", err) + } + defer chainDb.Close() + + // Get the number of items in old ancient db. + itemsOfAncient, err := chainDb.ItemAmountInAncient() + log.Info("ChainDB opened successfully", "itemsOfAncient", itemsOfAncient) + + // Abort if we can't access the freezer + if err != nil { + return fmt.Errorf("failed to access the freezer: %v", err) + } + + // Also abort if it's empty + if itemsOfAncient == 0 { + return errors.New("freezer is empty, abort") + } + + // If the items in freezer is less than the block amount that we want to reserve, it is not enough, should stop. + if itemsOfAncient < p.BlockAmountReserved { + return fmt.Errorf("the number of old blocks is not enough to reserve, ancientItems=%d, specifiedReservedBlockAmount=%d", itemsOfAncient, p.BlockAmountReserved) + } else if itemsOfAncient == p.BlockAmountReserved { + return fmt.Errorf("the number of old blocks is the same to reserved blocks, ancientItems=%d", itemsOfAncient) + } + + var oldOffset uint64 + if interrupt { + // The interrupt scecario within this function is specific for old and new ancientDB exsisted concurrently, + // should use last version of offset for oldAncientDB, because current offset is + // actually of the new ancientDB_Backup, but what we want is the offset of ancientDB being backup. + oldOffset = rawdb.ReadOffsetOfLastAncientFreezer(chainDb) + } else { + // Using current version of ancientDB for oldOffSet because the db for backup is current version. + oldOffset = rawdb.ReadOffsetOfCurrentAncientFreezer(chainDb) + } + + // Get the start BlockNumber for pruning. + startBlockNumber := oldOffset + itemsOfAncient - p.BlockAmountReserved + log.Info("Prune info", "oldOffset", oldOffset, "newOffset", startBlockNumber) + + // Create new ancientDB backup and record the new and last version of offset in kvDB as well. + // For every round, newoffset actually equals to the startBlockNumber in ancient backup db. + frdbBack, err := rawdb.NewDatabaseWithOnlyFreezer(chainDb, p.newAncientPath, namespace, readonly, startBlockNumber) + if err != nil { + return fmt.Errorf("failed to create ancient freezer backup: %v", err) + } + defer frdbBack.Close() + + offsetBatch := chainDb.NewBatch() + rawdb.WriteOffsetOfCurrentAncientFreezer(offsetBatch, startBlockNumber) + rawdb.WriteOffsetOfLastAncientFreezer(offsetBatch, oldOffset) + + if err := offsetBatch.Write(); err != nil { + log.Crit("Failed to write offset into disk", "err", err) + } + + // It's guaranteed that the old/new offsets are updated as well as the new ancientDB are created if this flock exist. + lock, _, err := fileutil.Flock(filepath.Join(p.newAncientPath, "PRUNEFLOCKBACK")) + if err != nil { + return fmt.Errorf("file lock error: %v", err) + } + + log.Info("Prune info", "old offset", oldOffset, "number of items in ancientDB", itemsOfAncient, "number of blocks to reserve", p.BlockAmountReserved) + log.Info("Record newOffset/newStartBlockNumber successfully", "newOffset", startBlockNumber) + + writeBlock := func(blockNumber uint64) error { + // Read all block data + blockHash := rawdb.ReadCanonicalHash(chainDb, blockNumber) + block := rawdb.ReadBlock(chainDb, blockHash, blockNumber) + receipts := rawdb.ReadRawReceipts(chainDb, blockHash, blockNumber) + borReceipts := []*types.Receipt{rawdb.ReadRawBorReceipt(chainDb, blockHash, blockNumber)} + + // Calculate the total difficulty of the block + td := rawdb.ReadTd(chainDb, blockHash, blockNumber) + if td == nil { + return consensus.ErrUnknownAncestor + } + + // Write into new ancient_back db. + if _, err := rawdb.WriteAncientBlocks(frdbBack, []*types.Block{block}, []types.Receipts{receipts}, []types.Receipts{borReceipts}, td); err != nil { + return fmt.Errorf("failed to write new ancient error: %v", err) + } + + return nil + } + + start := time.Now() + // All ancient data after and including startBlockNumber should write into new ancientDB ancient_back. + for blockNumber := startBlockNumber; blockNumber < itemsOfAncient+oldOffset; blockNumber++ { + err := writeBlock(blockNumber) + if err != nil { + return err + } + + // Print the log every 5s for better trace. + if time.Since(start) > 5*time.Second { + log.Info("Block backup process running successfully", "current blockNumber for backup", blockNumber) + + start = time.Now() + } + } + + if err = lock.Release(); err != nil { + return fmt.Errorf("failed to release file lock: %v", err) + } + + log.Info("Backup old ancientDB done", "current start blockNumber in ancientDB", startBlockNumber) + + return nil +} + +// BlockPruneBackup backup the ancient data for the old ancient db, i.e. the most recent 128 blocks in ancient db. +func (p *BlockPruner) BlockPruneBackup(name string, cache, handles int, namespace string, readonly, interrupt bool) error { + start := time.Now() + + if err := p.backupOldDb(name, cache, handles, namespace, readonly, interrupt); err != nil { + log.Error("Backup old ancientDB error", "err", err) + return err + } + + log.Info("Block pruning backup successfully", "time duration since start is", common.PrettyDuration(time.Since(start))) + + return nil +} + +// RecoverInterruption handles the case when the block prune process was interrupted. +func (p *BlockPruner) RecoverInterruption(name string, cache, handles int, namespace string, readonly bool) error { + log.Info("RecoverInterruption for block prune") + + newExist, err := checkFileExist(p.newAncientPath) + if err != nil { + return fmt.Errorf("newAncientDB path error %v", err) + } + + //nolint:nestif + if newExist { + log.Info("New ancientDB_backup existed in interruption scenario") + + flockOfAncientBack, err := checkFileExist(filepath.Join(p.newAncientPath, "PRUNEFLOCKBACK")) + if err != nil { + return fmt.Errorf("failed to check flock of ancientDB_Back %v", err) + } + + // Indicating both old and new ancientDB existed concurrently. + // Delete directly for the new ancientDB to prune from start, e.g.: path ../chaindb/ancient_backup + if err := os.RemoveAll(p.newAncientPath); err != nil { + return fmt.Errorf("failed to remove old ancient directory %v", err) + } + + if flockOfAncientBack { + // Indicating the oldOffset/newOffset have already been updated. + if err := p.BlockPruneBackup(name, cache, handles, namespace, readonly, true); err != nil { + return err + } + } else { + // Indicating the flock did not exist and the new offset did not be updated, so just handle this case as usual. + if err := p.BlockPruneBackup(name, cache, handles, namespace, readonly, false); err != nil { + return err + } + } + + if err := p.AncientDbReplacer(); err != nil { + return err + } + } else { + log.Info("New ancientDB_backup did not exist in interruption scenario") + // Indicating new ancientDB even did not be created, just prune starting at backup from startBlockNumber as usual, + // in this case, the new offset have not been written into kvDB. + if err := p.BlockPruneBackup(name, cache, handles, namespace, readonly, false); err != nil { + return err + } + if err := p.AncientDbReplacer(); err != nil { + return err + } + } + + return nil +} + +func checkFileExist(path string) (bool, error) { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + // Indicating the file didn't exist. + return false, nil + } + + return true, err + } + + return true, nil +} + +// AncientDbReplacer deletes the old ancient db and points the new db +// to the old path. +func (p *BlockPruner) AncientDbReplacer() error { + // Delete directly for the old ancientDB, e.g.: path ../chaindb/ancient + if err := os.RemoveAll(p.oldAncientPath); err != nil { + return fmt.Errorf("failed to remove old ancient directory %v", err) + } + + // Rename the new ancientDB path same to the old + if err := os.Rename(p.newAncientPath, p.oldAncientPath); err != nil { + return fmt.Errorf("failed to rename new ancient directory %v", err) + } + + log.Info("Replaced existing ancient db with pruned one") + + return nil +} + // RecoverPruning will resume the pruning procedure during the system restart. // This function is used in this case: user tries to prune state data, but the // system was interrupted midway because of crash or manual-kill. In this case diff --git a/docs/cli/README.md b/docs/cli/README.md index d52a4fd836..7b80c49420 100644 --- a/docs/cli/README.md +++ b/docs/cli/README.md @@ -46,6 +46,10 @@ - [```snapshot```](./snapshot.md) +- [```snapshot inspect-ancient-db```](./snapshot_inspect-ancient-db.md) + +- [```snapshot prune-block```](./snapshot_prune-block.md) + - [```snapshot prune-state```](./snapshot_prune-state.md) - [```status```](./status.md) diff --git a/docs/cli/inspect-ancient-db.md b/docs/cli/inspect-ancient-db.md new file mode 100644 index 0000000000..0029aaa746 --- /dev/null +++ b/docs/cli/inspect-ancient-db.md @@ -0,0 +1,19 @@ +# Inspect ancient DB for block pruning + +The ```bor snapshot inspect-ancient-db``` command will inspect few fields in the ancient datastore using the given datadir location. + + +This command prints the following information which is useful for block-pruning rounds: + +1. Offset / Start block number (from kvDB). +2. Amount of items in the ancientdb. +3. Last block number written in ancientdb. + + +## Options + +- ```datadir```: Path of the data directory to store information + +- ```keystore```: Path of the data directory to store keys + +- ```datadir.ancient```: Path of the old ancient data directory \ No newline at end of file diff --git a/docs/cli/server.md b/docs/cli/server.md index 18f72d5993..fbd4adad3f 100644 --- a/docs/cli/server.md +++ b/docs/cli/server.md @@ -20,7 +20,7 @@ The ```bor server``` command runs the Bor client. - ```bor.withoutheimdall```: Run without Heimdall service (for testing purpose) (default: false) -- ```chain```: Name of the chain to sync ('mumbai', 'mainnet', 'amoy') or path to a genesis file (default: mainnet) +- ```chain```: Name of the chain to sync ('amoy', 'mumbai', 'mainnet') or path to a genesis file (default: mainnet) - ```config```: Path to the TOML configuration file @@ -84,6 +84,8 @@ The ```bor server``` command runs the Bor client. - ```snapshot```: Enables the snapshot-database mode (default: true) +- ```state.scheme```: Scheme to use for storing ethereum state ('hash' or 'path') (default: hash) + - ```syncmode```: Blockchain sync mode (only "full" sync supported) (default: full) - ```verbosity```: Logging verbosity for the server (5=trace|4=debug|3=info|2=warn|1=error|0=crit) (default: 3) @@ -306,4 +308,4 @@ The ```bor server``` command runs the Bor client. - ```txpool.pricelimit```: Minimum gas price limit to enforce for acceptance into the pool (default: 1) -- ```txpool.rejournal```: Time interval to regenerate the local transaction journal (default: 1h0m0s) +- ```txpool.rejournal```: Time interval to regenerate the local transaction journal (default: 1h0m0s) \ No newline at end of file diff --git a/docs/cli/snapshot.md b/docs/cli/snapshot.md index 376220749b..ac1bc1ce74 100644 --- a/docs/cli/snapshot.md +++ b/docs/cli/snapshot.md @@ -2,4 +2,8 @@ The ```snapshot``` command groups snapshot related actions: -- [```snapshot prune-state```](./snapshot_prune-state.md): Prune state databases at the given datadir location. \ No newline at end of file +- [```snapshot prune-state```](./snapshot_prune-state.md): Prune state databases at the given datadir location. + +- [```snapshot prune-block```](./snapshot_prune-block.md): Prune ancient chaindata at the given datadir location. + +- [```snapshot inspect-ancient-db```](./snapshot_inspect-ancient-db.md): Inspect few fields in ancient datastore. \ No newline at end of file diff --git a/docs/cli/snapshot_inspect-ancient-db.md b/docs/cli/snapshot_inspect-ancient-db.md new file mode 100644 index 0000000000..c5742e75c1 --- /dev/null +++ b/docs/cli/snapshot_inspect-ancient-db.md @@ -0,0 +1,19 @@ +# Inspect ancient DB for block pruning + +The ```bor snapshot inspect-ancient-db``` command will inspect few fields in the ancient datastore using the given datadir location. + + +This command prints the following information which is useful for block-pruning rounds: + +1. Offset / Start block number (from kvDB). +2. Amount of items in the ancientdb. +3. Last block number written in ancientdb. + + +## Options + +- ```datadir```: Path of the data directory to store information + +- ```datadir.ancient```: Path of the old ancient data directory + +- ```keystore```: Path of the data directory to store keys \ No newline at end of file diff --git a/docs/cli/snapshot_prune-block.md b/docs/cli/snapshot_prune-block.md new file mode 100644 index 0000000000..b3ae3d9d4f --- /dev/null +++ b/docs/cli/snapshot_prune-block.md @@ -0,0 +1,32 @@ +# Prune ancient blockchain + +The ```bor snapshot prune-block``` command will prune historical blockchain data stored in the ancientdb. The amount of blocks expected for remaining after prune can be specified via `block-amount-reserved` in this command, will prune and only remain the specified amount of old block data in ancientdb. + + +The brief workflow as below: + +1. backup the the number of specified number of blocks backward in original ancientdb into new ancient_backup, +2. then delete the original ancientdb dir and rename the ancient_backup to original one for replacement, +3. finally assemble the statedb and new ancientdb together. + +The purpose of doing it is because the block data will be moved into the ancient store when it becomes old enough (exceed the Threshold 90000), the disk usage will be very large over time, and is occupied mainly by ancientdb, so it's very necessary to do block data pruning, this feature will handle it. + +Warning: This command only works with hash based storage scheme and doesn't work with path based storage scheme. + +## Options + +- ```block-amount-reserved```: Sets the expected reserved number of blocks for offline block prune (default: 1024) + +- ```cache.triesinmemory```: Number of block states (tries) to keep in memory (default = 128) (default: 128) + +- ```check-snapshot-with-mpt```: Enable checking between snapshot and MPT (default: false) + +- ```datadir```: Path of the data directory to store information + +- ```datadir.ancient```: Path of the old ancient data directory + +- ```keystore```: Path of the data directory to store keys + +### Cache Options + +- ```cache```: Megabytes of memory allocated to internal caching (default: 1024) \ No newline at end of file diff --git a/docs/cli/snapshot_prune-state.md b/docs/cli/snapshot_prune-state.md index bd053324da..ef955e8af0 100644 --- a/docs/cli/snapshot_prune-state.md +++ b/docs/cli/snapshot_prune-state.md @@ -1,6 +1,9 @@ # Prune state -The ```bor snapshot prune-state``` command will prune historical state data with the help of the state snapshot. All trie nodes and contract codes that do not belong to the specified version state will be deleted from the database. After pruning, only two version states are available: genesis and the specific one. +The ```bor snapshot prune-state``` command will prune historical state data +with the help of the state snapshot. All trie nodes and contract codes that do not belong to the +specified version state will be deleted from the database. After pruning, only two version states +are available: genesis and the specific one. ## Options diff --git a/eth/backend.go b/eth/backend.go index fb1aff5ef4..12170fa238 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -138,7 +138,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024) // Assemble the Ethereum object - chainDb, err := stack.OpenDatabaseWithFreezer("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, "ethereum/db/chaindata/", false) + chainDb, err := stack.OpenDatabaseWithFreezer("chaindata", config.DatabaseCache, config.DatabaseHandles, config.DatabaseFreezer, "ethereum/db/chaindata/", false, false, false) if err != nil { return nil, err } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index a342927f7a..3659fd488f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -646,7 +646,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * } } - frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. + frozen, _ := d.stateDB.ItemAmountInAncient() // Ignore the error here since light client can also hit here. // If a part of blockchain data has already been written into active store, // disable the ancient style insertion explicitly. diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index e1ecd011e3..46d58f315b 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -67,7 +67,7 @@ func newTesterWithNotification(t *testing.T, success func()) *downloadTester { freezer := t.TempDir() - db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), freezer, "", false) + db, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), freezer, "", false, false, false) if err != nil { panic(err) } diff --git a/eth/filters/IDatabase.go b/eth/filters/IDatabase.go index 6f22bb755c..8b76b6ca00 100644 --- a/eth/filters/IDatabase.go +++ b/eth/filters/IDatabase.go @@ -49,6 +49,20 @@ func (mr *MockDatabaseMockRecorder) Ancient(arg0, arg1 interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ancient", reflect.TypeOf((*MockDatabase)(nil).Ancient), arg0, arg1) } +// AncientOffSet mocks base method. +func (m *MockDatabase) AncientOffSet() uint64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AncientOffSet") + ret0, _ := ret[0].(uint64) + return ret0 +} + +// AncientOffSet indicates an expected call of AncientOffSet. +func (mr *MockDatabaseMockRecorder) AncientOffSet() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AncientOffSet", reflect.TypeOf((*MockDatabase)(nil).AncientOffSet)) +} + // AncientDatadir mocks base method. func (m *MockDatabase) AncientDatadir() (string, error) { m.ctrl.T.Helper() @@ -196,6 +210,21 @@ func (mr *MockDatabaseMockRecorder) HasAncient(arg0, arg1 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasAncient", reflect.TypeOf((*MockDatabase)(nil).HasAncient), arg0, arg1) } +// ItemAmountInAncient mocks base method. +func (m *MockDatabase) ItemAmountInAncient() (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ItemAmountInAncient") + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ItemAmountInAncient indicates an expected call of ItemAmountInAncient. +func (mr *MockDatabaseMockRecorder) ItemAmountInAncient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ItemAmountInAncient", reflect.TypeOf((*MockDatabase)(nil).ItemAmountInAncient)) +} + // MigrateTable mocks base method. func (m *MockDatabase) MigrateTable(arg0 string, arg1 func([]byte) ([]byte, error)) error { m.ctrl.T.Helper() diff --git a/eth/state_accessor.go b/eth/state_accessor.go index b0016e15cc..b4180ef8c3 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -76,6 +76,10 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u // The optional base statedb is given, mark the start point as parent block statedb, database, triedb, report = base, base.Database(), base.Database().TrieDB(), false current = eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) + + if current == nil { + return nil, nil, fmt.Errorf("missing parent block %v %d", block.ParentHash(), block.NumberU64()-1) + } } else { // Otherwise, try to reexec blocks until we find a state or reach our limit current = block diff --git a/ethdb/database.go b/ethdb/database.go index 095af6bf79..4add13e79f 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -94,6 +94,12 @@ type AncientReaderOp interface { // AncientSize returns the ancient size of the specified category. AncientSize(kind string) (uint64, error) + + // ItemAmountInAncient returns the actual length of current ancientDB. + ItemAmountInAncient() (uint64, error) + + // AncientOffSet returns the offset of current ancientDB. + AncientOffSet() uint64 } // AncientReader is the extended ancient reader interface including 'batched' or 'atomic' reading. diff --git a/ethdb/remotedb/remotedb.go b/ethdb/remotedb/remotedb.go index 3003dcc144..f33d2bb9c0 100644 --- a/ethdb/remotedb/remotedb.go +++ b/ethdb/remotedb/remotedb.go @@ -83,6 +83,14 @@ func (db *Database) Ancients() (uint64, error) { return resp, err } +func (db *Database) AncientOffSet() uint64 { + panic("not supported") +} + +func (db *Database) ItemAmountInAncient() (uint64, error) { + panic("not supported") +} + func (db *Database) Tail() (uint64, error) { panic("not supported") } diff --git a/go.mod b/go.mod index aef780904f..6c4ee2c49f 100644 --- a/go.mod +++ b/go.mod @@ -265,6 +265,7 @@ require ( github.com/RichardKnop/logging v0.0.0-20190827224416-1a693bdd4fae // indirect github.com/RichardKnop/machinery v1.7.4 // indirect github.com/RichardKnop/redsync v1.2.0 // indirect + github.com/prometheus/tsdb v0.10.0 github.com/zclconf/go-cty v1.13.0 // indirect github.com/zondax/hid v0.9.1 // indirect go.mongodb.org/mongo-driver v1.14.0 // indirect diff --git a/go.sum b/go.sum index 549b7e22e2..aee1527e94 100644 --- a/go.sum +++ b/go.sum @@ -1956,6 +1956,7 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/prometheus/tsdb v0.10.0 h1:If5rVCMTp6W2SiRAQFlbpJNgVlgMEd+U2GZckwK38ic= github.com/prometheus/tsdb v0.10.0/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSgwXEyGCt4= github.com/protolambda/bls12-381-util v0.0.0-20220416220906-d8552aa452c7/go.mod h1:IToEjHuttnUzwZI5KBSM/LOOW3qLbbrHOEfp3SbECGY= github.com/protolambda/bls12-381-util v0.1.0 h1:05DU2wJN7DTU7z28+Q+zejXkIsA/MF8JZQGhtBZZiWk= diff --git a/internal/cli/chain.go b/internal/cli/chain.go index 9a7e9e8537..2ed2c6d45b 100644 --- a/internal/cli/chain.go +++ b/internal/cli/chain.go @@ -28,9 +28,9 @@ func (c *ChainCommand) Help() string { return `Usage: bor chain This command groups actions to interact with the chain. - + Set the new head of the chain: - + $ bor chain sethead ` } diff --git a/internal/cli/command.go b/internal/cli/command.go index 9039d7d222..66ad650d9f 100644 --- a/internal/cli/command.go +++ b/internal/cli/command.go @@ -200,6 +200,16 @@ func Commands() map[string]MarkDownCommandFactory { Meta: meta, }, nil }, + "snapshot prune-block": func() (MarkDownCommand, error) { + return &PruneBlockCommand{ + Meta: meta, + }, nil + }, + "snapshot inspect-ancient-db": func() (MarkDownCommand, error) { + return &InspectAncientDbCommand{ + Meta: meta, + }, nil + }, } } diff --git a/internal/cli/snapshot.go b/internal/cli/snapshot.go index ec82614aa4..a575210bb5 100644 --- a/internal/cli/snapshot.go +++ b/internal/cli/snapshot.go @@ -3,18 +3,29 @@ package cli import ( + "errors" + "fmt" + "os" + "path/filepath" "strings" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/pruner" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/internal/cli/flagset" "github.com/ethereum/go-ethereum/internal/cli/server" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/trie" + + "github.com/prometheus/tsdb/fileutil" "github.com/mitchellh/cli" ) +var errPbssNotSupported = errors.New("ancient block pruning is not supporeted on path based storage scheme") + // SnapshotCommand is the command to group the snapshot commands type SnapshotCommand struct { UI cli.Ui @@ -26,6 +37,8 @@ func (a *SnapshotCommand) MarkDown() string { "# snapshot", "The ```snapshot``` command groups snapshot related actions:", "- [```snapshot prune-state```](./snapshot_prune-state.md): Prune state databases at the given datadir location.", + "- [```snapshot prune-block```](./snapshot_prune-block.md): Prune ancient chaindata at the given datadir location.", + "- [```snapshot inspect-ancient-db```](./snapshot_inspect-ancient-db.md): Inspect few fields in ancient datastore.", } return strings.Join(items, "\n\n") @@ -39,7 +52,15 @@ func (c *SnapshotCommand) Help() string { Prune the state trie: - $ bor snapshot prune-state` + $ bor snapshot prune-state + + Prune the ancient data: + + $ bor snapshot prune-block + + Inspect ancient DB pruning related fields: + + $ bor snapshot inspect-ancient-db` } // Synopsis implements the cli.Command interface @@ -65,8 +86,11 @@ type PruneStateCommand struct { // MarkDown implements cli.MarkDown interface func (c *PruneStateCommand) MarkDown() string { items := []string{ - "# Prune state", - "The ```bor snapshot prune-state``` command will prune historical state data with the help of the state snapshot. All trie nodes and contract codes that do not belong to the specified version state will be deleted from the database. After pruning, only two version states are available: genesis and the specific one.", + `# Prune state`, + `The ` + "```" + "bor snapshot prune-state" + "```" + ` command will prune historical state data +with the help of the state snapshot. All trie nodes and contract codes that do not belong to the +specified version state will be deleted from the database. After pruning, only two version states +are available: genesis and the specific one.`, c.Flags().MarkDown(), } @@ -161,7 +185,7 @@ func (c *PruneStateCommand) Run(args []string) int { return 1 } - chaindb, err := node.OpenDatabaseWithFreezer(chaindataPath, int(c.cache), dbHandles, c.datadirAncient, "", false) + chaindb, err := node.OpenDatabaseWithFreezer(chaindataPath, int(c.cache), dbHandles, c.datadirAncient, "", false, false, false) if err != nil { c.UI.Error(err.Error()) @@ -186,3 +210,417 @@ func (c *PruneStateCommand) Run(args []string) int { return 0 } + +type PruneBlockCommand struct { + *Meta + + datadirAncient string + cache int + blockAmountReserved uint64 + triesInMemory int + checkSnapshotWithMPT bool +} + +// MarkDown implements cli.MarkDown interface +func (c *PruneBlockCommand) MarkDown() string { + items := []string{ + "# Prune ancient blockchain", + "The ```bor snapshot prune-block``` command will prune historical blockchain data stored in the ancientdb. The amount of blocks expected for remaining after prune can be specified via `block-amount-reserved` in this command, will prune and only remain the specified amount of old block data in ancientdb.", + ` +The brief workflow as below: + +1. backup the the number of specified number of blocks backward in original ancientdb into new ancient_backup, +2. then delete the original ancientdb dir and rename the ancient_backup to original one for replacement, +3. finally assemble the statedb and new ancientdb together. + +The purpose of doing it is because the block data will be moved into the ancient store when it becomes old enough (exceed the Threshold 90000), the disk usage will be very large over time, and is occupied mainly by ancientdb, so it's very necessary to do block data pruning, this feature will handle it. + +Warning: This command only works with hash based storage scheme and doesn't work with path based storage scheme.`, + c.Flags().MarkDown(), + } + + return strings.Join(items, "\n\n") +} + +// Help implements the cli.Command interface +func (c *PruneBlockCommand) Help() string { + return `Usage: bor snapshot prune-block + + This command will prune ancient blockchain data at the given datadir location` + c.Flags().Help() +} + +// Synopsis implements the cli.Command interface +func (c *PruneBlockCommand) Synopsis() string { + return "Prune ancient blockchain data" +} + +// Flags: datadir, datadir.ancient, cache.trie.journal, bloomfilter.size +func (c *PruneBlockCommand) Flags() *flagset.Flagset { + flags := c.NewFlagSet("prune-block") + + flags.StringFlag(&flagset.StringFlag{ + Name: "datadir.ancient", + Value: &c.datadirAncient, + Usage: "Path of the old ancient data directory", + Default: "", + }) + + flags.IntFlag(&flagset.IntFlag{ + Name: "cache", + Usage: "Megabytes of memory allocated to internal caching", + Value: &c.cache, + Default: 1024, + Group: "Cache", + }) + flags.Uint64Flag(&flagset.Uint64Flag{ + Name: "block-amount-reserved", + Usage: "Sets the expected reserved number of blocks for offline block prune", + Value: &c.blockAmountReserved, + Default: 1024, + }) + + flags.IntFlag(&flagset.IntFlag{ + Name: "cache.triesinmemory", + Usage: "Number of block states (tries) to keep in memory (default = 128)", + Value: &c.triesInMemory, + Default: 128, + }) + + flags.BoolFlag(&flagset.BoolFlag{ + Name: "check-snapshot-with-mpt", + Value: &c.checkSnapshotWithMPT, + Usage: "Enable checking between snapshot and MPT", + }) + + return flags +} + +// Run implements the cli.Command interface +func (c *PruneBlockCommand) Run(args []string) int { + flags := c.Flags() + + if err := flags.Parse(args); err != nil { + c.UI.Error(err.Error()) + return 1 + } + + datadir := c.dataDir + if datadir == "" { + c.UI.Error("datadir is required") + return 1 + } + + // Create the node + node, err := node.New(&node.Config{ + DataDir: datadir, + }) + + if err != nil { + c.UI.Error(err.Error()) + return 1 + } + defer node.Close() + + dbHandles, err := server.MakeDatabaseHandles(0) + if err != nil { + c.UI.Error(err.Error()) + return 1 + } + + err = c.validateAgainstSnapshot(node, dbHandles) + if err != nil { + c.UI.Error(err.Error()) + return 1 + } + + err = c.pruneBlock(node, dbHandles) + if err != nil { + c.UI.Error(err.Error()) + return 1 + } + + return 0 +} + +// validateAgainstSnapshot checks if the MPT data and snapshot data matches with each other or not +func (c *PruneBlockCommand) validateAgainstSnapshot(stack *node.Node, dbHandles int) error { + chaindb, err := stack.OpenDatabaseWithFreezer(chaindataPath, c.cache, dbHandles, c.datadirAncient, "", false, true, false) + if err != nil { + return fmt.Errorf("failed to accessdb %v", err) + } + defer chaindb.Close() + + // Check if we're using hash based scheme and not path based + if rawdb.ReadStateScheme(chaindb) != rawdb.HashScheme { + return errPbssNotSupported + } + + if !c.checkSnapshotWithMPT { + return nil + } + + headBlock := rawdb.ReadHeadBlock(chaindb) + if headBlock == nil { + return errors.New("failed to load head block") + } + headHeader := headBlock.Header() + + snapconfig := snapshot.Config{ + CacheSize: 256, + Recovery: false, + NoBuild: true, + AsyncBuild: false, + } + + // Make sure the MPT and snapshot matches before pruning, otherwise the node can not start. + snaptree, err := snapshot.New(snapconfig, chaindb, trie.NewDatabase(chaindb, trie.HashDefaults), headBlock.Root()) + if err != nil { + log.Error("Unable to load snapshot", "err", err) + return err // The relevant snapshot(s) might not exist + } + + // Use the HEAD-(n-1) as the target root. The reason for picking it is: + // - in most of the normal cases, the related state is available + // - the probability of this layer being reorg is very low + + // Note that here (n) refers to `c.triesInMemory` which is a + // configurable parameter. + // Retrieve all snapshot layers from the current HEAD. + // In theory there are n difflayers + 1 disk layer present, + // so n diff layers are expected to be returned. + layers := snaptree.Snapshots(headHeader.Root, c.triesInMemory, true) + if len(layers) != c.triesInMemory { + // Reject if the accumulated diff layers are less than n. It + // means in most of normal cases, there is no associated state + // with bottom-most diff layer. + log.Error("snapshot layers != TriesInMemory", "err", err) + return fmt.Errorf("snapshot not old enough yet: need %d more blocks", c.triesInMemory-len(layers)) + } + // Use the bottom-most diff layer as the target + targetRoot := layers[len(layers)-1].Root() + + // Ensure the root is really present. The weak assumption + // is the presence of root can indicate the presence of the + // entire trie. + if blob := rawdb.ReadTrieNode(chaindb, common.Hash{}, nil, targetRoot, rawdb.HashScheme); len(blob) == 0 { + // The special case is for clique based networks(rinkeby, goerli + // and some other private networks), it's possible that two + // consecutive blocks will have same root. In this case snapshot + // difflayer won't be created. So HEAD-(n-1) may not paired with + // head-(n-1) layer. Instead the paired layer is higher than the + // bottom-most diff layer. Try to find the bottom-most snapshot + // layer with state available. + // + // Note HEAD is ignored. Usually there is the associated + // state available, but we don't want to use the topmost state + // as the pruning target. + for i := len(layers) - 2; i >= 1; i-- { + if blob := rawdb.ReadTrieNode(chaindb, common.Hash{}, nil, layers[i].Root(), rawdb.HashScheme); len(blob) != 0 { + targetRoot = layers[i].Root() + log.Info("Selecting middle-layer as the pruning target", "root", targetRoot, "depth", i) + return nil + } + } + + if blob := rawdb.ReadTrieNode(chaindb, common.Hash{}, nil, snaptree.DiskRoot(), rawdb.HashScheme); len(blob) != 0 { + targetRoot = snaptree.DiskRoot() + log.Info("Selecting disk-layer as the pruning target", "root", targetRoot) + return nil + } + + if len(layers) > 0 { + log.Error("no snapshot paired state") + return errors.New("no snapshot paired state") + } + + return fmt.Errorf("associated state[%x] is not present", targetRoot) + } else { + if len(layers) > 0 { + log.Info("Selecting bottom-most difflayer as the pruning target", "root", targetRoot, "height", headHeader.Number.Uint64()-uint64(len(layers)-1)) + } else { + log.Info("Selecting user-specified state as the pruning target", "root", targetRoot) + } + } + + return nil +} + +// pruneBlock is the entry point for the ancient pruning process. Based on the user specified +// params, it will prune the ancient data. It also handles the case where the pruning process +// was interrupted earlier. +func (c *PruneBlockCommand) pruneBlock(stack *node.Node, fdHandles int) error { + name := "chaindata" + + oldAncientPath := c.datadirAncient + + switch { + case oldAncientPath == "": + oldAncientPath = filepath.Join(stack.ResolvePath(name), "ancient") + case !filepath.IsAbs(oldAncientPath): + oldAncientPath = stack.ResolvePath(oldAncientPath) + } + + path, _ := filepath.Split(oldAncientPath) + if path == "" { + return errors.New("prune failed, did not specify the AncientPath") + } + + newAncientPath := filepath.Join(path, "ancient_back") + blockpruner := pruner.NewBlockPruner(stack, oldAncientPath, newAncientPath, c.blockAmountReserved) + + lock, exist, err := fileutil.Flock(filepath.Join(oldAncientPath, "PRUNEFLOCK")) + if err != nil { + log.Error("file lock error", "err", err) + return err + } + + if exist { + defer func() { + _ = lock.Release() + }() + log.Info("File lock existed, waiting for prune recovery and continue", "err", err) + + if err := blockpruner.RecoverInterruption("chaindata", c.cache, fdHandles, "", false); err != nil { + log.Error("Pruning failed", "err", err) + return err + } + + log.Info("Block prune successfully") + + return nil + } + + if _, err := os.Stat(newAncientPath); err == nil { + // No file lock found for old ancientDB but new ancientDB exsisted, indicating the geth was interrupted + // after old ancientDB removal, this happened after backup successfully, so just rename the new ancientDB + if err := blockpruner.AncientDbReplacer(); err != nil { + return err + } + + log.Info("Block prune successfully") + + return nil + } + + if err := blockpruner.BlockPruneBackup(name, c.cache, fdHandles, "", false, false); err != nil { + return err + } + + log.Info("Block backup successfully") + + // After backup successfully, rename the new ancientdb name to the original one, and delete the old ancientdb + if err := blockpruner.AncientDbReplacer(); err != nil { + return err + } + + if err = lock.Release(); err != nil { + log.Error("Unable to release lock on file", "err", err) + + return err + } + + log.Info("Block prune successfully") + + return nil +} + +type InspectAncientDbCommand struct { + *Meta + + datadirAncient string +} + +// MarkDown implements cli.MarkDown interface +func (c *InspectAncientDbCommand) MarkDown() string { + items := []string{ + "# Inspect ancient DB for block pruning", + "The ```bor snapshot inspect-ancient-db``` command will inspect few fields in the ancient datastore using the given datadir location.", + ` +This command prints the following information which is useful for block-pruning rounds: + +1. Offset / Start block number (from kvDB). +2. Amount of items in the ancientdb. +3. Last block number written in ancientdb. +`, + c.Flags().MarkDown(), + } + + return strings.Join(items, "\n\n") +} + +// Help implements the cli.Command interface +func (c *InspectAncientDbCommand) Help() string { + return `Usage: bor snapshot inspect-ancient-db + + This command will inspect few fields in the ancient datastore using the given datadir location` + c.Flags().Help() +} + +// Synopsis implements the cli.Command interface +func (c *InspectAncientDbCommand) Synopsis() string { + return "Inspect fields in the ancient blockchain data" +} + +// Flags: datadir, datadir.ancient, cache.trie.journal, bloomfilter.size +func (c *InspectAncientDbCommand) Flags() *flagset.Flagset { + flags := c.NewFlagSet("inspect-ancient-db") + + flags.StringFlag(&flagset.StringFlag{ + Name: "datadir.ancient", + Value: &c.datadirAncient, + Usage: "Path of the old ancient data directory", + Default: "", + }) + + return flags +} + +// Run implements the cli.Command interface +func (c *InspectAncientDbCommand) Run(args []string) int { + flags := c.Flags() + + if err := flags.Parse(args); err != nil { + c.UI.Error(err.Error()) + return 1 + } + + datadir := c.dataDir + if datadir == "" { + c.UI.Error("datadir is required") + return 1 + } + + // Create the node + node, err := node.New(&node.Config{ + DataDir: datadir, + }) + + if err != nil { + c.UI.Error(err.Error()) + return 1 + } + defer node.Close() + + dbHandles, err := server.MakeDatabaseHandles(0) + if err != nil { + c.UI.Error(err.Error()) + return 1 + } + + err = c.inspectAncientDb(node, dbHandles) + if err != nil { + c.UI.Error(err.Error()) + return 1 + } + + return 0 +} + +func (c *InspectAncientDbCommand) inspectAncientDb(stack *node.Node, dbHandles int) error { + chaindb, err := stack.OpenDatabaseWithFreezer(chaindataPath, 1024, dbHandles, c.datadirAncient, "", false, true, false) + if err != nil { + return err + } + defer chaindb.Close() + + return rawdb.AncientInspect(chaindb) +} diff --git a/internal/cli/snapshot_test.go b/internal/cli/snapshot_test.go new file mode 100644 index 0000000000..d17893ddda --- /dev/null +++ b/internal/cli/snapshot_test.go @@ -0,0 +1,246 @@ +package cli + +import ( + "bytes" + "math/big" + "os" + "path/filepath" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state/pruner" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/internal/cli/server" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" + + "github.com/stretchr/testify/require" +) + +var ( + canonicalSeed = 1 + blockPruneBackUpBlockNumber = 128 + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + balance = big.NewInt(1_000000000000000000) + gspec = &core.Genesis{Config: params.TestChainConfig, Alloc: core.GenesisAlloc{address: {Balance: balance}}} + signer = types.LatestSigner(gspec.Config) + config = &core.CacheConfig{ + TrieCleanLimit: 256, + TrieDirtyLimit: 256, + TrieTimeLimit: 5 * time.Minute, + SnapshotLimit: 0, // Disable snapshot + TriesInMemory: 128, + } + engine = ethash.NewFullFaker() +) + +func TestOfflineBlockPrune(t *testing.T) { + t.Parallel() + + // Corner case for 0 remain in ancinetStore. + testOfflineBlockPruneWithAmountReserved(t, 0) + + // General case. + testOfflineBlockPruneWithAmountReserved(t, 100) +} + +func testOfflineBlockPruneWithAmountReserved(t *testing.T, amountReserved uint64) { + t.Helper() + + datadir, err := os.MkdirTemp("", "") + require.NoError(t, err, "failed to create temporary datadir") + + os.RemoveAll(datadir) + + chaindbPath := filepath.Join(datadir, "chaindata") + oldAncientPath := filepath.Join(chaindbPath, "ancient") + newAncientPath := filepath.Join(chaindbPath, "ancient_back") + + _, _, blockList, receiptsList, externTdList, startBlockNumber, _ := BlockchainCreator(t, chaindbPath, oldAncientPath, amountReserved) + + node := startEthService(t, chaindbPath) + defer node.Close() + + // Initialize a block pruner for pruning, only remain amountReserved blocks backward. + testBlockPruner := pruner.NewBlockPruner(node, oldAncientPath, newAncientPath, amountReserved) + dbHandles, err := server.MakeDatabaseHandles(0) + require.NoError(t, err, "failed to create database handles") + + err = testBlockPruner.BlockPruneBackup(chaindbPath, 512, dbHandles, "", false, false) + require.NoError(t, err, "failed to backup block") + + dbBack, err := rawdb.Open(rawdb.OpenOptions{ + Type: node.Config().DBEngine, + Directory: chaindbPath, + AncientsDirectory: newAncientPath, + Namespace: "", + Cache: 0, + Handles: 0, + ReadOnly: false, + DisableFreeze: true, + IsLastOffset: false, + }) + require.NoError(t, err, "failed to create db with ancient backend") + + defer dbBack.Close() + + // Check the absence of genesis + genesis, err := dbBack.Ancient("hashes", 0) + require.Equal(t, []byte(nil), genesis, "got genesis but should be absent") + require.NotNil(t, err, "not-nill error expected") + + // Check against if the backup data matched original one + for blockNumber := startBlockNumber; blockNumber < startBlockNumber+amountReserved; blockNumber++ { + // Fetch the data explicitly from ancient db instead of `ReadCanonicalHash` because it + // will pull data from leveldb if not found in ancient. + blockHash, err := dbBack.Ancient("hashes", blockNumber) + require.NoError(t, err, "error fetching block hash from ancient db") + + // We can proceed with fetching other things via generic functions because if + // the block wouldn't have been there in ancient db, the function above to get + // block hash itself would've thrown error. + hash := common.BytesToHash(blockHash) + block := rawdb.ReadBlock(dbBack, hash, blockNumber) + + require.Equal(t, block.Hash(), hash, "block data mismatch between oldDb and backupDb") + require.Equal(t, blockList[blockNumber-startBlockNumber].Hash(), hash, "block data mismatch between oldDb and backupDb") + + receipts := rawdb.ReadRawReceipts(dbBack, hash, blockNumber) + checkReceiptsRLP(t, receipts, receiptsList[blockNumber-startBlockNumber]) + + // Calculate the total difficulty of the block + td := rawdb.ReadTd(dbBack, hash, blockNumber) + require.NotNil(t, td, "failed to read td", consensus.ErrUnknownAncestor) + + require.Equal(t, td.Cmp(externTdList[blockNumber-startBlockNumber]), 0, "Td mismatch between oldDb and backupDb") + } + + // Check if ancientDb freezer replaced successfully + err = testBlockPruner.AncientDbReplacer() + require.NoError(t, err, "error replacing ancient db") + + if _, err := os.Stat(newAncientPath); err != nil { + if !os.IsNotExist(err) { + t.Fatalf("ancientDb replaced unsuccessfully") + } + } + + _, err = os.Stat(oldAncientPath) + require.NoError(t, err, "failed to replace ancientDb") +} + +func BlockchainCreator(t *testing.T, chaindbPath, AncientPath string, blockRemain uint64) (ethdb.Database, []*types.Block, []*types.Block, []types.Receipts, []*big.Int, uint64, *core.BlockChain) { + t.Helper() + + // Create a database with ancient freezer + db, err := rawdb.Open(rawdb.OpenOptions{ + Directory: chaindbPath, + AncientsDirectory: AncientPath, + Namespace: "", + Cache: 0, + Handles: 0, + ReadOnly: false, + DisableFreeze: false, + IsLastOffset: false, + }) + require.NoError(t, err, "failed to create db with ancient backend") + + defer db.Close() + + genesis := gspec.MustCommit(db, trie.NewDatabase(db, trie.HashDefaults)) + // Initialize a fresh chain with only a genesis block + blockchain, err := core.NewBlockChain(db, config, gspec, nil, engine, vm.Config{}, nil, nil, nil) + require.NoError(t, err, "failed to create chain") + + // Make chain starting from genesis + blocks, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 500, func(i int, block *core.BlockGen) { + block.SetCoinbase(common.Address{0: byte(canonicalSeed), 19: byte(i)}) + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1), params.TxGas, big.NewInt(8750000000), nil), signer, key) + if err != nil { + require.NoError(t, err, "failed to sign tx") + } + block.AddTx(tx) + block.SetDifficulty(big.NewInt(1000000)) + }) + + _, err = blockchain.InsertChain(blocks) + require.NoError(t, err, "failed to insert chain") + + // Force run a freeze cycle + type freezer interface { + Freeze(threshold uint64) error + Ancients() (uint64, error) + } + + err = db.(freezer).Freeze(10) + require.NoError(t, err, "failed to perform freeze operation") + + // make sure there're frozen items + frozen, err := db.Ancients() + require.NoError(t, err, "failed to fetch ancients items from db") + require.NotEqual(t, frozen, 0, "no elements in freezer db") + require.GreaterOrEqual(t, frozen, blockRemain, "block amount is not enough for pruning") + + oldOffSet := rawdb.ReadOffsetOfCurrentAncientFreezer(db) + // Get the actual start block number. + startBlockNumber := frozen - blockRemain + oldOffSet + // Initialize the slice to buffer the block data left. + blockList := make([]*types.Block, 0, blockPruneBackUpBlockNumber) + receiptsList := make([]types.Receipts, 0, blockPruneBackUpBlockNumber) + externTdList := make([]*big.Int, 0, blockPruneBackUpBlockNumber) + // All ancient data within the most recent 128 blocks write into memory buffer for future new ancient_back directory usage. + for blockNumber := startBlockNumber; blockNumber < frozen+oldOffSet; blockNumber++ { + blockHash := rawdb.ReadCanonicalHash(db, blockNumber) + block := rawdb.ReadBlock(db, blockHash, blockNumber) + blockList = append(blockList, block) + receipts := rawdb.ReadRawReceipts(db, blockHash, blockNumber) + receiptsList = append(receiptsList, receipts) + // Calculate the total difficulty of the block + td := rawdb.ReadTd(db, blockHash, blockNumber) + require.NotNil(t, td, "failed to read td", consensus.ErrUnknownAncestor) + + externTdList = append(externTdList, td) + } + + return db, blocks, blockList, receiptsList, externTdList, startBlockNumber, blockchain +} + +func checkReceiptsRLP(t *testing.T, have, want types.Receipts) { + t.Helper() + + require.Equal(t, len(want), len(have), "receipts sizes mismatch") + + for i := 0; i < len(want); i++ { + rlpHave, err := rlp.EncodeToBytes(have[i]) + require.NoError(t, err, "error in rlp encoding") + + rlpWant, err := rlp.EncodeToBytes(want[i]) + require.NoError(t, err, "error in rlp encoding") + + require.Equal(t, true, bytes.Equal(rlpHave, rlpWant), "receipt rlp mismatch") + } +} + +// startEthService creates a full node instance for testing. +func startEthService(t *testing.T, chaindbPath string) *node.Node { + t.Helper() + + n, err := node.New(&node.Config{DataDir: chaindbPath}) + require.NoError(t, err, "failed to create node") + + err = n.Start() + require.NoError(t, err, "failed to start node") + + return n +} diff --git a/node/node.go b/node/node.go index 55564ce8b5..d63fe85d03 100644 --- a/node/node.go +++ b/node/node.go @@ -818,7 +818,7 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r // also attaching a chain freezer to it that moves ancient chain data from the // database to immutable append-only files. If the node is an ephemeral one, a // memory database is returned. -func (n *Node) OpenDatabaseWithFreezer(name string, cache int, handles int, ancient string, namespace string, readonly bool) (ethdb.Database, error) { +func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient, namespace string, readonly, disableFreeze, isLastOffset bool) (ethdb.Database, error) { n.lock.Lock() defer n.lock.Unlock() @@ -841,6 +841,8 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache int, handles int, anci Cache: cache, Handles: handles, ReadOnly: readonly, + DisableFreeze: disableFreeze, + IsLastOffset: isLastOffset, }) } diff --git a/trie/triedb/pathdb/database_test.go b/trie/triedb/pathdb/database_test.go index 10bd50e197..f2e68d3473 100644 --- a/trie/triedb/pathdb/database_test.go +++ b/trie/triedb/pathdb/database_test.go @@ -98,7 +98,7 @@ type tester struct { func newTester(t *testing.T, historyLimit uint64) *tester { var ( - disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) + disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false, false, false) db = New(disk, &Config{ StateHistory: historyLimit, CleanCacheSize: 256 * 1024,