Skip to content

Commit

Permalink
offline block prune
Browse files Browse the repository at this point in the history
  • Loading branch information
Mercybudda committed Dec 7, 2021
1 parent 74f6b61 commit ec56f6d
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 0 deletions.
91 changes: 91 additions & 0 deletions cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ package main

import (
"bytes"
"encoding/json"
"errors"
"os"
"path/filepath"
"time"

"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/pruner"
Expand Down Expand Up @@ -78,6 +82,36 @@ WARNING: It's necessary to delete the trie clean cache after the pruning.
If you specify another directory for the trie clean cache via "--cache.trie.journal"
during the use of Geth, please also specify it here for correct deletion. Otherwise
the trie clean cache with default directory will be deleted.
`,
},
{
Name: "prune-block-pre-backup",
Usage: "Back up the ancient block data",
ArgsUsage: "<root>",
Action: utils.MigrateFlags(pruneBlockPreBackUp),
Category: "MISCELLANEOUS COMMANDS",
Flags: []cli.Flag{
utils.DataDirFlag,
utils.AncientFlag,
utils.AncientBackUpFlag,
},
Description: `
Back up the ancient block data offline before prune block started.
`,
},
{
Name: "prune-block",
Usage: "Prune block data offline",
ArgsUsage: "<root>",
Action: utils.MigrateFlags(pruneBlock),
Category: "MISCELLANEOUS COMMANDS",
Flags: []cli.Flag{
utils.DataDirFlag,
utils.AncientFlag,
utils.AncientBackUpFlag,
},
Description: `
Offline prune for block data.
`,
},
{
Expand Down Expand Up @@ -149,6 +183,63 @@ It's also usable without snapshot enabled.
}
)

func pruneBlockPreBackUp(ctx *cli.Context) error {
// Make sure we have a valid genesis JSON
genesisPath := ctx.Args().First()
if len(genesisPath) == 0 {
utils.Fatalf("Must supply path to genesis JSON file")
}
file, err := os.Open(genesisPath)
if err != nil {
utils.Fatalf("Failed to read genesis file: %v", err)
}
defer file.Close()

genesis := new(core.Genesis)
if err := json.NewDecoder(file).Decode(genesis); err != nil {
utils.Fatalf("invalid genesis file: %v", err)
}

stack, config := makeConfigNode(ctx)
defer stack.Close()
freezer := config.Eth.DatabaseFreezer
chaindb := utils.MakeChainDatabase(ctx, stack, false)
if err != nil {
utils.Fatalf("Failed to open ancient database: %v", err)
}

for _, name := range []string{"chaindata"} {
root := stack.ResolvePath(name) // /Users/user/storage/Private_BSC_Storage/build/bin/node/geth/chaindata
switch {
case freezer == "":
freezer = filepath.Join(root, "ancient")
case !filepath.IsAbs(freezer):
freezer = stack.ResolvePath(freezer)
}
pruner, err := pruner.NewBlockPruner(chaindb, stack, stack.ResolvePath(""), freezer, genesis)
if err != nil {
utils.Fatalf("Failed to create block pruner", err)
}
backfreezer := filepath.Join(root, "ancient_back_up")
if err := pruner.BlockPruneBackUp(name, config.Eth.DatabaseCache, config.Eth.DatabaseHandles, backfreezer, "eth/db/chaindata/", false); err != nil {
log.Error("Failed to back up block", "err", err)
return err
}
}
log.Info("geth block offline pruning backup successfully")
return nil
}

func pruneBlock(ctx *cli.Context) error {
oldAncientPath := ctx.GlobalString(utils.AncientFlag.Name)
newAncientPath := ctx.GlobalString(utils.AncientBackUpFlag.Name)
if err := pruner.BlockPrune(oldAncientPath, newAncientPath); err != nil {
utils.Fatalf("Failed to prune block", err)
return err
}
return nil
}

func pruneState(ctx *cli.Context) error {
stack, config := makeConfigNode(ctx)
defer stack.Close()
Expand Down
4 changes: 4 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ var (
Name: "datadir.ancient",
Usage: "Data directory for ancient chain segments (default = inside chaindata)",
}
AncientBackUpFlag = DirectoryFlag{
Name: "datadir.ancient",
Usage: "Data directory for ancient directory backup (default = inside chaindata)",
}
DiffFlag = DirectoryFlag{
Name: "datadir.diff",
Usage: "Data directory for difflayer segments (default = inside chaindata)",
Expand Down
1 change: 1 addition & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ func WriteAncientBlock(db ethdb.AncientWriter, block *types.Block, receipts type
return len(headerBlob) + len(bodyBlob) + len(receiptBlob) + len(tdBlob) + common.HashLength
}


// DeleteBlock removes all block data associated with a hash.
func DeleteBlock(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
DeleteReceipts(db, hash, number)
Expand Down
47 changes: 47 additions & 0 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,53 @@ func (f *freezer) AppendAncient(number uint64, hash, header, body, receipts, td
return nil
}

// AppendAncient injects all binary blobs except for block body at the end of the
// append-only immutable table files.
//
// Notably, this function is lock free but kind of thread-safe. All out-of-order
// injection will be rejected. But if two injections with same number happen at
// the same time, we can get into the trouble.
func (f *freezer) AppendAncientNoBody(number uint64, hash, header, receipts, td []byte) (err error) {
if f.readonly {
return errReadOnly
}
// Ensure the binary blobs we are appending is continuous with freezer.
if atomic.LoadUint64(&f.frozen) != number {
return errOutOrderInsertion
}
// Rollback all inserted data if any insertion below failed to ensure
// the tables won't out of sync.
defer func() {
if err != nil {
rerr := f.repair()
if rerr != nil {
log.Crit("Failed to repair freezer", "err", rerr)
}
log.Info("Append ancient failed", "number", number, "err", err)
}
}()
// Inject all the components into the relevant data tables
if err := f.tables[freezerHashTable].Append(f.frozen, hash[:]); err != nil {
log.Error("Failed to append ancient hash", "number", f.frozen, "hash", hash, "err", err)
return err
}
if err := f.tables[freezerHeaderTable].Append(f.frozen, header); err != nil {
log.Error("Failed to append ancient header", "number", f.frozen, "hash", hash, "err", err)
return err
}

if err := f.tables[freezerReceiptTable].Append(f.frozen, receipts); err != nil {
log.Error("Failed to append ancient receipts", "number", f.frozen, "hash", hash, "err", err)
return err
}
if err := f.tables[freezerDifficultyTable].Append(f.frozen, td); err != nil {
log.Error("Failed to append ancient difficulty", "number", f.frozen, "hash", hash, "err", err)
return err
}
atomic.AddUint64(&f.frozen, 1) // Only modify atomically
return nil
}

// TruncateAncients discards any recent data above the provided threshold number.
func (f *freezer) TruncateAncients(items uint64) error {
if f.readonly {
Expand Down
129 changes: 129 additions & 0 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,23 @@ import (
"errors"
"fmt"
"math"
"math/big"
"os"
"path/filepath"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
Expand Down Expand Up @@ -85,6 +89,15 @@ type Pruner struct {
triesInMemory uint64
}

type BlockPruner struct {
db ethdb.Database
chaindbDir string
ancientdbDir string
headHeader *types.Header
n *node.Node
genesis *core.Genesis
}

// NewPruner creates the pruner instance.
func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize, triesInMemory uint64) (*Pruner, error) {
headBlock := rawdb.ReadHeadBlock(db)
Expand Down Expand Up @@ -115,6 +128,17 @@ func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize, trie
}, nil
}

func NewBlockPruner(db ethdb.Database, n *node.Node, chaindbDir, ancientdbDir string, genesis *core.Genesis) (*BlockPruner, error) {

return &BlockPruner{
db: db,
chaindbDir: chaindbDir,
ancientdbDir: ancientdbDir,
n: n,
genesis: genesis,
}, nil
}

func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, stateBloom *stateBloom, bloomPath string, middleStateRoots map[common.Hash]struct{}, start time.Time) error {
// Delete all stale trie nodes in the disk. With the help of state bloom
// the trie nodes(and codes) belong to the active state will be filtered
Expand Down Expand Up @@ -233,6 +257,111 @@ func prune(snaptree *snapshot.Tree, root common.Hash, maindb ethdb.Database, sta
return nil
}

// Prune block body data
func (p *BlockPruner) BlockPruneBackUp(name string, cache, handles int, backFreezer, namespace string, readonly bool) error {
//Back-up the necessary data within original ancient directory, create new freezer backup directory backFreezer
//db, err = rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, backFreezer, namespace, readonly)
start := time.Now()
chainDb := p.db
chainDbBack, err := p.n.OpenDatabaseWithFreezer(name, cache, handles, backFreezer, namespace, readonly)
if err != nil {
log.Error("Failed to open ancient database: %v", err)
return err
}

//write back-up data to new chainDb
// Restore the last known head block

//write genesis block firstly
genesis := p.genesis
if _, _, err := core.SetupGenesisBlock(chainDbBack, genesis); err != nil {
log.Error("Failed to write genesis block: %v", err)
return err
}

//write most recent 128 blocks data
headBlock := rawdb.ReadHeadBlock(chainDb)
if headBlock == nil {
return errors.New("Failed to load head block")
}
lastBlockNumber := headBlock.NumberU64()

//For block number 1 to current block-128, only back-up receipts, difficulties, block number->hash but no body data anymore
for blockNumber := lastBlockNumber - 128; blockNumber >= 1; blockNumber-- {
blockHash := rawdb.ReadCanonicalHash(chainDb, blockNumber)
block := rawdb.ReadBlock(chainDb, blockHash, blockNumber)
receipts := rawdb.ReadRawReceipts(chainDb, blockHash, blockNumber)
// Calculate the total difficulty of the block
td := rawdb.ReadTd(chainDb, blockHash, blockNumber)
if td == nil {
return consensus.ErrUnknownAncestor
}
externTd := new(big.Int).Add(block.Difficulty(), td)
// Encode all block components to RLP format.
headerBlob, err := rlp.EncodeToBytes(block.Header())
if err != nil {
log.Crit("Failed to RLP encode block header", "err", err)
}

storageReceipts := make([]*types.ReceiptForStorage, len(receipts))
for i, receipt := range receipts {
storageReceipts[i] = (*types.ReceiptForStorage)(receipt)
}
receiptBlob, err := rlp.EncodeToBytes(storageReceipts)
if err != nil {
log.Crit("Failed to RLP encode block receipts", "err", err)
}
tdBlob, err := rlp.EncodeToBytes(externTd)
if err != nil {
log.Crit("Failed to RLP encode block total difficulty", "err", err)
}
// Write all blob to flatten files.
err = chainDbBack.AppendAncientNoBody(block.NumberU64(), block.Hash().Bytes(), headerBlob, receiptBlob, tdBlob)
if err != nil {
log.Crit("Failed to write block data to ancient store", "err", err)
}

return nil
}

//All ancient data within the most recent 128 blocks write into new ancient_back directory
for blockNumber := lastBlockNumber - 127; blockNumber <= lastBlockNumber; blockNumber++ {
blockHash := rawdb.ReadCanonicalHash(chainDb, blockNumber)
block := rawdb.ReadBlock(chainDb, blockHash, blockNumber)
receipts := rawdb.ReadRawReceipts(chainDb, blockHash, blockNumber)
// Calculate the total difficulty of the block
td := rawdb.ReadTd(chainDb, blockHash, blockNumber)
if td == nil {
return consensus.ErrUnknownAncestor
}
externTd := new(big.Int).Add(block.Difficulty(), td)
rawdb.WriteAncientBlock(chainDbBack, block, receipts, externTd)
}

chainDb.Close()
chainDbBack.Close()

log.Info("Block pruning BackUp successful", common.PrettyDuration(time.Since(start)))
return nil
}

func BlockPrune(oldAncientPath, newAncientPath string) error {
//Delete directly for the old ancientdb, e.g.: path ../chaindb/ancient
if err := os.RemoveAll(oldAncientPath); err != nil {
log.Error("Failed to remove old ancient directory %v", err)

return err
}

//Rename the new ancientdb path same to the old
if err := os.Rename(newAncientPath, oldAncientPath); err != nil {
log.Error("Failed to rename new ancient directory %v", err)
return err
}
return nil

}

// Prune deletes all historical state nodes except the nodes belong to the
// specified state version. If user doesn't specify the state version, use
// the bottom-most snapshot diff layer as the target.
Expand Down
4 changes: 4 additions & 0 deletions ethdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ type AncientWriter interface {

// Sync flushes all in-memory ancient store data to disk.
Sync() error

// AppendAncient injects all binary blobs except for block body at the end of the
// append-only immutable table files.
AppendAncientNoBody(number uint64, hash, header, receipts, td []byte) error
}

// Reader contains the methods required to read data from both key-value as well as
Expand Down

0 comments on commit ec56f6d

Please sign in to comment.