Skip to content

Commit

Permalink
use hot cache for cachedb
Browse files Browse the repository at this point in the history
  • Loading branch information
unclezoro committed Jun 7, 2021
1 parent d902287 commit 0a1cc10
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 21 deletions.
9 changes: 8 additions & 1 deletion consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,14 @@ func (p *Parlia) applyTransaction(
}
actualTx := (*receivedTxs)[0]
if !bytes.Equal(p.signer.Hash(actualTx).Bytes(), expectedHash.Bytes()) {
return fmt.Errorf("expected tx hash %v, get %v", expectedHash.String(), actualTx.Hash().String())
return fmt.Errorf("expected tx hash %v, get %v, nonce %d, to %s, value %s, gas %d, gasPrice %s, data %s", expectedHash.String(), actualTx.Hash().String(),
expectedTx.Nonce(),
expectedTx.To().String(),
expectedTx.Value().String(),
expectedTx.Gas(),
expectedTx.GasPrice().String(),
hex.EncodeToString(expectedTx.Data()),
)
}
expectedTx = actualTx
// move to next
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
stateCache: state.NewDatabaseWithConfig(db, &trie.Config{
stateCache: state.NewDatabaseWithConfigAndCache(db, &trie.Config{
Cache: cacheConfig.TrieCleanLimit,
Journal: cacheConfig.TrieCleanJournal,
Preimages: cacheConfig.Preimages,
Expand Down Expand Up @@ -1936,7 +1936,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
log.Error("validate state failed", err)
log.Error("validate state failed", "error", err)
return it.index, err
}
proctime := time.Since(start)
Expand Down
78 changes: 75 additions & 3 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ const (
// Number of codehash->size associations to keep.
codeSizeCacheSize = 100000

// Number of state trie in cache
accountTrieCacheSize = 11

// Number of storage Trie in cache
storageTrieCacheSize = 2000

// Cache size granted for caching clean code.
codeCacheSize = 64 * 1024 * 1024
)
Expand All @@ -55,6 +61,12 @@ type Database interface {

// TrieDB retrieves the low level trie database used for data storage.
TrieDB() *trie.Database

// Cache the account trie tree
CacheAccount(root common.Hash, t Trie)

// Cache the storage trie tree
CacheStorage(addr common.Address, root common.Hash, t Trie)
}

// Trie is a Ethereum Merkle Patricia trie.
Expand Down Expand Up @@ -121,14 +133,45 @@ func NewDatabaseWithConfig(db ethdb.Database, config *trie.Config) Database {
}
}

func NewDatabaseWithConfigAndCache(db ethdb.Database, config *trie.Config) Database {
csc, _ := lru.New(codeSizeCacheSize)
atc, _ := lru.New(accountTrieCacheSize)
stc, _ := lru.New(storageTrieCacheSize)

database := &cachingDB{
db: trie.NewDatabaseWithConfig(db, config),
codeSizeCache: csc,
codeCache: fastcache.New(codeCacheSize),
accountTrieCache: atc,
storageTrieCache: stc,
}
return database
}

type cachingDB struct {
db *trie.Database
codeSizeCache *lru.Cache
codeCache *fastcache.Cache
db *trie.Database
codeSizeCache *lru.Cache
codeCache *fastcache.Cache
accountTrieCache *lru.Cache
storageTrieCache *lru.Cache
}

type triePair struct {
root common.Hash
trie Trie
}

// OpenTrie opens the main account trie at a specific root hash.
func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) {
if db.accountTrieCache != nil {
if tr, exist := db.accountTrieCache.Get(root); exist {
fmt.Println("hit account Trie")

return tr.(Trie), nil
}
}
fmt.Println("miss account Trie")

tr, err := trie.NewSecure(root, db.db)
if err != nil {
return nil, err
Expand All @@ -138,13 +181,42 @@ func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) {

// OpenStorageTrie opens the storage trie of an account.
func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) {
if db.storageTrieCache != nil {
if tries, exist := db.storageTrieCache.Get(root); exist {
triesPairs := tries.([3]*triePair)
for _, triePair := range triesPairs {
if triePair != nil && triePair.root == root {
return triePair.trie, nil
}
}
}
}
fmt.Println("miss storage Trie")

tr, err := trie.NewSecure(root, db.db)
if err != nil {
return nil, err
}
return tr, nil
}

func (db *cachingDB) CacheAccount(root common.Hash, t Trie) {
tr := t.(*trie.SecureTrie)
db.accountTrieCache.Add(root, tr.ResetCopy())
}

func (db *cachingDB) CacheStorage(addr common.Address, root common.Hash, t Trie) {
tr := t.(*trie.SecureTrie)
if tries, exist := db.storageTrieCache.Get(addr); exist {
triesArray := tries.([3]*triePair)
triesArray[0], triesArray[1], triesArray[2] = &triePair{root: root, trie: tr.ResetCopy()}, triesArray[0], triesArray[1]
db.storageTrieCache.Add(addr, triesArray)
} else {
triesArray := [3]*triePair{&triePair{root: root, trie: tr.ResetCopy()}, nil, nil}
db.storageTrieCache.Add(addr, triesArray)
}
}

// CopyTrie returns an independent copy of the given trie.
func (db *cachingDB) CopyTrie(t Trie) Trie {
switch t := t.(type) {
Expand Down
8 changes: 7 additions & 1 deletion core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (s *stateObject) finalise(prefetch bool) {
}
}
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch)
s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, false)
}
if len(s.dirtyStorage) > 0 {
s.dirtyStorage = make(Storage)
Expand Down Expand Up @@ -412,6 +412,9 @@ func (s *stateObject) updateRoot(db Database) {
func (s *stateObject) CommitTrie(db Database) error {
// If nothing changed, don't bother with hashing anything
if s.updateTrie(db) == nil {
if s.trie != nil && s.data.Root != emptyRoot {
db.CacheStorage(s.address, s.data.Root, s.trie)
}
return nil
}
if s.dbErr != nil {
Expand All @@ -425,6 +428,9 @@ func (s *stateObject) CommitTrie(db Database) error {
if err == nil {
s.data.Root = root
}
if s.data.Root != emptyRoot {
db.CacheStorage(s.address, s.data.Root, s.trie)
}
return err
}

Expand Down
10 changes: 7 additions & 3 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure
}
if s.prefetcher != nil && len(addressesToPrefetch) > 0 {
s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch)
s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch, true)
}
// Invalidate journal because reverting across transactions is not allowed.
s.clearJournalAndRefund()
Expand Down Expand Up @@ -986,7 +986,8 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.AccountHashes += time.Since(start) }(time.Now())
}
return s.trie.Hash()
root := s.trie.Hash()
return root
}

