diff --git a/consensus/consensus.go b/consensus/consensus.go index fc161390f9..a9e79359f9 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -49,6 +49,9 @@ type ChainHeaderReader interface { // GetHeaderByHash retrieves a block header from the database by its hash. GetHeaderByHash(hash common.Hash) *types.Header + + // GetHighestVerifiedHeader retrieves the highest header verified. + GetHighestVerifiedHeader() *types.Header } // ChainReader defines a small collection of methods needed to access the local diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 14304fe2bc..7cb352d1f2 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -56,6 +56,7 @@ const ( validatorBytesLength = common.AddressLength wiggleTime = uint64(1) // second, Random delay (per signer) to allow concurrent signers initialBackOffTime = uint64(1) // second + processBackOffTime = uint64(1) // second systemRewardPercent = 4 // it means 1/2^4 = 1/16 percentage of gas fee incoming will be distributed to system @@ -863,6 +864,16 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res return case <-time.After(delay): } + if p.shouldWaitForCurrentBlockProcess(chain, header, snap) { + log.Info("Waiting for received in turn block to process") + select { + case <-stop: + log.Info("Received block process finished, abort block seal") + return + case <-time.After(time.Duration(processBackOffTime) * time.Second): + log.Info("Process backoff time exhausted, start to seal block") + } + } select { case results <- block.WithSeal(header): @@ -874,6 +885,22 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res return nil } +func (p *Parlia) shouldWaitForCurrentBlockProcess(chain consensus.ChainHeaderReader, header *types.Header, snap *Snapshot) bool { + if header.Difficulty.Cmp(diffInTurn) == 0 { + return false + } + + highestVerifiedHeader := chain.GetHighestVerifiedHeader() + if highestVerifiedHeader == nil { + return false + } + + if header.ParentHash == highestVerifiedHeader.ParentHash { + return true + } + return false +} + func (p *Parlia) EnoughDistance(chain consensus.ChainReader, header *types.Header) bool { snap, err := p.snapshot(chain, header.Number.Uint64()-1, header.ParentHash, nil) if err != nil { diff --git a/core/blockchain.go b/core/blockchain.go index be0b0f04ae..a09cc5ff62 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -28,6 +28,8 @@ import ( "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" @@ -44,7 +46,6 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" - lru "github.com/hashicorp/golang-lru" ) var ( @@ -185,8 +186,9 @@ type BlockChain struct { chainmu sync.RWMutex // blockchain insertion lock - currentBlock atomic.Value // Current head of the block chain - currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) + currentBlock atomic.Value // Current head of the block chain + currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) + highestVerifiedHeader atomic.Value stateCache state.Database // State database to reuse between imports (contains state cache) bodyCache *lru.Cache // Cache for the most recent block bodies @@ -266,6 +268,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bc.currentBlock.Store(nilBlock) bc.currentFastBlock.Store(nilBlock) + var nilHeader *types.Header + bc.highestVerifiedHeader.Store(nilHeader) + // Initialize the chain with ancient data if it isn't empty. var txIndexBlock uint64 @@ -1883,6 +1888,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er if err != nil { return it.index, err } + bc.updateHighestVerifiedHeader(block.Header()) + // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") activeState = statedb @@ -1989,6 +1996,29 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er return it.index, err } +func (bc *BlockChain) updateHighestVerifiedHeader(header *types.Header) { + if header == nil || header.Number == nil { + return + } + currentHeader := bc.highestVerifiedHeader.Load().(*types.Header) + if currentHeader == nil { + bc.highestVerifiedHeader.Store(types.CopyHeader(header)) + return + } + + newTD := big.NewInt(0).Add(bc.GetTdByHash(header.ParentHash), header.Difficulty) + oldTD := big.NewInt(0).Add(bc.GetTdByHash(currentHeader.ParentHash), currentHeader.Difficulty) + + if newTD.Cmp(oldTD) > 0 { + bc.highestVerifiedHeader.Store(types.CopyHeader(header)) + return + } +} + +func (bc *BlockChain) GetHighestVerifiedHeader() *types.Header { + return bc.highestVerifiedHeader.Load().(*types.Header) +} + // insertSideChain is called when an import batch hits upon a pruned ancestor // error, which happens when a sidechain with a sufficiently old fork-block is // found. diff --git a/core/chain_makers.go b/core/chain_makers.go index 6cb74d51be..7f62e41ad8 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -303,3 +303,4 @@ func (cr *fakeChainReader) GetHeaderByNumber(number uint64) *types.Header func (cr *fakeChainReader) GetHeaderByHash(hash common.Hash) *types.Header { return nil } func (cr *fakeChainReader) GetHeader(hash common.Hash, number uint64) *types.Header { return nil } func (cr *fakeChainReader) GetBlock(hash common.Hash, number uint64) *types.Block { return nil } +func (cr *fakeChainReader) GetHighestVerifiedHeader() *types.Header { return nil } diff --git a/core/headerchain.go b/core/headerchain.go index 1dbf958786..fe4770a469 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/rawdb" @@ -33,7 +35,6 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - lru "github.com/hashicorp/golang-lru" ) const ( @@ -413,6 +414,10 @@ func (hc *HeaderChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []co return chain } +func (hc *HeaderChain) GetHighestVerifiedHeader() *types.Header { + return nil +} + // GetAncestor retrieves the Nth ancestor of a given block. It assumes that either the given block or // a close ancestor of it is canonical. maxNonCanonical points to a downwards counter limiting the // number of blocks to be individually checked before we reach the canonical chain. diff --git a/light/lightchain.go b/light/lightchain.go index ca6fbfac49..99644355ac 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + lru "github.com/hashicorp/golang-lru" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" @@ -37,7 +39,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" - lru "github.com/hashicorp/golang-lru" ) var ( @@ -148,6 +149,10 @@ func (lc *LightChain) HeaderChain() *core.HeaderChain { return lc.hc } +func (lc *LightChain) GetHighestVerifiedHeader() *types.Header { + return nil +} + // loadLastState loads the last known chain state from the database. This method // assumes that the chain manager mutex is held. func (lc *LightChain) loadLastState() error {