Skip to content

Commit

Permalink
handle difflayer and refine light processor
Browse files Browse the repository at this point in the history
  • Loading branch information
unclezoro committed Aug 26, 2021
1 parent 585c8b8 commit fb7f79c
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 50 deletions.
3 changes: 0 additions & 3 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package core

import (
"encoding/json"
"fmt"
"os"

"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
Expand Down Expand Up @@ -133,7 +131,6 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
},
func() error {
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
statedb.IterativeDump(true, true, true, json.NewEncoder(os.Stdout))
return fmt.Errorf("invalid merkle root (remote: %x local: %x)", header.Root, root)
} else {
return nil
Expand Down
246 changes: 220 additions & 26 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ const (
diffLayerfreezerRecheckInterval = 3 * time.Second
diffLayerfreezerBlockLimit = 864000 // The number of blocks that should be kept in disk.

diffLayerPruneRecheckInterval = 3 * time.Second

maxQueueDist = 64 // Maximum allowed distance from the chain head to queue
blockLimit = 128 // Maximum number of unique diff layers a peer may have delivered
maxUncleDist = 11 // Maximum allowed backward distance from the chain head

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
// Changelog:
Expand Down Expand Up @@ -135,6 +141,10 @@ type CacheConfig struct {
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
}

type PeerIDer interface {
ID() string
}

// defaultCacheConfig are the default caching values if none are specified by the
// user (also used during testing).
var defaultCacheConfig = &CacheConfig{
Expand All @@ -146,6 +156,8 @@ var defaultCacheConfig = &CacheConfig{
SnapshotWait: true,
}

type BlockChainOption func(*BlockChain) *BlockChain

// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
Expand Down Expand Up @@ -203,6 +215,14 @@ type BlockChain struct {
diffLayerRLPCache *lru.Cache // Cache for the rlp encoded diffLayers
diffQueue *prque.Prque // A Priority queue to store recent diff layer

// untrusted diff layers
diffMutex sync.RWMutex
receivedDiffLayers map[common.Hash]map[common.Hash]*types.DiffLayer //map[blockHash] map[DiffHash]Diff
diffHashToBlockHash map[common.Hash]common.Hash // map[diffHash]blockhash
diffHashToPeers map[common.Hash]map[string]struct{} // map[diffHash]map[pid]
diffNumToBlockHashes map[uint64]map[common.Hash]struct{} // map[number]map[blockHash]
diffPeersToDiffHashes map[string]map[common.Hash]struct{} // map[pid]map[diffHash]

quit chan struct{} // blockchain quit channel
wg sync.WaitGroup // chain processing wait group for shutting down
running int32 // 0 if chain is running, 1 when stopped
Expand All @@ -220,7 +240,7 @@ type BlockChain struct {
// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialises the default Ethereum Validator and
// Processor.
func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, txLookupLimit *uint64) (*BlockChain, error) {
func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool, txLookupLimit *uint64, options ...BlockChainOption) (*BlockChain, error) {
if cacheConfig == nil {
cacheConfig = defaultCacheConfig
}
Expand All @@ -246,20 +266,25 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
Journal: cacheConfig.TrieCleanJournal,
Preimages: cacheConfig.Preimages,
}),
triesInMemory: cacheConfig.TriesInMemory,
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
diffLayerCache: diffLayerCache,
diffLayerRLPCache: diffLayerRLPCache,
txLookupCache: txLookupCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
diffQueue: prque.New(nil),
triesInMemory: cacheConfig.TriesInMemory,
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
diffLayerCache: diffLayerCache,
diffLayerRLPCache: diffLayerRLPCache,
txLookupCache: txLookupCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
diffQueue: prque.New(nil),
receivedDiffLayers: make(map[common.Hash]map[common.Hash]*types.DiffLayer),
diffHashToBlockHash: make(map[common.Hash]common.Hash),
diffHashToPeers: make(map[common.Hash]map[string]struct{}),
diffNumToBlockHashes: make(map[uint64]map[common.Hash]struct{}),
diffPeersToDiffHashes: make(map[string]map[common.Hash]struct{}),
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)
Expand Down Expand Up @@ -387,6 +412,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
bc.snaps, _ = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, int(bc.cacheConfig.TriesInMemory), head.Root(), !bc.cacheConfig.SnapshotWait, true, recover)
}
// do options before start any routine
for _, option := range options {
bc = option(bc)
}

