diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 9e92f00950..15db979823 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -123,7 +123,7 @@ var ( } DiffSyncFlag = cli.BoolFlag{ Name: "diffsync", - Usage: "Enable difflayer sync, Please note that enable diffsync will improve the syncing speed, " + + Usage: "Enable diffy sync, Please note that enable diffsync will improve the syncing speed, " + "but will degrade the security to light client level", } RangeLimitFlag = cli.BoolFlag{ diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 275ad2b913..9e599cdbd8 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -55,8 +55,7 @@ const ( validatorBytesLength = common.AddressLength wiggleTime = uint64(1) // second, Random delay (per signer) to allow concurrent signers - // TODO this is a hardfork change, just for tuning so far, recover it late - initialBackOffTime = uint64(2) // second + initialBackOffTime = uint64(1) // second systemRewardPercent = 4 // it means 1/2^4 = 1/16 percentage of gas fee incoming will be distributed to system @@ -800,9 +799,9 @@ func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header) *time. return nil } delay := p.delayForRamanujanFork(snap, header) - // The blocking time should be no more than half of epoch - if delay > time.Duration(p.config.Period)*time.Second*4/5 { - delay = time.Duration(p.config.Period) * time.Second * 4 / 5 + // The blocking time should be no more than half of period + if delay > time.Duration(p.config.Period)*time.Second/2 { + delay = time.Duration(p.config.Period) * time.Second / 2 } return &delay } @@ -894,19 +893,9 @@ func (p *Parlia) AllowLightProcess(chain consensus.ChainReader, currentHeader *t } idx := snap.indexOfVal(p.val) - if idx < 0 { - return true - } - validators := snap.validators() - - validatorNum := int64(len(validators)) - // It is not allowed if only two validators - if validatorNum <= 2 { - return false - } + // validator is not allowed to diff sync + return idx < 0 - offset := (int64(snap.Number) + 2) % validatorNum - return validators[offset] == p.val } func (p *Parlia) IsLocalBlock(header *types.Header) bool { diff --git a/core/blockchain.go b/core/blockchain.go index 84a229f629..fbf7fc00f6 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2654,14 +2654,12 @@ func (bc *BlockChain) HandleDiffLayer(diffLayer *types.DiffLayer, pid string, fu if _, alreadyHas := bc.diffPeersToDiffHashes[pid][diffLayer.DiffHash]; alreadyHas { return nil } + } else { + bc.diffPeersToDiffHashes[pid] = make(map[common.Hash]struct{}) } - bc.diffPeersToDiffHashes[pid] = make(map[common.Hash]struct{}) bc.diffPeersToDiffHashes[pid][diffLayer.DiffHash] = struct{}{} if _, exist := bc.diffNumToBlockHashes[diffLayer.Number]; !exist { bc.diffNumToBlockHashes[diffLayer.Number] = make(map[common.Hash]struct{}) - } - if len(bc.diffNumToBlockHashes[diffLayer.Number]) > 4 { - } bc.diffNumToBlockHashes[diffLayer.Number][diffLayer.BlockHash] = struct{}{} @@ -2929,7 +2927,7 @@ func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscr return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) } -//Options +// Options func EnableLightProcessor(bc *BlockChain) *BlockChain { bc.processor = NewLightStateProcessor(bc.Config(), bc, bc.engine) return bc diff --git a/core/state/statedb.go b/core/state/statedb.go index 886f181bf5..b55171e324 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1109,9 +1109,11 @@ func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer, tasksNum := 0 finishCh := make(chan struct{}) defer close(finishCh) - threads := 1 - if len(s.diffTries)/runtime.NumCPU() > minNumberOfAccountPerTask { + threads := len(s.diffTries) / minNumberOfAccountPerTask + if threads > runtime.NumCPU() { threads = runtime.NumCPU() + } else if threads == 0 { + threads = 1 } for i := 0; i < threads; i++ { go func() { @@ -1230,7 +1232,15 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer tasksNum := 0 finishCh := make(chan struct{}) defer close(finishCh) - for i := 0; i < runtime.NumCPU(); i++ { + + threads := len(s.stateObjectsDirty) / minNumberOfAccountPerTask + if threads > runtime.NumCPU() { + threads = runtime.NumCPU() + } else if threads == 0 { + threads = 1 + } + + for i := 0; i < threads; i++ { go func() { codeWriter := s.db.TrieDB().DiskDB().NewBatch() for { diff --git a/core/state_processor.go b/core/state_processor.go index aca43b4609..fb82727d0c 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -44,7 +44,9 @@ import ( const ( fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly minNumberOfAccountPerTask = 5 - diffLayerTimeout = 50 + recentTime = 2048 * 3 + recentDiffLayerTimeout = 20 + farDiffLayerTimeout = 2 ) // StateProcessor is a basic Processor, which takes care of transitioning @@ -83,7 +85,6 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB allowLightProcess := true if posa, ok := p.engine.(consensus.PoSA); ok { allowLightProcess = posa.AllowLightProcess(p.bc, block.Header()) - log.Error("===debug, allow to light process?", "allow", allowLightProcess) } // random fallback to full process if check := p.randomGenerator.Int63n(fullProcessCheck); allowLightProcess && check != 0 && len(block.Transactions()) != 0 { @@ -92,28 +93,25 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB pid = peer.ID() } var diffLayer *types.DiffLayer - //TODO This is just for debug + var diffLayerTimeout = recentDiffLayerTimeout + if time.Now().Unix()-int64(block.Time()) > recentTime { + diffLayerTimeout = farDiffLayerTimeout + } for tried := 0; tried < diffLayerTimeout; tried++ { // wait a bit for the diff layer diffLayer = p.bc.GetUnTrustedDiffLayer(block.Hash(), pid) if diffLayer != nil { - log.Error("===debug find it", "idx", tried) break } time.Sleep(time.Millisecond) } if diffLayer != nil { - if err := diffLayer.Receipts.DeriveFields(p.bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { - log.Error("Failed to derive block receipts fields", "hash", block.Hash(), "number", block.NumberU64(), "err", err) - // fallback to full process - return p.StateProcessor.Process(block, statedb, cfg) - } - receipts, logs, gasUsed, err := p.LightProcess(diffLayer, block, statedb, cfg) + receipts, logs, gasUsed, err := p.LightProcess(diffLayer, block, statedb) if err == nil { log.Info("do light process success at block", "num", block.NumberU64()) return statedb, receipts, logs, gasUsed, nil } else { - log.Error("do light process err at block\n", "num", block.NumberU64(), "err", err) + log.Error("do light process err at block", "num", block.NumberU64(), "err", err) p.bc.removeDiffLayers(diffLayer.DiffHash) // prepare new statedb statedb.StopPrefetcher() @@ -131,7 +129,7 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB return p.StateProcessor.Process(block, statedb, cfg) } -func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) { +func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *types.Block, statedb *state.StateDB) (types.Receipts, []*types.Log, uint64, error) { statedb.MarkLightProcessed() fullDiffCode := make(map[common.Hash][]byte, len(diffLayer.Codes)) diffTries := make(map[common.Address]state.Trie) @@ -149,9 +147,11 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty for des := range snapDestructs { statedb.Trie().TryDelete(des[:]) } - threads := 1 - if len(snapAccounts)/runtime.NumCPU() > minNumberOfAccountPerTask { + threads := len(snapAccounts) / minNumberOfAccountPerTask + if threads > runtime.NumCPU() { threads = runtime.NumCPU() + } else if threads == 0 { + threads = 1 } iteAccounts := make([]common.Address, 0, len(snapAccounts)) @@ -236,7 +236,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty !bytes.Equal(latestAccount.CodeHash, types.EmptyCodeHash) { if code, exist := fullDiffCode[codeHash]; exist { if crypto.Keccak256Hash(code) != codeHash { - errChan <- err + errChan <- fmt.Errorf("code and code hash mismatch, account %s", diffAccount.String()) return } diffMux.Lock() @@ -245,7 +245,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty } else { rawCode := rawdb.ReadCode(p.bc.db, codeHash) if len(rawCode) == 0 { - errChan <- err + errChan <- fmt.Errorf("missing code, account %s", diffAccount.String()) return } } diff --git a/core/tx_pool.go b/core/tx_pool.go index d0304857c3..cff0811870 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -1019,6 +1019,7 @@ func (pool *TxPool) scheduleReorgLoop() { pool.reorgDoneCh <- nextDone case req := <-pool.reqPromoteCh: + log.Error("=== debug receive reqPromote notice") // Promote request: update address set if request is already pending. if dirtyAccounts == nil { dirtyAccounts = req diff --git a/eth/handler.go b/eth/handler.go index e4980b939b..0fd0e5d49f 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -486,7 +486,6 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { for _, peer := range transfer { if len(diff) != 0 && peer.diffExt != nil { // difflayer should send before block - log.Error("===debug Broadcast block", "number", block.Number(), "hash", hash) peer.diffExt.SendDiffLayers([]rlp.RawValue{diff}) } peer.AsyncSendNewBlock(block, td) diff --git a/eth/handler_diff.go b/eth/handler_diff.go index ab9363e6e5..97474f5f18 100644 --- a/eth/handler_diff.go +++ b/eth/handler_diff.go @@ -19,8 +19,6 @@ package eth import ( "fmt" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/eth/protocols/diff" "github.com/ethereum/go-ethereum/p2p/enode" @@ -73,15 +71,8 @@ func (h *diffHandler) handleDiffLayerPackage(packet *diff.DiffLayersPacket, pid diffs, err := packet.Unpack() if err != nil { - log.Error("====unpack err", "number", diffs[0].Number, "hash", diffs[0].BlockHash, "err", err) return err } - if len(diffs) > 0 { - log.Error("====debug receive difflayer", "number", diffs[0].Number, "hash", diffs[0].BlockHash) - - } else { - log.Error("====debug receive difflayer length 0") - } for _, d := range diffs { if d != nil { if err := d.Validate(); err != nil { diff --git a/eth/protocols/diff/protocol.go b/eth/protocols/diff/protocol.go index 02474632a5..7506496234 100644 --- a/eth/protocols/diff/protocol.go +++ b/eth/protocols/diff/protocol.go @@ -98,6 +98,7 @@ func (p *DiffLayersPacket) Unpack() ([]*types.DiffLayer, error) { type DiffCapPacket struct { DiffSync bool + Extra rlp.RawValue // for extension } type DiffLayersPacket []rlp.RawValue