Skip to content

Commit

Permalink
perf: concurrency and memory improvements for execution layer
Browse files Browse the repository at this point in the history
Description

This PR refers to the code optimization done by `BSC`, which mainly includes the following parts:
1. Do BlockBody verification concurrently.
2. Do the calculation of intermediate root concurrently.
3. Commit the MPTs concurrently.
4. Preload accounts before processing blocks.
5. Make the snapshot layers configurable.
6. Reuse some objects to reduce GC.
7. Add shared_pool for `stateDB` to improve cache usage.

References
- bnb-chain/bsc#257
- bnb-chain/bsc#792

---------

Co-authored-by: j75689 <j75689@gmail.com>
Co-authored-by: realowen <131384228+realowen@users.noreply.github.com>
  • Loading branch information
3 people committed May 31, 2023
1 parent 547f15b commit 1709fea
Show file tree
Hide file tree
Showing 73 changed files with 1,298 additions and 447 deletions.
2 changes: 2 additions & 0 deletions cmd/devp2p/internal/ethtest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ func setupGeth(stack *node.Node) error {
TrieCleanCacheRejournal: 60 * time.Minute,
TrieDirtyCache: 16,
TrieTimeout: 60 * time.Minute,
TriesInMemory: 128,
SnapshotCache: 10,
Preimages: true,
})
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/geth/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func dump(ctx *cli.Context) error {
config := &trie.Config{
Preimages: true, // always enable preimage lookup
}
state, err := state.New(root, state.NewDatabaseWithConfig(db, config), nil)
state, err := state.New(root, state.NewDatabaseWithConfigAndCache(db, config), nil)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ var (
utils.SyncTargetFlag,
utils.ExitWhenSyncedFlag,
utils.GCModeFlag,
utils.TriesInMemoryFlag,
utils.SnapshotFlag,
utils.TxLookupLimitFlag,
utils.LightServeFlag,
Expand Down
13 changes: 9 additions & 4 deletions cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
Flags: flags.Merge([]cli.Flag{
utils.CacheTrieJournalFlag,
utils.BloomFilterSizeFlag,
utils.TriesInMemoryFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags),
Description: `
geth snapshot prune-state <state-root>
Expand All @@ -73,7 +74,7 @@ the trie clean cache with default directory will be deleted.
Usage: "Recalculate state hash based on the snapshot for verification",
ArgsUsage: "<root>",
Action: verifyState,
Flags: flags.Merge(utils.NetworkFlags, utils.DatabasePathFlags),
Flags: flags.Merge(utils.NetworkFlags, utils.DatabasePathFlags, []cli.Flag{utils.TriesInMemoryFlag}),
Description: `
geth snapshot verify-state <state-root>
will traverse the whole accounts and storages set based on the specified
Expand Down Expand Up @@ -144,6 +145,7 @@ It's also usable without snapshot enabled.
utils.ExcludeStorageFlag,
utils.StartKeyFlag,
utils.DumpLimitFlag,
utils.TriesInMemoryFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags),
Description: `
This command is semantically equivalent to 'geth dump', but uses the snapshots
Expand Down Expand Up @@ -171,7 +173,8 @@ func pruneState(ctx *cli.Context) error {
Cachedir: stack.ResolvePath(config.Eth.TrieCleanCacheJournal),
BloomSize: ctx.Uint64(utils.BloomFilterSizeFlag.Name),
}
pruner, err := pruner.NewPruner(chaindb, prunerconfig)
pruner, err := pruner.NewPruner(chaindb, prunerconfig,
pruner.WithTriesInMemory(ctx.Uint64(utils.TriesInMemoryFlag.Name)))
if err != nil {
log.Error("Failed to open snapshot tree", "err", err)
return err
Expand Down Expand Up @@ -213,7 +216,8 @@ func verifyState(ctx *cli.Context) error {
NoBuild: true,
AsyncBuild: false,
}
snaptree, err := snapshot.New(snapconfig, chaindb, trie.NewDatabase(chaindb), headBlock.Root())
snaptree, err := snapshot.New(snapconfig, chaindb, trie.NewDatabase(chaindb), headBlock.Root(),
snapshot.SetCapLimit(int(ctx.Uint64(utils.TriesInMemoryFlag.Name))))
if err != nil {
log.Error("Failed to open snapshot tree", "err", err)
return err
Expand Down Expand Up @@ -496,7 +500,8 @@ func dumpState(ctx *cli.Context) error {
NoBuild: true,
AsyncBuild: false,
}
snaptree, err := snapshot.New(snapConfig, db, trie.NewDatabase(db), root)
snaptree, err := snapshot.New(snapConfig, db, trie.NewDatabase(db), root,
snapshot.SetCapLimit(int(ctx.Uint64(utils.TriesInMemoryFlag.Name))))
if err != nil {
return err
}
Expand Down
11 changes: 10 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ var (
Value: true,
Category: flags.EthCategory,
}
TriesInMemoryFlag = &cli.Uint64Flag{
Name: "triesInMemory",
Usage: "The layer of tries trees that keep in memory",
Value: 128,
}
TxLookupLimitFlag = &cli.Uint64Flag{
Name: "txlookuplimit",
Usage: "Number of recent blocks to maintain transactions index for (default = about one year, 0 = entire chain)",
Expand Down Expand Up @@ -465,7 +470,7 @@ var (
CacheDatabaseFlag = &cli.IntFlag{
Name: "cache.database",
Usage: "Percentage of cache memory allowance to use for database io",
Value: 50,
Value: 40,
Category: flags.PerfCategory,
}
CacheTrieFlag = &cli.IntFlag{
Expand Down Expand Up @@ -1852,6 +1857,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheGCFlag.Name) {
cfg.TrieDirtyCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheGCFlag.Name) / 100
}
if ctx.IsSet(TriesInMemoryFlag.Name) {
cfg.TriesInMemory = ctx.Uint64(TriesInMemoryFlag.Name)
}
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheSnapshotFlag.Name) {
cfg.SnapshotCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheSnapshotFlag.Name) / 100
}
Expand Down Expand Up @@ -2296,6 +2304,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockCh
TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache,
TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive",
TrieTimeLimit: ethconfig.Defaults.TrieTimeout,
TriesInMemory: ethconfig.Defaults.TriesInMemory,
SnapshotLimit: ethconfig.Defaults.SnapshotCache,
Preimages: ctx.Bool(CachePreimagesFlag.Name),
}
Expand Down
60 changes: 60 additions & 0 deletions common/gopool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package gopool

import (
"time"

"github.com/panjf2000/ants/v2"
)

const (
// DefaultAntsPoolSize is the default expire time of ants pool.
defaultGoroutineExpireDuration = 10 * time.Second
)

var (
// defaultPool is the default ants pool for gopool package.
defaultPool *ants.Pool
)

func init() {
// Init a instance pool when importing ants.
pool, err := ants.NewPool(
ants.DefaultAntsPoolSize,
ants.WithExpiryDuration(defaultGoroutineExpireDuration),
)
if err != nil {
panic(err)
}

defaultPool = pool
}

// Submit submits a task to pool.
func Submit(task func()) error {
return defaultPool.Submit(task)
}

// Running returns the number of the currently running goroutines.
func Running() int {
return defaultPool.Running()
}

// Cap returns the capacity of this default pool.
func Cap() int {
return defaultPool.Cap()
}

// Free returns the available goroutines to work.
func Free() int {
return defaultPool.Free()
}

// Release Closes the default pool.
func Release() {
defaultPool.Release()
}

// Reboot reboots the default pool.
func Reboot() {
defaultPool.Reboot()
}
8 changes: 8 additions & 0 deletions common/lru/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,11 @@ func (c *Cache[K, V]) Keys() []K {

return c.cache.Keys()
}

// GetOldest returns the oldest entry in the cache.
func (c *Cache[K, V]) GetOldest() (key K, value V, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()

return c.cache.GetOldest()
}
6 changes: 4 additions & 2 deletions core/asm/lexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"unicode"
"unicode/utf8"

"github.com/ethereum/go-ethereum/common/gopool"
)

// stateFn is used through the lifetime of the
Expand Down Expand Up @@ -103,14 +105,14 @@ func Lex(source []byte, debug bool) <-chan token {
state: lexLine,
debug: debug,
}
go func() {
gopool.Submit(func() {
l.emit(lineStart)
for l.state != nil {
l.state = l.state(l)
}
l.emit(eof)
close(l.tokens)
}()
})

return ch
}
Expand Down
64 changes: 45 additions & 19 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package core
import (
"fmt"

"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -64,29 +65,54 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
if hash := types.CalcUncleHash(block.Uncles()); hash != header.UncleHash {
return fmt.Errorf("uncle root hash mismatch (header value %x, calculated %x)", header.UncleHash, hash)
}
if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash {
return fmt.Errorf("transaction root hash mismatch (header value %x, calculated %x)", header.TxHash, hash)

validateFuns := []func() error{
func() error {
if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash {
return fmt.Errorf("transaction root hash mismatch (header value %x, calculated %x)", header.TxHash, hash)
}
return nil
},
func() error {
// Withdrawals are present after the Shanghai fork.
if header.WithdrawalsHash != nil {
// Withdrawals list must be present in body after Shanghai.
if block.Withdrawals() == nil {
return fmt.Errorf("missing withdrawals in block body")
}
if hash := types.DeriveSha(block.Withdrawals(), trie.NewStackTrie(nil)); hash != *header.WithdrawalsHash {
return fmt.Errorf("withdrawals root hash mismatch (header value %x, calculated %x)", *header.WithdrawalsHash, hash)
}
} else if block.Withdrawals() != nil {
// Withdrawals are not allowed prior to shanghai fork
return fmt.Errorf("withdrawals present in block body")
}
return nil
},
func() error {
if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {
return consensus.ErrUnknownAncestor
}
return consensus.ErrPrunedAncestor
}
return nil
},
}
// Withdrawals are present after the Shanghai fork.
if header.WithdrawalsHash != nil {
// Withdrawals list must be present in body after Shanghai.
if block.Withdrawals() == nil {
return fmt.Errorf("missing withdrawals in block body")
}
if hash := types.DeriveSha(block.Withdrawals(), trie.NewStackTrie(nil)); hash != *header.WithdrawalsHash {
return fmt.Errorf("withdrawals root hash mismatch (header value %x, calculated %x)", *header.WithdrawalsHash, hash)
}
} else if block.Withdrawals() != nil {
// Withdrawals are not allowed prior to shanghai fork
return fmt.Errorf("withdrawals present in block body")
validateRes := make(chan error, len(validateFuns))
for _, f := range validateFuns {
tmpFunc := f
gopool.Submit(func() {
validateRes <- tmpFunc()
})
}

if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {
return consensus.ErrUnknownAncestor
for i := 0; i < len(validateFuns); i++ {
err := <-validateRes
if err != nil {
return err
}
return consensus.ErrPrunedAncestor
}

return nil
}

Expand Down
Loading

0 comments on commit 1709fea

Please sign in to comment.