diff --git a/core/blockchain_diff_test.go b/core/blockchain_diff_test.go index d44ba0465b..7df2612b35 100644 --- a/core/blockchain_diff_test.go +++ b/core/blockchain_diff_test.go @@ -117,15 +117,15 @@ func rawDataToDiffLayer(data rlp.RawValue) (*types.DiffLayer, error) { hasher.Write(data) var diffHash common.Hash hasher.Sum(diffHash[:0]) - hasher.Reset() diff.DiffHash = diffHash + hasher.Reset() return &diff, nil } func TestProcessDiffLayer(t *testing.T) { t.Parallel() - blockNum := maxDiffLimit - 1 + blockNum := 128 fullBackend := newTestBackend(blockNum, false) falseDiff := 5 defer fullBackend.close() diff --git a/core/state/statedb.go b/core/state/statedb.go index 382d73d81a..886f181bf5 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -39,8 +39,9 @@ import ( ) const ( - preLoadLimit = 128 - defaultNumOfSlots = 100 + preLoadLimit = 128 + defaultNumOfSlots = 100 + minNumberOfAccountPerTask = 5 ) type revision struct { @@ -558,7 +559,7 @@ func (s *StateDB) TryPreload(block *types.Block, signer types.Signer) { accounts[*tx.To()] = true } } - for account, _ := range accounts { + for account := range accounts { accountsSlice = append(accountsSlice, account) } if len(accountsSlice) >= preLoadLimit && len(accountsSlice) > runtime.NumCPU() { @@ -1081,52 +1082,127 @@ func (s *StateDB) clearJournalAndRefund() { s.validRevisions = s.validRevisions[:0] // Snapshots can be created without journal entires } -func (s *StateDB) LightCommit() (common.Hash, *types.DiffLayer, error) { +func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer, error) { codeWriter := s.db.TrieDB().DiskDB().NewBatch() - // Step1 write code - for codeHash, code := range s.diffCode { - rawdb.WriteCode(codeWriter, codeHash, code) - if codeWriter.ValueSize() >= ethdb.IdealBatchSize { - if err := codeWriter.Write(); err != nil { - return common.Hash{}, nil, err + commitFuncs := []func() error{ + func() error { + for codeHash, code := range s.diffCode { + rawdb.WriteCode(codeWriter, codeHash, code) + if codeWriter.ValueSize() >= ethdb.IdealBatchSize { + if err := codeWriter.Write(); err != nil { + return err + } + codeWriter.Reset() + } + } + if codeWriter.ValueSize() > 0 { + if err := codeWriter.Write(); err != nil { + return err + } + } + return nil + }, + func() error { + tasks := make(chan func()) + taskResults := make(chan error, len(s.diffTries)) + tasksNum := 0 + finishCh := make(chan struct{}) + defer close(finishCh) + threads := 1 + if len(s.diffTries)/runtime.NumCPU() > minNumberOfAccountPerTask { + threads = runtime.NumCPU() + } + for i := 0; i < threads; i++ { + go func() { + for { + select { + case task := <-tasks: + task() + case <-finishCh: + return + } + } + }() } - codeWriter.Reset() - } - } - if codeWriter.ValueSize() > 0 { - if err := codeWriter.Write(); err != nil { - return common.Hash{}, nil, err - } - } - // Step2 commit account storage - for account, diff := range s.diffTries { - root, err := diff.Commit(nil) - if err != nil { - return common.Hash{}, nil, err - } - s.db.CacheStorage(crypto.Keccak256Hash(account[:]), root, diff) - } + for account, diff := range s.diffTries { + tmpAccount := account + tmpDiff := diff + tasks <- func() { + root, err := tmpDiff.Commit(nil) + if err != nil { + taskResults <- err + return + } + s.db.CacheStorage(crypto.Keccak256Hash(tmpAccount[:]), root, tmpDiff) + taskResults <- nil + return + } + tasksNum++ + } + + for i := 0; i < tasksNum; i++ { + err := <-taskResults + if err != nil { + return err + } + } - // Step3 commit account trie - var account Account - root, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { - if err := rlp.DecodeBytes(leaf, &account); err != nil { + // commit account trie + var account Account + root, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { + if err := rlp.DecodeBytes(leaf, &account); err != nil { + return nil + } + if account.Root != emptyRoot { + s.db.TrieDB().Reference(account.Root, parent) + } + return nil + }) + if err != nil { + return err + } + if root != emptyRoot { + s.db.CacheAccount(root, s.trie) + } return nil - } - if account.Root != emptyRoot { - s.db.TrieDB().Reference(account.Root, parent) - } - return nil - }) - if err != nil { - return common.Hash{}, nil, err + }, + func() error { + if s.snap != nil { + if metrics.EnabledExpensive { + defer func(start time.Time) { s.SnapshotCommits += time.Since(start) }(time.Now()) + } + // Only update if there's a state transition (skip empty Clique blocks) + if parent := s.snap.Root(); parent != root { + if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil { + log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) + } + // Keep n diff layers in the memory + // - head layer is paired with HEAD state + // - head-1 layer is paired with HEAD-1 state + // - head-(n-1) layer(bottom-most diff layer) is paired with HEAD-(n-1)state + if err := s.snaps.Cap(root, s.snaps.CapLimit()); err != nil { + log.Warn("Failed to cap snapshot tree", "root", root, "layers", s.snaps.CapLimit(), "err", err) + } + } + } + return nil + }, + } + commitRes := make(chan error, len(commitFuncs)) + for _, f := range commitFuncs { + tmpFunc := f + go func() { + commitRes <- tmpFunc() + }() } - if root != emptyRoot { - s.db.CacheAccount(root, s.trie) + for i := 0; i < len(commitFuncs); i++ { + r := <-commitRes + if r != nil { + return common.Hash{}, nil, r + } } - s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil s.diffTries, s.diffCode = nil, nil return root, s.diffLayer, nil @@ -1140,7 +1216,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer // Finalize any pending changes and merge everything into the tries root := s.IntermediateRoot(deleteEmptyObjects) if s.lightProcessed { - return s.LightCommit() + return s.LightCommit(root) } var diffLayer *types.DiffLayer if s.snap != nil { diff --git a/core/state_processor.go b/core/state_processor.go index c281263975..ecb28a23ea 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -43,7 +43,7 @@ import ( const ( fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly - minNumberOfAccountPerTask = 10 + minNumberOfAccountPerTask = 5 ) // StateProcessor is a basic Processor, which takes care of transitioning