Skip to content

Commit

Permalink
verify task get difflayer cache synchronously
Browse files Browse the repository at this point in the history
Signed-off-by: kyrie-yl <yl.on.the.way@gmail.com>
  • Loading branch information
kyrie-yl committed Apr 13, 2022
1 parent 7b070c0 commit 1aaab76
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 30 deletions.
9 changes: 0 additions & 9 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)
Expand All @@ -49,7 +48,6 @@ func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engin
bc: blockchain,
}
if mode.NeedRemoteVerify() {
log.Info("this node is a fast node with remote state verifier.")
validator.remoteValidator = NewVerifyManager(blockchain, peers, mode == InsecureVerify)
go validator.remoteValidator.mainLoop()
}
Expand Down Expand Up @@ -92,13 +90,6 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
}
return nil
},
// for fast node which verify trie from remote verify peers, a block's H-11 ancestor should have been verify.
func() error {
if v.remoteValidator != nil && !v.remoteValidator.AncestorVerified(header) {
return fmt.Errorf("block's ancessor %x has not been verified", block.Hash())
}
return nil
},
}
validateRes := make(chan error, len(validateFuns))
for _, f := range validateFuns {
Expand Down
29 changes: 28 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type BlockChain struct {
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
chainBlockFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
scope event.SubscriptionScope
Expand All @@ -226,6 +227,7 @@ type BlockChain struct {
// trusted diff layers
diffLayerCache *lru.Cache // Cache for the diffLayers
diffLayerRLPCache *lru.Cache // Cache for the rlp encoded diffLayers
diffLayerChanCache *lru.Cache // Cache for
diffQueue *prque.Prque // A Priority queue to store recent diff layer
diffQueueBuffer chan *types.DiffLayer
diffLayerFreezerBlockLimit uint64
Expand Down Expand Up @@ -277,6 +279,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
futureBlocks, _ := lru.New(maxFutureBlocks)
diffLayerCache, _ := lru.New(diffLayerCacheLimit)
diffLayerRLPCache, _ := lru.New(diffLayerRLPCacheLimit)
diffLayerChanCache, _ := lru.New(diffLayerCacheLimit)

bc := &BlockChain{
chainConfig: chainConfig,
Expand All @@ -299,6 +302,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
badBlockCache: badBlockCache,
diffLayerCache: diffLayerCache,
diffLayerRLPCache: diffLayerRLPCache,
diffLayerChanCache: diffLayerChanCache,
txLookupCache: txLookupCache,
futureBlocks: futureBlocks,
engine: engine,
Expand Down Expand Up @@ -520,7 +524,13 @@ func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer, sorted bool) {
if bc.diffLayerCache.Len() >= diffLayerCacheLimit {
bc.diffLayerCache.RemoveOldest()
}

//json.MarshalIndent()
bc.diffLayerCache.Add(diffLayer.BlockHash, diffLayer)
if cached, ok := bc.diffLayerChanCache.Get(diffLayer.BlockHash); ok {
diffLayerCh := cached.(chan struct{})
close(diffLayerCh)
}
if bc.db.DiffStore() != nil {
// push to priority queue before persisting
bc.diffQueueBuffer <- diffLayer
Expand Down Expand Up @@ -1816,6 +1826,9 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
diffLayer.BlockHash = block.Hash()
diffLayer.Number = block.NumberU64()

diffLayerCh := make(chan struct{})
bc.diffLayerChanCache.Add(diffLayer.BlockHash, diffLayerCh)

go bc.cacheDiffLayer(diffLayer, false)
}

Expand Down Expand Up @@ -2072,6 +2085,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
}()

for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
if bc.validator.RemoteVerifyManager() != nil {
for !bc.Validator().RemoteVerifyManager().AncestorVerified(block.Header()) {
if bc.insertStopped() {
break
}
log.Info("block ancestor has not been verified", "number", block.Number(), "hash", block.Hash())
time.Sleep(100 * time.Millisecond)
}
}
// If the chain is terminating, stop processing blocks
if bc.insertStopped() {
log.Debug("Abort during block processing")
Expand Down Expand Up @@ -2231,6 +2253,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
stats.processed++
stats.usedGas += usedGas

bc.chainBlockFeed.Send(ChainHeadEvent{block})
dirty, _ := bc.stateCache.TrieDB().Size()
stats.report(chain, it.index, dirty)
}
Expand Down Expand Up @@ -3101,6 +3124,10 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
}

func (bc *BlockChain) SubscribeChainBlockEvent(ch chan<- ChainHeadEvent) event.Subscription {
return bc.scope.Track(bc.chainBlockFeed.Subscribe(ch))
}

// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
Expand Down Expand Up @@ -3278,7 +3305,7 @@ func CalculateDiffHash(d *types.DiffLayer) (common.Hash, error) {
BlockHash: d.BlockHash,
Receipts: make([]*types.ReceiptForStorage, 0),
Number: d.Number,
Codes: d.Codes,
//Codes: d.Codes,
Destructs: d.Destructs,
Accounts: d.Accounts,
Storages: d.Storages,
Expand Down
56 changes: 36 additions & 20 deletions core/remote_state_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type remoteVerifyManager struct {
allowInsecure bool

// Subscription
chainHeadCh chan ChainHeadEvent
chainBlockCh chan ChainHeadEvent
chainHeadSub event.Subscription

// Channels
Expand All @@ -62,11 +62,11 @@ func NewVerifyManager(blockchain *BlockChain, peers verifyPeers, allowInsecure b
verifiedCache: verifiedCache,
allowInsecure: allowInsecure,

chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
chainBlockCh: make(chan ChainHeadEvent, chainHeadChanSize),
verifyCh: make(chan common.Hash, maxForkHeight),
messageCh: make(chan verifyMessage),
}
vm.chainHeadSub = blockchain.SubscribeChainHeadEvent(vm.chainHeadCh)
vm.chainHeadSub = blockchain.SubscribeChainBlockEvent(vm.chainBlockCh)
return vm
}

Expand All @@ -81,7 +81,7 @@ func (vm *remoteVerifyManager) mainLoop() {
defer pruneTicker.Stop()
for {
select {
case h := <-vm.chainHeadCh:
case h := <-vm.chainBlockCh:
vm.NewBlockVerifyTask(h.Block.Header())
case hash := <-vm.verifyCh:
vm.cacheBlockVerified(hash)
Expand Down Expand Up @@ -121,6 +121,11 @@ func (vm *remoteVerifyManager) mainLoop() {

func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) {
for i := 0; header != nil && i <= maxForkHeight; i++ {
// if is genesis block, mark it as verified and break.
if header.Number.Uint64() == 0 {
vm.cacheBlockVerified(header.Hash())
break
}
func(hash common.Hash) {
// if verified cache record that this block has been verified, skip.
if _, ok := vm.verifiedCache.Get(hash); ok {
Expand All @@ -130,17 +135,32 @@ func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) {
if _, ok := vm.tasks[hash]; ok {
return
}
diffLayer := vm.bc.GetTrustedDiffLayer(hash)

if header.TxHash == types.EmptyRootHash {
log.Debug("this is an empty block:", "block", hash, "number", header.Number)
vm.cacheBlockVerified(hash)
return
}

var diffLayer *types.DiffLayer
if cached, ok := vm.bc.diffLayerChanCache.Get(hash); ok {
diffLayerCh := cached.(chan struct{})
<-diffLayerCh
vm.bc.diffLayerChanCache.Remove(hash)
diffLayer = vm.bc.GetTrustedDiffLayer(hash)
}
// if this block has no diff, there is no need to verify it.
var err error
if diffLayer == nil {
if diffLayer, err = vm.bc.GenerateDiffLayer(hash); err != nil {
log.Error("failed to get diff layer", "block", hash, "number", header.Number, "error", err)
return
} else if diffLayer == nil {
log.Info("this is an empty block:", "block", hash, "number", header.Number)
return
}
log.Info("block's trusted diffLayer is nil", "hash", hash, "number", header.Number)
//if diffLayer, err = vm.bc.GenerateDiffLayer(hash); err != nil {
// log.Error("failed to get diff layer", "block", hash, "number", header.Number, "error", err)
// return
//} else if diffLayer == nil {
// log.Info("this is an empty block:", "block", hash, "number", header.Number)
// vm.cacheBlockVerified(hash)
// return
//}
}
diffHash, err := CalculateDiffHash(diffLayer)
if err != nil {
Expand Down Expand Up @@ -170,11 +190,7 @@ func (vm *remoteVerifyManager) AncestorVerified(header *types.Header) bool {
if header == nil {
return true
}
// check whether H-11 block is a empty block.
if header.TxHash == types.EmptyRootHash {
parent := vm.bc.GetHeaderByHash(header.ParentHash)
return parent == nil || header.Root == parent.Root
}

hash := header.Hash()
_, exist := vm.verifiedCache.Get(hash)
return exist
Expand Down Expand Up @@ -203,7 +219,7 @@ type verifyTask struct {
candidatePeers verifyPeers
badPeers map[string]struct{}
startAt time.Time
allowInsecure bool
allowInsecure bool

messageCh chan verifyMessage
terminalCh chan struct{}
Expand Down Expand Up @@ -236,13 +252,13 @@ func (vt *verifyTask) Start(verifyCh chan common.Hash) {
case types.StatusFullVerified:
vt.compareRootHashAndMark(msg, verifyCh)
case types.StatusPartiallyVerified:
log.Warn("block %s , num= %s is insecure verified", msg.verifyResult.BlockHash, msg.verifyResult.BlockNumber)
log.Warn("block is insecure verified", "hash", msg.verifyResult.BlockHash, "number", msg.verifyResult.BlockNumber)
if vt.allowInsecure {
vt.compareRootHashAndMark(msg, verifyCh)
}
case types.StatusDiffHashMismatch, types.StatusImpossibleFork, types.StatusUnexpectedError:
vt.badPeers[msg.peerId] = struct{}{}
log.Info("peer %s is not available: code %d, msg %s,", msg.peerId, msg.verifyResult.Status.Code, msg.verifyResult.Status.Msg)
log.Info("peer is not available", "hash", msg.verifyResult.BlockHash, "number", msg.verifyResult.BlockNumber, "peer", msg.peerId, "reason", msg.verifyResult.Status.Msg)
case types.StatusBlockTooNew, types.StatusBlockNewer, types.StatusPossibleFork:
log.Info("return msg from peer %s for block %s is %s", msg.peerId, msg.verifyResult.BlockHash, msg.verifyResult.Status.Msg)
}
Expand Down

0 comments on commit 1aaab76

Please sign in to comment.