diff --git a/core/blockchain.go b/core/blockchain.go index 52d8d636fa..089bea95f4 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1895,7 +1895,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er start := i * len(accountsSlice) / runtime.NumCPU() end := (i + 1) * len(accountsSlice) / runtime.NumCPU() if i+1 == runtime.NumCPU() { - end = len(accounts) + end = len(accountsSlice) } preloadWg.Add(1) gopool.Submit(func() { diff --git a/core/state/state_object.go b/core/state/state_object.go index f93f47d5f5..7f71a0afb8 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -335,7 +335,11 @@ func (s *stateObject) updateTrie(db Database) Trie { } // Track the amount of time wasted on updating the storage trie if metrics.EnabledExpensive { - defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now()) + defer func(start time.Time) { + s.db.MetricsMux.Lock() + s.db.StorageUpdates += time.Since(start) + s.db.MetricsMux.Unlock() + }(time.Now()) } // The snapshot storage map for the object var storage map[common.Hash][]byte @@ -361,6 +365,7 @@ func (s *stateObject) updateTrie(db Database) Trie { } // If state snapshotting is active, cache the data til commit if s.db.snap != nil { + s.db.snapMux.Lock() if storage == nil { // Retrieve the old storage map, if available, create a new one otherwise if storage = s.db.snapStorage[s.addrHash]; storage == nil { @@ -369,6 +374,7 @@ func (s *stateObject) updateTrie(db Database) Trie { } } storage[crypto.HashData(hasher, key[:])] = v // v will be nil if value is 0x00 + s.db.snapMux.Unlock() } usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure } @@ -389,7 +395,11 @@ func (s *stateObject) updateRoot(db Database) { } // Track the amount of time wasted on hashing the storage trie if metrics.EnabledExpensive { - defer func(start time.Time) { s.db.StorageHashes += time.Since(start) }(time.Now()) + defer func(start time.Time) { + s.db.MetricsMux.Lock() + s.db.StorageHashes += time.Since(start) + s.db.MetricsMux.Unlock() + }(time.Now()) } s.data.Root = s.trie.Hash() } diff --git a/core/state/statedb.go b/core/state/statedb.go index f899da1c00..5d67514b40 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -21,10 +21,13 @@ import ( "errors" "fmt" "math/big" + "runtime" "sort" "sync" "time" + "github.com/ethereum/go-ethereum/common/gopool" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" @@ -69,6 +72,7 @@ type StateDB struct { trie Trie hasher crypto.KeccakState + snapMux sync.Mutex snaps *snapshot.Tree snap snapshot.Snapshot snapDestructs map[common.Hash]struct{} @@ -108,6 +112,7 @@ type StateDB struct { nextRevisionId int // Measurements gathered during execution for debugging purposes + MetricsMux sync.Mutex AccountReads time.Duration AccountHashes time.Duration AccountUpdates time.Duration @@ -895,6 +900,22 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { s.prefetcher = nil }() } + + tasks := make(chan func()) + finishCh := make(chan struct{}) + wg := sync.WaitGroup{} + for i := 0; i < runtime.NumCPU(); i++ { + gopool.Submit(func() { + for { + select { + case task := <-tasks: + task() + case <-finishCh: + return + } + } + }) + } // Although naively it makes sense to retrieve the account trie and then do // the contract storage and account updates sequentially, that short circuits // the account prefetcher. Instead, let's process all the storage updates @@ -902,9 +923,15 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // to pull useful data from disk. for addr := range s.stateObjectsPending { if obj := s.stateObjects[addr]; !obj.deleted { - obj.updateRoot(s.db) + wg.Add(1) + tasks <- func() { + obj.updateRoot(s.db) + wg.Done() + } } } + wg.Wait() + close(finishCh) // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie // which has the same root, but also has some content loaded into it. @@ -958,72 +985,97 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { return common.Hash{}, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr) } // Finalize any pending changes and merge everything into the tries - s.IntermediateRoot(deleteEmptyObjects) - - // Commit objects to the trie, measuring the elapsed time - codeWriter := s.db.TrieDB().DiskDB().NewBatch() - for addr := range s.stateObjectsDirty { - if obj := s.stateObjects[addr]; !obj.deleted { - // Write any contract code associated with the state object - if obj.code != nil && obj.dirtyCode { - rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) - obj.dirtyCode = false + root := s.IntermediateRoot(deleteEmptyObjects) + + commitFuncs := []func() error{ + func() error { + // Commit objects to the trie, measuring the elapsed time + codeWriter := s.db.TrieDB().DiskDB().NewBatch() + for addr := range s.stateObjectsDirty { + if obj := s.stateObjects[addr]; !obj.deleted { + // Write any contract code associated with the state object + if obj.code != nil && obj.dirtyCode { + rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) + obj.dirtyCode = false + } + // Write any storage changes in the state object to its storage trie + if err := obj.CommitTrie(s.db); err != nil { + return err + } + } } - // Write any storage changes in the state object to its storage trie - if err := obj.CommitTrie(s.db); err != nil { - return common.Hash{}, err + if len(s.stateObjectsDirty) > 0 { + s.stateObjectsDirty = make(map[common.Address]struct{}, len(s.stateObjectsDirty)/2) } - } - } - if len(s.stateObjectsDirty) > 0 { - s.stateObjectsDirty = make(map[common.Address]struct{}) - } - if codeWriter.ValueSize() > 0 { - if err := codeWriter.Write(); err != nil { - log.Crit("Failed to commit dirty codes", "error", err) - } - } - // Write the account trie changes, measuing the amount of wasted time - var start time.Time - if metrics.EnabledExpensive { - start = time.Now() - } - // The onleaf func is called _serially_, so we can reuse the same account - // for unmarshalling every time. - var account Account - root, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { - if err := rlp.DecodeBytes(leaf, &account); err != nil { - return nil - } - if account.Root != emptyRoot { - s.db.TrieDB().Reference(account.Root, parent) - } - return nil - }) - if metrics.EnabledExpensive { - s.AccountCommits += time.Since(start) - } - // If snapshotting is enabled, update the snapshot tree with this new version - if s.snap != nil { - if metrics.EnabledExpensive { - defer func(start time.Time) { s.SnapshotCommits += time.Since(start) }(time.Now()) - } - // Only update if there's a state transition (skip empty Clique blocks) - if parent := s.snap.Root(); parent != root { - if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil { - log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) + if codeWriter.ValueSize() > 0 { + if err := codeWriter.Write(); err != nil { + log.Crit("Failed to commit dirty codes", "error", err) + } + } + // Write the account trie changes, measuing the amount of wasted time + var start time.Time + if metrics.EnabledExpensive { + start = time.Now() } - // Keep 128 diff layers in the memory, persistent layer is 129th. - // - head layer is paired with HEAD state - // - head-1 layer is paired with HEAD-1 state - // - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state - if err := s.snaps.Cap(root, 128); err != nil { - log.Warn("Failed to cap snapshot tree", "root", root, "layers", 128, "err", err) + // The onleaf func is called _serially_, so we can reuse the same account + // for unmarshalling every time. + var account Account + _, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { + if err := rlp.DecodeBytes(leaf, &account); err != nil { + return nil + } + if account.Root != emptyRoot { + s.db.TrieDB().Reference(account.Root, parent) + } + return nil + }) + if err != nil { + return err } + if metrics.EnabledExpensive { + s.AccountCommits += time.Since(start) + } + return nil + }, + func() error { + // If snapshotting is enabled, update the snapshot tree with this new version + if s.snap != nil { + if metrics.EnabledExpensive { + defer func(start time.Time) { s.SnapshotCommits += time.Since(start) }(time.Now()) + } + // Only update if there's a state transition (skip empty Clique blocks) + if parent := s.snap.Root(); parent != root { + if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil { + log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) + } + // Keep 128 diff layers in the memory, persistent layer is 129th. + // - head layer is paired with HEAD state + // - head-1 layer is paired with HEAD-1 state + // - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state + if err := s.snaps.Cap(root, 128); err != nil { + log.Warn("Failed to cap snapshot tree", "root", root, "layers", 128, "err", err) + } + } + s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil + } + return nil + }, + } + commitRes := make(chan error, len(commitFuncs)) + for _, f := range commitFuncs { + tmpFunc := f + gopool.Submit(func() { + commitRes <- tmpFunc() + }) + } + for i := 0; i < len(commitFuncs); i++ { + r := <-commitRes + if r != nil { + return common.Hash{}, r } - s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil } - return root, err + + return root, nil } // PrepareAccessList handles the preparatory steps for executing a state transition with