Skip to content

Commit

Permalink
add test code
Browse files Browse the repository at this point in the history
  • Loading branch information
unclezoro committed Sep 10, 2021
1 parent c6e62e1 commit 80be4ed
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 52 deletions.
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ var (
}
DiffSyncFlag = cli.BoolFlag{
Name: "diffsync",
Usage: "Enable difflayer sync, Please note that enable diffsync will improve the syncing speed, " +
Usage: "Enable diffy sync, Please note that enable diffsync will improve the syncing speed, " +
"but will degrade the security to light client level",
}
RangeLimitFlag = cli.BoolFlag{
Expand Down
23 changes: 6 additions & 17 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ const (

validatorBytesLength = common.AddressLength
wiggleTime = uint64(1) // second, Random delay (per signer) to allow concurrent signers
// TODO this is a hardfork change, just for tuning so far, recover it late
initialBackOffTime = uint64(2) // second
initialBackOffTime = uint64(1) // second

systemRewardPercent = 4 // it means 1/2^4 = 1/16 percentage of gas fee incoming will be distributed to system

Expand Down Expand Up @@ -800,9 +799,9 @@ func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header) *time.
return nil
}
delay := p.delayForRamanujanFork(snap, header)
// The blocking time should be no more than half of epoch
if delay > time.Duration(p.config.Period)*time.Second*4/5 {
delay = time.Duration(p.config.Period) * time.Second * 4 / 5
// The blocking time should be no more than half of period
if delay > time.Duration(p.config.Period)*time.Second/2 {
delay = time.Duration(p.config.Period) * time.Second / 2
}
return &delay
}
Expand Down Expand Up @@ -894,19 +893,9 @@ func (p *Parlia) AllowLightProcess(chain consensus.ChainReader, currentHeader *t
}

idx := snap.indexOfVal(p.val)
if idx < 0 {
return true
}
validators := snap.validators()

validatorNum := int64(len(validators))
// It is not allowed if only two validators
if validatorNum <= 2 {
return false
}
// validator is not allowed to diff sync
return idx < 0

offset := (int64(snap.Number) + 2) % validatorNum
return validators[offset] == p.val
}

func (p *Parlia) IsLocalBlock(header *types.Header) bool {
Expand Down
8 changes: 3 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2654,14 +2654,12 @@ func (bc *BlockChain) HandleDiffLayer(diffLayer *types.DiffLayer, pid string, fu
if _, alreadyHas := bc.diffPeersToDiffHashes[pid][diffLayer.DiffHash]; alreadyHas {
return nil
}
} else {
bc.diffPeersToDiffHashes[pid] = make(map[common.Hash]struct{})
}
bc.diffPeersToDiffHashes[pid] = make(map[common.Hash]struct{})
bc.diffPeersToDiffHashes[pid][diffLayer.DiffHash] = struct{}{}
if _, exist := bc.diffNumToBlockHashes[diffLayer.Number]; !exist {
bc.diffNumToBlockHashes[diffLayer.Number] = make(map[common.Hash]struct{})
}
if len(bc.diffNumToBlockHashes[diffLayer.Number]) > 4 {

}
bc.diffNumToBlockHashes[diffLayer.Number][diffLayer.BlockHash] = struct{}{}

Expand Down Expand Up @@ -2929,7 +2927,7 @@ func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscr
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
}

