From 1aaab766e369802586400ecaa41b55acea3232fa Mon Sep 17 00:00:00 2001 From: kyrie-yl Date: Thu, 7 Apr 2022 00:21:26 +0800 Subject: [PATCH] verify task get difflayer cache synchronously Signed-off-by: kyrie-yl --- core/block_validator.go | 9 ------ core/blockchain.go | 29 +++++++++++++++++- core/remote_state_verifier.go | 56 ++++++++++++++++++++++------------- 3 files changed, 64 insertions(+), 30 deletions(-) diff --git a/core/block_validator.go b/core/block_validator.go index cf956dccbb..29ee2ec348 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -23,7 +23,6 @@ import ( "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" ) @@ -49,7 +48,6 @@ func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engin bc: blockchain, } if mode.NeedRemoteVerify() { - log.Info("this node is a fast node with remote state verifier.") validator.remoteValidator = NewVerifyManager(blockchain, peers, mode == InsecureVerify) go validator.remoteValidator.mainLoop() } @@ -92,13 +90,6 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { } return nil }, - // for fast node which verify trie from remote verify peers, a block's H-11 ancestor should have been verify. - func() error { - if v.remoteValidator != nil && !v.remoteValidator.AncestorVerified(header) { - return fmt.Errorf("block's ancessor %x has not been verified", block.Hash()) - } - return nil - }, } validateRes := make(chan error, len(validateFuns)) for _, f := range validateFuns { diff --git a/core/blockchain.go b/core/blockchain.go index ffcfe444b6..0776a2d637 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -203,6 +203,7 @@ type BlockChain struct { chainFeed event.Feed chainSideFeed event.Feed chainHeadFeed event.Feed + chainBlockFeed event.Feed logsFeed event.Feed blockProcFeed event.Feed scope event.SubscriptionScope @@ -226,6 +227,7 @@ type BlockChain struct { // trusted diff layers diffLayerCache *lru.Cache // Cache for the diffLayers diffLayerRLPCache *lru.Cache // Cache for the rlp encoded diffLayers + diffLayerChanCache *lru.Cache // Cache for diffQueue *prque.Prque // A Priority queue to store recent diff layer diffQueueBuffer chan *types.DiffLayer diffLayerFreezerBlockLimit uint64 @@ -277,6 +279,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par futureBlocks, _ := lru.New(maxFutureBlocks) diffLayerCache, _ := lru.New(diffLayerCacheLimit) diffLayerRLPCache, _ := lru.New(diffLayerRLPCacheLimit) + diffLayerChanCache, _ := lru.New(diffLayerCacheLimit) bc := &BlockChain{ chainConfig: chainConfig, @@ -299,6 +302,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par badBlockCache: badBlockCache, diffLayerCache: diffLayerCache, diffLayerRLPCache: diffLayerRLPCache, + diffLayerChanCache: diffLayerChanCache, txLookupCache: txLookupCache, futureBlocks: futureBlocks, engine: engine, @@ -520,7 +524,13 @@ func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer, sorted bool) { if bc.diffLayerCache.Len() >= diffLayerCacheLimit { bc.diffLayerCache.RemoveOldest() } + + //json.MarshalIndent() bc.diffLayerCache.Add(diffLayer.BlockHash, diffLayer) + if cached, ok := bc.diffLayerChanCache.Get(diffLayer.BlockHash); ok { + diffLayerCh := cached.(chan struct{}) + close(diffLayerCh) + } if bc.db.DiffStore() != nil { // push to priority queue before persisting bc.diffQueueBuffer <- diffLayer @@ -1816,6 +1826,9 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. diffLayer.BlockHash = block.Hash() diffLayer.Number = block.NumberU64() + diffLayerCh := make(chan struct{}) + bc.diffLayerChanCache.Add(diffLayer.BlockHash, diffLayerCh) + go bc.cacheDiffLayer(diffLayer, false) } @@ -2072,6 +2085,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er }() for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() { + if bc.validator.RemoteVerifyManager() != nil { + for !bc.Validator().RemoteVerifyManager().AncestorVerified(block.Header()) { + if bc.insertStopped() { + break + } + log.Info("block ancestor has not been verified", "number", block.Number(), "hash", block.Hash()) + time.Sleep(100 * time.Millisecond) + } + } // If the chain is terminating, stop processing blocks if bc.insertStopped() { log.Debug("Abort during block processing") @@ -2231,6 +2253,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er stats.processed++ stats.usedGas += usedGas + bc.chainBlockFeed.Send(ChainHeadEvent{block}) dirty, _ := bc.stateCache.TrieDB().Size() stats.report(chain, it.index, dirty) } @@ -3101,6 +3124,10 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) } +func (bc *BlockChain) SubscribeChainBlockEvent(ch chan<- ChainHeadEvent) event.Subscription { + return bc.scope.Track(bc.chainBlockFeed.Subscribe(ch)) +} + // SubscribeChainSideEvent registers a subscription of ChainSideEvent. func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { return bc.scope.Track(bc.chainSideFeed.Subscribe(ch)) @@ -3278,7 +3305,7 @@ func CalculateDiffHash(d *types.DiffLayer) (common.Hash, error) { BlockHash: d.BlockHash, Receipts: make([]*types.ReceiptForStorage, 0), Number: d.Number, - Codes: d.Codes, + //Codes: d.Codes, Destructs: d.Destructs, Accounts: d.Accounts, Storages: d.Storages, diff --git a/core/remote_state_verifier.go b/core/remote_state_verifier.go index a61bb79944..3481113112 100644 --- a/core/remote_state_verifier.go +++ b/core/remote_state_verifier.go @@ -45,7 +45,7 @@ type remoteVerifyManager struct { allowInsecure bool // Subscription - chainHeadCh chan ChainHeadEvent + chainBlockCh chan ChainHeadEvent chainHeadSub event.Subscription // Channels @@ -62,11 +62,11 @@ func NewVerifyManager(blockchain *BlockChain, peers verifyPeers, allowInsecure b verifiedCache: verifiedCache, allowInsecure: allowInsecure, - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + chainBlockCh: make(chan ChainHeadEvent, chainHeadChanSize), verifyCh: make(chan common.Hash, maxForkHeight), messageCh: make(chan verifyMessage), } - vm.chainHeadSub = blockchain.SubscribeChainHeadEvent(vm.chainHeadCh) + vm.chainHeadSub = blockchain.SubscribeChainBlockEvent(vm.chainBlockCh) return vm } @@ -81,7 +81,7 @@ func (vm *remoteVerifyManager) mainLoop() { defer pruneTicker.Stop() for { select { - case h := <-vm.chainHeadCh: + case h := <-vm.chainBlockCh: vm.NewBlockVerifyTask(h.Block.Header()) case hash := <-vm.verifyCh: vm.cacheBlockVerified(hash) @@ -121,6 +121,11 @@ func (vm *remoteVerifyManager) mainLoop() { func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) { for i := 0; header != nil && i <= maxForkHeight; i++ { + // if is genesis block, mark it as verified and break. + if header.Number.Uint64() == 0 { + vm.cacheBlockVerified(header.Hash()) + break + } func(hash common.Hash) { // if verified cache record that this block has been verified, skip. if _, ok := vm.verifiedCache.Get(hash); ok { @@ -130,17 +135,32 @@ func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) { if _, ok := vm.tasks[hash]; ok { return } - diffLayer := vm.bc.GetTrustedDiffLayer(hash) + + if header.TxHash == types.EmptyRootHash { + log.Debug("this is an empty block:", "block", hash, "number", header.Number) + vm.cacheBlockVerified(hash) + return + } + + var diffLayer *types.DiffLayer + if cached, ok := vm.bc.diffLayerChanCache.Get(hash); ok { + diffLayerCh := cached.(chan struct{}) + <-diffLayerCh + vm.bc.diffLayerChanCache.Remove(hash) + diffLayer = vm.bc.GetTrustedDiffLayer(hash) + } // if this block has no diff, there is no need to verify it. var err error if diffLayer == nil { - if diffLayer, err = vm.bc.GenerateDiffLayer(hash); err != nil { - log.Error("failed to get diff layer", "block", hash, "number", header.Number, "error", err) - return - } else if diffLayer == nil { - log.Info("this is an empty block:", "block", hash, "number", header.Number) - return - } + log.Info("block's trusted diffLayer is nil", "hash", hash, "number", header.Number) + //if diffLayer, err = vm.bc.GenerateDiffLayer(hash); err != nil { + // log.Error("failed to get diff layer", "block", hash, "number", header.Number, "error", err) + // return + //} else if diffLayer == nil { + // log.Info("this is an empty block:", "block", hash, "number", header.Number) + // vm.cacheBlockVerified(hash) + // return + //} } diffHash, err := CalculateDiffHash(diffLayer) if err != nil { @@ -170,11 +190,7 @@ func (vm *remoteVerifyManager) AncestorVerified(header *types.Header) bool { if header == nil { return true } - // check whether H-11 block is a empty block. - if header.TxHash == types.EmptyRootHash { - parent := vm.bc.GetHeaderByHash(header.ParentHash) - return parent == nil || header.Root == parent.Root - } + hash := header.Hash() _, exist := vm.verifiedCache.Get(hash) return exist @@ -203,7 +219,7 @@ type verifyTask struct { candidatePeers verifyPeers badPeers map[string]struct{} startAt time.Time - allowInsecure bool + allowInsecure bool messageCh chan verifyMessage terminalCh chan struct{} @@ -236,13 +252,13 @@ func (vt *verifyTask) Start(verifyCh chan common.Hash) { case types.StatusFullVerified: vt.compareRootHashAndMark(msg, verifyCh) case types.StatusPartiallyVerified: - log.Warn("block %s , num= %s is insecure verified", msg.verifyResult.BlockHash, msg.verifyResult.BlockNumber) + log.Warn("block is insecure verified", "hash", msg.verifyResult.BlockHash, "number", msg.verifyResult.BlockNumber) if vt.allowInsecure { vt.compareRootHashAndMark(msg, verifyCh) } case types.StatusDiffHashMismatch, types.StatusImpossibleFork, types.StatusUnexpectedError: vt.badPeers[msg.peerId] = struct{}{} - log.Info("peer %s is not available: code %d, msg %s,", msg.peerId, msg.verifyResult.Status.Code, msg.verifyResult.Status.Msg) + log.Info("peer is not available", "hash", msg.verifyResult.BlockHash, "number", msg.verifyResult.BlockNumber, "peer", msg.peerId, "reason", msg.verifyResult.Status.Msg) case types.StatusBlockTooNew, types.StatusBlockNewer, types.StatusPossibleFork: log.Info("return msg from peer %s for block %s is %s", msg.peerId, msg.verifyResult.BlockHash, msg.verifyResult.Status.Msg) }