Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]feat: Merge holding performance PRs after v1.1.12 is released. #1030

Merged
merged 4 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1889,10 +1889,18 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
interruptCh := make(chan struct{})
// For diff sync, it may fallback to full sync, so we still do prefetch
if len(block.Transactions()) >= prefetchTxNumber {
throwaway := statedb.Copy()
// do Prefetch in a separate goroutine to avoid blocking the critical path

// 1.do state prefetch for snapshot cache
throwaway := statedb.CopyDoPrefetch()
go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh)

// 2.do trie prefetch for MPT trie node cache
// it is for the big state trie tree, prefetch based on transaction's From/To address.
// trie prefetcher is thread safe now, ok to prefetch in a separate routine
go throwaway.TriePrefetchInAdvance(block, signer)
}

//Process block using the parent state as reference point
substart := time.Now()
if bc.pipeCommit {
Expand Down
13 changes: 2 additions & 11 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ type diffLayer struct {
storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil
storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted)

verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
valid bool // mark the difflayer is valid or not.
accountCorrected bool // mark the accountData has been corrected ort not
verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
valid bool // mark the difflayer is valid or not.

diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer

Expand Down Expand Up @@ -294,14 +293,6 @@ func (dl *diffLayer) CorrectAccounts(accounts map[common.Hash][]byte) {
defer dl.lock.Unlock()

dl.accountData = accounts
dl.accountCorrected = true
}

func (dl *diffLayer) AccountsCorrected() bool {
dl.lock.RLock()
defer dl.lock.RUnlock()

return dl.accountCorrected
}

// Parent returns the subsequent layer of a diff layer.
Expand Down
4 changes: 0 additions & 4 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ func (dl *diskLayer) Verified() bool {
func (dl *diskLayer) CorrectAccounts(map[common.Hash][]byte) {
}

func (dl *diskLayer) AccountsCorrected() bool {
return true
}

// Parent always returns nil as there's no layer below the disk.
func (dl *diskLayer) Parent() snapshot {
return nil
Expand Down
14 changes: 4 additions & 10 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ type Snapshot interface {
// CorrectAccounts updates account data for storing the correct data during pipecommit
CorrectAccounts(map[common.Hash][]byte)

// AccountsCorrected checks whether the account data has been corrected during pipecommit
AccountsCorrected() bool

// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
Account(hash common.Hash) (*Account, error)
Expand All @@ -131,20 +128,17 @@ type Snapshot interface {
// Storage directly retrieves the storage data associated with a particular hash,
// within a particular account.
Storage(accountHash, storageHash common.Hash) ([]byte, error)

// Parent returns the subsequent layer of a snapshot, or nil if the base was
// reached.
Parent() snapshot
}

// snapshot is the internal version of the snapshot data layer that supports some
// additional methods compared to the public API.
type snapshot interface {
Snapshot

// Parent returns the subsequent layer of a snapshot, or nil if the base was
// reached.
//
// Note, the method is an internal helper to avoid type switching between the
// disk and diff layers. There is no locking involved.
Parent() snapshot

// Update creates a new layer on top of the existing snapshot diff tree with
// the specified data items.
//
Expand Down
12 changes: 1 addition & 11 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,8 @@ func (s *StateObject) finalise(prefetch bool) {
}
}

// The account root need to be updated before prefetch, otherwise the account root is empty
if s.db.pipeCommit && s.data.Root == dummyRoot && !s.rootCorrected && s.db.snap.AccountsCorrected() {
if acc, err := s.db.snap.Account(crypto.HashData(s.db.hasher, s.address.Bytes())); err == nil {
if acc != nil && len(acc.Root) != 0 {
s.data.Root = common.BytesToHash(acc.Root)
s.rootCorrected = true
}
}
}

prefetcher := s.db.prefetcher
if prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot && s.data.Root != dummyRoot {
if prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash)
}
if len(s.dirtyStorage) > 0 {
Expand Down
84 changes: 63 additions & 21 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ var (
// emptyRoot is the known root hash of an empty trie.
emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")

// dummyRoot is the dummy account root before corrected in pipecommit sync mode,
// the value is 542e5fc2709de84248e9bce43a9c0c8943a608029001360f8ab55bf113b23d28
dummyRoot = crypto.Keccak256Hash([]byte("dummy_account_root"))

emptyAddr = crypto.Keccak256Hash(common.Address{}.Bytes())
)

Expand Down Expand Up @@ -218,7 +214,12 @@ func (s *StateDB) StartPrefetcher(namespace string) {
s.prefetcher = nil
}
if s.snap != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
parent := s.snap.Parent()
if parent != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, parent.Root(), namespace)
} else {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, common.Hash{}, namespace)
}
}
}