//Options
// Options
func EnableLightProcessor(bc *BlockChain) *BlockChain {
bc.processor = NewLightStateProcessor(bc.Config(), bc, bc.engine)
return bc
Expand Down
16 changes: 13 additions & 3 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,9 +1109,11 @@ func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer,
tasksNum := 0
finishCh := make(chan struct{})
defer close(finishCh)
threads := 1
if len(s.diffTries)/runtime.NumCPU() > minNumberOfAccountPerTask {
threads := len(s.diffTries) / minNumberOfAccountPerTask
if threads > runtime.NumCPU() {
threads = runtime.NumCPU()
} else if threads == 0 {
threads = 1
}
for i := 0; i < threads; i++ {
go func() {
Expand Down Expand Up @@ -1230,7 +1232,15 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer
tasksNum := 0
finishCh := make(chan struct{})
defer close(finishCh)
for i := 0; i < runtime.NumCPU(); i++ {

threads := len(s.stateObjectsDirty) / minNumberOfAccountPerTask
if threads > runtime.NumCPU() {
threads = runtime.NumCPU()
} else if threads == 0 {
threads = 1
}

for i := 0; i < threads; i++ {
go func() {
codeWriter := s.db.TrieDB().DiskDB().NewBatch()
for {
Expand Down
32 changes: 16 additions & 16 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ import (
const (
fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly
minNumberOfAccountPerTask = 5
diffLayerTimeout = 50
recentTime = 2048 * 3
recentDiffLayerTimeout = 20
farDiffLayerTimeout = 2
)

// StateProcessor is a basic Processor, which takes care of transitioning
Expand Down Expand Up @@ -83,7 +85,6 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB
allowLightProcess := true
if posa, ok := p.engine.(consensus.PoSA); ok {
allowLightProcess = posa.AllowLightProcess(p.bc, block.Header())
log.Error("===debug, allow to light process?", "allow", allowLightProcess)
}
// random fallback to full process
if check := p.randomGenerator.Int63n(fullProcessCheck); allowLightProcess && check != 0 && len(block.Transactions()) != 0 {
Expand All @@ -92,28 +93,25 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB
pid = peer.ID()
}
var diffLayer *types.DiffLayer
//TODO This is just for debug
var diffLayerTimeout = recentDiffLayerTimeout
if time.Now().Unix()-int64(block.Time()) > recentTime {
diffLayerTimeout = farDiffLayerTimeout
}
for tried := 0; tried < diffLayerTimeout; tried++ {
// wait a bit for the diff layer
diffLayer = p.bc.GetUnTrustedDiffLayer(block.Hash(), pid)
if diffLayer != nil {
log.Error("===debug find it", "idx", tried)
break
}
time.Sleep(time.Millisecond)
}
if diffLayer != nil {
if err := diffLayer.Receipts.DeriveFields(p.bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil {
log.Error("Failed to derive block receipts fields", "hash", block.Hash(), "number", block.NumberU64(), "err", err)
// fallback to full process
return p.StateProcessor.Process(block, statedb, cfg)
}
receipts, logs, gasUsed, err := p.LightProcess(diffLayer, block, statedb, cfg)
receipts, logs, gasUsed, err := p.LightProcess(diffLayer, block, statedb)
if err == nil {
log.Info("do light process success at block", "num", block.NumberU64())
return statedb, receipts, logs, gasUsed, nil
} else {
log.Error("do light process err at block\n", "num", block.NumberU64(), "err", err)
log.Error("do light process err at block", "num", block.NumberU64(), "err", err)
p.bc.removeDiffLayers(diffLayer.DiffHash)
// prepare new statedb
statedb.StopPrefetcher()
Expand All @@ -131,7 +129,7 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB
return p.StateProcessor.Process(block, statedb, cfg)
}

func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) {
func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *types.Block, statedb *state.StateDB) (types.Receipts, []*types.Log, uint64, error) {
statedb.MarkLightProcessed()
fullDiffCode := make(map[common.Hash][]byte, len(diffLayer.Codes))
diffTries := make(map[common.Address]state.Trie)
Expand All @@ -149,9 +147,11 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty
for des := range snapDestructs {
statedb.Trie().TryDelete(des[:])
}
threads := 1
if len(snapAccounts)/runtime.NumCPU() > minNumberOfAccountPerTask {
threads := len(snapAccounts) / minNumberOfAccountPerTask
if threads > runtime.NumCPU() {
threads = runtime.NumCPU()
} else if threads == 0 {
threads = 1
}

iteAccounts := make([]common.Address, 0, len(snapAccounts))
Expand Down Expand Up @@ -236,7 +236,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty
!bytes.Equal(latestAccount.CodeHash, types.EmptyCodeHash) {
if code, exist := fullDiffCode[codeHash]; exist {
if crypto.Keccak256Hash(code) != codeHash {
errChan <- err
errChan <- fmt.Errorf("code and code hash mismatch, account %s", diffAccount.String())
return
}
diffMux.Lock()
Expand All @@ -245,7 +245,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty
} else {
rawCode := rawdb.ReadCode(p.bc.db, codeHash)
if len(rawCode) == 0 {
errChan <- err
errChan <- fmt.Errorf("missing code, account %s", diffAccount.String())
return
}
}
Expand Down
1 change: 1 addition & 0 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ func (pool *TxPool) scheduleReorgLoop() {
pool.reorgDoneCh <- nextDone

case req := <-pool.reqPromoteCh:
log.Error("=== debug receive reqPromote notice")
// Promote request: update address set if request is already pending.
if dirtyAccounts == nil {
dirtyAccounts = req
Expand Down
1 change: 0 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
for _, peer := range transfer {
if len(diff) != 0 && peer.diffExt != nil {
// difflayer should send before block
log.Error("===debug Broadcast block", "number", block.Number(), "hash", hash)
peer.diffExt.SendDiffLayers([]rlp.RawValue{diff})
}
peer.AsyncSendNewBlock(block, td)
Expand Down
9 changes: 0 additions & 9 deletions eth/handler_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package eth
import (
"fmt"

"github.com/ethereum/go-ethereum/log"

"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/eth/protocols/diff"
"github.com/ethereum/go-ethereum/p2p/enode"
Expand Down Expand Up @@ -73,15 +71,8 @@ func (h *diffHandler) handleDiffLayerPackage(packet *diff.DiffLayersPacket, pid
diffs, err := packet.Unpack()

if err != nil {
log.Error("====unpack err", "number", diffs[0].Number, "hash", diffs[0].BlockHash, "err", err)
return err
}
if len(diffs) > 0 {
log.Error("====debug receive difflayer", "number", diffs[0].Number, "hash", diffs[0].BlockHash)

} else {
log.Error("====debug receive difflayer length 0")
}
for _, d := range diffs {
if d != nil {
if err := d.Validate(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions eth/protocols/diff/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (p *DiffLayersPacket) Unpack() ([]*types.DiffLayer, error) {

type DiffCapPacket struct {
DiffSync bool
//Extra rlp.RawValue // for extension
}

type DiffLayersPacket []rlp.RawValue
Expand Down

0 comments on commit 80be4ed

Please sign in to comment.