From 88ba737a46163330cfce1fa0072227ffc6450b13 Mon Sep 17 00:00:00 2001 From: yutianwu Date: Thu, 23 Sep 2021 16:05:09 +0800 Subject: [PATCH 1/6] add block proccess backoff time when validator is not in turn --- consensus/consensus.go | 3 +++ consensus/parlia/parlia.go | 22 ++++++++++++++++++++++ core/blockchain.go | 28 +++++++++++++++++++++++++--- core/chain_makers.go | 1 + core/headerchain.go | 7 ++++++- light/lightchain.go | 7 ++++++- 6 files changed, 63 insertions(+), 5 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index fc161390f9..a9e79359f9 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -49,6 +49,9 @@ type ChainHeaderReader interface { // GetHeaderByHash retrieves a block header from the database by its hash. GetHeaderByHash(hash common.Hash) *types.Header + + // GetHighestVerifiedHeader retrieves the highest header verified. + GetHighestVerifiedHeader() *types.Header } // ChainReader defines a small collection of methods needed to access the local diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 14304fe2bc..004f872716 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -56,6 +56,7 @@ const ( validatorBytesLength = common.AddressLength wiggleTime = uint64(1) // second, Random delay (per signer) to allow concurrent signers initialBackOffTime = uint64(1) // second + processBackOffTime = uint64(1) // second systemRewardPercent = 4 // it means 1/2^4 = 1/16 percentage of gas fee incoming will be distributed to system @@ -863,6 +864,14 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res return case <-time.After(delay): } + if p.shouldWaitForCurrentBlockProcess(chain, header, snap) { + log.Info("not in turn and received current block, wait for current block process") + select { + case <-stop: + return + case <-time.After(time.Duration(processBackOffTime) * time.Second): + } + } select { case results <- block.WithSeal(header): @@ -874,6 +883,19 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res return nil } +func (p *Parlia) shouldWaitForCurrentBlockProcess(chain consensus.ChainHeaderReader, header *types.Header, snap *Snapshot) bool { + highestVerifiedHeader := chain.GetHighestVerifiedHeader() + if highestVerifiedHeader == nil || highestVerifiedHeader.Number == nil { + return false + } + + if !snap.inturn(p.val) && header.Number.Cmp(highestVerifiedHeader.Number) == 0 { + return true + } + + return false +} + func (p *Parlia) EnoughDistance(chain consensus.ChainReader, header *types.Header) bool { snap, err := p.snapshot(chain, header.Number.Uint64()-1, header.ParentHash, nil) if err != nil { diff --git a/core/blockchain.go b/core/blockchain.go index be0b0f04ae..d4ebbbfcbc 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -28,6 +28,8 @@ import ( "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" @@ -44,7 +46,6 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" - lru "github.com/hashicorp/golang-lru" ) var ( @@ -185,8 +186,9 @@ type BlockChain struct { chainmu sync.RWMutex // blockchain insertion lock - currentBlock atomic.Value // Current head of the block chain - currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) + currentBlock atomic.Value // Current head of the block chain + currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) + highestVerifiedHeader atomic.Value stateCache state.Database // State database to reuse between imports (contains state cache) bodyCache *lru.Cache // Cache for the most recent block bodies @@ -266,6 +268,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bc.currentBlock.Store(nilBlock) bc.currentFastBlock.Store(nilBlock) + var nilHeader *types.Header + bc.highestVerifiedHeader.Store(nilHeader) + // Initialize the chain with ancient data if it isn't empty. var txIndexBlock uint64 @@ -1883,6 +1888,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er if err != nil { return it.index, err } + bc.updateHighestVerifiedHeader(block.Header()) + // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") activeState = statedb @@ -1989,6 +1996,21 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er return it.index, err } +func (bc *BlockChain) updateHighestVerifiedHeader(header *types.Header) { + if header == nil || header.Number == nil { + return + } + currentHeader := bc.highestVerifiedHeader.Load().(*types.Header) + if currentHeader == nil || currentHeader.Number == nil || currentHeader.Number.Cmp(header.Number) < 0 { + bc.highestVerifiedHeader.Store(types.CopyHeader(header)) + return + } +} + +func (bc *BlockChain) GetHighestVerifiedHeader() *types.Header { + return bc.highestVerifiedHeader.Load().(*types.Header) +} + // insertSideChain is called when an import batch hits upon a pruned ancestor // error, which happens when a sidechain with a sufficiently old fork-block is // found. diff --git a/core/chain_makers.go b/core/chain_makers.go index 6cb74d51be..7f62e41ad8 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -303,3 +303,4 @@ func (cr *fakeChainReader) GetHeaderByNumber(number uint64) *types.Header func (cr *fakeChainReader) GetHeaderByHash(hash common.Hash) *types.Header { return nil } func (cr *fakeChainReader) GetHeader(hash common.Hash, number uint64) *types.Header { return nil } func (cr *fakeChainReader) GetBlock(hash common.Hash, number uint64) *types.Block { return nil } +func (cr *fakeChainReader) GetHighestVerifiedHeader() *types.Header { return nil } diff --git a/core/headerchain.go b/core/headerchain.go index 1dbf958786..fe4770a469 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/rawdb" @@ -33,7 +35,6 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - lru "github.com/hashicorp/golang-lru" ) const ( @@ -413,6 +414,10 @@ func (hc *HeaderChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []co return chain } +func (hc *HeaderChain) GetHighestVerifiedHeader() *types.Header { + return nil +} + // GetAncestor retrieves the Nth ancestor of a given block. It assumes that either the given block or // a close ancestor of it is canonical. maxNonCanonical points to a downwards counter limiting the // number of blocks to be individually checked before we reach the canonical chain. diff --git a/light/lightchain.go b/light/lightchain.go index ca6fbfac49..99644355ac 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" @@ -37,7 +39,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" - lru "github.com/hashicorp/golang-lru" ) var ( @@ -148,6 +149,10 @@ func (lc *LightChain) HeaderChain() *core.HeaderChain { return lc.hc } +func (lc *LightChain) GetHighestVerifiedHeader() *types.Header { + return nil +} + // loadLastState loads the last known chain state from the database. This method // assumes that the chain manager mutex is held. func (lc *LightChain) loadLastState() error { From e0e58b23451eee6b0c8c29d6353ee641c2858e0c Mon Sep 17 00:00:00 2001 From: yutianwu Date: Thu, 23 Sep 2021 19:12:34 +0800 Subject: [PATCH 2/6] update comment --- consensus/parlia/parlia.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 004f872716..267be62889 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -865,7 +865,7 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res case <-time.After(delay): } if p.shouldWaitForCurrentBlockProcess(chain, header, snap) { - log.Info("not in turn and received current block, wait for current block process") + log.Info("Waiting for received in turn block to process") select { case <-stop: return From 4a84cfdf625976856138cd0635d2c223ab785a11 Mon Sep 17 00:00:00 2001 From: yutianwu Date: Fri, 24 Sep 2021 14:29:16 +0800 Subject: [PATCH 3/6] add more log --- consensus/parlia/parlia.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 267be62889..789ca84ce6 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -868,8 +868,10 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res log.Info("Waiting for received in turn block to process") select { case <-stop: + log.Info("Received block process finished, abort block seal") return case <-time.After(time.Duration(processBackOffTime) * time.Second): + log.Info("Process backoff time exhausted, start to seal block") } } From 065e69e3205e3d66f4888221324b303564dde6b5 Mon Sep 17 00:00:00 2001 From: yutianwu Date: Sun, 26 Sep 2021 18:23:45 +0800 Subject: [PATCH 4/6] fix comments --- consensus/parlia/parlia.go | 9 ++++++--- core/blockchain.go | 10 +++++++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 789ca84ce6..3f76082010 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -886,15 +886,18 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res } func (p *Parlia) shouldWaitForCurrentBlockProcess(chain consensus.ChainHeaderReader, header *types.Header, snap *Snapshot) bool { + if snap.inturn(p.val) { + return false + } + highestVerifiedHeader := chain.GetHighestVerifiedHeader() - if highestVerifiedHeader == nil || highestVerifiedHeader.Number == nil { + if highestVerifiedHeader == nil { return false } - if !snap.inturn(p.val) && header.Number.Cmp(highestVerifiedHeader.Number) == 0 { + if header.ParentHash == highestVerifiedHeader.ParentHash && header.Difficulty.Cmp(highestVerifiedHeader.Difficulty) < 0 { return true } - return false } diff --git a/core/blockchain.go b/core/blockchain.go index d4ebbbfcbc..a09cc5ff62 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2001,7 +2001,15 @@ func (bc *BlockChain) updateHighestVerifiedHeader(header *types.Header) { return } currentHeader := bc.highestVerifiedHeader.Load().(*types.Header) - if currentHeader == nil || currentHeader.Number == nil || currentHeader.Number.Cmp(header.Number) < 0 { + if currentHeader == nil { + bc.highestVerifiedHeader.Store(types.CopyHeader(header)) + return + } + + newTD := big.NewInt(0).Add(bc.GetTdByHash(header.ParentHash), header.Difficulty) + oldTD := big.NewInt(0).Add(bc.GetTdByHash(currentHeader.ParentHash), currentHeader.Difficulty) + + if newTD.Cmp(oldTD) > 0 { bc.highestVerifiedHeader.Store(types.CopyHeader(header)) return } From 77f19d435dd6bef1ef7575428d3bac9625fa8e6e Mon Sep 17 00:00:00 2001 From: yutianwu Date: Sun, 26 Sep 2021 19:09:31 +0800 Subject: [PATCH 5/6] wait if td is the same --- consensus/parlia/parlia.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 3f76082010..68ade2e6ff 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -895,7 +895,7 @@ func (p *Parlia) shouldWaitForCurrentBlockProcess(chain consensus.ChainHeaderRea return false } - if header.ParentHash == highestVerifiedHeader.ParentHash && header.Difficulty.Cmp(highestVerifiedHeader.Difficulty) < 0 { + if header.ParentHash == highestVerifiedHeader.ParentHash && header.Difficulty.Cmp(highestVerifiedHeader.Difficulty) <= 0 { return true } return false From 7f9cdc4ae5832d58e833b34cc7d43c35c77fb962 Mon Sep 17 00:00:00 2001 From: yutianwu Date: Mon, 27 Sep 2021 10:50:13 +0800 Subject: [PATCH 6/6] minor update --- consensus/parlia/parlia.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 68ade2e6ff..7cb352d1f2 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -886,7 +886,7 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res } func (p *Parlia) shouldWaitForCurrentBlockProcess(chain consensus.ChainHeaderReader, header *types.Header, snap *Snapshot) bool { - if snap.inturn(p.val) { + if header.Difficulty.Cmp(diffInTurn) == 0 { return false } @@ -895,7 +895,7 @@ func (p *Parlia) shouldWaitForCurrentBlockProcess(chain consensus.ChainHeaderRea return false } - if header.ParentHash == highestVerifiedHeader.ParentHash && header.Difficulty.Cmp(highestVerifiedHeader.Difficulty) <= 0 { + if header.ParentHash == highestVerifiedHeader.ParentHash { return true } return false