// Take ownership of this particular state
go bc.update()
if txLookupLimit != nil {
Expand All @@ -412,6 +442,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
if bc.db.DiffStore() != nil {
go bc.diffLayerFreeze()
}
go bc.pruneDiffLoop()

return bc, nil
}

Expand All @@ -420,19 +452,19 @@ func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
}

func (bc *BlockChain) CacheReceipts(hash common.Hash, receipts types.Receipts) {
func (bc *BlockChain) cacheReceipts(hash common.Hash, receipts types.Receipts) {
bc.receiptsCache.Add(hash, receipts)
}

func (bc *BlockChain) CacheDiffLayer(hash common.Hash, num uint64, diffLayer *types.DiffLayer) {
bc.diffLayerCache.Add(hash, diffLayer)
func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer) {
bc.diffLayerCache.Add(diffLayer.Hash, diffLayer)
if bc.db.DiffStore() != nil {
// push to priority queue before persisting
bc.diffQueue.Push(diffLayer, -(int64(num)))
bc.diffQueue.Push(diffLayer, -(int64(diffLayer.Number)))
}
}

func (bc *BlockChain) CacheBlock(hash common.Hash, block *types.Block) {
func (bc *BlockChain) cacheBlock(hash common.Hash, block *types.Block) {
bc.blockCache.Add(hash, block)
}

Expand Down Expand Up @@ -1568,7 +1600,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
diffLayer.Receipts = receipts
diffLayer.StateRoot = root
diffLayer.Hash = block.Hash()
bc.CacheDiffLayer(diffLayer.Hash, block.Number().Uint64(), diffLayer)
diffLayer.Number = block.NumberU64()
bc.cacheDiffLayer(diffLayer)
}
triedb := bc.stateCache.TrieDB()

Expand Down Expand Up @@ -1945,12 +1978,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
}
// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
activeState = statedb
statedb.TryPreload(block, signer)

//Process block using the parent state as reference point
substart := time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
activeState = statedb
if err != nil {
bc.reportBlock(block, receipts, err)
return it.index, err
Expand All @@ -1974,8 +2006,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
log.Error("validate state failed", "error", err)
return it.index, err
}
bc.CacheReceipts(block.Hash(), receipts)
bc.CacheBlock(block.Hash(), block)
bc.cacheReceipts(block.Hash(), receipts)
bc.cacheBlock(block.Hash(), block)
proctime := time.Since(start)

// Update the metrics touched during block validation
Expand Down Expand Up @@ -2423,6 +2455,162 @@ func (bc *BlockChain) diffLayerFreeze() {
}
}

func (bc *BlockChain) GetDiffLayer(block *types.Block) *types.DiffLayer {
bc.diffMutex.Lock()
defer bc.diffMutex.Unlock()
if diffs, exist := bc.receivedDiffLayers[block.Hash()]; exist && len(diffs) != 0 {
if len(diffs) == 1 {
for _, d := range diffs {
return d
}
} else {
// pick the one from exact same peer
if peer, ok := block.ReceivedFrom.(PeerIDer); ok {
pid := peer.ID()
if diffHashes, exist := bc.diffPeersToDiffHashes[pid]; exist {
for d1 := range diffs {
for d2 := range diffHashes {
if d1 == d2 {
return bc.receivedDiffLayers[block.Hash()][d1]
}
}
}
}
}
// Do not find overlap, do random pick
for _, d := range diffs {
return d
}
}
}
return nil
}

func (bc *BlockChain) removeDiffLayers(diffHash common.Hash) {
bc.diffMutex.Lock()
defer bc.diffMutex.Unlock()

// Untrusted peers
pids := bc.diffHashToPeers[diffHash]
invalidDiffHashes := make(map[common.Hash]struct{})
if pids != nil {
for pid := range pids {
invaliDiffHashesPeer := bc.diffPeersToDiffHashes[pid]
if invaliDiffHashesPeer != nil {
for invaliDiffHash := range invaliDiffHashesPeer {
invalidDiffHashes[invaliDiffHash] = struct{}{}
}
}
delete(bc.diffPeersToDiffHashes, pid)
}
}
for invalidDiffHash := range invalidDiffHashes {
delete(bc.diffHashToPeers, invalidDiffHash)
affectedBlockHash := bc.diffHashToBlockHash[invalidDiffHash]
if diffs, exist := bc.receivedDiffLayers[affectedBlockHash]; exist {
delete(diffs, invalidDiffHash)
if len(diffs) == 0 {
delete(bc.receivedDiffLayers, affectedBlockHash)
}
}
delete(bc.diffHashToBlockHash, invalidDiffHash)
}
}

