Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/state: parallelise parts of state commit #29681

Merged
merged 4 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ func (s *stateObject) updateRoot() {
// commit obtains a set of dirty storage trie nodes and updates the account data.
// The returned set can be nil if nothing to commit. This function assumes all
// storage mutations have already been flushed into trie by updateRoot.
//
// Note, commit may run concurrently across all the state objects. Do not assume
// thread-safe access to the statedb.
func (s *stateObject) commit() (*trienode.NodeSet, error) {
// Short circuit if trie is not even loaded, don't bother with committing anything
if s.trie == nil {
Expand Down
84 changes: 59 additions & 25 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"fmt"
"maps"
"math/big"
"runtime"
"slices"
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -31,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/core/tracing"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/internal/workerpool"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
Expand Down Expand Up @@ -1146,47 +1149,78 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er
storageTrieNodesUpdated int
storageTrieNodesDeleted int
nodes = trienode.NewMergedNodeSet()
codeWriter = s.db.DiskDB().NewBatch()
)
// Handle all state deletions first
if err := s.handleDestruction(nodes); err != nil {
return common.Hash{}, err
}
// Handle all state updates afterwards
// Handle all state updates afterwards, concurrently to one another to shave
// off some milliseconds from the commit operation. Also accumulate the code
// writes to run in parallel with the computations.
start := time.Now()
var (
code = s.db.DiskDB().NewBatch()
lock sync.Mutex
)
workers := workerpool.New[*stateObject, error](len(s.mutations), min(len(s.mutations), runtime.NumCPU()),
func(obj *stateObject) error {
// Write any storage changes in the state object to its storage trie
set, err := obj.commit()
if err != nil {
return err
}
// Merge the dirty nodes of storage trie into global set. It is possible
// that the account was destructed and then resurrected in the same block.
// In this case, the node set is shared by both accounts.
if set != nil {
lock.Lock()
defer lock.Unlock()

if err = nodes.Merge(set); err != nil {
return err
}
updates, deleted := set.Size()
storageTrieNodesUpdated += updates
storageTrieNodesDeleted += deleted
}
return nil
})

for addr, op := range s.mutations {
if op.isDelete() {
continue
}
obj := s.stateObjects[addr]

// Write any contract code associated with the state object
obj := s.stateObjects[addr]
if obj.code != nil && obj.dirtyCode {
rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code)
rawdb.WriteCode(code, common.BytesToHash(obj.CodeHash()), obj.code)
obj.dirtyCode = false
}
// Write any storage changes in the state object to its storage trie
set, err := obj.commit()
if err != nil {
return common.Hash{}, err
}
// Merge the dirty nodes of storage trie into global set. It is possible
// that the account was destructed and then resurrected in the same block.
// In this case, the node set is shared by both accounts.
if set != nil {
if err := nodes.Merge(set); err != nil {
return common.Hash{}, err
}
updates, deleted := set.Size()
storageTrieNodesUpdated += updates
storageTrieNodesDeleted += deleted
// Run the storage updates concurrently to one another
workers.Schedule(obj)
}
workers.Close()

// Updates running concurrently, wait for them to complete; running the code
// writes in the meantime.
done := make(chan struct{})
go func() {
// This goroutine is only needed to accurately measure the storage commit
// and not have the concurrent code write dirty the stats.
defer close(done)

workers.Wait()
s.StorageCommits += time.Since(start)
}()
if code.ValueSize() > 0 {
if err := code.Write(); err != nil {
log.Crit("Failed to commit dirty codes", "error", err)
}
}
s.StorageCommits += time.Since(start)

if codeWriter.ValueSize() > 0 {
if err := codeWriter.Write(); err != nil {
log.Crit("Failed to commit dirty codes", "error", err)
<-done
for err := range workers.Results() {
if err != nil {
return common.Hash{}, err
}
}
// Write the account trie changes, measuring the amount of wasted time
Expand Down
95 changes: 95 additions & 0 deletions internal/workerpool/workerpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

// Package workerpool implements a concurrent task processor.
package workerpool
karalabe marked this conversation as resolved.
Show resolved Hide resolved

import (
"runtime"
"sync"
)

// WorkerPool is a concurrent task processor, scheduling and running tasks from
// a source channel, feeding any errors into a sink.
type WorkerPool[T any, R any] struct {
tasks chan T // Input channel waiting to consume tasks
results chan R // Result channel for consuers to wait on
working sync.WaitGroup // Waitgroup blocking on worker liveness
}

// New creates a worker pool with the given number of max task capacity and an
// optional goroutine count to execute on. If 0 threads are requested, the pool
// will default to the number of (logical) CPUs.
func New[T any, R any](tasks int, threads int, f func(T) R) *WorkerPool[T, R] {
// Create the worker pool
pool := &WorkerPool[T, R]{
tasks: make(chan T, tasks),
results: make(chan R, tasks),
}
// Start all the data processor routines
if threads == 0 {
threads = runtime.NumCPU()
}
pool.working.Add(threads)
for i := 0; i < threads; i++ {
go pool.work(f)
}
return pool
}

// Close signals the end of the task stream. It does not block execution, rather
// returns immediately and users have to explicitly call Wait to block until the
// pool actually spins down. Alternatively, consumers can read the results chan,
// which will be closed after the last result is delivered.
//
// Calling Close multiple times will panic. Not particularly hard to avoid, but
// it's really a programming error.
func (pool *WorkerPool[T, R]) Close() {
close(pool.tasks)
go func() {
pool.working.Wait()
close(pool.results)
}()
}

// Wait blocks until all the scheduled tasks have been processed.
func (pool *WorkerPool[T, R]) Wait() {
pool.working.Wait()
}

// Schedule adds a task to the work queue.
func (pool *WorkerPool[T, R]) Schedule(task T) {
pool.tasks <- task
}

// Results retrieves the result channel to consume the output of the individual
// work tasks. The channel will be closed after all tasks are done.
//
// Note, as long as the number of actually scheduled tasks are smaller or equal
// to the requested number form the constructor, it's fine to not consume this
// channel.
func (pool *WorkerPool[T, R]) Results() chan R {
return pool.results
}

// work is the (one of many) goroutine consuming input tasks and executing them
// to compute the results.
func (pool *WorkerPool[T, R]) work(f func(T) R) {
defer pool.working.Done()
for task := range pool.tasks {
pool.results <- f(task)
}
}
Loading