diff --git a/miner/worker.go b/miner/worker.go index dca827f6d6..0e0ca03ee2 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -53,9 +53,6 @@ const ( // chainSideChanSize is the size of channel listening to ChainSideEvent. chainSideChanSize = 10 - // resubmitAdjustChanSize is the size of resubmitting interval adjustment channel. - resubmitAdjustChanSize = 10 - // sealingLogAtDepth is the number of confirmations before logging successful mining. sealingLogAtDepth = 11 @@ -63,18 +60,6 @@ const ( // any newly arrived transactions. minRecommitInterval = 1 * time.Second - // maxRecommitInterval is the maximum time interval to recreate the sealing block with - // any newly arrived transactions. - maxRecommitInterval = 15 * time.Second - - // intervalAdjustRatio is the impact a single interval adjustment has on sealing work - // resubmitting interval. - intervalAdjustRatio = 0.1 - - // intervalAdjustBias is applied during the new resubmit interval calculation in favor of - // increasing upper limit or decreasing lower limit so that the limit can be reachable. - intervalAdjustBias = 200 * 1000.0 * 1000.0 - // staleThreshold is the maximum depth of the acceptable stale block. staleThreshold = 11 ) @@ -158,16 +143,15 @@ type task struct { } const ( - commitInterruptNone int32 = iota - commitInterruptNewHead - commitInterruptResubmit + commitInterruptNewHead int32 = 1 + commitInterruptResubmit int32 = 2 ) // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. type newWorkReq struct { - interrupt *int32 - noempty bool - timestamp int64 + interruptCh chan int32 + noempty bool + timestamp int64 } // getWorkReq represents a request for getting a new sealing work with provided parameters. @@ -177,12 +161,6 @@ type getWorkReq struct { result chan *types.Block } -// intervalAdjust represents a resubmitting interval adjustment. -type intervalAdjust struct { - ratio float64 - inc bool -} - // worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type worker struct { @@ -213,7 +191,6 @@ type worker struct { startCh chan struct{} exitCh chan struct{} resubmitIntervalCh chan time.Duration - resubmitAdjustCh chan *intervalAdjust wg sync.WaitGroup @@ -279,7 +256,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus exitCh: make(chan struct{}), startCh: make(chan struct{}, 1), resubmitIntervalCh: make(chan time.Duration), - resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), } // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) @@ -396,33 +372,11 @@ func (w *worker) close() { w.wg.Wait() } -// recalcRecommit recalculates the resubmitting interval upon feedback. -func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration { - var ( - prevF = float64(prev.Nanoseconds()) - next float64 - ) - if inc { - next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias) - max := float64(maxRecommitInterval.Nanoseconds()) - if next > max { - next = max - } - } else { - next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias) - min := float64(minRecommit.Nanoseconds()) - if next < min { - next = min - } - } - return time.Duration(int64(next)) -} - // newWorkLoop is a standalone goroutine to submit new sealing work upon received events. func (w *worker) newWorkLoop(recommit time.Duration) { defer w.wg.Done() var ( - interrupt *int32 + interruptCh chan int32 minRecommit = recommit // minimal resubmit interval specified by user. timestamp int64 // timestamp for each round of sealing. ) @@ -432,13 +386,15 @@ func (w *worker) newWorkLoop(recommit time.Duration) { <-timer.C // discard the initial tick // commit aborts in-flight transaction execution with given signal and resubmits a new one. - commit := func(noempty bool, s int32) { - if interrupt != nil { - atomic.StoreInt32(interrupt, s) + commit := func(noempty bool, reason int32) { + if interruptCh != nil { + // each commit work will have its own interruptCh to stop work with a reason + interruptCh <- reason + close(interruptCh) } - interrupt = new(int32) + interruptCh = make(chan int32, 1) select { - case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}: + case w.newWorkCh <- &newWorkReq{interruptCh: interruptCh, noempty: noempty, timestamp: timestamp}: case <-w.exitCh: return } @@ -508,23 +464,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { w.resubmitHook(minRecommit, recommit) } - case adjust := <-w.resubmitAdjustCh: - // Adjust resubmit interval by feedback. - if adjust.inc { - before := recommit - target := float64(recommit.Nanoseconds()) / adjust.ratio - recommit = recalcRecommit(minRecommit, recommit, target, true) - log.Trace("Increase miner recommit interval", "from", before, "to", recommit) - } else { - before := recommit - recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false) - log.Trace("Decrease miner recommit interval", "from", before, "to", recommit) - } - - if w.resubmitHook != nil { - w.resubmitHook(minRecommit, recommit) - } - case <-w.exitCh: return } @@ -551,7 +490,7 @@ func (w *worker) mainLoop() { for { select { case req := <-w.newWorkCh: - w.commitWork(req.interrupt, req.noempty, req.timestamp) + w.commitWork(req.interruptCh, req.noempty, req.timestamp) case req := <-w.getWorkCh: block, err := w.generateWork(req.params) @@ -858,7 +797,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool { +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interruptCh chan int32) bool { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -884,35 +823,32 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP } bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity) - interruptCh := make(chan struct{}) - defer close(interruptCh) + stopPrefetchCh := make(chan struct{}) + defer close(stopPrefetchCh) //prefetch txs from all pending txs txsPrefetch := txs.Copy() tx := txsPrefetch.Peek() txCurr := &tx - w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), interruptCh, txCurr) + w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), stopPrefetchCh, txCurr) LOOP: for { // In the following three cases, we will interrupt the execution of the transaction. - // (1) new head block event arrival, the interrupt signal is 1 - // (2) worker start or restart, the interrupt signal is 1 - // (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2. + // (1) new head block event arrival, the reason is 1 + // (2) worker start or restart, the reason is 1 + // (3) worker recreate the sealing block with any newly arrived transactions, the reason is 2. // For the first two cases, the semi-finished work will be discarded. // For the third case, the semi-finished work will be submitted to the consensus engine. - if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { - // Notify resubmit loop to increase resubmitting interval due to too frequent commits. - if atomic.LoadInt32(interrupt) == commitInterruptResubmit { - ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit) - if ratio < 0.1 { - ratio = 0.1 - } - w.resubmitAdjustCh <- &intervalAdjust{ - ratio: ratio, - inc: true, + if interruptCh != nil { + select { + case reason, ok := <-interruptCh: + if !ok { + // should never be here, since interruptCh should not be read before + log.Warn("commit transactions stopped unknown") } + return reason == commitInterruptNewHead + default: } - return atomic.LoadInt32(interrupt) == commitInterruptNewHead } // If we don't have enough gas for any further transactions then we're done if env.gasPool.Gas() < params.TxGas { @@ -998,11 +934,6 @@ LOOP: } w.pendingLogsFeed.Send(cpy) } - // Notify resubmit loop to decrease resubmitting interval if current interval is larger - // than the user-specified one. - if interrupt != nil { - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - } return false } @@ -1106,7 +1037,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interrupt *int32, env *environment) { +func (w *worker) fillTransactions(interruptCh chan int32, env *environment) { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(true) @@ -1119,13 +1050,13 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { } if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { + if w.commitTransactions(env, txs, interruptCh) { return } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { + if w.commitTransactions(env, txs, interruptCh) { return } } @@ -1146,7 +1077,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { // commitWork generates several new sealing tasks based on the parent block // and submit them to the sealer. -func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { +func (w *worker) commitWork(interruptCh chan int32, noempty bool, timestamp int64) { start := time.Now() // Set the coinbase if the worker is running or it's required @@ -1172,7 +1103,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { w.commit(work, nil, false, start) } // Fill pending transactions from the txpool - w.fillTransactions(interrupt, work) + w.fillTransactions(interruptCh, work) w.commit(work, w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover diff --git a/miner/worker_test.go b/miner/worker_test.go index 2cdd4284e6..e405788c61 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -20,7 +20,6 @@ import ( "errors" "math/big" "math/rand" - "sync/atomic" "testing" "time" @@ -270,100 +269,6 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) { } } -func TestAdjustIntervalEthash(t *testing.T) { - testAdjustInterval(t, ethashChainConfig, ethash.NewFaker()) -} - -func TestAdjustIntervalClique(t *testing.T) { - testAdjustInterval(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) -} - -func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { - defer engine.Close() - - w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) - defer w.close() - - w.skipSealHook = func(task *task) bool { - return true - } - w.fullTaskHook = func() { - time.Sleep(100 * time.Millisecond) - } - var ( - progress = make(chan struct{}, 10) - result = make([]float64, 0, 10) - index = 0 - start uint32 - ) - w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) { - // Short circuit if interval checking hasn't started. - if atomic.LoadUint32(&start) == 0 { - return - } - var wantMinInterval, wantRecommitInterval time.Duration - - switch index { - case 0: - wantMinInterval, wantRecommitInterval = 3*time.Second, 3*time.Second - case 1: - origin := float64(3 * time.Second.Nanoseconds()) - estimate := origin*(1-intervalAdjustRatio) + intervalAdjustRatio*(origin/0.8+intervalAdjustBias) - wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond - case 2: - estimate := result[index-1] - min := float64(3 * time.Second.Nanoseconds()) - estimate = estimate*(1-intervalAdjustRatio) + intervalAdjustRatio*(min-intervalAdjustBias) - wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond - case 3: - wantMinInterval, wantRecommitInterval = time.Second, time.Second - } - - // Check interval - if minInterval != wantMinInterval { - t.Errorf("resubmit min interval mismatch: have %v, want %v ", minInterval, wantMinInterval) - } - if recommitInterval != wantRecommitInterval { - t.Errorf("resubmit interval mismatch: have %v, want %v", recommitInterval, wantRecommitInterval) - } - result = append(result, float64(recommitInterval.Nanoseconds())) - index += 1 - progress <- struct{}{} - } - w.start() - - time.Sleep(time.Second) // Ensure two tasks have been summitted due to start opt - atomic.StoreUint32(&start, 1) - - w.setRecommitInterval(3 * time.Second) - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.resubmitAdjustCh <- &intervalAdjust{inc: true, ratio: 0.8} - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.setRecommitInterval(500 * time.Millisecond) - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } -} - func TestGetSealingWorkEthash(t *testing.T) { testGetSealingWork(t, ethashChainConfig, ethash.NewFaker(), false) }