From 88ba737a46163330cfce1fa0072227ffc6450b13 Mon Sep 17 00:00:00 2001 From: yutianwu Date: Thu, 23 Sep 2021 16:05:09 +0800 Subject: [PATCH] add block proccess backoff time when validator is not in turn --- consensus/consensus.go | 3 +++ consensus/parlia/parlia.go | 22 ++++++++++++++++++++++ core/blockchain.go | 28 +++++++++++++++++++++++++--- core/chain_makers.go | 1 + core/headerchain.go | 7 ++++++- light/lightchain.go | 7 ++++++- 6 files changed, 63 insertions(+), 5 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index fc161390f9..a9e79359f9 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -49,6 +49,9 @@ type ChainHeaderReader interface { // GetHeaderByHash retrieves a block header from the database by its hash. GetHeaderByHash(hash common.Hash) *types.Header + + // GetHighestVerifiedHeader retrieves the highest header verified. + GetHighestVerifiedHeader() *types.Header } // ChainReader defines a small collection of methods needed to access the local diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 14304fe2bc..004f872716 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -56,6 +56,7 @@ const ( validatorBytesLength = common.AddressLength wiggleTime = uint64(1) // second, Random delay (per signer) to allow concurrent signers initialBackOffTime = uint64(1) // second + processBackOffTime = uint64(1) // second systemRewardPercent = 4 // it means 1/2^4 = 1/16 percentage of gas fee incoming will be distributed to system @@ -863,6 +864,14 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res return case <-time.After(delay): } + if p.shouldWaitForCurrentBlockProcess(chain, header, snap) { + log.Info("not in turn and received current block, wait for current block process") + select { + case <-stop: + return + case <-time.After(time.Duration(processBackOffTime) * time.Second): + } + } select { case results <- block.WithSeal(header): @@ -874,6 +883,19 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res return nil } +func (p *Parlia) shouldWaitForCurrentBlockProcess(chain consensus.ChainHeaderReader, header *types.Header, snap *Snapshot) bool { + highestVerifiedHeader := chain.GetHighestVerifiedHeader() + if highestVerifiedHeader == nil || highestVerifiedHeader.Number == nil { + return false + } + + if !snap.inturn(p.val) && header.Number.Cmp(highestVerifiedHeader.Number) == 0 { + return true + } + + return false +} + func (p *Parlia) EnoughDistance(chain consensus.ChainReader, header *types.Header) bool { snap, err := p.snapshot(chain, header.Number.Uint64()-1, header.ParentHash, nil) if err != nil { diff --git a/core/blockchain.go b/core/blockchain.go index be0b0f04ae..d4ebbbfcbc 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -28,6 +28,8 @@ import ( "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" @@ -44,7 +46,6 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" - lru "github.com/hashicorp/golang-lru" ) var ( @@ -185,8 +186,9 @@ type BlockChain struct { chainmu sync.RWMutex // blockchain insertion lock - currentBlock atomic.Value // Current head of the block chain - currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) + currentBlock atomic.Value // Current head of the block chain + currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) + highestVerifiedHeader atomic.Value stateCache state.Database // State database to reuse between imports (contains state cache) bodyCache *lru.Cache // Cache for the most recent block bodies @@ -266,6 +268,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bc.currentBlock.Store(nilBlock) bc.currentFastBlock.Store(nilBlock) + var nilHeader *types.Header + bc.highestVerifiedHeader.Store(nilHeader) + // Initialize the chain with ancient data if it isn't empty. var txIndexBlock uint64 @@ -1883,6 +1888,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er if err != nil { return it.index, err } + bc.updateHighestVerifiedHeader(block.Header()) + // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") activeState = statedb @@ -1989,6 +1996,21 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er return it.index, err } +func (bc *BlockChain) updateHighestVerifiedHeader(header *types.Header) { + if header == nil || header.Number == nil { + return + } + currentHeader := bc.highestVerifiedHeader.Load().(*types.Header) + if currentHeader == nil || currentHeader.Number == nil || currentHeader.Number.Cmp(header.Number) < 0 { + bc.highestVerifiedHeader.Store(types.CopyHeader(header)) + return + } +} + +func (bc *BlockChain) GetHighestVerifiedHeader() *types.Header { + return bc.highestVerifiedHeader.Load().(*types.Header) +} + // insertSideChain is called when an import batch hits upon a pruned ancestor // error, which happens when a sidechain with a sufficiently old fork-block is // found. diff --git a/core/chain_makers.go b/core/chain_makers.go index 6cb74d51be..7f62e41ad8 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -303,3 +303,4 @@ func (cr *fakeChainReader) GetHeaderByNumber(number uint64) *types.Header func (cr *fakeChainReader) GetHeaderByHash(hash common.Hash) *types.Header { return nil } func (cr *fakeChainReader) GetHeader(hash common.Hash, number uint64) *types.Header { return nil } func (cr *fakeChainReader) GetBlock(hash common.Hash, number uint64) *types.Block { return nil } +func (cr *fakeChainReader) GetHighestVerifiedHeader() *types.Header { return nil } diff --git a/core/headerchain.go b/core/headerchain.go index 1dbf958786..fe4770a469 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/rawdb" @@ -33,7 +35,6 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - lru "github.com/hashicorp/golang-lru" ) const ( @@ -413,6 +414,10 @@ func (hc *HeaderChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []co return chain } +func (hc *HeaderChain) GetHighestVerifiedHeader() *types.Header { + return nil +} + // GetAncestor retrieves the Nth ancestor of a given block. It assumes that either the given block or // a close ancestor of it is canonical. maxNonCanonical points to a downwards counter limiting the // number of blocks to be individually checked before we reach the canonical chain. diff --git a/light/lightchain.go b/light/lightchain.go index ca6fbfac49..99644355ac 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" @@ -37,7 +39,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" - lru "github.com/hashicorp/golang-lru" ) var ( @@ -148,6 +149,10 @@ func (lc *LightChain) HeaderChain() *core.HeaderChain { return lc.hc } +func (lc *LightChain) GetHighestVerifiedHeader() *types.Header { + return nil +} + // loadLastState loads the last known chain state from the database. This method // assumes that the chain manager mutex is held. func (lc *LightChain) loadLastState() error {