From 14e48ffca3cdda2c103464848f88e7ea07b31e2b Mon Sep 17 00:00:00 2001 From: fudongbai <296179868@qq.com> Date: Thu, 26 Aug 2021 14:55:37 +0800 Subject: [PATCH] update --- cmd/utils/flags.go | 7 +- core/blockchain.go | 209 +++++++++++++++++++-------------- core/rawdb/accessors_chain.go | 20 ++-- core/state/statedb.go | 31 +++-- core/state_processor.go | 35 +++--- core/types/block.go | 11 +- eth/backend.go | 3 +- eth/handler.go | 9 +- eth/protocols/diff/handler.go | 1 + eth/protocols/diff/peer.go | 60 ++++++++-- eth/protocols/eth/broadcast.go | 13 +- eth/protocols/eth/peer.go | 4 +- 12 files changed, 238 insertions(+), 165 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index a6eebfcf7b..9b5d2b14a6 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -118,8 +118,9 @@ var ( Usage: "Enable directly broadcast mined block to all peers", } LightSyncFlag = cli.BoolFlag{ - Name: "lightsync", - Usage: "Enable difflayer light sync ", + Name: "lightsync", + Usage: "Enable difflayer light sync, Please note that enable lightsync will improve the syncing speed, " + + "but will degrade the security to light client level", } RangeLimitFlag = cli.BoolFlag{ Name: "rangelimit", @@ -435,7 +436,7 @@ var ( } PersistDiffFlag = cli.BoolFlag{ Name: "persistdiff", - Usage: "Enable persisting the diff layer", + Usage: "Enable persistence of the diff layer", } // Miner settings MiningEnabledFlag = cli.BoolFlag{ diff --git a/core/blockchain.go b/core/blockchain.go index 286d3c86f5..65ff1deb9b 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -90,14 +90,12 @@ const ( maxTimeFutureBlocks = 30 maxBeyondBlocks = 2048 - diffLayerfreezerRecheckInterval = 3 * time.Second - diffLayerfreezerBlockLimit = 864000 // The number of blocks that should be kept in disk. - - diffLayerPruneRecheckInterval = 3 * time.Second - - maxQueueDist = 64 // Maximum allowed distance from the chain head to queue - blockLimit = 128 // Maximum number of unique diff layers a peer may have delivered - maxUncleDist = 11 // Maximum allowed backward distance from the chain head + diffLayerFreezerRecheckInterval = 3 * time.Second + diffLayerFreezerBlockLimit = 864000 // The number of diff layers that should be kept in disk. + diffLayerPruneRecheckInterval = 1 * time.Second // The interval to prune unverified diff layers + maxDiffQueueDist = 64 // Maximum allowed distance from the chain head to queue diffLayers + maxDiffLimit = 128 // Maximum number of unique diff layers a peer may have delivered + maxDiffForkDist = 11 // Maximum allowed backward distance from the chain head // BlockChainVersion ensures that an incompatible database forces a resync from scratch. // @@ -141,6 +139,7 @@ type CacheConfig struct { SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it } +// To avoid cycle import type PeerIDer interface { ID() string } @@ -204,21 +203,23 @@ type BlockChain struct { currentBlock atomic.Value // Current head of the block chain currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) - stateCache state.Database // State database to reuse between imports (contains state cache) - bodyCache *lru.Cache // Cache for the most recent block bodies - bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format - receiptsCache *lru.Cache // Cache for the most recent receipts per block - blockCache *lru.Cache // Cache for the most recent entire blocks - txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. - futureBlocks *lru.Cache // future blocks are blocks added for later processing - diffLayerCache *lru.Cache // Cache for the diffLayers - diffLayerRLPCache *lru.Cache // Cache for the rlp encoded diffLayers - diffQueue *prque.Prque // A Priority queue to store recent diff layer + stateCache state.Database // State database to reuse between imports (contains state cache) + bodyCache *lru.Cache // Cache for the most recent block bodies + bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format + receiptsCache *lru.Cache // Cache for the most recent receipts per block + blockCache *lru.Cache // Cache for the most recent entire blocks + txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. + futureBlocks *lru.Cache // future blocks are blocks added for later processing + + // trusted diff layers + diffLayerCache *lru.Cache // Cache for the diffLayers + diffLayerRLPCache *lru.Cache // Cache for the rlp encoded diffLayers + diffQueue *prque.Prque // A Priority queue to store recent diff layer // untrusted diff layers - diffMutex sync.RWMutex - receivedDiffLayers map[common.Hash]map[common.Hash]*types.DiffLayer //map[blockHash] map[DiffHash]Diff - diffHashToBlockHash map[common.Hash]common.Hash // map[diffHash]blockhash + diffMux sync.RWMutex + receivedDiffLayers map[common.Hash]map[common.Hash]*types.DiffLayer // map[blockHash] map[DiffHash]Diff + diffHashToBlockHash map[common.Hash]common.Hash // map[diffHash]blockHash diffHashToPeers map[common.Hash]map[string]struct{} // map[diffHash]map[pid] diffNumToBlockHashes map[uint64]map[common.Hash]struct{} // map[number]map[blockHash] diffPeersToDiffHashes map[string]map[common.Hash]struct{} // map[pid]map[diffHash] @@ -240,12 +241,15 @@ type BlockChain struct { // NewBlockChain returns a fully initialised block chain using information // available in the database. It initialises the default Ethereum Validator and // Processor. -func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, txLookupLimit *uint64, options ...BlockChainOption) (*BlockChain, error) { +func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, + vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, txLookupLimit *uint64, + options ...BlockChainOption) (*BlockChain, error) { if cacheConfig == nil { cacheConfig = defaultCacheConfig } if cacheConfig.TriesInMemory != 128 { - log.Warn("TriesInMemory isn't the default value(128), you need specify exact same TriesInMemory when prune data", "triesInMemory", cacheConfig.TriesInMemory) + log.Warn("TriesInMemory isn't the default value(128), you need specify exact same TriesInMemory when prune data", + "triesInMemory", cacheConfig.TriesInMemory) } bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) @@ -416,7 +420,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par for _, option := range options { bc = option(bc) } - // Take ownership of this particular state go bc.update() if txLookupLimit != nil { @@ -440,9 +443,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } // Need persist and prune diff layer if bc.db.DiffStore() != nil { - go bc.diffLayerFreeze() + go bc.trustedDiffLayerFreezeLoop() } - go bc.pruneDiffLoop() + go bc.untrustedDiffLayerPruneLoop() return bc, nil } @@ -457,7 +460,7 @@ func (bc *BlockChain) cacheReceipts(hash common.Hash, receipts types.Receipts) { } func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer) { - bc.diffLayerCache.Add(diffLayer.Hash, diffLayer) + bc.diffLayerCache.Add(diffLayer.BlockHash, diffLayer) if bc.db.DiffStore() != nil { // push to priority queue before persisting bc.diffQueue.Push(diffLayer, -(int64(diffLayer.Number))) @@ -929,29 +932,41 @@ func (bc *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue { return body } -// GetBodyRLP retrieves a diff layer in RLP encoding from the database by hash, -// caching it if found. -func (bc *BlockChain) GetDiffLayerRLP(hash common.Hash) rlp.RawValue { +// GetDiffLayerRLP retrieves a diff layer in RLP encoding from the cache or database by blockHash +func (bc *BlockChain) GetDiffLayerRLP(blockHash common.Hash) rlp.RawValue { // Short circuit if the diffLayer's already in the cache, retrieve otherwise - if cached, ok := bc.diffLayerRLPCache.Get(hash); ok { + if cached, ok := bc.diffLayerRLPCache.Get(blockHash); ok { return cached.(rlp.RawValue) } - if cached, ok := bc.diffLayerCache.Get(hash); ok { + if cached, ok := bc.diffLayerCache.Get(blockHash); ok { diff := cached.(*types.DiffLayer) bz, err := rlp.EncodeToBytes(diff) if err != nil { return nil } - bc.diffLayerRLPCache.Add(hash, bz) + bc.diffLayerRLPCache.Add(blockHash, bz) + return bz + } + + // fallback to untrusted sources. + diff := bc.GetDiffLayer(blockHash, "") + if diff != nil { + bz, err := rlp.EncodeToBytes(diff) + if err != nil { + return nil + } + // No need to cache untrusted data return bz } + + // fallback to disk diffStore := bc.db.DiffStore() if diffStore == nil { return nil } - rawData := rawdb.ReadDiffLayerRLP(diffStore, hash) + rawData := rawdb.ReadDiffLayerRLP(diffStore, blockHash) if len(rawData) != 0 { - bc.diffLayerRLPCache.Add(hash, rawData) + bc.diffLayerRLPCache.Add(blockHash, rawData) } return rawData } @@ -1598,8 +1613,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. if diffLayer != nil && block.Header().TxHash != types.EmptyRootHash { // Filling necessary field diffLayer.Receipts = receipts - diffLayer.StateRoot = root - diffLayer.Hash = block.Hash() + diffLayer.BlockHash = block.Hash() diffLayer.Number = block.NumberU64() bc.cacheDiffLayer(diffLayer) } @@ -2001,11 +2015,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // Validate the state using the default validator substart = time.Now() - if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { - bc.reportBlock(block, receipts, err) - log.Error("validate state failed", "error", err) - return it.index, err + if !statedb.IsLightProcessed() { + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { + log.Error("validate state failed", "error", err) + return it.index, err + } } + bc.reportBlock(block, receipts, err) bc.cacheReceipts(block.Hash(), receipts) bc.cacheBlock(block.Hash(), block) proctime := time.Since(start) @@ -2384,8 +2400,10 @@ func (bc *BlockChain) update() { } } -func (bc *BlockChain) diffLayerFreeze() { - recheck := time.Tick(diffLayerfreezerRecheckInterval) +func (bc *BlockChain) trustedDiffLayerFreezeLoop() { + recheck := time.Tick(diffLayerFreezerRecheckInterval) + bc.wg.Add(1) + defer bc.wg.Done() for { select { case <-bc.quit: @@ -2398,7 +2416,7 @@ func (bc *BlockChain) diffLayerFreeze() { if batch == nil { batch = bc.db.DiffStore().NewBatch() } - rawdb.WriteDiffLayer(batch, diffLayer.Hash, diffLayer) + rawdb.WriteDiffLayer(batch, diffLayer.BlockHash, diffLayer) if batch.ValueSize() > ethdb.IdealBatchSize { if err := batch.Write(); err != nil { log.Error("Failed to write diff layer", "err", err) @@ -2408,6 +2426,7 @@ func (bc *BlockChain) diffLayerFreeze() { } } if batch != nil { + // flush data if err := batch.Write(); err != nil { log.Error("Failed to write diff layer", "err", err) return @@ -2426,12 +2445,12 @@ func (bc *BlockChain) diffLayerFreeze() { if int64(currentHeight)+prio > int64(bc.triesInMemory) { canonicalHash := bc.GetCanonicalHash(uint64(-prio)) // on the canonical chain - if canonicalHash == diffLayer.Hash { + if canonicalHash == diffLayer.BlockHash { if batch == nil { batch = bc.db.DiffStore().NewBatch() } - rawdb.WriteDiffLayer(batch, diffLayer.Hash, diffLayer) - staleHash := bc.GetCanonicalHash(uint64(-prio) - diffLayerfreezerBlockLimit) + rawdb.WriteDiffLayer(batch, diffLayer.BlockHash, diffLayer) + staleHash := bc.GetCanonicalHash(uint64(-prio) - diffLayerFreezerBlockLimit) rawdb.DeleteDiffLayer(batch, staleHash) } } else { @@ -2455,31 +2474,29 @@ func (bc *BlockChain) diffLayerFreeze() { } } -func (bc *BlockChain) GetDiffLayer(block *types.Block) *types.DiffLayer { - bc.diffMutex.Lock() - defer bc.diffMutex.Unlock() - if diffs, exist := bc.receivedDiffLayers[block.Hash()]; exist && len(diffs) != 0 { +func (bc *BlockChain) GetDiffLayer(blockHash common.Hash, pid string) *types.DiffLayer { + bc.diffMux.RLock() + defer bc.diffMux.RUnlock() + if diffs, exist := bc.receivedDiffLayers[blockHash]; exist && len(diffs) != 0 { if len(diffs) == 1 { - for _, d := range diffs { - return d + // return the only one diff layer + for _, diff := range diffs { + return diff } } else { - // pick the one from exact same peer - if peer, ok := block.ReceivedFrom.(PeerIDer); ok { - pid := peer.ID() + // pick the one from exact same peer if we know where the block comes from + if pid != "" { if diffHashes, exist := bc.diffPeersToDiffHashes[pid]; exist { - for d1 := range diffs { - for d2 := range diffHashes { - if d1 == d2 { - return bc.receivedDiffLayers[block.Hash()][d1] - } + for diff := range diffs { + if _, overlap := diffHashes[diff]; overlap { + return bc.receivedDiffLayers[blockHash][diff] } } } } // Do not find overlap, do random pick - for _, d := range diffs { - return d + for _, diff := range diffs { + return diff } } } @@ -2487,8 +2504,8 @@ func (bc *BlockChain) GetDiffLayer(block *types.Block) *types.DiffLayer { } func (bc *BlockChain) removeDiffLayers(diffHash common.Hash) { - bc.diffMutex.Lock() - defer bc.diffMutex.Unlock() + bc.diffMux.Lock() + defer bc.diffMux.Unlock() // Untrusted peers pids := bc.diffHashToPeers[diffHash] @@ -2517,8 +2534,10 @@ func (bc *BlockChain) removeDiffLayers(diffHash common.Hash) { } } -func (bc *BlockChain) pruneDiffLoop() { +func (bc *BlockChain) untrustedDiffLayerPruneLoop() { recheck := time.Tick(diffLayerPruneRecheckInterval) + bc.wg.Add(1) + defer bc.wg.Done() for { select { case <-bc.quit: @@ -2531,57 +2550,65 @@ func (bc *BlockChain) pruneDiffLoop() { func (bc *BlockChain) pruneDiffLayer() { currentHeight := bc.CurrentBlock().NumberU64() - bc.diffMutex.Lock() - defer bc.diffMutex.Unlock() + bc.diffMux.Lock() + defer bc.diffMux.Unlock() sortNumbers := make([]uint64, 0, len(bc.diffNumToBlockHashes)) for number := range bc.diffNumToBlockHashes { sortNumbers = append(sortNumbers, number) } - sort.SliceStable(sortNumbers, func(i, j int) bool { - return sortNumbers[i] < sortNumbers[j] + sort.Slice(sortNumbers, func(i, j int) bool { + return sortNumbers[i] <= sortNumbers[j] }) - blockHashes := make(map[common.Hash]struct{}) + staleBlockHashes := make(map[common.Hash]struct{}) for _, number := range sortNumbers { - if number < currentHeight-maxUncleDist { - affecthedHashes := bc.diffNumToBlockHashes[number] - if affecthedHashes != nil { - for affecthedHash := range affecthedHashes { - blockHashes[affecthedHash] = struct{}{} - delete(bc.diffNumToBlockHashes, number) + if number < currentHeight-maxDiffForkDist { + affectedHashes := bc.diffNumToBlockHashes[number] + if affectedHashes != nil { + for affectedHash := range affectedHashes { + staleBlockHashes[affectedHash] = struct{}{} } + delete(bc.diffNumToBlockHashes, number) } } else { break } } - - for blockHash := range blockHashes { + staleDiffHashes := make(map[common.Hash]struct{}, 0) + for blockHash := range staleBlockHashes { if diffHashes, exist := bc.receivedDiffLayers[blockHash]; exist { for diffHash := range diffHashes { + staleDiffHashes[diffHash] = struct{}{} delete(bc.diffHashToBlockHash, diffHash) delete(bc.diffHashToPeers, diffHash) } } delete(bc.receivedDiffLayers, blockHash) } - + for diffHash := range staleDiffHashes { + for p, diffHashes := range bc.diffPeersToDiffHashes { + delete(diffHashes, diffHash) + if len(diffHash) == 0 { + delete(bc.diffPeersToDiffHashes, p) + } + } + } } -// Process diff layers +// Process received diff layers func (bc *BlockChain) HandleDiffLayer(diffLayer *types.DiffLayer, pid string) error { // Basic check currentHeight := bc.CurrentBlock().NumberU64() - if diffLayer.Number > currentHeight && diffLayer.Number-currentHeight > maxQueueDist { + if diffLayer.Number > currentHeight && diffLayer.Number-currentHeight > maxDiffQueueDist { return errors.New("diff layers too new from current") } - if diffLayer.Number < currentHeight && currentHeight-diffLayer.Number > maxUncleDist { + if diffLayer.Number < currentHeight && currentHeight-diffLayer.Number > maxDiffForkDist { return errors.New("diff layers too old from current") } - bc.diffMutex.Lock() - defer bc.diffMutex.Unlock() + bc.diffMux.Lock() + defer bc.diffMux.Unlock() - if len(bc.diffPeersToDiffHashes[pid]) > blockLimit { + if len(bc.diffPeersToDiffHashes[pid]) > maxDiffLimit { return errors.New("too many accumulated diffLayers") } if _, exist := bc.diffPeersToDiffHashes[pid]; exist { @@ -2595,18 +2622,18 @@ func (bc *BlockChain) HandleDiffLayer(diffLayer *types.DiffLayer, pid string) er if _, exist := bc.diffNumToBlockHashes[diffLayer.Number]; !exist { bc.diffNumToBlockHashes[diffLayer.Number] = make(map[common.Hash]struct{}) } - bc.diffNumToBlockHashes[diffLayer.Number][diffLayer.Hash] = struct{}{} + bc.diffNumToBlockHashes[diffLayer.Number][diffLayer.BlockHash] = struct{}{} if _, exist := bc.diffHashToPeers[diffLayer.DiffHash]; !exist { bc.diffHashToPeers[diffLayer.DiffHash] = make(map[string]struct{}) } bc.diffHashToPeers[diffLayer.DiffHash][pid] = struct{}{} - if _, exist := bc.receivedDiffLayers[diffLayer.Hash]; !exist { - bc.receivedDiffLayers[diffLayer.Hash] = make(map[common.Hash]*types.DiffLayer) + if _, exist := bc.receivedDiffLayers[diffLayer.BlockHash]; !exist { + bc.receivedDiffLayers[diffLayer.BlockHash] = make(map[common.Hash]*types.DiffLayer) } - bc.receivedDiffLayers[diffLayer.Hash][diffLayer.DiffHash] = diffLayer - bc.diffHashToBlockHash[diffLayer.DiffHash] = diffLayer.Hash + bc.receivedDiffLayers[diffLayer.BlockHash][diffLayer.DiffHash] = diffLayer + bc.diffHashToBlockHash[diffLayer.DiffHash] = diffLayer.BlockHash return nil } diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 34f16ebb7a..6489a600fb 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -455,32 +455,32 @@ func WriteDiffLayer(db ethdb.KeyValueWriter, hash common.Hash, layer *types.Diff WriteDiffLayerRLP(db, hash, data) } -func WriteDiffLayerRLP(db ethdb.KeyValueWriter, hash common.Hash, rlp rlp.RawValue) { - if err := db.Put(diffLayerKey(hash), rlp); err != nil { - log.Crit("Failed to store block body", "err", err) +func WriteDiffLayerRLP(db ethdb.KeyValueWriter, blockHash common.Hash, rlp rlp.RawValue) { + if err := db.Put(diffLayerKey(blockHash), rlp); err != nil { + log.Crit("Failed to store diff layer", "err", err) } } -func ReadDiffLayer(db ethdb.KeyValueReader, hash common.Hash) *types.DiffLayer { - data := ReadDiffLayerRLP(db, hash) +func ReadDiffLayer(db ethdb.KeyValueReader, blockHash common.Hash) *types.DiffLayer { + data := ReadDiffLayerRLP(db, blockHash) if len(data) == 0 { return nil } diff := new(types.DiffLayer) if err := rlp.Decode(bytes.NewReader(data), diff); err != nil { - log.Error("Invalid diff layer RLP", "hash", hash, "err", err) + log.Error("Invalid diff layer RLP", "hash", blockHash, "err", err) return nil } return diff } -func ReadDiffLayerRLP(db ethdb.KeyValueReader, hash common.Hash) rlp.RawValue { - data, _ := db.Get(diffLayerKey(hash)) +func ReadDiffLayerRLP(db ethdb.KeyValueReader, blockHash common.Hash) rlp.RawValue { + data, _ := db.Get(diffLayerKey(blockHash)) return data } -func DeleteDiffLayer(db ethdb.KeyValueWriter, hash common.Hash) { - if err := db.Delete(diffLayerKey(hash)); err != nil { +func DeleteDiffLayer(db ethdb.KeyValueWriter, blockHash common.Hash) { + if err := db.Delete(diffLayerKey(blockHash)); err != nil { log.Crit("Failed to delete diffLayer", "err", err) } } diff --git a/core/state/statedb.go b/core/state/statedb.go index 15256dda64..1830d00a6c 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -72,15 +72,15 @@ func (n *proofList) Delete(key []byte) error { // * Contracts // * Accounts type StateDB struct { - db Database - prefetcher *triePrefetcher - originalRoot common.Hash // The pre-state root, before any changes were made - trie Trie - hasher crypto.KeccakState - diffLayer *types.DiffLayer - diffTries map[common.Address]Trie - diffCode map[common.Hash][]byte - diffEnabled bool + db Database + prefetcher *triePrefetcher + originalRoot common.Hash // The pre-state root, before any changes were made + trie Trie + hasher crypto.KeccakState + diffLayer *types.DiffLayer + diffTries map[common.Address]Trie + diffCode map[common.Hash][]byte + lightProcessed bool snapMux sync.Mutex snaps *snapshot.Tree @@ -191,8 +191,12 @@ func (s *StateDB) StopPrefetcher() { } // Mark that the block is processed by diff layer -func (s *StateDB) MarkDiffEnabled() { - s.diffEnabled = true +func (s *StateDB) MarkLightProcessed() { + s.lightProcessed = true +} + +func (s *StateDB) IsLightProcessed() bool { + return s.lightProcessed } // setError remembers the first non-nil error it is called with. @@ -954,7 +958,7 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { // It is called in between transactions to get the root hash that // goes into transaction receipts. func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { - if s.diffEnabled { + if s.lightProcessed { return s.trie.Hash() } // Finalise all the dirty storage states and write them into the tries @@ -1134,7 +1138,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer } // Finalize any pending changes and merge everything into the tries root := s.IntermediateRoot(deleteEmptyObjects) - if s.diffEnabled { + if s.lightProcessed { return s.LightCommit() } var diffLayer *types.DiffLayer @@ -1295,6 +1299,7 @@ func (s *StateDB) DiffLayerToSnap(diffLayer *types.DiffLayer) (map[common.Addres snapAccounts[account.Account] = account.Blob } for _, storage := range diffLayer.Storages { + // should never happen if len(storage.Keys) != len(storage.Vals) { return nil, nil, nil, errors.New("invalid diffLayer: length of keys and values mismatch") } diff --git a/core/state_processor.go b/core/state_processor.go index 029ea718ba..99a9003bc6 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -39,7 +39,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) -const fullProcessCheck = 21 +const fullProcessCheck = 21 // On light sync mode, will do full process every fullProcessCheck randomly // StateProcessor is a basic Processor, which takes care of transitioning // state from one point to another. @@ -66,7 +66,6 @@ type LightStateProcessor struct { } func NewLightStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *LightStateProcessor { - randomGenerator := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) return &LightStateProcessor{ randomGenerator: randomGenerator, @@ -77,7 +76,11 @@ func NewLightStateProcessor(config *params.ChainConfig, bc *BlockChain, engine c func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) { // random fallback to full process if check := p.randomGenerator.Int63n(fullProcessCheck); check != 0 { - diffLayer := p.bc.GetDiffLayer(block) + var pid string + if peer, ok := block.ReceivedFrom.(PeerIDer); ok { + pid = peer.ID() + } + diffLayer := p.bc.GetDiffLayer(block.Hash(), pid) if diffLayer != nil { receipts, logs, gasUsed, err := p.LightProcess(diffLayer, block, statedb, cfg) if err == nil { @@ -101,7 +104,7 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB } func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) { - statedb.MarkDiffEnabled() + statedb.MarkLightProcessed() fullDiffCode := make(map[common.Hash][]byte, len(diffLayer.Codes)) diffTries := make(map[common.Address]state.Trie) diffCode := make(map[common.Hash][]byte) @@ -167,7 +170,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty if !bytes.Equal(latestAccount.CodeHash, previousAccount.CodeHash) && !bytes.Equal(latestAccount.CodeHash, types.EmptyCodeHash) { if code, exist := fullDiffCode[codeHash]; exist { - if crypto.Keccak256Hash(code) == codeHash { + if crypto.Keccak256Hash(code) != codeHash { return nil, nil, 0, errors.New("code and codeHash mismatch") } diffCode[codeHash] = code @@ -223,9 +226,17 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty return nil, nil, 0, err } } + var allLogs []*types.Log + var gasUsed uint64 + for _, receipt := range diffLayer.Receipts { + allLogs = append(allLogs, receipt.Logs...) + gasUsed += receipt.GasUsed + } - if root := statedb.IntermediateRoot(p.bc.Config().IsEIP158(block.Number())); block.Root() != root { - return nil, nil, 0, fmt.Errorf("invalid merkle root (remote: %x local: %x)", block.Root(), root) + // Do validate in advance so that we can fall back to full process + if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil { + log.Error("validate state failed during light sync", "error", err) + return nil, nil, 0, err } // remove redundant storage change @@ -246,18 +257,12 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty }) } } + + statedb.SetSnapData(snapDestructs, snapAccounts, snapStorage) if len(snapAccounts) != len(diffLayer.Accounts) || len(snapStorage) != len(diffLayer.Storages) { diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = statedb.SnapToDiffLayer() } statedb.SetDiff(diffLayer, diffTries, diffCode) - statedb.SetSnapData(snapDestructs, snapAccounts, snapStorage) - - var allLogs []*types.Log - var gasUsed uint64 - for _, receipt := range diffLayer.Receipts { - allLogs = append(allLogs, receipt.Logs...) - gasUsed += receipt.GasUsed - } return diffLayer.Receipts, allLogs, gasUsed, nil } diff --git a/core/types/block.go b/core/types/block.go index 4315ae476a..7f2a0785cb 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -27,10 +27,9 @@ import ( "sync/atomic" "time" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" ) @@ -372,11 +371,9 @@ func (b *Block) Hash() common.Hash { type Blocks []*Block -// journalDestruct is an account deletion entry in a diffLayer's disk journal. type DiffLayer struct { DiffHash common.Hash `rlp:"_"` - Hash common.Hash - StateRoot common.Hash + BlockHash common.Hash Number uint64 Receipts Receipts // Receipts are duplicated stored to simplify the logic Codes []DiffCode @@ -386,8 +383,8 @@ type DiffLayer struct { } func (d *DiffLayer) Validate() error { - if d.Hash == (common.Hash{}) || d.StateRoot == (common.Hash{}) { - return errors.New("hash can't be empty") + if d.BlockHash == (common.Hash{}) || d.DiffHash == (common.Hash{}) { + return errors.New("both BlockHash and DiffHash can't be empty") } for _, storage := range d.Storages { if len(storage.Keys) != len(storage.Vals) { diff --git a/eth/backend.go b/eth/backend.go index 4a31dfb26b..36db5ac5ba 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -26,8 +26,6 @@ import ( "sync/atomic" "time" - "github.com/ethereum/go-ethereum/eth/protocols/diff" - "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -44,6 +42,7 @@ import ( "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/eth/protocols/diff" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/ethdb" diff --git a/eth/handler.go b/eth/handler.go index b75f20e7e3..47b37e0cfc 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -24,7 +24,7 @@ import ( "sync/atomic" "time" - "github.com/ethereum/go-ethereum/eth/protocols/diff" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/fetcher" + "github.com/ethereum/go-ethereum/eth/protocols/diff" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/ethdb" @@ -480,8 +481,12 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { } diff := h.chain.GetDiffLayerRLP(block.Hash()) for _, peer := range transfer { - peer.AsyncSendNewBlock(block, diff, td) + if len(diff) != 0 && peer.diffExt != nil { + peer.diffExt.AsyncSendDiffLayer([]rlp.RawValue{diff}) + } + peer.AsyncSendNewBlock(block, td) } + log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return } diff --git a/eth/protocols/diff/handler.go b/eth/protocols/diff/handler.go index b6532cf684..7b019eb7b3 100644 --- a/eth/protocols/diff/handler.go +++ b/eth/protocols/diff/handler.go @@ -59,6 +59,7 @@ func MakeProtocols(backend Backend, dnsdisc enode.Iterator) []p2p.Protocol { Length: protocolLengths[version], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { return backend.RunPeer(newPeer(version, p, rw), func(peer *Peer) error { + defer peer.Close() return handle(backend, peer) }) }, diff --git a/eth/protocols/diff/peer.go b/eth/protocols/diff/peer.go index c599fadef9..f0c4952e65 100644 --- a/eth/protocols/diff/peer.go +++ b/eth/protocols/diff/peer.go @@ -6,30 +6,53 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" ) +const maxQueuedDiffLayers = 12 + // Peer is a collection of relevant information we have about a `diff` peer. type Peer struct { - id string // Unique ID for the peer, cached - lightSync bool // whether the peer can light sync + id string // Unique ID for the peer, cached + lightSync bool // whether the peer can light sync + queuedDiffLayers chan []rlp.RawValue // Queue of diff layers to broadcast to the peer *p2p.Peer // The embedded P2P package peer rw p2p.MsgReadWriter // Input/output streams for diff version uint // Protocol version negotiated logger log.Logger // Contextual logger with the peer id injected + term chan struct{} // Termination channel to stop the broadcasters } // newPeer create a wrapper for a network connection and negotiated protocol // version. func newPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) *Peer { id := p.ID().String() - return &Peer{ - id: id, - Peer: p, - rw: rw, - lightSync: false, - version: version, - logger: log.New("peer", id[:8]), + peer := &Peer{ + id: id, + Peer: p, + rw: rw, + lightSync: false, + version: version, + logger: log.New("peer", id[:8]), + queuedDiffLayers: make(chan []rlp.RawValue, maxQueuedDiffLayers), + term: make(chan struct{}), + } + go peer.broadcastDiffLayers() + return peer +} + +func (p *Peer) broadcastDiffLayers() { + for { + select { + case prop := <-p.queuedDiffLayers: + if err := p.SendDiffLayers(prop); err != nil { + p.Log().Error("Failed to propagated diff layer", "err", err) + return + } + case <-p.term: + return + } } } @@ -52,6 +75,13 @@ func (p *Peer) Log() log.Logger { return p.logger } +// Close signals the broadcast goroutine to terminate. Only ever call this if +// you created the peer yourself via NewPeer. Otherwise let whoever created it +// clean it up! +func (p *Peer) Close() { + close(p.term) +} + // RequestDiffLayers fetches a batch of diff layers corresponding to the hashes // specified. func (p *Peer) RequestDiffLayers(hashes []common.Hash) error { @@ -63,3 +93,15 @@ func (p *Peer) RequestDiffLayers(hashes []common.Hash) error { BlockHashes: hashes, }) } + +func (p *Peer) SendDiffLayers(diffs []rlp.RawValue) error { + return p2p.Send(p.rw, DiffLayerMsg, diffs) +} + +func (p *Peer) AsyncSendDiffLayer(diffLayers []rlp.RawValue) { + select { + case p.queuedDiffLayers <- diffLayers: + default: + p.Log().Debug("Dropping diff layers propagation") + } +} diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index a9cf525161..e0ee2a1cfa 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -19,11 +19,6 @@ package eth import ( "math/big" - "github.com/ethereum/go-ethereum/eth/protocols/diff" - "github.com/ethereum/go-ethereum/p2p" - - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/types" @@ -38,9 +33,8 @@ const ( // blockPropagation is a block propagation event, waiting for its turn in the // broadcast queue. type blockPropagation struct { - block *types.Block - diffLayer rlp.RawValue - td *big.Int + block *types.Block + td *big.Int } // broadcastBlocks is a write loop that multiplexes blocks and block accouncements @@ -53,9 +47,6 @@ func (p *Peer) broadcastBlocks() { if err := p.SendNewBlock(prop.block, prop.td); err != nil { return } - if len(prop.diffLayer) != 0 { - p2p.Send(p.rw, diff.DiffLayerMsg, &diff.DiffLayersPacket{prop.diffLayer}) - } p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) case block := <-p.queuedBlockAnns: diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 3199fc809b..f09760a2ec 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -337,9 +337,9 @@ func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error { // AsyncSendNewBlock queues an entire block for propagation to a remote peer. If // the peer's broadcast queue is full, the event is silently dropped. -func (p *Peer) AsyncSendNewBlock(block *types.Block, diff rlp.RawValue, td *big.Int) { +func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { select { - case p.queuedBlocks <- &blockPropagation{block: block, diffLayer: diff, td: td}: + case p.queuedBlocks <- &blockPropagation{block: block, td: td}: // Mark all the block hash as known, but ensure we don't overflow our limits for p.knownBlocks.Cardinality() >= maxKnownBlocks { p.knownBlocks.Pop()