Skip to content

Commit

Permalink
flags: refactor state expiry config;
Browse files Browse the repository at this point in the history
  • Loading branch information
0xbundler committed Oct 9, 2023
1 parent 6ef398f commit 4c3b6cd
Show file tree
Hide file tree
Showing 21 changed files with 356 additions and 132 deletions.
10 changes: 0 additions & 10 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
cfg.Ethstats.URL = ctx.String(utils.EthStatsURLFlag.Name)
}
applyMetricConfig(ctx, &cfg)
applyStateExpiryConfig(ctx, &cfg)

return stack, cfg
}
Expand Down Expand Up @@ -271,15 +270,6 @@ func applyMetricConfig(ctx *cli.Context, cfg *gethConfig) {
}
}

func applyStateExpiryConfig(ctx *cli.Context, cfg *gethConfig) {
if ctx.IsSet(utils.StateExpiryEnableFlag.Name) {
cfg.Eth.StateExpiryEnable = ctx.Bool(utils.StateExpiryEnableFlag.Name)
}
if ctx.IsSet(utils.StateExpiryFullStateEndpointFlag.Name) {
cfg.Eth.StateExpiryFullStateEndpoint = ctx.String(utils.StateExpiryFullStateEndpointFlag.Name)
}
}

func deprecated(field string) bool {
switch field {
case "ethconfig.Config.EVMInterpreter":
Expand Down
4 changes: 4 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ var (
stateExpiryFlags = []cli.Flag{
utils.StateExpiryEnableFlag,
utils.StateExpiryFullStateEndpointFlag,
utils.StateExpiryStateEpoch1BlockFlag,
utils.StateExpiryStateEpoch2BlockFlag,
utils.StateExpiryStateEpochPeriodFlag,
utils.StateExpiryEnableLocalReviveFlag,
}
)

Expand Down
5 changes: 2 additions & 3 deletions cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,12 @@ func pruneState(ctx *cli.Context) error {
Preimages: cfg.Eth.Preimages,
StateHistory: cfg.Eth.StateHistory,
StateScheme: cfg.Eth.StateScheme,
EnableStateExpiry: cfg.Eth.StateExpiryEnable,
RemoteEndPoint: cfg.Eth.StateExpiryFullStateEndpoint,
StateExpiryCfg: cfg.Eth.StateExpiryCfg,
}
prunerconfig := pruner.Config{
Datadir: stack.ResolvePath(""),
BloomSize: ctx.Uint64(utils.BloomFilterSizeFlag.Name),
EnableStateExpiry: cfg.Eth.StateExpiryEnable,
EnableStateExpiry: cfg.Eth.StateExpiryCfg.EnableExpiry(),
ChainConfig: chainConfig,
CacheConfig: cacheConfig,
}
Expand Down
104 changes: 102 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"math"
"math/big"
"net"
Expand Down Expand Up @@ -1125,6 +1127,26 @@ var (
Usage: "set state expiry remote full state rpc endpoint, every expired state will fetch from remote",
Category: flags.StateExpiryCategory,
}
StateExpiryStateEpoch1BlockFlag = &cli.Uint64Flag{
Name: "state-expiry.epoch1",
Usage: "set state expiry epoch1 block number",
Category: flags.StateExpiryCategory,
}
StateExpiryStateEpoch2BlockFlag = &cli.Uint64Flag{
Name: "state-expiry.epoch2",
Usage: "set state expiry epoch2 block number",
Category: flags.StateExpiryCategory,
}
StateExpiryStateEpochPeriodFlag = &cli.Uint64Flag{
Name: "state-expiry.period",
Usage: "set state expiry epoch period after epoch2",
Category: flags.StateExpiryCategory,
}
StateExpiryEnableLocalReviveFlag = &cli.BoolFlag{
Name: "state-expiry.localrevive",
Usage: "if enable local revive",
Category: flags.StateExpiryCategory,
}
)

func init() {
Expand Down Expand Up @@ -1940,13 +1962,18 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.StateHistory = ctx.Uint64(StateHistoryFlag.Name)
}
// Parse state scheme, abort the process if it's not compatible.
chaindb := tryMakeReadOnlyDatabase(ctx, stack)
chaindb := MakeChainDatabase(ctx, stack, false, false)
scheme, err := ParseStateScheme(ctx, chaindb)
chaindb.Close()
if err != nil {
Fatalf("%v", err)
}
cfg.StateScheme = scheme
seCfg, err := ParseStateExpiryConfig(ctx, chaindb, scheme)
if err != nil {
Fatalf("%v", err)
}
cfg.StateExpiryCfg = seCfg
chaindb.Close()

// Parse transaction history flag, if user is still using legacy config
// file with 'TxLookupLimit' configured, copy the value to 'TransactionHistory'.
Expand Down Expand Up @@ -2521,6 +2548,79 @@ func ParseStateScheme(ctx *cli.Context, disk ethdb.Database) (string, error) {
return "", fmt.Errorf("incompatible state scheme, stored: %s, provided: %s", stored, scheme)
}

func ParseStateExpiryConfig(ctx *cli.Context, disk ethdb.Database, scheme string) (*types.StateExpiryConfig, error) {
enc := rawdb.ReadStateExpiryCfg(disk)
var stored *types.StateExpiryConfig
if len(enc) > 0 {
var cfg types.StateExpiryConfig
if err := rlp.DecodeBytes(enc, &cfg); err != nil {
return nil, err
}
stored = &cfg
}
newCfg := &types.StateExpiryConfig{StateScheme: scheme}
if ctx.IsSet(StateExpiryEnableFlag.Name) {
newCfg.Enable = ctx.Bool(StateExpiryEnableFlag.Name)
}
if ctx.IsSet(StateExpiryFullStateEndpointFlag.Name) {
newCfg.FullStateEndpoint = ctx.String(StateExpiryFullStateEndpointFlag.Name)
}

// some config will use stored default
if ctx.IsSet(StateExpiryStateEpoch1BlockFlag.Name) {
newCfg.StateEpoch1Block = ctx.Uint64(StateExpiryStateEpoch1BlockFlag.Name)
} else if stored != nil {
newCfg.StateEpoch1Block = stored.StateEpoch1Block
}
if ctx.IsSet(StateExpiryStateEpoch2BlockFlag.Name) {
newCfg.StateEpoch2Block = ctx.Uint64(StateExpiryStateEpoch2BlockFlag.Name)
} else if stored != nil {
newCfg.StateEpoch2Block = stored.StateEpoch2Block
}
if ctx.IsSet(StateExpiryStateEpochPeriodFlag.Name) {
newCfg.StateEpochPeriod = ctx.Uint64(StateExpiryStateEpochPeriodFlag.Name)
} else if stored != nil {
newCfg.StateEpochPeriod = stored.StateEpochPeriod
}
if ctx.IsSet(StateExpiryEnableLocalReviveFlag.Name) {
newCfg.EnableLocalRevive = ctx.Bool(StateExpiryEnableLocalReviveFlag.Name)
}

// override prune level
newCfg.PruneLevel = types.StateExpiryPruneLevel1
switch newCfg.StateScheme {
case rawdb.HashScheme:
// TODO(0xbundler): will stop support HBSS later.
newCfg.PruneLevel = types.StateExpiryPruneLevel0
case rawdb.PathScheme:
newCfg.PruneLevel = types.StateExpiryPruneLevel1
default:
return nil, fmt.Errorf("not support the state scheme: %v", newCfg.StateScheme)
}

if err := newCfg.Validation(); err != nil {
return nil, err
}
if err := stored.CheckCompatible(newCfg); err != nil {
return nil, err
}

log.Info("Apply State Expiry", "cfg", newCfg)
if !newCfg.Enable {
return newCfg, nil
}

// save it into db
enc, err := rlp.EncodeToBytes(newCfg)
if err != nil {
return nil, err
}
if err = rawdb.WriteStateExpiryCfg(disk, enc); err != nil {
return nil, err
}
return newCfg, nil
}

// MakeTrieDatabase constructs a trie database based on the configured scheme.
func MakeTrieDatabase(ctx *cli.Context, disk ethdb.Database, preimage bool, readOnly bool) *trie.Database {
config := &trie.Config{
Expand Down
41 changes: 24 additions & 17 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ type CacheConfig struct {
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it

// state expiry feature
EnableStateExpiry bool
RemoteEndPoint string
StateExpiryCfg *types.StateExpiryConfig
}

// TriedbConfig derives the configures for trie database.
Expand All @@ -170,7 +169,7 @@ func (c *CacheConfig) TriedbConfig() *trie.Config {
Cache: c.TrieCleanLimit,
Preimages: c.Preimages,
NoTries: c.NoTries,
EnableStateExpiry: c.EnableStateExpiry,
EnableStateExpiry: c.StateExpiryCfg.EnableExpiry(),
}
if c.StateScheme == rawdb.HashScheme {
config.HashDB = &hashdb.Config{
Expand Down Expand Up @@ -300,8 +299,8 @@ type BlockChain struct {
doubleSignMonitor *monitor.DoubleSignMonitor

// state expiry feature
enableStateExpiry bool
fullStateDB ethdb.FullStateDB
stateExpiryCfg *types.StateExpiryConfig
fullStateDB ethdb.FullStateDB
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -374,10 +373,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.processor = NewStateProcessor(chainConfig, bc, engine)

var err error
if cacheConfig.EnableStateExpiry {
log.Info("enable state expiry feature", "RemoteEndPoint", cacheConfig.RemoteEndPoint)
bc.enableStateExpiry = true
bc.fullStateDB, err = ethdb.NewFullStateRPCServer(cacheConfig.RemoteEndPoint)
if cacheConfig.StateExpiryCfg.EnableExpiry() {
log.Info("enable state expiry feature", "RemoteEndPoint", cacheConfig.StateExpiryCfg.FullStateEndpoint)
bc.stateExpiryCfg = cacheConfig.StateExpiryCfg
bc.fullStateDB, err = ethdb.NewFullStateRPCServer(cacheConfig.StateExpiryCfg.FullStateEndpoint)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -624,7 +623,15 @@ func (bc *BlockChain) cacheBlock(hash common.Hash, block *types.Block) {
}

func (bc *BlockChain) EnableStateExpiry() bool {
return bc.enableStateExpiry
return bc.stateExpiryCfg.EnableExpiry()
}

func (bc *BlockChain) EnableStateExpiryLocalRevive() bool {
if bc.EnableStateExpiry() {
return bc.stateExpiryCfg.EnableLocalRevive
}

return false
}

func (bc *BlockChain) FullStateDB() ethdb.FullStateDB {
Expand Down Expand Up @@ -1052,8 +1059,8 @@ func (bc *BlockChain) StateAtWithSharedPool(root, startAtBlockHash common.Hash,
if err != nil {
return nil, err
}
if bc.enableStateExpiry {
stateDB.InitStateExpiryFeature(bc.chainConfig, bc.fullStateDB, startAtBlockHash, height)
if bc.EnableStateExpiry() {
stateDB.InitStateExpiryFeature(bc.stateExpiryCfg, bc.fullStateDB, startAtBlockHash, height)
}
return stateDB, err
}
Expand Down Expand Up @@ -2059,8 +2066,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return it.index, err
}
bc.updateHighestVerifiedHeader(block.Header())
if bc.enableStateExpiry {
statedb.InitStateExpiryFeature(bc.chainConfig, bc.fullStateDB, parent.Hash(), block.Number())
if bc.EnableStateExpiry() {
statedb.InitStateExpiryFeature(bc.stateExpiryCfg, bc.fullStateDB, parent.Hash(), block.Number())
}

// Enable prefetching to pull in trie node paths while processing transactions
Expand All @@ -2071,8 +2078,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
// do Prefetch in a separate goroutine to avoid blocking the critical path
// 1.do state prefetch for snapshot cache
throwaway := statedb.CopyDoPrefetch()
if throwaway != nil && bc.enableStateExpiry {
throwaway.InitStateExpiryFeature(bc.chainConfig, bc.fullStateDB, parent.Hash(), block.Number())
if throwaway != nil && bc.EnableStateExpiry() {
throwaway.InitStateExpiryFeature(bc.stateExpiryCfg, bc.fullStateDB, parent.Hash(), block.Number())
}
go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh)

Expand Down Expand Up @@ -2156,7 +2163,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
stats.usedGas += usedGas

dirty, _ := bc.triedb.Size()
stats.report(chain, it.index, dirty, setHead, bc.chainConfig)
stats.report(chain, it.index, dirty, setHead, bc.stateExpiryCfg)

if !setHead {
// After merge we expect few side chains. Simply count
Expand Down
3 changes: 1 addition & 2 deletions core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package core

import (
"github.com/ethereum/go-ethereum/params"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -40,7 +39,7 @@ const statsReportLimit = 8 * time.Second

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool, config *params.ChainConfig) {
func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool, config *types.StateExpiryConfig) {
// Fetch the timings for the batch
var (
now = mclock.Now()
Expand Down
6 changes: 4 additions & 2 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,15 +356,17 @@ func (bc *BlockChain) StateAt(startAtRoot common.Hash, startAtBlockHash common.H
if err != nil {
return nil, err
}
if bc.enableStateExpiry {
sdb.InitStateExpiryFeature(bc.chainConfig, bc.fullStateDB, startAtBlockHash, expectHeight)
if bc.EnableStateExpiry() {
sdb.InitStateExpiryFeature(bc.stateExpiryCfg, bc.fullStateDB, startAtBlockHash, expectHeight)
}
return sdb, err
}

// Config retrieves the chain's fork configuration.
func (bc *BlockChain) Config() *params.ChainConfig { return bc.chainConfig }

func (bc *BlockChain) StateExpiryConfig() *types.StateExpiryConfig { return bc.stateExpiryCfg }

// Engine retrieves the blockchain's consensus engine.
func (bc *BlockChain) Engine() consensus.Engine { return bc.engine }

Expand Down
9 changes: 9 additions & 0 deletions core/rawdb/accessors_epoch_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ func DeleteEpochMetaPlainState(db ethdb.KeyValueWriter, addr common.Hash, path s
return db.Delete(epochMetaPlainStateKey(addr, path))
}

func ReadStateExpiryCfg(db ethdb.Reader) []byte {
val, _ := db.Get(stateExpiryCfgKey)
return val
}

func WriteStateExpiryCfg(db ethdb.KeyValueWriter, val []byte) error {
return db.Put(stateExpiryCfgKey, val)
}

func epochMetaPlainStateKey(addr common.Hash, path string) []byte {
key := make([]byte, len(EpochMetaPlainStatePrefix)+len(addr)+len(path))
copy(key[:], EpochMetaPlainStatePrefix)
Expand Down
3 changes: 3 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ var (
// epochMetaPlainStateMeta save disk layer meta data
epochMetaPlainStateMeta = []byte("epochMetaPlainStateMeta")

// stateExpiryCfgKey save state expiry persistence config
stateExpiryCfgKey = []byte("stateExpiryCfgKey")

// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td
Expand Down
4 changes: 2 additions & 2 deletions core/state/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func (it *nodeIterator) step() error {
if err != nil {
return err
}
if it.state.enableStateExpiry {
dataTrie.SetEpoch(it.state.epoch)
if it.state.EnableExpire() {
dataTrie.SetEpoch(it.state.Epoch())
}
it.dataIt, err = dataTrie.NodeIterator(nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ func (p *Pruner) ExpiredPrune(height *big.Int, root common.Hash) error {
var (
pruneExpiredTrieCh = make(chan *snapshot.ContractItem, 100000)
pruneExpiredInDiskCh = make(chan *trie.NodeInfo, 100000)
epoch = types.GetStateEpoch(p.config.ChainConfig, height)
epoch = types.GetStateEpoch(p.config.CacheConfig.StateExpiryCfg, height)
rets = make([]error, 3)
tasksWG sync.WaitGroup
)
Expand Down
Loading

0 comments on commit 4c3b6cd

Please sign in to comment.