From a60d8fa55935051fca5ea40296f8e29b89d999d5 Mon Sep 17 00:00:00 2001 From: asyukii Date: Thu, 5 Oct 2023 15:41:25 +0800 Subject: [PATCH] feat(snap): add state expiry support to snap sync add server related for snap sync --- core/state/state_expiry.go | 10 +- core/state/sync.go | 29 +++++ eth/backend.go | 1 + eth/downloader/downloader.go | 39 +++++- eth/downloader/statesync.go | 38 +++++- eth/handler.go | 6 +- eth/protocols/snap/handler.go | 54 +++++++- eth/protocols/snap/sync.go | 104 ++++++++++++++-- eth/protocols/snap/sync_test.go | 75 ++++++++++-- trie/proof.go | 19 +++ trie/stacktrie.go | 210 ++++++++++++++++++++++++++++++-- trie/sync.go | 74 ++++++++--- trie/trie.go | 8 ++ trie/trie_reader.go | 11 +- 14 files changed, 618 insertions(+), 60 deletions(-) diff --git a/core/state/state_expiry.go b/core/state/state_expiry.go index 38458cca1b..fbeada5135 100644 --- a/core/state/state_expiry.go +++ b/core/state/state_expiry.go @@ -51,7 +51,7 @@ func fetchExpiredStorageFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Ha return nil, fmt.Errorf("cannot find any revive proof from remoteDB") } - return reviveStorageTrie(addr, tr, proofs[0], key) + return ReviveStorageTrie(addr, tr, proofs[0], key) } // batchFetchExpiredStorageFromRemote request expired state from remote full state node with a list of keys and prefixes. @@ -113,8 +113,8 @@ func batchFetchExpiredFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Hash } for i, proof := range proofs { - // kvs, err := reviveStorageTrie(addr, tr, proof, common.HexToHash(keysStr[i])) // TODO(asyukii): this logically should work but it doesn't because of some reason, will need to investigate - kvs, err := reviveStorageTrie(addr, tr, proof, common.HexToHash(proof.Key)) + // kvs, err := ReviveStorageTrie(addr, tr, proof, common.HexToHash(keysStr[i])) // TODO(asyukii): this logically should work but it doesn't because of some reason, will need to investigate + kvs, err := ReviveStorageTrie(addr, tr, proof, common.HexToHash(proof.Key)) if err != nil { log.Error("reviveStorageTrie failed", "addr", addr, "key", keys[i], "err", err) continue @@ -125,8 +125,8 @@ func batchFetchExpiredFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Hash return ret, nil } -// reviveStorageTrie revive trie's expired state from proof -func reviveStorageTrie(addr common.Address, tr Trie, proof types.ReviveStorageProof, targetKey common.Hash) (map[string][]byte, error) { +// ReviveStorageTrie revive trie's expired state from proof +func ReviveStorageTrie(addr common.Address, tr Trie, proof types.ReviveStorageProof, targetKey common.Hash) (map[string][]byte, error) { defer func(start time.Time) { reviveStorageTrieTimer.Update(time.Since(start)) }(time.Now()) diff --git a/core/state/sync.go b/core/state/sync.go index 61097c6462..1b288b304e 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -55,3 +55,32 @@ func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(k syncer = trie.NewSync(root, database, onAccount, scheme) return syncer } + +func NewStateSyncWithExpiry(root common.Hash, database ethdb.KeyValueReader, onLeaf func(keys [][]byte, leaf []byte) error, scheme string, epoch types.StateEpoch) *trie.Sync { + // Register the storage slot callback if the external callback is specified. + var onSlot func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error + if onLeaf != nil { + onSlot = func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error { + return onLeaf(keys, leaf) + } + } + // Register the account callback to connect the state trie and the storage + // trie belongs to the contract. + var syncer *trie.Sync + onAccount := func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error { + if onLeaf != nil { + if err := onLeaf(keys, leaf); err != nil { + return err + } + } + var obj types.StateAccount + if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil { + return err + } + syncer.AddSubTrie(obj.Root, path, parent, parentPath, onSlot) + syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), path, parent, parentPath) + return nil + } + syncer = trie.NewSyncWithEpoch(root, database, onAccount, scheme, epoch) + return syncer +} diff --git a/eth/backend.go b/eth/backend.go index 6cca4141fe..802202c578 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -277,6 +277,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { DirectBroadcast: config.DirectBroadcast, DisablePeerTxBroadcast: config.DisablePeerTxBroadcast, PeerSet: peers, + EnableStateExpiry: config.StateExpiryEnable, }); err != nil { return nil, err } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 14d68844eb..7a2cde1a64 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -131,6 +131,8 @@ type Downloader struct { SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now stateSyncStart chan *stateSync + enableStateExpiry bool + // Cancellation and termination cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop) cancelCh chan struct{} // Channel to cancel mid-flight syncs @@ -207,6 +209,8 @@ type BlockChain interface { // TrieDB retrieves the low level trie database used for interacting // with trie nodes. TrieDB() *trie.Database + + Config() *params.ChainConfig } type DownloadOption func(downloader *Downloader) *Downloader @@ -235,6 +239,30 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchai return dl } +func NewWithExpiry(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, enableStateExpiry bool, dropPeer peerDropFn, options ...DownloadOption) *Downloader { + if lightchain == nil { + lightchain = chain + } + dl := &Downloader{ + stateDB: stateDb, + mux: mux, + queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), + peers: newPeerSet(), + blockchain: chain, + lightchain: lightchain, + dropPeer: dropPeer, + enableStateExpiry: enableStateExpiry, + headerProcCh: make(chan *headerTask, 1), + quitCh: make(chan struct{}), + SnapSyncer: snap.NewSyncerWithStateExpiry(stateDb, chain.TrieDB().Scheme(), enableStateExpiry), + stateSyncStart: make(chan *stateSync), + syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(), + } + + go dl.stateFetcher() + return dl +} + // Progress retrieves the synchronisation boundaries, specifically the origin // block where synchronisation started at (may have failed/suspended); the block // or header sync is currently at; and the latest known block which the sync targets. @@ -1464,10 +1492,14 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { func (d *Downloader) processSnapSyncContent() error { // Start syncing state of the reported head block. This should get us most of // the state of the pivot block. + var epoch types.StateEpoch + d.pivotLock.RLock() - sync := d.syncState(d.pivotHeader.Root) + sync := d.syncStateWithEpoch(d.pivotHeader.Root, epoch) d.pivotLock.RUnlock() + epoch = types.GetStateEpoch(d.blockchain.Config(), new(big.Int).SetUint64(d.pivotHeader.Number.Uint64())) + defer func() { // The `sync` object is replaced every time the pivot moves. We need to // defer close the very last active one, hence the lazy evaluation vs. @@ -1516,11 +1548,12 @@ func (d *Downloader) processSnapSyncContent() error { d.pivotLock.RLock() pivot := d.pivotHeader d.pivotLock.RUnlock() + epoch = types.GetStateEpoch(d.blockchain.Config(), new(big.Int).SetUint64(pivot.Number.Uint64())) if oldPivot == nil { if pivot.Root != sync.root { sync.Cancel() - sync = d.syncState(pivot.Root) + sync = d.syncStateWithEpoch(pivot.Root, epoch) go closeOnErr(sync) } @@ -1558,7 +1591,7 @@ func (d *Downloader) processSnapSyncContent() error { // If new pivot block found, cancel old state retrieval and restart if oldPivot != P { sync.Cancel() - sync = d.syncState(P.Header.Root) + sync = d.syncStateWithEpoch(P.Header.Root, types.GetStateEpoch(d.blockchain.Config(), new(big.Int).SetUint64(P.Header.Number.Uint64()))) go closeOnErr(sync) oldPivot = P diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index 501af63ed5..e7d9952ff2 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" ) @@ -40,6 +41,22 @@ func (d *Downloader) syncState(root common.Hash) *stateSync { return s } +func (d *Downloader) syncStateWithEpoch(root common.Hash, epoch types.StateEpoch) *stateSync { + // Create the state sync + s := newStateSyncWithEpoch(d, root, epoch) + select { + case d.stateSyncStart <- s: + // If we tell the statesync to restart with a new root, we also need + // to wait for it to actually also start -- when old requests have timed + // out or been delivered + <-s.started + case <-d.quitCh: + s.err = errCancelStateFetch + close(s.done) + } + return s +} + // stateFetcher manages the active state sync and accepts requests // on its behalf. func (d *Downloader) stateFetcher() { @@ -77,8 +94,10 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { // stateSync schedules requests for downloading a particular state trie defined // by a given state root. type stateSync struct { - d *Downloader // Downloader instance to access and manage current peerset - root common.Hash // State root currently being synced + d *Downloader // Downloader instance to access and manage current peerset + root common.Hash // State root currently being synced + epoch types.StateEpoch + enableStateExpiry bool started chan struct{} // Started is signalled once the sync loop starts cancel chan struct{} // Channel to signal a termination request @@ -99,11 +118,26 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync { } } +func newStateSyncWithEpoch(d *Downloader, root common.Hash, epoch types.StateEpoch) *stateSync { + return &stateSync{ + d: d, + root: root, + epoch: epoch, + enableStateExpiry: true, + cancel: make(chan struct{}), + done: make(chan struct{}), + started: make(chan struct{}), + } +} + // run starts the task assignment and response processing loop, blocking until // it finishes, and finally notifying any goroutines waiting for the loop to // finish. func (s *stateSync) run() { close(s.started) + if s.enableStateExpiry { + s.d.SnapSyncer.UpdateEpoch(s.epoch) + } s.err = s.d.SnapSyncer.Sync(s.root, s.cancel) close(s.done) } diff --git a/eth/handler.go b/eth/handler.go index d081c76266..72bfd8993e 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -123,6 +123,7 @@ type handlerConfig struct { DirectBroadcast bool DisablePeerTxBroadcast bool PeerSet *peerSet + EnableStateExpiry bool } type handler struct { @@ -134,6 +135,8 @@ type handler struct { acceptTxs atomic.Bool // Flag whether we're considered synchronised (enables transaction processing) directBroadcast bool + enableStateExpiry bool + database ethdb.Database txpool txPool votepool votePool @@ -196,6 +199,7 @@ func newHandler(config *handlerConfig) (*handler, error) { peersPerIP: make(map[string]int), requiredBlocks: config.RequiredBlocks, directBroadcast: config.DirectBroadcast, + enableStateExpiry: config.EnableStateExpiry, quitSync: make(chan struct{}), handlerDoneCh: make(chan struct{}), handlerStartCh: make(chan struct{}), @@ -249,7 +253,7 @@ func newHandler(config *handlerConfig) (*handler, error) { downloadOptions = append(downloadOptions, success) */ - h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, downloadOptions...) + h.downloader = downloader.NewWithExpiry(config.Database, h.eventMux, h.chain, nil, config.EnableStateExpiry, h.removePeer, downloadOptions...) // Construct the fetcher (short sync) validator := func(header *types.Header) error { diff --git a/eth/protocols/snap/handler.go b/eth/protocols/snap/handler.go index b2fd03766e..0eaf6a28e4 100644 --- a/eth/protocols/snap/handler.go +++ b/eth/protocols/snap/handler.go @@ -23,7 +23,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -381,13 +384,26 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP storage []*StorageData last common.Hash abort bool + sv snapshot.SnapValue + hash common.Hash + slot []byte + enc []byte ) for it.Next() { if size >= hardLimit { abort = true break } - hash, slot := it.Hash(), common.CopyBytes(it.Slot()) + hash, enc = it.Hash(), common.CopyBytes(it.Slot()) + if len(enc) > 0 { + sv, err = snapshot.DecodeValueFromRLPBytes(enc) + if err != nil || sv == nil { + log.Warn("Failed to decode storage slot", "err", err) + return nil, nil + } + } + + slot = sv.GetVal() // Track the returned interval for the Merkle proofs last = hash @@ -429,13 +445,23 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP } proof := light.NewNodeSet() if err := stTrie.Prove(origin[:], proof); err != nil { - log.Warn("Failed to prove storage range", "origin", req.Origin, "err", err) - return nil, nil + if enErr, ok := err.(*trie.ExpiredNodeError); ok { + err := reviveAndGetProof(chain.FullStateDB(), stTrie, req.Root, common.BytesToAddress(account[:]), acc.Root, enErr.Path, origin, proof) + if err != nil { + log.Warn("Failed to prove storage range", "origin", origin, "err", err) + return nil, nil + } + } } if last != (common.Hash{}) { if err := stTrie.Prove(last[:], proof); err != nil { - log.Warn("Failed to prove storage range", "last", last, "err", err) - return nil, nil + if enErr, ok := err.(*trie.ExpiredNodeError); ok { + err := reviveAndGetProof(chain.FullStateDB(), stTrie, req.Root, common.BytesToAddress(account[:]), acc.Root, enErr.Path, last, proof) + if err != nil { + log.Warn("Failed to prove storage range", "origin", origin, "err", err) + return nil, nil + } + } } } for _, blob := range proof.NodeList() { @@ -567,6 +593,24 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s return nodes, nil } +func reviveAndGetProof(fullStateDB ethdb.FullStateDB, tr *trie.StateTrie, stateRoot common.Hash, account common.Address, root common.Hash, prefixKey []byte, key common.Hash, proofDb *light.NodeSet) error { + proofs, err := fullStateDB.GetStorageReviveProof(stateRoot, account, root, []string{common.Bytes2Hex(prefixKey)}, []string{common.Bytes2Hex(key[:])}) + if err != nil || len(proofs) == 0 { + return err + } + + _, err = state.ReviveStorageTrie(account, tr, proofs[0], key) + if err != nil { + return err + } + + if err := tr.Prove(key[:], proofDb); err != nil { + return err + } + + return nil +} + // NodeInfo represents a short summary of the `snap` sub-protocol metadata // known about the host peer. type NodeInfo struct{} diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index f56a9480b9..3e56bf054e 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" @@ -408,6 +409,7 @@ type SyncPeer interface { // - The peer remains connected, but does not deliver a response in time // - The peer delivers a stale response after a previous timeout // - The peer delivers a refusal to serve the requested state + type Syncer struct { db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup) scheme string // Node scheme used in node database @@ -423,6 +425,9 @@ type Syncer struct { peerDrop *event.Feed // Event feed to react to peers dropping rates *msgrate.Trackers // Message throughput rates for peers + enableStateExpiry bool + epoch types.StateEpoch + // Request tracking during syncing phase statelessPeers map[string]struct{} // Peers that failed to deliver state data accountIdlers map[string]struct{} // Peers that aren't serving account requests @@ -509,6 +514,39 @@ func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer { } } +func NewSyncerWithStateExpiry(db ethdb.KeyValueStore, scheme string, enableStateExpiry bool) *Syncer { + return &Syncer{ + db: db, + scheme: scheme, + + peers: make(map[string]SyncPeer), + peerJoin: new(event.Feed), + peerDrop: new(event.Feed), + rates: msgrate.NewTrackers(log.New("proto", "snap")), + update: make(chan struct{}, 1), + + enableStateExpiry: enableStateExpiry, + + accountIdlers: make(map[string]struct{}), + storageIdlers: make(map[string]struct{}), + bytecodeIdlers: make(map[string]struct{}), + + accountReqs: make(map[uint64]*accountRequest), + storageReqs: make(map[uint64]*storageRequest), + bytecodeReqs: make(map[uint64]*bytecodeRequest), + + trienodeHealIdlers: make(map[string]struct{}), + bytecodeHealIdlers: make(map[string]struct{}), + + trienodeHealReqs: make(map[uint64]*trienodeHealRequest), + bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest), + trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk + stateWriter: db.NewBatch(), + + extProgress: new(SyncProgress), + } +} + // Register injects a new data source into the syncer's peerset. func (s *Syncer) Register(peer SyncPeer) error { // Make sure the peer is not registered yet @@ -572,10 +610,16 @@ func (s *Syncer) Unregister(id string) error { func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { // Move the trie root from any previous value, revert stateless markers for // any peers and initialize the syncer if it was not yet run + var scheduler *trie.Sync s.lock.Lock() s.root = root + if s.enableStateExpiry { + scheduler = state.NewStateSyncWithExpiry(root, s.db, s.onHealState, s.scheme, s.epoch) + } else { + scheduler = state.NewStateSync(root, s.db, s.onHealState, s.scheme) + } s.healer = &healTask{ - scheduler: state.NewStateSync(root, s.db, s.onHealState, s.scheme), + scheduler: scheduler, trieTasks: make(map[string]common.Hash), codeTasks: make(map[common.Hash]struct{}), } @@ -717,6 +761,10 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { } } +func (s *Syncer) UpdateEpoch(epoch types.StateEpoch) { + s.epoch = epoch +} + // loadSyncStatus retrieves a previously aborted sync status from the database, // or generates a fresh one if none is available. func (s *Syncer) loadSyncStatus() { @@ -2008,14 +2056,24 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { s.storageBytes += common.StorageSize(len(key) + len(value)) }, } + var genTrie *trie.StackTrie + if s.enableStateExpiry { + genTrie = trie.NewStackTrieWithStateExpiry(func(owner common.Hash, path []byte, hash common.Hash, val []byte) { + rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme) + }, func(owner common.Hash, path []byte, blob []byte) { + rawdb.WriteEpochMetaPlainState(batch, owner, string(path), blob) + }, account, s.epoch) + } else { + genTrie = trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) { + rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme) + }, account) + } tasks = append(tasks, &storageTask{ Next: common.Hash{}, Last: r.End(), root: acc.Root, genBatch: batch, - genTrie: trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) { - rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme) - }, account), + genTrie: genTrie, }) for r.Next() { batch := ethdb.HookedBatch{ @@ -2024,14 +2082,23 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { s.storageBytes += common.StorageSize(len(key) + len(value)) }, } + if s.enableStateExpiry { + genTrie = trie.NewStackTrieWithStateExpiry(func(owner common.Hash, path []byte, hash common.Hash, val []byte) { + rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme) + }, func(owner common.Hash, path []byte, blob []byte) { + rawdb.WriteEpochMetaPlainState(batch, owner, string(path), blob) + }, account, s.epoch) + } else { + genTrie = trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) { + rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme) + }, account) + } tasks = append(tasks, &storageTask{ Next: r.Start(), Last: r.End(), root: acc.Root, genBatch: batch, - genTrie: trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) { - rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme) - }, account), + genTrie: genTrie, }) } for _, task := range tasks { @@ -2076,9 +2143,18 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { slots += len(res.hashes[i]) if i < len(res.hashes)-1 || res.subTask == nil { - tr := trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) { - rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme) - }, account) + var tr *trie.StackTrie + if s.enableStateExpiry { + tr = trie.NewStackTrieWithStateExpiry(func(owner common.Hash, path []byte, hash common.Hash, val []byte) { + rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme) + }, func(owner common.Hash, path []byte, blob []byte) { + rawdb.WriteEpochMetaPlainState(batch, owner, string(path), blob) + }, account, s.epoch) + } else { + tr = trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) { + rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme) + }, account) + } for j := 0; j < len(res.hashes[i]); j++ { tr.Update(res.hashes[i][j][:], res.slots[i][j]) } @@ -2088,7 +2164,13 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // outdated during the sync, but it can be fixed later during the // snapshot generation. for j := 0; j < len(res.hashes[i]); j++ { - rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) + var snapVal []byte + if s.enableStateExpiry { + snapVal, _ = snapshot.EncodeValueToRLPBytes(snapshot.NewValueWithEpoch(s.epoch, res.slots[i][j])) + } else { + snapVal, _ = rlp.EncodeToBytes(res.slots[i][j]) + } + rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], snapVal) // If we're storing large contracts, generate the trie nodes // on the fly to not trash the gluing points diff --git a/eth/protocols/snap/sync_test.go b/eth/protocols/snap/sync_test.go index 1514ad4e13..25cbef9d28 100644 --- a/eth/protocols/snap/sync_test.go +++ b/eth/protocols/snap/sync_test.go @@ -640,6 +640,16 @@ func setupSyncer(scheme string, peers ...*testPeer) *Syncer { return syncer } +func setupSyncerWithExpiry(scheme string, expiry bool, peers ...*testPeer) *Syncer { + stateDb := rawdb.NewMemoryDatabase() + syncer := NewSyncerWithStateExpiry(stateDb, scheme, expiry) + for _, peer := range peers { + syncer.Register(peer) + peer.remote = syncer + } + return syncer +} + // TestSync tests a basic sync with one peer func TestSync(t *testing.T) { t.Parallel() @@ -750,6 +760,7 @@ func TestSyncWithStorage(t *testing.T) { testSyncWithStorage(t, rawdb.HashScheme) testSyncWithStorage(t, rawdb.PathScheme) + testSyncWithStorageStateExpiry(t, rawdb.PathScheme) } func testSyncWithStorage(t *testing.T, scheme string) { @@ -762,7 +773,7 @@ func testSyncWithStorage(t *testing.T, scheme string) { }) } ) - nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 3, 3000, true, false) + nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 3, 3000, true, false, false) mkSource := func(name string) *testPeer { source := newTestPeer(name, t, term) @@ -781,6 +792,35 @@ func testSyncWithStorage(t *testing.T, scheme string) { verifyTrie(scheme, syncer.db, sourceAccountTrie.Hash(), t) } +func testSyncWithStorageStateExpiry(t *testing.T, scheme string) { + var ( + once sync.Once + cancel = make(chan struct{}) + term = func() { + once.Do(func() { + close(cancel) + }) + } + ) + nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 3, 3000, true, false, true) + mkSource := func(name string) *testPeer { + source := newTestPeer(name, t, term) + source.accountTrie = sourceAccountTrie.Copy() + source.accountValues = elems + source.setStorageTries(storageTries) + source.storageValues = storageElems + return source + } + syncer := setupSyncerWithExpiry(nodeScheme, true, mkSource("sourceA")) + syncer.UpdateEpoch(10) + done := checkStall(t, term) + if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil { + t.Fatalf("sync failed: %v", err) + } + close(done) + verifyTrie(scheme, syncer.db, sourceAccountTrie.Hash(), t) +} + // TestMultiSyncManyUseless contains one good peer, and many which doesn't return anything valuable at all func TestMultiSyncManyUseless(t *testing.T) { t.Parallel() @@ -799,7 +839,7 @@ func testMultiSyncManyUseless(t *testing.T, scheme string) { }) } ) - nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false) + nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false, false) mkSource := func(name string, noAccount, noStorage, noTrieNode bool) *testPeer { source := newTestPeer(name, t, term) @@ -853,7 +893,7 @@ func testMultiSyncManyUselessWithLowTimeout(t *testing.T, scheme string) { }) } ) - nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false) + nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false, false) mkSource := func(name string, noAccount, noStorage, noTrieNode bool) *testPeer { source := newTestPeer(name, t, term) @@ -912,7 +952,7 @@ func testMultiSyncManyUnresponsive(t *testing.T, scheme string) { }) } ) - nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false) + nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false, false) mkSource := func(name string, noAccount, noStorage, noTrieNode bool) *testPeer { source := newTestPeer(name, t, term) @@ -1215,7 +1255,7 @@ func testSyncBoundaryStorageTrie(t *testing.T, scheme string) { }) } ) - nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 10, 1000, false, true) + nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 10, 1000, false, true, false) mkSource := func(name string) *testPeer { source := newTestPeer(name, t, term) @@ -1257,7 +1297,7 @@ func testSyncWithStorageAndOneCappedPeer(t *testing.T, scheme string) { }) } ) - nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 300, 1000, false, false) + nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 300, 1000, false, false, false) mkSource := func(name string, slow bool) *testPeer { source := newTestPeer(name, t, term) @@ -1304,7 +1344,7 @@ func testSyncWithStorageAndCorruptPeer(t *testing.T, scheme string) { }) } ) - nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false) + nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false, false) mkSource := func(name string, handler storageHandlerFunc) *testPeer { source := newTestPeer(name, t, term) @@ -1348,7 +1388,7 @@ func testSyncWithStorageAndNonProvingPeer(t *testing.T, scheme string) { }) } ) - nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false) + nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false, false) mkSource := func(name string, handler storageHandlerFunc) *testPeer { source := newTestPeer(name, t, term) @@ -1608,16 +1648,22 @@ func makeAccountTrieWithStorageWithUniqueStorage(scheme string, accounts, slots } // makeAccountTrieWithStorage spits out a trie, along with the leafs -func makeAccountTrieWithStorage(scheme string, accounts, slots int, code, boundary bool) (string, *trie.Trie, []*kv, map[common.Hash]*trie.Trie, map[common.Hash][]*kv) { +func makeAccountTrieWithStorage(scheme string, accounts, slots int, code, boundary bool, expiry bool) (string, *trie.Trie, []*kv, map[common.Hash]*trie.Trie, map[common.Hash][]*kv) { var ( - db = trie.NewDatabase(rawdb.NewMemoryDatabase(), newDbConfig(scheme)) - accTrie = trie.NewEmpty(db) + db *trie.Database + accTrie *trie.Trie entries []*kv storageRoots = make(map[common.Hash]common.Hash) storageTries = make(map[common.Hash]*trie.Trie) storageEntries = make(map[common.Hash][]*kv) nodes = trienode.NewMergedNodeSet() ) + if expiry { + db = trie.NewDatabase(rawdb.NewMemoryDatabase(), newDbConfigWithExpiry(scheme, expiry)) + } else { + db = trie.NewDatabase(rawdb.NewMemoryDatabase(), newDbConfig(scheme)) + } + accTrie = trie.NewEmpty(db) // Create n accounts in the trie for i := uint64(1); i <= uint64(accounts); i++ { key := key32(i) @@ -1897,3 +1943,10 @@ func newDbConfig(scheme string) *trie.Config { } return &trie.Config{PathDB: pathdb.Defaults} } + +func newDbConfigWithExpiry(scheme string, expiry bool) *trie.Config { + if scheme == rawdb.HashScheme { + return &trie.Config{} + } + return &trie.Config{PathDB: pathdb.Defaults, EnableStateExpiry: expiry} +} diff --git a/trie/proof.go b/trie/proof.go index 7e62c4a8b1..f4c784c143 100644 --- a/trie/proof.go +++ b/trie/proof.go @@ -37,6 +37,9 @@ import ( // nodes of the longest existing prefix of the key (at least the root node), ending // with the node that proves the absence of the key. func (t *Trie) Prove(key []byte, proofDb ethdb.KeyValueWriter) error { + + var nodeEpoch types.StateEpoch + // Short circuit if the trie is already committed and not usable. if t.committed { return ErrCommitted @@ -48,7 +51,14 @@ func (t *Trie) Prove(key []byte, proofDb ethdb.KeyValueWriter) error { tn = t.root ) key = keybytesToHex(key) + + if t.enableExpiry { + nodeEpoch = t.getRootEpoch() + } for len(key) > 0 && tn != nil { + if t.enableExpiry && t.epochExpired(tn, nodeEpoch) { + return NewExpiredNodeError(prefix, nodeEpoch) + } switch n := tn.(type) { case *shortNode: if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) { @@ -61,6 +71,9 @@ func (t *Trie) Prove(key []byte, proofDb ethdb.KeyValueWriter) error { } nodes = append(nodes, n) case *fullNode: + if t.enableExpiry { + nodeEpoch = n.GetChildEpoch(int(key[0])) + } tn = n.Children[key[0]] prefix = append(prefix, key[0]) key = key[1:] @@ -80,6 +93,12 @@ func (t *Trie) Prove(key []byte, proofDb ethdb.KeyValueWriter) error { // clean cache or the database, they are all in their own // copy and safe to use unsafe decoder. tn = mustDecodeNodeUnsafe(n, blob) + + if child, ok := tn.(*fullNode); t.enableExpiry && ok { + if err = t.resolveEpochMeta(child, nodeEpoch, prefix); err != nil { + return err + } + } default: panic(fmt.Sprintf("%T: invalid node: %v", tn, tn)) } diff --git a/trie/stacktrie.go b/trie/stacktrie.go index ee1ce28291..aed56a2241 100644 --- a/trie/stacktrie.go +++ b/trie/stacktrie.go @@ -27,6 +27,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie/epochmeta" ) var ErrCommitDisabled = errors.New("no database for committing") @@ -40,6 +42,7 @@ var stPool = sync.Pool{ // NodeWriteFunc is used to provide all information of a dirty node for committing // so that callers can flush nodes into database with desired scheme. type NodeWriteFunc = func(owner common.Hash, path []byte, hash common.Hash, blob []byte) +type NodeMetaWriteFunc = func(owner common.Hash, path []byte, blob []byte) func stackTrieFromPool(writeFn NodeWriteFunc, owner common.Hash) *StackTrie { st := stPool.Get().(*StackTrie) @@ -48,6 +51,16 @@ func stackTrieFromPool(writeFn NodeWriteFunc, owner common.Hash) *StackTrie { return st } +func stackTrieFromPoolWithExpiry(writeFn NodeWriteFunc, writeMetaFn NodeMetaWriteFunc, owner common.Hash, epoch types.StateEpoch) *StackTrie { + st := stPool.Get().(*StackTrie) + st.owner = owner + st.writeFn = writeFn + st.writeMetaFn = writeMetaFn + st.epoch = epoch + st.enableStateExpiry = true + return st +} + func returnToPool(st *StackTrie) { st.Reset() stPool.Put(st) @@ -57,12 +70,16 @@ func returnToPool(st *StackTrie) { // in order. Once it determines that a subtree will no longer be inserted // into, it will hash it and free up the memory it uses. type StackTrie struct { - owner common.Hash // the owner of the trie - nodeType uint8 // node type (as in branch, ext, leaf) - val []byte // value contained by this node if it's a leaf - key []byte // key chunk covered by this (leaf|ext) node - children [16]*StackTrie // list of children (for branch and exts) - writeFn NodeWriteFunc // function for committing nodes, can be nil + owner common.Hash // the owner of the trie + nodeType uint8 // node type (as in branch, ext, leaf) + val []byte // value contained by this node if it's a leaf + key []byte // key chunk covered by this (leaf|ext) node + children [16]*StackTrie // list of children (for branch and exts) + enableStateExpiry bool // whether to enable state expiry + epoch types.StateEpoch + epochMap [16]types.StateEpoch + writeFn NodeWriteFunc // function for committing nodes, can be nil + writeMetaFn NodeMetaWriteFunc // function for committing epoch metadata, can be nil } // NewStackTrie allocates and initializes an empty trie. @@ -83,6 +100,17 @@ func NewStackTrieWithOwner(writeFn NodeWriteFunc, owner common.Hash) *StackTrie } } +func NewStackTrieWithStateExpiry(writeFn NodeWriteFunc, writeMetaFn NodeMetaWriteFunc, owner common.Hash, epoch types.StateEpoch) *StackTrie { + return &StackTrie{ + owner: owner, + nodeType: emptyNode, + epoch: epoch, + enableStateExpiry: true, + writeFn: writeFn, + writeMetaFn: writeMetaFn, + } +} + // NewFromBinary initialises a serialized stacktrie with the given db. func NewFromBinary(data []byte, writeFn NodeWriteFunc) (*StackTrie, error) { var st StackTrie @@ -208,6 +236,10 @@ func (st *StackTrie) Update(key, value []byte) error { if len(value) == 0 { panic("deletion not supported") } + if st.enableStateExpiry { + st.insertWithEpoch(k[:len(k)-1], value, nil, st.epoch) + return nil + } st.insert(k[:len(k)-1], value, nil) return nil } @@ -387,6 +419,154 @@ func (st *StackTrie) insert(key, value []byte, prefix []byte) { } } +func (st *StackTrie) insertWithEpoch(key, value []byte, prefix []byte, epoch types.StateEpoch) { + switch st.nodeType { + case branchNode: /* Branch */ + idx := int(key[0]) + + // Unresolve elder siblings + for i := idx - 1; i >= 0; i-- { + if st.children[i] != nil { + if st.children[i].nodeType != hashedNode { + st.children[i].hash(append(prefix, byte(i))) + } + break + } + } + + // Add new child + if st.children[idx] == nil { + st.children[idx] = newLeaf(st.owner, key[1:], value, st.writeFn) + } else { + st.children[idx].insertWithEpoch(key[1:], value, append(prefix, key[0]), epoch) + } + st.epochMap[idx] = epoch + + case extNode: /* Ext */ + // Compare both key chunks and see where they differ + diffidx := st.getDiffIndex(key) + + // Check if chunks are identical. If so, recurse into + // the child node. Otherwise, the key has to be split + // into 1) an optional common prefix, 2) the fullnode + // representing the two differing path, and 3) a leaf + // for each of the differentiated subtrees. + if diffidx == len(st.key) { + // Ext key and key segment are identical, recurse into + // the child node. + st.children[0].insertWithEpoch(key[diffidx:], value, append(prefix, key[:diffidx]...), epoch) + return + } + // Save the original part. Depending if the break is + // at the extension's last byte or not, create an + // intermediate extension or use the extension's child + // node directly. + var n *StackTrie + if diffidx < len(st.key)-1 { + // Break on the non-last byte, insert an intermediate + // extension. The path prefix of the newly-inserted + // extension should also contain the different byte. + n = newExt(st.owner, st.key[diffidx+1:], st.children[0], st.writeFn) + n.hash(append(prefix, st.key[:diffidx+1]...)) + } else { + // Break on the last byte, no need to insert + // an extension node: reuse the current node. + // The path prefix of the original part should + // still be same. + n = st.children[0] + n.hash(append(prefix, st.key...)) + } + var p *StackTrie + if diffidx == 0 { + // the break is on the first byte, so + // the current node is converted into + // a branch node. + st.children[0] = nil + p = st + st.nodeType = branchNode + } else { + // the common prefix is at least one byte + // long, insert a new intermediate branch + // node. + st.children[0] = stackTrieFromPoolWithExpiry(st.writeFn, st.writeMetaFn, st.owner, st.epoch) + st.children[0].nodeType = branchNode + p = st.children[0] + } + // Create a leaf for the inserted part + o := newLeaf(st.owner, key[diffidx+1:], value, st.writeFn) + + // Insert both child leaves where they belong: + origIdx := st.key[diffidx] + newIdx := key[diffidx] + p.children[origIdx] = n + p.children[newIdx] = o + st.key = st.key[:diffidx] + p.epochMap[origIdx] = epoch + p.epochMap[newIdx] = epoch + + case leafNode: /* Leaf */ + // Compare both key chunks and see where they differ + diffidx := st.getDiffIndex(key) + + // Overwriting a key isn't supported, which means that + // the current leaf is expected to be split into 1) an + // optional extension for the common prefix of these 2 + // keys, 2) a fullnode selecting the path on which the + // keys differ, and 3) one leaf for the differentiated + // component of each key. + if diffidx >= len(st.key) { + panic("Trying to insert into existing key") + } + + // Check if the split occurs at the first nibble of the + // chunk. In that case, no prefix extnode is necessary. + // Otherwise, create that + var p *StackTrie + if diffidx == 0 { + // Convert current leaf into a branch + st.nodeType = branchNode + p = st + st.children[0] = nil + } else { + // Convert current node into an ext, + // and insert a child branch node. + st.nodeType = extNode + st.children[0] = NewStackTrieWithStateExpiry(st.writeFn, st.writeMetaFn, st.owner, st.epoch) + st.children[0].nodeType = branchNode + p = st.children[0] + } + + // Create the two child leaves: one containing the original + // value and another containing the new value. The child leaf + // is hashed directly in order to free up some memory. + origIdx := st.key[diffidx] + p.children[origIdx] = newLeaf(st.owner, st.key[diffidx+1:], st.val, st.writeFn) + p.children[origIdx].hash(append(prefix, st.key[:diffidx+1]...)) + + newIdx := key[diffidx] + p.children[newIdx] = newLeaf(st.owner, key[diffidx+1:], value, st.writeFn) + + p.epochMap[origIdx] = epoch + p.epochMap[newIdx] = epoch + + // Finally, cut off the key part that has been passed + // over to the children. + st.key = st.key[:diffidx] + st.val = nil + + case emptyNode: /* Empty */ + st.nodeType = leafNode + st.key = key + st.val = value + + case hashedNode: + panic("trying to insert into hash") + + default: + panic("invalid type") + } +} + // hash converts st into a 'hashedNode', if possible. Possible outcomes: // // 1. The rlp-encoded value was >= 32 bytes: @@ -469,6 +649,7 @@ func (st *StackTrie) hashRec(hasher *hasher, path []byte) { panic("invalid node type") } + prevNodeType := st.nodeType st.nodeType = hashedNode st.key = st.key[:0] if len(encodedNode) < 32 { @@ -481,6 +662,12 @@ func (st *StackTrie) hashRec(hasher *hasher, path []byte) { st.val = hasher.hashData(encodedNode) if st.writeFn != nil { st.writeFn(st.owner, path, common.BytesToHash(st.val), encodedNode) + if st.enableStateExpiry && prevNodeType == branchNode && st.writeMetaFn != nil { + epochMeta := epochmeta.NewBranchNodeEpochMeta(st.epochMap) + buf := rlp.NewEncoderBuffer(nil) + epochMeta.Encode(buf) + st.writeMetaFn(st.owner, path, buf.ToBytes()) + } } } @@ -514,6 +701,9 @@ func (st *StackTrie) Commit() (h common.Hash, err error) { if st.writeFn == nil { return common.Hash{}, ErrCommitDisabled } + if st.enableStateExpiry && st.writeMetaFn == nil { + return common.Hash{}, ErrCommitDisabled + } hasher := newHasher(false) defer returnHasherToPool(hasher) @@ -529,6 +719,12 @@ func (st *StackTrie) Commit() (h common.Hash, err error) { hasher.sha.Write(st.val) hasher.sha.Read(h[:]) - st.writeFn(st.owner, nil, h, st.val) + st.writeFn(st.owner, nil, h, st.val) // func(owner common.Hash, path []byte, hash common.Hash, blob []byte) + if st.enableStateExpiry && st.nodeType == branchNode && st.writeMetaFn != nil { + epochMeta := epochmeta.NewBranchNodeEpochMeta(st.epochMap) + buf := rlp.NewEncoderBuffer(nil) + epochMeta.Encode(buf) + st.writeMetaFn(st.owner, nil, buf.ToBytes()) + } return h, nil } diff --git a/trie/sync.go b/trie/sync.go index 35d36f6e04..c6f0da5e86 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -27,6 +27,8 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie/epochmeta" ) // ErrNotRequested is returned by the trie sync when it's requested to process a @@ -93,9 +95,10 @@ type LeafCallback func(keys [][]byte, path []byte, leaf []byte, parent common.Ha // nodeRequest represents a scheduled or already in-flight trie node retrieval request. type nodeRequest struct { - hash common.Hash // Hash of the trie node to retrieve - path []byte // Merkle path leading to this node for prioritization - data []byte // Data content of the node, cached until all subtrees complete + hash common.Hash // Hash of the trie node to retrieve + path []byte // Merkle path leading to this node for prioritization + data []byte // Data content of the node, cached until all subtrees complete + epochMap [16]types.StateEpoch parent *nodeRequest // Parent state node referencing this entry deps int // Number of dependencies before allowed to commit this node @@ -125,10 +128,11 @@ type CodeSyncResult struct { // syncMemBatch is an in-memory buffer of successfully downloaded but not yet // persisted data items. type syncMemBatch struct { - nodes map[string][]byte // In-memory membatch of recently completed nodes - hashes map[string]common.Hash // Hashes of recently completed nodes - codes map[common.Hash][]byte // In-memory membatch of recently completed codes - size uint64 // Estimated batch-size of in-memory data. + nodes map[string][]byte // In-memory membatch of recently completed nodes + hashes map[string]common.Hash // Hashes of recently completed nodes + epochMaps map[string][16]types.StateEpoch + codes map[common.Hash][]byte // In-memory membatch of recently completed codes + size uint64 // Estimated batch-size of in-memory data. } // newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. @@ -156,13 +160,15 @@ func (batch *syncMemBatch) hasCode(hash common.Hash) bool { // unknown trie hashes to retrieve, accepts node data associated with said hashes // and reconstructs the trie step by step until all is done. type Sync struct { - scheme string // Node scheme descriptor used in database. - database ethdb.KeyValueReader // Persistent database to check for existing entries - membatch *syncMemBatch // Memory buffer to avoid frequent database writes - nodeReqs map[string]*nodeRequest // Pending requests pertaining to a trie node path - codeReqs map[common.Hash]*codeRequest // Pending requests pertaining to a code hash - queue *prque.Prque[int64, any] // Priority queue with the pending requests - fetches map[int]int // Number of active fetches per trie node depth + scheme string // Node scheme descriptor used in database. + database ethdb.KeyValueReader // Persistent database to check for existing entries + membatch *syncMemBatch // Memory buffer to avoid frequent database writes + nodeReqs map[string]*nodeRequest // Pending requests pertaining to a trie node path + codeReqs map[common.Hash]*codeRequest // Pending requests pertaining to a code hash + queue *prque.Prque[int64, any] // Priority queue with the pending requests + fetches map[int]int // Number of active fetches per trie node depth + enableStateExpiry bool + epoch types.StateEpoch } // NewSync creates a new trie data download scheduler. @@ -180,6 +186,22 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb return ts } +func NewSyncWithEpoch(root common.Hash, database ethdb.KeyValueReader, callback LeafCallback, scheme string, epoch types.StateEpoch) *Sync { + ts := &Sync{ + scheme: scheme, + database: database, + membatch: newSyncMemBatch(), + nodeReqs: make(map[string]*nodeRequest), + codeReqs: make(map[common.Hash]*codeRequest), + queue: prque.New[int64, any](nil), // Ugh, can contain both string and hash, whyyy + fetches: make(map[int]int), + epoch: epoch, + enableStateExpiry: true, + } + ts.AddSubTrie(root, nil, common.Hash{}, nil, callback) + return ts +} + // AddSubTrie registers a new trie to the sync code, rooted at the designated // parent for completion tracking. The given path is a unique node path in // hex format and contain all the parent path if it's layered trie node. @@ -328,6 +350,14 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error { } req.data = result.Data + if fn, ok := node.(*fullNode); s.enableStateExpiry && ok { + for i := 0; i < 16; i++ { + if fn.Children[i] != nil { + req.epochMap[i] = s.epoch + } + } + } + // Create and schedule a request for all the children nodes requests, err := s.children(req, node) if err != nil { @@ -351,6 +381,14 @@ func (s *Sync) Commit(dbw ethdb.Batch) error { for path, value := range s.membatch.nodes { owner, inner := ResolvePath([]byte(path)) rawdb.WriteTrieNode(dbw, owner, inner, s.membatch.hashes[path], value, s.scheme) + if s.enableStateExpiry { + if s.membatch.epochMaps[path] != [16]types.StateEpoch{} { + epochMeta := epochmeta.NewBranchNodeEpochMeta(s.membatch.epochMaps[path]) + buf := rlp.NewEncoderBuffer(nil) + epochMeta.Encode(buf) + rawdb.WriteEpochMetaPlainState(dbw, owner, string(inner), buf.ToBytes()) + } + } } for hash, value := range s.membatch.codes { rawdb.WriteCode(dbw, hash, value) @@ -509,10 +547,18 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error { // Write the node content to the membatch s.membatch.nodes[string(req.path)] = req.data s.membatch.hashes[string(req.path)] = req.hash + if req.epochMap != [16]types.StateEpoch{} { + s.membatch.epochMaps[string(req.path)] = req.epochMap + } // The size tracking refers to the db-batch, not the in-memory data. // Therefore, we ignore the req.Path, and account only for the hash+data // which eventually is written to db. s.membatch.size += common.HashLength + uint64(len(req.data)) + for _, epoch := range req.epochMap { + if epoch != 0 { + s.membatch.size += 16 + } + } delete(s.nodeReqs, string(req.path)) s.fetches[len(req.path)]-- diff --git a/trie/trie.go b/trie/trie.go index 6a74636b45..177530b87c 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -1556,3 +1556,11 @@ func (t *Trie) recursePruneExpiredNode(n node, path []byte, epoch types.StateEpo panic(fmt.Sprintf("invalid node type: %T", n)) } } + +func (t *Trie) UpdateRootEpoch(epoch types.StateEpoch) { + t.rootEpoch = epoch +} + +func (t *Trie) UpdateCurrentEpoch(epoch types.StateEpoch) { + t.currentEpoch = epoch +} diff --git a/trie/trie_reader.go b/trie/trie_reader.go index 924d379bbf..ee7a5dec28 100644 --- a/trie/trie_reader.go +++ b/trie/trie_reader.go @@ -51,11 +51,20 @@ type trieReader struct { // newTrieReader initializes the trie reader with the given node reader. func newTrieReader(stateRoot, owner common.Hash, db *Database) (*trieReader, error) { + var err error + if stateRoot == (common.Hash{}) || stateRoot == types.EmptyRootHash { if stateRoot == (common.Hash{}) { log.Error("Zero state root hash!") } - return &trieReader{owner: owner}, nil + tr := &trieReader{owner: owner} + if db.snapTree != nil { + tr.emdb, err = epochmeta.NewEpochMetaDatabase(db.snapTree, new(big.Int), stateRoot) + if err != nil { + return nil, err + } + } + return tr, nil } reader, err := db.Reader(stateRoot) if err != nil {