Skip to content

Commit

Permalink
add block proccess backoff time when validator is not in turn
Browse files Browse the repository at this point in the history
  • Loading branch information
yutianwu committed Sep 23, 2021
1 parent bca9678 commit 157c22c
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 5 deletions.
3 changes: 3 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type ChainHeaderReader interface {

// GetHeaderByHash retrieves a block header from the database by its hash.
GetHeaderByHash(hash common.Hash) *types.Header

// GetHighestVerifiedHeader retrieves the highest header verified.
GetHighestVerifiedHeader() *types.Header
}

// ChainReader defines a small collection of methods needed to access the local
Expand Down
23 changes: 23 additions & 0 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
validatorBytesLength = common.AddressLength
wiggleTime = uint64(1) // second, Random delay (per signer) to allow concurrent signers
initialBackOffTime = uint64(1) // second
processBackOffTime = uint64(1) // second

systemRewardPercent = 4 // it means 1/2^4 = 1/16 percentage of gas fee incoming will be distributed to system

Expand Down Expand Up @@ -864,6 +865,15 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res
case <-time.After(delay):
}

if p.shouldWaitForCurrentBlockProcess(chain, header, snap) {
log.Info("not in turn and received current block, wait for current block process")
select {
case <-stop:
return
case <-time.After(time.Duration(processBackOffTime) * time.Second):
}
}

select {
case results <- block.WithSeal(header):
default:
Expand All @@ -874,6 +884,19 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res
return nil
}

func (p *Parlia) shouldWaitForCurrentBlockProcess(chain consensus.ChainHeaderReader, header *types.Header, snap *Snapshot) bool {
highestVerifiedHeader := chain.GetHighestVerifiedHeader()
if highestVerifiedHeader == nil || highestVerifiedHeader.Number == nil {
return false
}

if !snap.inturn(p.val) && header.Number.Cmp(highestVerifiedHeader.Number) == 0 {
return true
}

return false
}

func (p *Parlia) EnoughDistance(chain consensus.ChainReader, header *types.Header) bool {
snap, err := p.snapshot(chain, header.Number.Uint64()-1, header.ParentHash, nil)
if err != nil {
Expand Down
28 changes: 25 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
Expand All @@ -44,7 +46,6 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
lru "github.com/hashicorp/golang-lru"
)

var (
Expand Down Expand Up @@ -185,8 +186,9 @@ type BlockChain struct {

chainmu sync.RWMutex // blockchain insertion lock

currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
highestVerifiedHeader atomic.Value

stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
Expand Down Expand Up @@ -266,6 +268,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bc.currentBlock.Store(nilBlock)
bc.currentFastBlock.Store(nilBlock)

var nilHeader *types.Header
bc.highestVerifiedHeader.Store(nilHeader)

// Initialize the chain with ancient data if it isn't empty.
var txIndexBlock uint64

Expand Down Expand Up @@ -1883,6 +1888,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if err != nil {
return it.index, err
}
bc.updateHighestVerifiedHeader(block.Header())

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
activeState = statedb
Expand Down Expand Up @@ -1989,6 +1996,21 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
return it.index, err
}

func (bc *BlockChain) updateHighestVerifiedHeader(header *types.Header) {
if header == nil || header.Number == nil {
return
}
currentHeader := bc.highestVerifiedHeader.Load().(*types.Header)
if currentHeader == nil || currentHeader.Number == nil || currentHeader.Number.Cmp(header.Number) > 0 {
bc.highestVerifiedHeader.Store(types.CopyHeader(header))
return
}
}

func (bc *BlockChain) GetHighestVerifiedHeader() *types.Header {
return bc.highestVerifiedHeader.Load().(*types.Header)
}

// insertSideChain is called when an import batch hits upon a pruned ancestor
// error, which happens when a sidechain with a sufficiently old fork-block is
// found.
Expand Down
1 change: 1 addition & 0 deletions core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,4 @@ func (cr *fakeChainReader) GetHeaderByNumber(number uint64) *types.Header
func (cr *fakeChainReader) GetHeaderByHash(hash common.Hash) *types.Header { return nil }
func (cr *fakeChainReader) GetHeader(hash common.Hash, number uint64) *types.Header { return nil }
func (cr *fakeChainReader) GetBlock(hash common.Hash, number uint64) *types.Block { return nil }
func (cr *fakeChainReader) GetHighestVerifiedHeader() *types.Header { return nil }
7 changes: 6 additions & 1 deletion core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
lru "github.com/hashicorp/golang-lru"
)

const (
Expand Down Expand Up @@ -413,6 +414,10 @@ func (hc *HeaderChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []co
return chain
}

func (hc *HeaderChain) GetHighestVerifiedHeader() *types.Header {
return nil
}

// GetAncestor retrieves the Nth ancestor of a given block. It assumes that either the given block or
// a close ancestor of it is canonical. maxNonCanonical points to a downwards counter limiting the
// number of blocks to be individually checked before we reach the canonical chain.
Expand Down
7 changes: 6 additions & 1 deletion light/lightchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
Expand All @@ -37,7 +39,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
lru "github.com/hashicorp/golang-lru"
)

var (
Expand Down Expand Up @@ -148,6 +149,10 @@ func (lc *LightChain) HeaderChain() *core.HeaderChain {
return lc.hc
}

func (lc *LightChain) GetHighestVerifiedHeader() *types.Header {
return nil
}

// loadLastState loads the last known chain state from the database. This method
// assumes that the chain manager mutex is held.
func (lc *LightChain) loadLastState() error {
Expand Down

0 comments on commit 157c22c

Please sign in to comment.