Skip to content

Commit

Permalink
feat(snap): add state expiry support to snap sync
Browse files Browse the repository at this point in the history
add server related for snap sync

edit
  • Loading branch information
asyukii committed Oct 13, 2023
1 parent 6cd8481 commit 2656b7f
Show file tree
Hide file tree
Showing 14 changed files with 620 additions and 60 deletions.
10 changes: 5 additions & 5 deletions core/state/state_expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func fetchExpiredStorageFromRemote(meta *stateExpiryMeta, addr common.Address, r
return nil, fmt.Errorf("cannot find any revive proof from remoteDB")
}

return reviveStorageTrie(addr, tr, proofs[0], key)
return ReviveStorageTrie(addr, tr, proofs[0], key)
}

// batchFetchExpiredStorageFromRemote request expired state from remote full state node with a list of keys and prefixes.
Expand Down Expand Up @@ -126,8 +126,8 @@ func batchFetchExpiredFromRemote(expiryMeta *stateExpiryMeta, addr common.Addres
}

for i, proof := range proofs {
// kvs, err := reviveStorageTrie(addr, tr, proof, common.HexToHash(keysStr[i])) // TODO(asyukii): this logically should work but it doesn't because of some reason, will need to investigate
kvs, err := reviveStorageTrie(addr, tr, proof, common.HexToHash(proof.Key))
// kvs, err := ReviveStorageTrie(addr, tr, proof, common.HexToHash(keysStr[i])) // TODO(asyukii): this logically should work but it doesn't because of some reason, will need to investigate
kvs, err := ReviveStorageTrie(addr, tr, proof, common.HexToHash(proof.Key))
if err != nil {
log.Error("reviveStorageTrie failed", "addr", addr, "key", keys[i], "err", err)
continue
Expand All @@ -138,8 +138,8 @@ func batchFetchExpiredFromRemote(expiryMeta *stateExpiryMeta, addr common.Addres
return ret, nil
}

// reviveStorageTrie revive trie's expired state from proof
func reviveStorageTrie(addr common.Address, tr Trie, proof types.ReviveStorageProof, targetKey common.Hash) (map[string][]byte, error) {
// ReviveStorageTrie revive trie's expired state from proof
func ReviveStorageTrie(addr common.Address, tr Trie, proof types.ReviveStorageProof, targetKey common.Hash) (map[string][]byte, error) {
defer func(start time.Time) {
reviveStorageTrieTimer.Update(time.Since(start))
}(time.Now())
Expand Down
29 changes: 29 additions & 0 deletions core/state/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,32 @@ func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(k
syncer = trie.NewSync(root, database, onAccount, scheme)
return syncer
}

func NewStateSyncWithExpiry(root common.Hash, database ethdb.KeyValueReader, onLeaf func(keys [][]byte, leaf []byte) error, scheme string, epoch types.StateEpoch) *trie.Sync {
// Register the storage slot callback if the external callback is specified.
var onSlot func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error
if onLeaf != nil {
onSlot = func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error {
return onLeaf(keys, leaf)
}
}
// Register the account callback to connect the state trie and the storage
// trie belongs to the contract.
var syncer *trie.Sync
onAccount := func(keys [][]byte, path []byte, leaf []byte, parent common.Hash, parentPath []byte) error {
if onLeaf != nil {
if err := onLeaf(keys, leaf); err != nil {
return err
}
}
var obj types.StateAccount
if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil {
return err
}
syncer.AddSubTrie(obj.Root, path, parent, parentPath, onSlot)
syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), path, parent, parentPath)
return nil
}
syncer = trie.NewSyncWithEpoch(root, database, onAccount, scheme, epoch)
return syncer
}
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
DirectBroadcast: config.DirectBroadcast,
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
PeerSet: peers,
EnableStateExpiry: config.StateExpiryCfg.Enable,
}); err != nil {
return nil, err
}
Expand Down
41 changes: 38 additions & 3 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ type Downloader struct {
SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now
stateSyncStart chan *stateSync

enableStateExpiry bool

// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs
Expand Down Expand Up @@ -207,6 +209,10 @@ type BlockChain interface {
// TrieDB retrieves the low level trie database used for interacting
// with trie nodes.
TrieDB() *trie.Database

Config() *params.ChainConfig

StateExpiryConfig() *types.StateExpiryConfig
}

type DownloadOption func(downloader *Downloader) *Downloader
Expand Down Expand Up @@ -235,6 +241,30 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchai
return dl
}

