Skip to content

Commit

Permalink
make commit concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
unclezoro committed Sep 6, 2021
1 parent 07f4746 commit 85c037d
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 45 deletions.
4 changes: 2 additions & 2 deletions core/blockchain_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ func rawDataToDiffLayer(data rlp.RawValue) (*types.DiffLayer, error) {
hasher.Write(data)
var diffHash common.Hash
hasher.Sum(diffHash[:0])
hasher.Reset()
diff.DiffHash = diffHash
hasher.Reset()
return &diff, nil
}

func TestProcessDiffLayer(t *testing.T) {
t.Parallel()

blockNum := maxDiffLimit - 1
blockNum := 128
fullBackend := newTestBackend(blockNum, false)
falseDiff := 5
defer fullBackend.close()
Expand Down
160 changes: 118 additions & 42 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ import (
)

const (
preLoadLimit = 128
defaultNumOfSlots = 100
preLoadLimit = 128
defaultNumOfSlots = 100
minNumberOfAccountPerTask = 5
)

type revision struct {
Expand Down Expand Up @@ -558,7 +559,7 @@ func (s *StateDB) TryPreload(block *types.Block, signer types.Signer) {
accounts[*tx.To()] = true
}
}
for account, _ := range accounts {
for account := range accounts {
accountsSlice = append(accountsSlice, account)
}
if len(accountsSlice) >= preLoadLimit && len(accountsSlice) > runtime.NumCPU() {
Expand Down Expand Up @@ -1081,52 +1082,127 @@ func (s *StateDB) clearJournalAndRefund() {
s.validRevisions = s.validRevisions[:0] // Snapshots can be created without journal entires
}

func (s *StateDB) LightCommit() (common.Hash, *types.DiffLayer, error) {
func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer, error) {
codeWriter := s.db.TrieDB().DiskDB().NewBatch()

// Step1 write code
for codeHash, code := range s.diffCode {
rawdb.WriteCode(codeWriter, codeHash, code)
if codeWriter.ValueSize() >= ethdb.IdealBatchSize {
if err := codeWriter.Write(); err != nil {
return common.Hash{}, nil, err
commitFuncs := []func() error{
func() error {
for codeHash, code := range s.diffCode {
rawdb.WriteCode(codeWriter, codeHash, code)
if codeWriter.ValueSize() >= ethdb.IdealBatchSize {
if err := codeWriter.Write(); err != nil {
return err
}
codeWriter.Reset()
}
}
if codeWriter.ValueSize() > 0 {
if err := codeWriter.Write(); err != nil {
return err
}
}
return nil
},
func() error {
tasks := make(chan func())
taskResults := make(chan error, len(s.diffTries))
tasksNum := 0
finishCh := make(chan struct{})
defer close(finishCh)
threads := 1
if len(s.diffTries)/runtime.NumCPU() > minNumberOfAccountPerTask {
threads = runtime.NumCPU()
}
for i := 0; i < threads; i++ {
go func() {
for {
select {
case task := <-tasks:
task()
case <-finishCh:
return
}
}
}()
}
codeWriter.Reset()
}
}
if codeWriter.ValueSize() > 0 {
if err := codeWriter.Write(); err != nil {
return common.Hash{}, nil, err
}
}

// Step2 commit account storage
for account, diff := range s.diffTries {
root, err := diff.Commit(nil)
if err != nil {
return common.Hash{}, nil, err
}
s.db.CacheStorage(crypto.Keccak256Hash(account[:]), root, diff)
}
for account, diff := range s.diffTries {
tmpAccount := account
tmpDiff := diff
tasks <- func() {
root, err := tmpDiff.Commit(nil)
if err != nil {
taskResults <- err
return
}
s.db.CacheStorage(crypto.Keccak256Hash(tmpAccount[:]), root, tmpDiff)
taskResults <- nil
return
}
tasksNum++
}

for i := 0; i < tasksNum; i++ {
err := <-taskResults
if err != nil {
return err
}
}

// Step3 commit account trie
var account Account
root, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error {
if err := rlp.DecodeBytes(leaf, &account); err != nil {
// commit account trie
var account Account
root, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error {
if err := rlp.DecodeBytes(leaf, &account); err != nil {
return nil
}
if account.Root != emptyRoot {
s.db.TrieDB().Reference(account.Root, parent)
}
return nil
})
if err != nil {
return err
}
if root != emptyRoot {
s.db.CacheAccount(root, s.trie)
}
return nil
}
if account.Root != emptyRoot {
s.db.TrieDB().Reference(account.Root, parent)
}
return nil
})
if err != nil {
return common.Hash{}, nil, err
},
func() error {
if s.snap != nil {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.SnapshotCommits += time.Since(start) }(time.Now())
}
// Only update if there's a state transition (skip empty Clique blocks)
if parent := s.snap.Root(); parent != root {
if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil {
log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err)
}
// Keep n diff layers in the memory
// - head layer is paired with HEAD state
// - head-1 layer is paired with HEAD-1 state
// - head-(n-1) layer(bottom-most diff layer) is paired with HEAD-(n-1)state
if err := s.snaps.Cap(root, s.snaps.CapLimit()); err != nil {
log.Warn("Failed to cap snapshot tree", "root", root, "layers", s.snaps.CapLimit(), "err", err)
}
}
}
return nil
},
}
commitRes := make(chan error, len(commitFuncs))
for _, f := range commitFuncs {
tmpFunc := f
go func() {
commitRes <- tmpFunc()
}()
}
if root != emptyRoot {
s.db.CacheAccount(root, s.trie)
for i := 0; i < len(commitFuncs); i++ {
r := <-commitRes
if r != nil {
return common.Hash{}, nil, r
}
}

s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil
s.diffTries, s.diffCode = nil, nil
return root, s.diffLayer, nil
Expand All @@ -1140,7 +1216,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer
// Finalize any pending changes and merge everything into the tries
root := s.IntermediateRoot(deleteEmptyObjects)
if s.lightProcessed {
return s.LightCommit()
return s.LightCommit(root)
}
var diffLayer *types.DiffLayer
if s.snap != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (

const (
fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly
minNumberOfAccountPerTask = 10
minNumberOfAccountPerTask = 5
)

// StateProcessor is a basic Processor, which takes care of transitioning
Expand Down

0 comments on commit 85c037d

Please sign in to comment.