// Prepare sets the current transaction hash and index and block hash which is
Expand Down Expand Up @@ -1077,7 +1078,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
// The onleaf func is called _serially_, so we can reuse the same account
// for unmarshalling every time.
var account Account
_, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error {
root, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error {
if err := rlp.DecodeBytes(leaf, &account); err != nil {
return nil
}
Expand All @@ -1092,6 +1093,9 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
if metrics.EnabledExpensive {
s.AccountCommits += time.Since(start)
}
if root != emptyRoot {
s.db.CacheAccount(root, s.trie)
}
return nil
},
func() error {
Expand Down
32 changes: 21 additions & 11 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ func (p *triePrefetcher) copy() *triePrefetcher {
}

// prefetch schedules a batch of trie items to prefetch.
func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte) {
func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, isAccount bool) {
// If the prefetcher is an inactive one, bail out
if p.fetches != nil {
return
}
// Active fetcher, schedule the retrievals
fetcher := p.fetchers[root]
if fetcher == nil {
fetcher = newSubfetcher(p.db, root)
fetcher = newSubfetcher(p.db, root, isAccount)
p.fetchers[root] = fetcher
}
fetcher.schedule(keys)
Expand Down Expand Up @@ -213,19 +213,22 @@ type subfetcher struct {
seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks
used [][]byte // Tracks the entries used in the end

isAccount bool // Whether it is account trie
}

// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(db Database, root common.Hash) *subfetcher {
func newSubfetcher(db Database, root common.Hash, isAccount bool) *subfetcher {
sf := &subfetcher{
db: db,
root: root,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
copy: make(chan chan Trie),
seen: make(map[string]struct{}),
db: db,
root: root,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
copy: make(chan chan Trie),
seen: make(map[string]struct{}),
isAccount: isAccount,
}
gopool.Submit(func() {
sf.loop()
Expand Down Expand Up @@ -283,7 +286,14 @@ func (sf *subfetcher) loop() {
defer close(sf.term)

// Start by opening the trie and stop processing if it fails
trie, err := sf.db.OpenTrie(sf.root)
var trie Trie
var err error
if sf.isAccount {
trie, err = sf.db.OpenTrie(sf.root)
} else {
// address is useless
trie, err = sf.db.OpenStorageTrie(common.Hash{}, sf.root)
}
if err != nil {
log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
return
Expand Down
8 changes: 8 additions & 0 deletions light/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ func (db *odrDatabase) TrieDB() *trie.Database {
return nil
}

func (db *odrDatabase) CacheAccount(_ common.Hash, _ state.Trie) {
return
}

func (db *odrDatabase) CacheStorage(_ common.Address, _ common.Hash, _ state.Trie) {
return
}

type odrTrie struct {
db *odrDatabase
id *TrieID
Expand Down
7 changes: 7 additions & 0 deletions trie/secure_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ func (t *SecureTrie) Copy() *SecureTrie {
return &cpy
}

func (t *SecureTrie) ResetCopy() *SecureTrie {
cpy := *t
cpy.secKeyCacheOwner = nil
cpy.secKeyCache = nil
return &cpy
}

// NodeIterator returns an iterator that returns nodes of the underlying trie. Iteration
// starts at the key after the given start key.
func (t *SecureTrie) NodeIterator(start []byte) NodeIterator {
Expand Down

0 comments on commit 0a1cc10

Please sign in to comment.