From ed644a983d663bfa77874793b0afa97cd0cbe55f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 5 Feb 2019 16:20:18 +0200 Subject: [PATCH 1/5] trie: move dirty data to clean cache on flush --- trie/database.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/trie/database.go b/trie/database.go index 6df1a7f79b78..a7106b60db18 100644 --- a/trie/database.go +++ b/trie/database.go @@ -784,6 +784,11 @@ func (db *Database) uncache(hash common.Hash) { } delete(db.dirties, hash) db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) + + // Move the flushed node into the clean cache to prevent insta-reloads + if db.cleans != nil { + db.cleans.Set(string(hash[:]), node.rlp()) + } } // Size returns the current storage size of the memory cache in front of the From f94deb6a3f65216c1ea67b1cf9e809016bcf51bc Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 20 Mar 2019 21:06:47 +0100 Subject: [PATCH 2/5] core, ethdb, trie: experiment with batch-replay-based uncaching --- core/rawdb/table.go | 4 ++ ethdb/batch.go | 3 + ethdb/database.go | 7 +++ ethdb/leveldb/leveldb.go | 17 ++++++ ethdb/memorydb/memorydb.go | 11 ++++ trie/database.go | 117 ++++++++++++++++++++++++------------- 6 files changed, 117 insertions(+), 42 deletions(-) diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 974df681bdac..7ea9b2c7dbd7 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -124,6 +124,10 @@ type tableBatch struct { prefix string } +func (b *tableBatch) Replay(replay ethdb.DbEventLogger) error { + panic("implement me") +} + // Put inserts the given value into the batch for later committing. func (b *tableBatch) Put(key, value []byte) error { return b.batch.Put(append([]byte(b.prefix), key...), value) diff --git a/ethdb/batch.go b/ethdb/batch.go index a6f015821080..38b704b0eb8a 100644 --- a/ethdb/batch.go +++ b/ethdb/batch.go @@ -34,6 +34,9 @@ type Batch interface { // Reset resets the batch for reuse Reset() + + // Replay replays the batch into another batch + Replay(logger DbEventLogger) error } // Batcher wraps the NewBatch method of a backing data store. diff --git a/ethdb/database.go b/ethdb/database.go index 30208e1468c6..35ce8a4d2598 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -40,6 +40,13 @@ type Deleter interface { Delete(key []byte) error } +// DbEventLogger wraps Put and Delete to serve as a recipient +// for batch replays +type DbEventLogger interface { + Writer + Deleter +} + // Stater wraps the Stat method of a backing data store. type Stater interface { // Stat returns a particular internal stat of the database. diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index f730887d25dc..835fab482843 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -172,6 +172,23 @@ func (db *Database) NewBatch() ethdb.Batch { } } +type dbWrapper struct { + wrapped ethdb.DbEventLogger +} + +func (dbw *dbWrapper) Put(key, value []byte) { + dbw.wrapped.Put(key, value) +} + +func (dbw *dbWrapper) Delete(key []byte) { + dbw.wrapped.Delete(key) +} + +// Replay replays batch contents. +func (b *batch) Replay(r ethdb.DbEventLogger) error { + return b.b.Replay(&dbWrapper{r}) +} + // NewIterator creates a binary-alphabetical iterator over the entire keyspace // contained within the leveldb database. func (db *Database) NewIterator() ethdb.Iterator { diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index cc2490ba2fb1..db591a727671 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -200,6 +200,17 @@ type batch struct { size int } +func (b *batch) Replay(replay ethdb.DbEventLogger) error { + for _, keyvalue := range b.writes { + if keyvalue.delete { + replay.Delete(keyvalue.key) + continue + } + replay.Put(keyvalue.key, keyvalue.value) + } + return nil +} + // Put inserts the given value into the batch for later committing. func (b *batch) Put(key, value []byte) error { b.writes = append(b.writes, keyvalue{common.CopyBytes(key), common.CopyBytes(value), false}) diff --git a/trie/database.go b/trie/database.go index a7106b60db18..7193b3324ff2 100644 --- a/trie/database.go +++ b/trie/database.go @@ -81,7 +81,8 @@ type Database struct { dirtiesSize common.StorageSize // Storage size of the dirty node cache (exc. flushlist) preimagesSize common.StorageSize // Storage size of the preimages cache - lock sync.RWMutex + lock sync.RWMutex + batchLogger *BatchEventLogger } // rawNode is a simple binary blob used to differentiate between collapsed trie @@ -293,12 +294,14 @@ func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int) *Database { Hasher: trienodeHasher{}, }) } - return &Database{ + db := &Database{ diskdb: diskdb, cleans: cleans, dirties: map[common.Hash]*cachedNode{{}: {}}, preimages: make(map[common.Hash][]byte), } + db.batchLogger = newBatchEventLogger(db) + return db } // DiskDB retrieves the persistent storage backing the trie database. @@ -660,6 +663,61 @@ func (db *Database) Cap(limit common.StorageSize) error { return nil } +type BatchEventLogger struct { + db *Database +} + +func newBatchEventLogger(db *Database) *BatchEventLogger { + return &BatchEventLogger{db} +} + +// Put reacts to batch writes, and implements uncache: +// is the post-processing step of a commit operation where the already +// persisted trie is removed from the cache. The reason behind the two-phase +// commit is to ensure consistent data availability while moving from memory +// to disk. +func (p *BatchEventLogger) Put(key []byte, value []byte) error { + // key is hash + // value is rlp + //log.Info("proxybatch", "key", fmt.Sprintf("0x%x", key)) + hash := common.BytesToHash(key) + db := p.db + rlp := value + // If the node does not exist, we're done on this path + node, ok := db.dirties[hash] + if !ok { + return nil + } + // Node still exists, remove it from the flush-list + switch hash { + case db.oldest: + db.oldest = node.flushNext + db.dirties[node.flushNext].flushPrev = common.Hash{} + case db.newest: + db.newest = node.flushPrev + db.dirties[node.flushPrev].flushNext = common.Hash{} + default: + db.dirties[node.flushPrev].flushNext = node.flushNext + db.dirties[node.flushNext].flushPrev = node.flushPrev + } + // Uncache the node's subtries and remove the node itself too + //for _, child := range node.childs() { + // db.uncache(child) + //} + delete(db.dirties, hash) + db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) + + // Move the flushed node into the clean cache to prevent insta-reloads + if db.cleans != nil { + db.cleans.Set(string(hash[:]), rlp) + } + return nil +} + +func (p *BatchEventLogger) Delete(key []byte) error { + panic("Not implemented") +} + // Commit iterates over all the children of a particular node, writes them out // to disk, forcefully tearing down all references in both directions. // @@ -673,7 +731,6 @@ func (db *Database) Commit(node common.Hash, report bool) error { start := time.Now() batch := db.diskdb.NewBatch() - // Move all of the accumulated preimages into a write batch for hash, preimage := range db.preimages { if err := batch.Put(db.secureKey(hash[:]), preimage); err != nil { @@ -689,6 +746,11 @@ func (db *Database) Commit(node common.Hash, report bool) error { batch.Reset() } } + if err := batch.Write(); err != nil { + db.lock.RUnlock() + return err + } + batch.Reset() // Move the trie itself into the batch, flushing if enough data is accumulated nodes, storage := len(db.dirties), db.dirtiesSize if err := db.commit(node, batch); err != nil { @@ -703,16 +765,15 @@ func (db *Database) Commit(node common.Hash, report bool) error { return err } db.lock.RUnlock() - // Write successful, clear out the flushed data db.lock.Lock() defer db.lock.Unlock() + batch.Replay(db.batchLogger) + batch.Reset() db.preimages = make(map[common.Hash][]byte) db.preimagesSize = 0 - db.uncache(node) - memcacheCommitTimeTimer.Update(time.Since(start)) memcacheCommitSizeMeter.Mark(int64(storage - db.dirtiesSize)) memcacheCommitNodesMeter.Mark(int64(nodes - len(db.dirties))) @@ -751,46 +812,18 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error { if err := batch.Write(); err != nil { return err } - batch.Reset() + db.lock.RUnlock() + { + db.lock.Lock() + batch.Replay(db.batchLogger) + batch.Reset() + db.lock.Unlock() + } + db.lock.RLock() } return nil } -// uncache is the post-processing step of a commit operation where the already -// persisted trie is removed from the cache. The reason behind the two-phase -// commit is to ensure consistent data availability while moving from memory -// to disk. -func (db *Database) uncache(hash common.Hash) { - // If the node does not exist, we're done on this path - node, ok := db.dirties[hash] - if !ok { - return - } - // Node still exists, remove it from the flush-list - switch hash { - case db.oldest: - db.oldest = node.flushNext - db.dirties[node.flushNext].flushPrev = common.Hash{} - case db.newest: - db.newest = node.flushPrev - db.dirties[node.flushPrev].flushNext = common.Hash{} - default: - db.dirties[node.flushPrev].flushNext = node.flushNext - db.dirties[node.flushNext].flushPrev = node.flushPrev - } - // Uncache the node's subtries and remove the node itself too - for _, child := range node.childs() { - db.uncache(child) - } - delete(db.dirties, hash) - db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) - - // Move the flushed node into the clean cache to prevent insta-reloads - if db.cleans != nil { - db.cleans.Set(string(hash[:]), node.rlp()) - } -} - // Size returns the current storage size of the memory cache in front of the // persistent database layer. func (db *Database) Size() (common.StorageSize, common.StorageSize) { From 887e7164306b5f39a5a3aa884fc2cc9a67d2b5c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 25 Mar 2019 13:20:17 +0200 Subject: [PATCH 3/5] core, ethdb, trie: polish up batch replay mechanism --- core/rawdb/table.go | 9 +-- ethdb/batch.go | 6 +- ethdb/database.go | 8 +-- ethdb/leveldb/leveldb.go | 46 ++++++++----- ethdb/memorydb/memorydb.go | 23 +++---- trie/database.go | 131 ++++++++++++++++++------------------- 6 files changed, 118 insertions(+), 105 deletions(-) diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 7ea9b2c7dbd7..04f81f8ee0a4 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -124,10 +124,6 @@ type tableBatch struct { prefix string } -func (b *tableBatch) Replay(replay ethdb.DbEventLogger) error { - panic("implement me") -} - // Put inserts the given value into the batch for later committing. func (b *tableBatch) Put(key, value []byte) error { return b.batch.Put(append([]byte(b.prefix), key...), value) @@ -152,3 +148,8 @@ func (b *tableBatch) Write() error { func (b *tableBatch) Reset() { b.batch.Reset() } + +// Replay replays the batch contents. +func (b *tableBatch) Replay(r ethdb.Replayee) error { + return b.batch.Replay(r) +} diff --git a/ethdb/batch.go b/ethdb/batch.go index 38b704b0eb8a..538b464ccd6c 100644 --- a/ethdb/batch.go +++ b/ethdb/batch.go @@ -32,11 +32,11 @@ type Batch interface { // Write flushes any accumulated data to disk. Write() error - // Reset resets the batch for reuse + // Reset resets the batch for reuse. Reset() - // Replay replays the batch into another batch - Replay(logger DbEventLogger) error + // Replay replays the batch contents. + Replay(replayer Replayee) error } // Batcher wraps the NewBatch method of a backing data store. diff --git a/ethdb/database.go b/ethdb/database.go index 35ce8a4d2598..ddb1fd06e97f 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Package database defines the interfaces for an Ethereum data store. +// Package ethdb defines the interfaces for an Ethereum data store. package ethdb import "io" @@ -40,9 +40,9 @@ type Deleter interface { Delete(key []byte) error } -// DbEventLogger wraps Put and Delete to serve as a recipient -// for batch replays -type DbEventLogger interface { +// Replayee wraps basic batch operations to allow replaying an existing batch +// on top of multiple databases. +type Replayee interface { Writer Deleter } diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index 835fab482843..c11b39bfbe92 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -172,23 +172,6 @@ func (db *Database) NewBatch() ethdb.Batch { } } -type dbWrapper struct { - wrapped ethdb.DbEventLogger -} - -func (dbw *dbWrapper) Put(key, value []byte) { - dbw.wrapped.Put(key, value) -} - -func (dbw *dbWrapper) Delete(key []byte) { - dbw.wrapped.Delete(key) -} - -// Replay replays batch contents. -func (b *batch) Replay(r ethdb.DbEventLogger) error { - return b.b.Replay(&dbWrapper{r}) -} - // NewIterator creates a binary-alphabetical iterator over the entire keyspace // contained within the leveldb database. func (db *Database) NewIterator() ethdb.Iterator { @@ -433,3 +416,32 @@ func (b *batch) Reset() { b.b.Reset() b.size = 0 } + +// Replay replays the batch contents. +func (b *batch) Replay(r ethdb.Replayee) error { + return b.b.Replay(&replayer{replayer: r}) +} + +// replayer is a small wrapper to implement the correct replay methods. +type replayer struct { + replayer ethdb.Replayee + failure error +} + +// Put inserts the given value into the key-value data store. +func (r *replayer) Put(key, value []byte) { + // If the replay already failed, stop executing ops + if r.failure != nil { + return + } + r.failure = r.replayer.Put(key, value) +} + +// Delete removes the key from the key-value data store. +func (r *replayer) Delete(key []byte) { + // If the replay already failed, stop executing ops + if r.failure != nil { + return + } + r.failure = r.replayer.Delete(key) +} diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index db591a727671..55d88d2135e5 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -200,17 +200,6 @@ type batch struct { size int } -func (b *batch) Replay(replay ethdb.DbEventLogger) error { - for _, keyvalue := range b.writes { - if keyvalue.delete { - replay.Delete(keyvalue.key) - continue - } - replay.Put(keyvalue.key, keyvalue.value) - } - return nil -} - // Put inserts the given value into the batch for later committing. func (b *batch) Put(key, value []byte) error { b.writes = append(b.writes, keyvalue{common.CopyBytes(key), common.CopyBytes(value), false}) @@ -251,6 +240,18 @@ func (b *batch) Reset() { b.size = 0 } +// Replay replays the batch contents. +func (b *batch) Replay(r ethdb.Replayee) error { + for _, keyvalue := range b.writes { + if keyvalue.delete { + r.Delete(keyvalue.key) + continue + } + r.Put(keyvalue.key, keyvalue.value) + } + return nil +} + // iterator can walk over the (potentially partial) keyspace of a memory key // value store. Internally it is a deep copy of the entire iterated state, // sorted by keys. diff --git a/trie/database.go b/trie/database.go index 7193b3324ff2..620b13b56d30 100644 --- a/trie/database.go +++ b/trie/database.go @@ -81,8 +81,7 @@ type Database struct { dirtiesSize common.StorageSize // Storage size of the dirty node cache (exc. flushlist) preimagesSize common.StorageSize // Storage size of the preimages cache - lock sync.RWMutex - batchLogger *BatchEventLogger + lock sync.RWMutex } // rawNode is a simple binary blob used to differentiate between collapsed trie @@ -300,7 +299,6 @@ func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int) *Database { dirties: map[common.Hash]*cachedNode{{}: {}}, preimages: make(map[common.Hash][]byte), } - db.batchLogger = newBatchEventLogger(db) return db } @@ -663,61 +661,6 @@ func (db *Database) Cap(limit common.StorageSize) error { return nil } -type BatchEventLogger struct { - db *Database -} - -func newBatchEventLogger(db *Database) *BatchEventLogger { - return &BatchEventLogger{db} -} - -// Put reacts to batch writes, and implements uncache: -// is the post-processing step of a commit operation where the already -// persisted trie is removed from the cache. The reason behind the two-phase -// commit is to ensure consistent data availability while moving from memory -// to disk. -func (p *BatchEventLogger) Put(key []byte, value []byte) error { - // key is hash - // value is rlp - //log.Info("proxybatch", "key", fmt.Sprintf("0x%x", key)) - hash := common.BytesToHash(key) - db := p.db - rlp := value - // If the node does not exist, we're done on this path - node, ok := db.dirties[hash] - if !ok { - return nil - } - // Node still exists, remove it from the flush-list - switch hash { - case db.oldest: - db.oldest = node.flushNext - db.dirties[node.flushNext].flushPrev = common.Hash{} - case db.newest: - db.newest = node.flushPrev - db.dirties[node.flushPrev].flushNext = common.Hash{} - default: - db.dirties[node.flushPrev].flushNext = node.flushNext - db.dirties[node.flushNext].flushPrev = node.flushPrev - } - // Uncache the node's subtries and remove the node itself too - //for _, child := range node.childs() { - // db.uncache(child) - //} - delete(db.dirties, hash) - db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) - - // Move the flushed node into the clean cache to prevent insta-reloads - if db.cleans != nil { - db.cleans.Set(string(hash[:]), rlp) - } - return nil -} - -func (p *BatchEventLogger) Delete(key []byte) error { - panic("Not implemented") -} - // Commit iterates over all the children of a particular node, writes them out // to disk, forcefully tearing down all references in both directions. // @@ -731,6 +674,7 @@ func (db *Database) Commit(node common.Hash, report bool) error { start := time.Now() batch := db.diskdb.NewBatch() + // Move all of the accumulated preimages into a write batch for hash, preimage := range db.preimages { if err := batch.Put(db.secureKey(hash[:]), preimage); err != nil { @@ -738,6 +682,7 @@ func (db *Database) Commit(node common.Hash, report bool) error { db.lock.RUnlock() return err } + // If the batch is too large, flush to disk if batch.ValueSize() > ethdb.IdealBatchSize { if err := batch.Write(); err != nil { db.lock.RUnlock() @@ -746,31 +691,39 @@ func (db *Database) Commit(node common.Hash, report bool) error { batch.Reset() } } + // Since we're going to replay trie node writes into the clean cache, flush out + // any batched pre-images before continuing. if err := batch.Write(); err != nil { db.lock.RUnlock() return err } batch.Reset() + // Move the trie itself into the batch, flushing if enough data is accumulated nodes, storage := len(db.dirties), db.dirtiesSize - if err := db.commit(node, batch); err != nil { + + uncacher := &cleaner{db} + if err := db.commit(node, batch, uncacher); err != nil { log.Error("Failed to commit trie from trie database", "err", err) db.lock.RUnlock() return err } - // Write batch ready, unlock for readers during persistence + // Trie mostly committed to disk, flush any batch leftovers if err := batch.Write(); err != nil { log.Error("Failed to write trie to disk", "err", err) db.lock.RUnlock() return err } db.lock.RUnlock() - // Write successful, clear out the flushed data + + // Uncache any leftovers in the last batch db.lock.Lock() defer db.lock.Unlock() - batch.Replay(db.batchLogger) + + batch.Replay(uncacher) batch.Reset() + // Reset the storage counters and bumpd metrics db.preimages = make(map[common.Hash][]byte) db.preimagesSize = 0 @@ -793,14 +746,14 @@ func (db *Database) Commit(node common.Hash, report bool) error { } // commit is the private locked version of Commit. -func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error { +func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner) error { // If the node does not exist, it's a previously committed node node, ok := db.dirties[hash] if !ok { return nil } for _, child := range node.childs() { - if err := db.commit(child, batch); err != nil { + if err := db.commit(child, batch, uncacher); err != nil { return err } } @@ -815,15 +768,61 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error { db.lock.RUnlock() { db.lock.Lock() - batch.Replay(db.batchLogger) - batch.Reset() + batch.Replay(uncacher) db.lock.Unlock() + batch.Reset() } db.lock.RLock() } return nil } +// cleaner is a database batch replayer that takes a batch of write operations +// and cleans up the trie database from anything written to disk. +type cleaner struct { + db *Database +} + +// Put reacts to database writes and implements dirty data uncaching. This is the +// post-processing step of a commit operation where the already persisted trie is +// removed from the dirty cache and moved into the clean cache. The reason behind +// the two-phase commit is to ensure ensure data availability while moving from +// memory to disk. +func (c *cleaner) Put(key []byte, rlp []byte) error { + hash := common.BytesToHash(key) + + // If the node does not exist, we're done on this path + node, ok := c.db.dirties[hash] + if !ok { + return nil + } + // Node still exists, remove it from the flush-list + switch hash { + case c.db.oldest: + c.db.oldest = node.flushNext + c.db.dirties[node.flushNext].flushPrev = common.Hash{} + case c.db.newest: + c.db.newest = node.flushPrev + c.db.dirties[node.flushPrev].flushNext = common.Hash{} + default: + c.db.dirties[node.flushPrev].flushNext = node.flushNext + c.db.dirties[node.flushNext].flushPrev = node.flushPrev + } + // Remove the node from the dirty cache + delete(c.db.dirties, hash) + c.db.dirtiesSize -= common.StorageSize(common.HashLength + int(node.size)) + + // Move the flushed node into the clean cache to prevent insta-reloads + if c.db.cleans != nil { + c.db.cleans.Set(string(hash[:]), rlp) + } + return nil +} + +func (c *cleaner) Delete(key []byte) error { + panic("Not implemented") +} + // Size returns the current storage size of the memory cache in front of the // persistent database layer. func (db *Database) Size() (common.StorageSize, common.StorageSize) { From 4b883c5dce465291334602f5e2e91343f7c83371 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 25 Mar 2019 14:01:25 +0200 Subject: [PATCH 4/5] core, ethdb, light, trie: merge ethdb.Deleter in ethdb.Writer --- core/blockchain.go | 2 +- core/headerchain.go | 2 +- core/rawdb/accessors_chain.go | 16 ++++++++-------- core/rawdb/accessors_indexes.go | 2 +- core/rawdb/table.go | 4 ++-- core/state/statedb.go | 4 ++++ ethdb/batch.go | 3 +-- ethdb/database.go | 12 ------------ ethdb/leveldb/leveldb.go | 12 ++++++------ ethdb/memorydb/memorydb.go | 10 +++++++--- light/nodeset.go | 14 ++++++++++++++ trie/database.go | 3 +-- 12 files changed, 46 insertions(+), 38 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 5d5c5e6805fe..08302d057111 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -301,7 +301,7 @@ func (bc *BlockChain) SetHead(head uint64) error { defer bc.chainmu.Unlock() // Rewind the header chain, deleting all block bodies until then - delFn := func(db ethdb.Deleter, hash common.Hash, num uint64) { + delFn := func(db ethdb.Writer, hash common.Hash, num uint64) { rawdb.DeleteBody(db, hash, num) } bc.hc.SetHead(head, delFn) diff --git a/core/headerchain.go b/core/headerchain.go index 027cb798fe1e..f005b8324cbe 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -455,7 +455,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) { // DeleteCallback is a callback function that is called by SetHead before // each header is deleted. -type DeleteCallback func(ethdb.Deleter, common.Hash, uint64) +type DeleteCallback func(ethdb.Writer, common.Hash, uint64) // SetHead rewinds the local chain to a new head. Everything above the new head // will be deleted and the new one set. diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index ea923f9d18eb..10f3ba00f0ff 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -45,7 +45,7 @@ func WriteCanonicalHash(db ethdb.Writer, hash common.Hash, number uint64) { } // DeleteCanonicalHash removes the number to hash canonical mapping. -func DeleteCanonicalHash(db ethdb.Deleter, number uint64) { +func DeleteCanonicalHash(db ethdb.Writer, number uint64) { if err := db.Delete(headerHashKey(number)); err != nil { log.Crit("Failed to delete number to hash mapping", "err", err) } @@ -180,7 +180,7 @@ func WriteHeader(db ethdb.Writer, header *types.Header) { } // DeleteHeader removes all block header data associated with a hash. -func DeleteHeader(db ethdb.Deleter, hash common.Hash, number uint64) { +func DeleteHeader(db ethdb.Writer, hash common.Hash, number uint64) { deleteHeaderWithoutNumber(db, hash, number) if err := db.Delete(headerNumberKey(hash)); err != nil { log.Crit("Failed to delete hash to number mapping", "err", err) @@ -189,7 +189,7 @@ func DeleteHeader(db ethdb.Deleter, hash common.Hash, number uint64) { // deleteHeaderWithoutNumber removes only the block header but does not remove // the hash to number mapping. -func deleteHeaderWithoutNumber(db ethdb.Deleter, hash common.Hash, number uint64) { +func deleteHeaderWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) { if err := db.Delete(headerKey(number, hash)); err != nil { log.Crit("Failed to delete header", "err", err) } @@ -240,7 +240,7 @@ func WriteBody(db ethdb.Writer, hash common.Hash, number uint64, body *types.Bod } // DeleteBody removes all block body data associated with a hash. -func DeleteBody(db ethdb.Deleter, hash common.Hash, number uint64) { +func DeleteBody(db ethdb.Writer, hash common.Hash, number uint64) { if err := db.Delete(blockBodyKey(number, hash)); err != nil { log.Crit("Failed to delete block body", "err", err) } @@ -278,7 +278,7 @@ func WriteTd(db ethdb.Writer, hash common.Hash, number uint64, td *big.Int) { } // DeleteTd removes all block total difficulty data associated with a hash. -func DeleteTd(db ethdb.Deleter, hash common.Hash, number uint64) { +func DeleteTd(db ethdb.Writer, hash common.Hash, number uint64) { if err := db.Delete(headerTDKey(number, hash)); err != nil { log.Crit("Failed to delete block total difficulty", "err", err) } @@ -347,7 +347,7 @@ func WriteReceipts(db ethdb.Writer, hash common.Hash, number uint64, receipts ty } // DeleteReceipts removes all receipt data associated with a block hash. -func DeleteReceipts(db ethdb.Deleter, hash common.Hash, number uint64) { +func DeleteReceipts(db ethdb.Writer, hash common.Hash, number uint64) { if err := db.Delete(blockReceiptsKey(number, hash)); err != nil { log.Crit("Failed to delete block receipts", "err", err) } @@ -378,7 +378,7 @@ func WriteBlock(db ethdb.Writer, block *types.Block) { } // DeleteBlock removes all block data associated with a hash. -func DeleteBlock(db ethdb.Deleter, hash common.Hash, number uint64) { +func DeleteBlock(db ethdb.Writer, hash common.Hash, number uint64) { DeleteReceipts(db, hash, number) DeleteHeader(db, hash, number) DeleteBody(db, hash, number) @@ -387,7 +387,7 @@ func DeleteBlock(db ethdb.Deleter, hash common.Hash, number uint64) { // deleteBlockWithoutNumber removes all block data associated with a hash, except // the hash to number mapping. -func deleteBlockWithoutNumber(db ethdb.Deleter, hash common.Hash, number uint64) { +func deleteBlockWithoutNumber(db ethdb.Writer, hash common.Hash, number uint64) { DeleteReceipts(db, hash, number) deleteHeaderWithoutNumber(db, hash, number) DeleteBody(db, hash, number) diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index d90a430129c5..5c7ad693472a 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -54,7 +54,7 @@ func WriteTxLookupEntries(db ethdb.Writer, block *types.Block) { } // DeleteTxLookupEntry removes all transaction data associated with a hash. -func DeleteTxLookupEntry(db ethdb.Deleter, hash common.Hash) { +func DeleteTxLookupEntry(db ethdb.Writer, hash common.Hash) { db.Delete(txLookupKey(hash)) } diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 04f81f8ee0a4..e19649dd4692 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -150,6 +150,6 @@ func (b *tableBatch) Reset() { } // Replay replays the batch contents. -func (b *tableBatch) Replay(r ethdb.Replayee) error { - return b.batch.Replay(r) +func (b *tableBatch) Replay(w ethdb.Writer) error { + return b.batch.Replay(w) } diff --git a/core/state/statedb.go b/core/state/statedb.go index 0673de543fe2..a299cdb64721 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -53,6 +53,10 @@ func (n *proofList) Put(key []byte, value []byte) error { return nil } +func (n *proofList) Delete(key []byte) error { + panic("not supported") +} + // StateDBs within the ethereum protocol are used to store anything // within the merkle trie. StateDBs take care of caching and storing // nested states. It's the general query interface to retrieve: diff --git a/ethdb/batch.go b/ethdb/batch.go index 538b464ccd6c..a9c406354675 100644 --- a/ethdb/batch.go +++ b/ethdb/batch.go @@ -24,7 +24,6 @@ const IdealBatchSize = 100 * 1024 // when Write is called. A batch cannot be used concurrently. type Batch interface { Writer - Deleter // ValueSize retrieves the amount of data queued up for writing. ValueSize() int @@ -36,7 +35,7 @@ type Batch interface { Reset() // Replay replays the batch contents. - Replay(replayer Replayee) error + Replay(w Writer) error } // Batcher wraps the NewBatch method of a backing data store. diff --git a/ethdb/database.go b/ethdb/database.go index ddb1fd06e97f..bab99aed1f62 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -32,21 +32,11 @@ type Reader interface { type Writer interface { // Put inserts the given value into the key-value data store. Put(key []byte, value []byte) error -} -// Deleter wraps the Delete method of a backing data store. -type Deleter interface { // Delete removes the key from the key-value data store. Delete(key []byte) error } -// Replayee wraps basic batch operations to allow replaying an existing batch -// on top of multiple databases. -type Replayee interface { - Writer - Deleter -} - // Stater wraps the Stat method of a backing data store. type Stater interface { // Stat returns a particular internal stat of the database. @@ -70,7 +60,6 @@ type Compacter interface { type KeyValueStore interface { Reader Writer - Deleter Batcher Iteratee Stater @@ -83,7 +72,6 @@ type KeyValueStore interface { type Database interface { Reader Writer - Deleter Batcher Iteratee Stater diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index c11b39bfbe92..f437cb9740c8 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -418,14 +418,14 @@ func (b *batch) Reset() { } // Replay replays the batch contents. -func (b *batch) Replay(r ethdb.Replayee) error { - return b.b.Replay(&replayer{replayer: r}) +func (b *batch) Replay(w ethdb.Writer) error { + return b.b.Replay(&replayer{writer: w}) } // replayer is a small wrapper to implement the correct replay methods. type replayer struct { - replayer ethdb.Replayee - failure error + writer ethdb.Writer + failure error } // Put inserts the given value into the key-value data store. @@ -434,7 +434,7 @@ func (r *replayer) Put(key, value []byte) { if r.failure != nil { return } - r.failure = r.replayer.Put(key, value) + r.failure = r.writer.Put(key, value) } // Delete removes the key from the key-value data store. @@ -443,5 +443,5 @@ func (r *replayer) Delete(key []byte) { if r.failure != nil { return } - r.failure = r.replayer.Delete(key) + r.failure = r.writer.Delete(key) } diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index 55d88d2135e5..5c3f7e22a31e 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -241,13 +241,17 @@ func (b *batch) Reset() { } // Replay replays the batch contents. -func (b *batch) Replay(r ethdb.Replayee) error { +func (b *batch) Replay(w ethdb.Writer) error { for _, keyvalue := range b.writes { if keyvalue.delete { - r.Delete(keyvalue.key) + if err := w.Delete(keyvalue.key); err != nil { + return err + } continue } - r.Put(keyvalue.key, keyvalue.value) + if err := w.Put(keyvalue.key, keyvalue.value); err != nil { + return err + } } return nil } diff --git a/light/nodeset.go b/light/nodeset.go index 3b556108a606..a8bf4f6c65f6 100644 --- a/light/nodeset.go +++ b/light/nodeset.go @@ -60,6 +60,15 @@ func (db *NodeSet) Put(key []byte, value []byte) error { return nil } +// Delete removes a node from the set +func (db *NodeSet) Delete(key []byte) error { + db.lock.Lock() + defer db.lock.Unlock() + + delete(db.nodes, string(key)) + return nil +} + // Get returns a stored node func (db *NodeSet) Get(key []byte) ([]byte, error) { db.lock.RLock() @@ -138,6 +147,11 @@ func (n *NodeList) Put(key []byte, value []byte) error { return nil } +// Delete panics as there's no reason to remove a node from the list. +func (n *NodeList) Delete(key []byte) error { + panic("not supported") +} + // DataSize returns the aggregated data size of nodes in the list func (n NodeList) DataSize() int { var size int diff --git a/trie/database.go b/trie/database.go index 620b13b56d30..0a2a069937e0 100644 --- a/trie/database.go +++ b/trie/database.go @@ -293,13 +293,12 @@ func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int) *Database { Hasher: trienodeHasher{}, }) } - db := &Database{ + return &Database{ diskdb: diskdb, cleans: cleans, dirties: map[common.Hash]*cachedNode{{}: {}}, preimages: make(map[common.Hash][]byte), } - return db } // DiskDB retrieves the persistent storage backing the trie database. From 9755de781af533f4cee8b37ad71d35030b317e20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 26 Mar 2019 11:01:12 +0200 Subject: [PATCH 5/5] trie: adjust locking to actual trie database behavior --- trie/database.go | 48 ++++++++++++++++++------------------------------ 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/trie/database.go b/trie/database.go index 0a2a069937e0..3bbcb6ade90c 100644 --- a/trie/database.go +++ b/trie/database.go @@ -59,6 +59,11 @@ const secureKeyLength = 11 + 32 // Database is an intermediate write layer between the trie data structures and // the disk database. The aim is to accumulate trie writes in-memory and only // periodically flush a couple tries to disk, garbage collecting the remainder. +// +// Note, the trie Database is **not** thread safe in its mutations, but it **is** +// thread safe in providing individual, independent node access. The rationale +// behind this split design is to provide read access to RPC handlers and sync +// servers even while the trie is executing expensive garbage collection. type Database struct { diskdb ethdb.KeyValueStore // Persistent storage for matured trie nodes @@ -465,8 +470,8 @@ func (db *Database) Nodes() []common.Hash { // Reference adds a new reference from a parent node to a child node. func (db *Database) Reference(child common.Hash, parent common.Hash) { - db.lock.RLock() - defer db.lock.RUnlock() + db.lock.Lock() + defer db.lock.Unlock() db.reference(child, parent) } @@ -561,13 +566,14 @@ func (db *Database) dereference(child common.Hash, parent common.Hash) { // Cap iteratively flushes old but still referenced trie nodes until the total // memory usage goes below the given threshold. +// +// Note, this method is a non-synchronized mutator. It is unsafe to call this +// concurrently with other mutators. func (db *Database) Cap(limit common.StorageSize) error { // Create a database batch to flush persistent data out. It is important that // outside code doesn't see an inconsistent state (referenced data removed from // memory cache during commit but not yet in persistent storage). This is ensured // by only uncaching existing data when the database write finalizes. - db.lock.RLock() - nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now() batch := db.diskdb.NewBatch() @@ -583,12 +589,10 @@ func (db *Database) Cap(limit common.StorageSize) error { for hash, preimage := range db.preimages { if err := batch.Put(db.secureKey(hash[:]), preimage); err != nil { log.Error("Failed to commit preimage from trie database", "err", err) - db.lock.RUnlock() return err } if batch.ValueSize() > ethdb.IdealBatchSize { if err := batch.Write(); err != nil { - db.lock.RUnlock() return err } batch.Reset() @@ -601,14 +605,12 @@ func (db *Database) Cap(limit common.StorageSize) error { // Fetch the oldest referenced node and push into the batch node := db.dirties[oldest] if err := batch.Put(oldest[:], node.rlp()); err != nil { - db.lock.RUnlock() return err } // If we exceeded the ideal batch size, commit and reset if batch.ValueSize() >= ethdb.IdealBatchSize { if err := batch.Write(); err != nil { log.Error("Failed to write flush list to disk", "err", err) - db.lock.RUnlock() return err } batch.Reset() @@ -623,11 +625,8 @@ func (db *Database) Cap(limit common.StorageSize) error { // Flush out any remainder data from the last batch if err := batch.Write(); err != nil { log.Error("Failed to write flush list to disk", "err", err) - db.lock.RUnlock() return err } - db.lock.RUnlock() - // Write successful, clear out the flushed data db.lock.Lock() defer db.lock.Unlock() @@ -661,16 +660,16 @@ func (db *Database) Cap(limit common.StorageSize) error { } // Commit iterates over all the children of a particular node, writes them out -// to disk, forcefully tearing down all references in both directions. +// to disk, forcefully tearing down all references in both directions. As a side +// effect, all pre-images accumulated up to this point are also written. // -// As a side effect, all pre-images accumulated up to this point are also written. +// Note, this method is a non-synchronized mutator. It is unsafe to call this +// concurrently with other mutators. func (db *Database) Commit(node common.Hash, report bool) error { // Create a database batch to flush persistent data out. It is important that // outside code doesn't see an inconsistent state (referenced data removed from // memory cache during commit but not yet in persistent storage). This is ensured // by only uncaching existing data when the database write finalizes. - db.lock.RLock() - start := time.Now() batch := db.diskdb.NewBatch() @@ -678,13 +677,11 @@ func (db *Database) Commit(node common.Hash, report bool) error { for hash, preimage := range db.preimages { if err := batch.Put(db.secureKey(hash[:]), preimage); err != nil { log.Error("Failed to commit preimage from trie database", "err", err) - db.lock.RUnlock() return err } // If the batch is too large, flush to disk if batch.ValueSize() > ethdb.IdealBatchSize { if err := batch.Write(); err != nil { - db.lock.RUnlock() return err } batch.Reset() @@ -693,7 +690,6 @@ func (db *Database) Commit(node common.Hash, report bool) error { // Since we're going to replay trie node writes into the clean cache, flush out // any batched pre-images before continuing. if err := batch.Write(); err != nil { - db.lock.RUnlock() return err } batch.Reset() @@ -704,17 +700,13 @@ func (db *Database) Commit(node common.Hash, report bool) error { uncacher := &cleaner{db} if err := db.commit(node, batch, uncacher); err != nil { log.Error("Failed to commit trie from trie database", "err", err) - db.lock.RUnlock() return err } // Trie mostly committed to disk, flush any batch leftovers if err := batch.Write(); err != nil { log.Error("Failed to write trie to disk", "err", err) - db.lock.RUnlock() return err } - db.lock.RUnlock() - // Uncache any leftovers in the last batch db.lock.Lock() defer db.lock.Unlock() @@ -764,14 +756,10 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleane if err := batch.Write(); err != nil { return err } - db.lock.RUnlock() - { - db.lock.Lock() - batch.Replay(uncacher) - db.lock.Unlock() - batch.Reset() - } - db.lock.RLock() + db.lock.Lock() + batch.Replay(uncacher) + batch.Reset() + db.lock.Unlock() } return nil }