func (bc *BlockChain) pruneDiffLoop() {
recheck := time.Tick(diffLayerPruneRecheckInterval)
for {
select {
case <-bc.quit:
return
case <-recheck:
bc.pruneDiffLayer()
}
}
}

func (bc *BlockChain) pruneDiffLayer() {
currentHeight := bc.CurrentBlock().NumberU64()
bc.diffMutex.Lock()
defer bc.diffMutex.Unlock()
sortNumbers := make([]uint64, 0, len(bc.diffNumToBlockHashes))
for number := range bc.diffNumToBlockHashes {
sortNumbers = append(sortNumbers, number)
}
sort.SliceStable(sortNumbers, func(i, j int) bool {
return sortNumbers[i] < sortNumbers[j]
})
blockHashes := make(map[common.Hash]struct{})
for _, number := range sortNumbers {
if number < currentHeight-maxUncleDist {
affecthedHashes := bc.diffNumToBlockHashes[number]
if affecthedHashes != nil {
for affecthedHash := range affecthedHashes {
blockHashes[affecthedHash] = struct{}{}
delete(bc.diffNumToBlockHashes, number)
}
}
} else {
break
}
}

for blockHash := range blockHashes {
if diffHashes, exist := bc.receivedDiffLayers[blockHash]; exist {
for diffHash := range diffHashes {
delete(bc.diffHashToBlockHash, diffHash)
delete(bc.diffHashToPeers, diffHash)
}
}
delete(bc.receivedDiffLayers, blockHash)
}

}

// Process diff layers
func (bc *BlockChain) HandleDiffLayer(diffLayer *types.DiffLayer, pid string) error {
// Basic check
currentHeight := bc.CurrentBlock().NumberU64()
if diffLayer.Number > currentHeight && diffLayer.Number-currentHeight > maxQueueDist {
return errors.New("diff layers too new from current")
}
if diffLayer.Number < currentHeight && currentHeight-diffLayer.Number > maxUncleDist {
return errors.New("diff layers too old from current")
}

bc.diffMutex.Lock()
defer bc.diffMutex.Unlock()

if len(bc.diffPeersToDiffHashes[pid]) > blockLimit {
return errors.New("too many accumulated diffLayers")
}
if _, exist := bc.diffPeersToDiffHashes[pid]; exist {
if _, alreadyHas := bc.diffPeersToDiffHashes[pid][diffLayer.DiffHash]; alreadyHas {
return nil
}
} else {
bc.diffPeersToDiffHashes[pid] = make(map[common.Hash]struct{})
}
bc.diffPeersToDiffHashes[pid][diffLayer.DiffHash] = struct{}{}
if _, exist := bc.diffNumToBlockHashes[diffLayer.Number]; !exist {
bc.diffNumToBlockHashes[diffLayer.Number] = make(map[common.Hash]struct{})
}
bc.diffNumToBlockHashes[diffLayer.Number][diffLayer.Hash] = struct{}{}

if _, exist := bc.diffHashToPeers[diffLayer.DiffHash]; !exist {
bc.diffHashToPeers[diffLayer.DiffHash] = make(map[string]struct{})
}
bc.diffHashToPeers[diffLayer.DiffHash][pid] = struct{}{}

if _, exist := bc.receivedDiffLayers[diffLayer.Hash]; !exist {
bc.receivedDiffLayers[diffLayer.Hash] = make(map[common.Hash]*types.DiffLayer)
}
bc.receivedDiffLayers[diffLayer.Hash][diffLayer.DiffHash] = diffLayer
bc.diffHashToBlockHash[diffLayer.DiffHash] = diffLayer.Hash

return nil
}

// maintainTxIndex is responsible for the construction and deletion of the
// transaction index.
//
Expand Down Expand Up @@ -2672,3 +2860,9 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
}

//Options
func EnableLightProcessor(bc *BlockChain) *BlockChain {
bc.processor = NewLightStateProcessor(bc.Config(), bc, bc.engine)
return bc
}
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
if err != nil {
return err
}
receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{})
statedb, receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{})
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand Down
Loading

0 comments on commit fb7f79c

Please sign in to comment.