Expand All @@ -236,6 +237,35 @@ func (s *StateDB) StopPrefetcher() {
}
}

func (s *StateDB) TriePrefetchInAdvance(block *types.Block, signer types.Signer) {
// s is a temporary throw away StateDB, s.prefetcher won't be resetted to nil
// so no need to add lock for s.prefetcher
prefetcher := s.prefetcher
if prefetcher == nil {
return
}
accounts := make(map[common.Address]struct{}, block.Transactions().Len()<<1)
for _, tx := range block.Transactions() {
from, err := types.Sender(signer, tx)
if err != nil {
// invalid block, skip prefetch
return
}
accounts[from] = struct{}{}
if tx.To() != nil {
accounts[*tx.To()] = struct{}{}
}
}
addressesToPrefetch := make([][]byte, 0, len(accounts))
for addr := range accounts {
addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure
}

if len(addressesToPrefetch) > 0 {
prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr)
}
}

// Mark that the block is processed by diff layer
func (s *StateDB) SetExpectedStateRoot(root common.Hash) {
s.expectedRoot = root
Expand Down Expand Up @@ -775,6 +805,17 @@ func (db *StateDB) ForEachStorage(addr common.Address, cb func(key, value common
// Copy creates a deep, independent copy of the state.
// Snapshots of the copied state cannot be applied to the copy.
func (s *StateDB) Copy() *StateDB {
return s.copyInternal(false)
}

// It is mainly for state prefetcher to do trie prefetch right now.
func (s *StateDB) CopyDoPrefetch() *StateDB {
return s.copyInternal(true)
}

// If doPrefetch is true, it tries to reuse the prefetcher, the copied StateDB will do active trie prefetch.
// otherwise, just do inactive copy trie prefetcher.
func (s *StateDB) copyInternal(doPrefetch bool) *StateDB {
// Copy all the basic fields, initialize the memory ones
state := &StateDB{
db: s.db,
Expand Down Expand Up @@ -841,12 +882,12 @@ func (s *StateDB) Copy() *StateDB {
state.accessList = s.accessList.Copy()
}

// If there's a prefetcher running, make an inactive copy of it that can
// only access data but does not actively preload (since the user will not
// know that they need to explicitly terminate an active copy).
prefetcher := s.prefetcher
if prefetcher != nil {
state.prefetcher = prefetcher.copy()
state.prefetcher = s.prefetcher
if s.prefetcher != nil && !doPrefetch {
// If there's a prefetcher running, make an inactive copy of it that can
// only access data but does not actively preload (since the user will not
// know that they need to explicitly terminate an active copy).
state.prefetcher = state.prefetcher.copy()
}
if s.snaps != nil {
// In order for the miner to be able to use and make additions
Expand Down Expand Up @@ -960,7 +1001,11 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
}
prefetcher := s.prefetcher
if prefetcher != nil && len(addressesToPrefetch) > 0 {
prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr)
if s.snap.Verified() {
prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr)
} else if prefetcher.rootParent != (common.Hash{}) {
prefetcher.prefetch(prefetcher.rootParent, addressesToPrefetch, emptyAddr)
}
}
// Invalidate journal because reverting across transactions is not allowed.
s.clearJournalAndRefund()
Expand Down Expand Up @@ -995,11 +1040,12 @@ func (s *StateDB) CorrectAccountsRoot(blockRoot common.Hash) {
}
if accounts, err := snapshot.Accounts(); err == nil && accounts != nil {
for _, obj := range s.stateObjects {
if !obj.deleted && !obj.rootCorrected && obj.data.Root == dummyRoot {
if !obj.deleted {
if account, exist := accounts[crypto.Keccak256Hash(obj.address[:])]; exist {
obj.data.Root = common.BytesToHash(account.Root)
if obj.data.Root == (common.Hash{}) {
if len(account.Root) == 0 {
obj.data.Root = emptyRoot
} else {
obj.data.Root = common.BytesToHash(account.Root)
}
obj.rootCorrected = true
}
Expand All @@ -1013,12 +1059,8 @@ func (s *StateDB) PopulateSnapAccountAndStorage() {
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
if s.snap != nil {
root := obj.data.Root
storageChanged := s.populateSnapStorage(obj)
if storageChanged {
root = dummyRoot
}
s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, root, obj.data.CodeHash)
s.populateSnapStorage(obj)
s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, obj.data.Root, obj.data.CodeHash)
}
}
}
Expand Down
Loading