func NewWithExpiry(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, enableStateExpiry bool, dropPeer peerDropFn, options ...DownloadOption) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{
stateDB: stateDb,
mux: mux,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
enableStateExpiry: enableStateExpiry,
headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}),
SnapSyncer: snap.NewSyncerWithStateExpiry(stateDb, chain.TrieDB().Scheme(), enableStateExpiry),
stateSyncStart: make(chan *stateSync),
syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(),
}

go dl.stateFetcher()
return dl
}

// Progress retrieves the synchronisation boundaries, specifically the origin
// block where synchronisation started at (may have failed/suspended); the block
// or header sync is currently at; and the latest known block which the sync targets.
Expand Down Expand Up @@ -1464,10 +1494,14 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
func (d *Downloader) processSnapSyncContent() error {
// Start syncing state of the reported head block. This should get us most of
// the state of the pivot block.
var epoch types.StateEpoch

d.pivotLock.RLock()
sync := d.syncState(d.pivotHeader.Root)
sync := d.syncStateWithEpoch(d.pivotHeader.Root, epoch)
d.pivotLock.RUnlock()

epoch = types.GetStateEpoch(d.blockchain.StateExpiryConfig(), new(big.Int).SetUint64(d.pivotHeader.Number.Uint64()))

defer func() {
// The `sync` object is replaced every time the pivot moves. We need to
// defer close the very last active one, hence the lazy evaluation vs.
Expand Down Expand Up @@ -1516,11 +1550,12 @@ func (d *Downloader) processSnapSyncContent() error {
d.pivotLock.RLock()
pivot := d.pivotHeader
d.pivotLock.RUnlock()
epoch = types.GetStateEpoch(d.blockchain.StateExpiryConfig(), new(big.Int).SetUint64(pivot.Number.Uint64()))

if oldPivot == nil {
if pivot.Root != sync.root {
sync.Cancel()
sync = d.syncState(pivot.Root)
sync = d.syncStateWithEpoch(pivot.Root, epoch)

go closeOnErr(sync)
}
Expand Down Expand Up @@ -1558,7 +1593,7 @@ func (d *Downloader) processSnapSyncContent() error {
// If new pivot block found, cancel old state retrieval and restart
if oldPivot != P {
sync.Cancel()
sync = d.syncState(P.Header.Root)
sync = d.syncStateWithEpoch(P.Header.Root, types.GetStateEpoch(d.blockchain.StateExpiryConfig(), new(big.Int).SetUint64(P.Header.Number.Uint64())))

go closeOnErr(sync)
oldPivot = P
Expand Down
38 changes: 36 additions & 2 deletions eth/downloader/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

Expand All @@ -40,6 +41,22 @@ func (d *Downloader) syncState(root common.Hash) *stateSync {
return s
}

func (d *Downloader) syncStateWithEpoch(root common.Hash, epoch types.StateEpoch) *stateSync {
// Create the state sync
s := newStateSyncWithEpoch(d, root, epoch)
select {
case d.stateSyncStart <- s:
// If we tell the statesync to restart with a new root, we also need
// to wait for it to actually also start -- when old requests have timed
// out or been delivered
<-s.started
case <-d.quitCh:
s.err = errCancelStateFetch
close(s.done)
}
return s
}

// stateFetcher manages the active state sync and accepts requests
// on its behalf.
func (d *Downloader) stateFetcher() {
Expand Down Expand Up @@ -77,8 +94,10 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
// stateSync schedules requests for downloading a particular state trie defined
// by a given state root.
type stateSync struct {
d *Downloader // Downloader instance to access and manage current peerset
root common.Hash // State root currently being synced
d *Downloader // Downloader instance to access and manage current peerset
root common.Hash // State root currently being synced
epoch types.StateEpoch
enableStateExpiry bool

started chan struct{} // Started is signalled once the sync loop starts
cancel chan struct{} // Channel to signal a termination request
Expand All @@ -99,11 +118,26 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
}
}

func newStateSyncWithEpoch(d *Downloader, root common.Hash, epoch types.StateEpoch) *stateSync {
return &stateSync{
d: d,
root: root,
epoch: epoch,
enableStateExpiry: true,
cancel: make(chan struct{}),
done: make(chan struct{}),
started: make(chan struct{}),
}
}

// run starts the task assignment and response processing loop, blocking until
// it finishes, and finally notifying any goroutines waiting for the loop to
// finish.
func (s *stateSync) run() {
close(s.started)
if s.enableStateExpiry {
s.d.SnapSyncer.UpdateEpoch(s.epoch)
}
s.err = s.d.SnapSyncer.Sync(s.root, s.cancel)
close(s.done)
}
Expand Down
6 changes: 5 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type handlerConfig struct {
DirectBroadcast bool
DisablePeerTxBroadcast bool
PeerSet *peerSet
EnableStateExpiry bool
}

type handler struct {
Expand All @@ -134,6 +135,8 @@ type handler struct {
acceptTxs atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)
directBroadcast bool

enableStateExpiry bool

database ethdb.Database
txpool txPool
votepool votePool
Expand Down Expand Up @@ -196,6 +199,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
peersPerIP: make(map[string]int),
requiredBlocks: config.RequiredBlocks,
directBroadcast: config.DirectBroadcast,
enableStateExpiry: config.EnableStateExpiry,
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
Expand Down Expand Up @@ -249,7 +253,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
downloadOptions = append(downloadOptions, success)
*/

h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, downloadOptions...)
h.downloader = downloader.NewWithExpiry(config.Database, h.eventMux, h.chain, nil, config.EnableStateExpiry, h.removePeer, downloadOptions...)

// Construct the fetcher (short sync)
validator := func(header *types.Header) error {
Expand Down
54 changes: 49 additions & 5 deletions eth/protocols/snap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -381,13 +384,26 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP
storage []*StorageData
last common.Hash
abort bool
sv snapshot.SnapValue
hash common.Hash
slot []byte
enc []byte
)
for it.Next() {
if size >= hardLimit {
abort = true
break
}
hash, slot := it.Hash(), common.CopyBytes(it.Slot())
hash, enc = it.Hash(), common.CopyBytes(it.Slot())
if len(enc) > 0 {
sv, err = snapshot.DecodeValueFromRLPBytes(enc)
if err != nil || sv == nil {
log.Warn("Failed to decode storage slot", "err", err)
return nil, nil
}
}

slot = sv.GetVal()

// Track the returned interval for the Merkle proofs
last = hash
Expand Down Expand Up @@ -429,13 +445,23 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP
}
proof := light.NewNodeSet()
if err := stTrie.Prove(origin[:], proof); err != nil {
log.Warn("Failed to prove storage range", "origin", req.Origin, "err", err)
return nil, nil
if enErr, ok := err.(*trie.ExpiredNodeError); ok {
err := reviveAndGetProof(chain.FullStateDB(), stTrie, req.Root, common.BytesToAddress(account[:]), acc.Root, enErr.Path, origin, proof)
if err != nil {
log.Warn("Failed to prove storage range", "origin", origin, "err", err)
return nil, nil
}
}
}
if last != (common.Hash{}) {
if err := stTrie.Prove(last[:], proof); err != nil {
log.Warn("Failed to prove storage range", "last", last, "err", err)
return nil, nil
if enErr, ok := err.(*trie.ExpiredNodeError); ok {
err := reviveAndGetProof(chain.FullStateDB(), stTrie, req.Root, common.BytesToAddress(account[:]), acc.Root, enErr.Path, last, proof)
if err != nil {
log.Warn("Failed to prove storage range", "origin", origin, "err", err)
return nil, nil
}
}
}
}
for _, blob := range proof.NodeList() {
Expand Down Expand Up @@ -567,6 +593,24 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s
return nodes, nil
}

func reviveAndGetProof(fullStateDB ethdb.FullStateDB, tr *trie.StateTrie, stateRoot common.Hash, account common.Address, root common.Hash, prefixKey []byte, key common.Hash, proofDb *light.NodeSet) error {
proofs, err := fullStateDB.GetStorageReviveProof(stateRoot, account, root, []string{common.Bytes2Hex(prefixKey)}, []string{common.Bytes2Hex(key[:])})
if err != nil || len(proofs) == 0 {
return err
}

_, err = state.ReviveStorageTrie(account, tr, proofs[0], key)
if err != nil {
return err
}

if err := tr.Prove(key[:], proofDb); err != nil {
return err
}

return nil
}

// NodeInfo represents a short summary of the `snap` sub-protocol metadata
// known about the host peer.
type NodeInfo struct{}
Expand Down
Loading

0 comments on commit 2656b7f

Please sign in to comment.