diff --git a/core/tx_pool.go b/core/tx_pool.go index 75b5ac101949..ee507a5b7055 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -253,13 +253,14 @@ type TxPool struct { locals *accountSet // Set of local transaction to exempt from eviction rules journal *txJournal // Journal of local transaction to back up to disk - pending map[common.Address]*txList // All currently processable transactions - queue map[common.Address]*txList // Queued but non-processable transactions - beats map[common.Address]time.Time // Last heartbeat from each known account - mevBundles []types.MevBundle - megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay - all *txLookup // All transactions to allow lookups - priced *txPricedList // All transactions sorted by price + pending map[common.Address]*txList // All currently processable transactions + queue map[common.Address]*txList // Queued but non-processable transactions + beats map[common.Address]time.Time // Last heartbeat from each known account + mevBundles []types.MevBundle + megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay + NewMegabundleHooks []func(common.Address, *types.MevBundle) + all *txLookup // All transactions to allow lookups + priced *txPricedList // All transactions sorted by price chainHeadCh chan ChainHeadEvent chainHeadSub event.Subscription @@ -630,13 +631,20 @@ func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactio return errors.New("megabundle from non-trusted address") } - pool.megabundles[relayAddr] = types.MevBundle{ + megabundle := types.MevBundle{ Txs: txs, BlockNumber: blockNumber, MinTimestamp: minTimestamp, MaxTimestamp: maxTimestamp, RevertingTxHashes: revertingTxHashes, } + + pool.megabundles[relayAddr] = megabundle + + for _, hook := range pool.NewMegabundleHooks { + go hook(relayAddr, &megabundle) + } + return nil } diff --git a/miner/multi_worker.go b/miner/multi_worker.go index 3452fb0f214b..c1d716c8d7ad 100644 --- a/miner/multi_worker.go +++ b/miner/multi_worker.go @@ -105,16 +105,31 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons })) } + relayWorkerMap := make(map[common.Address]*worker) + for i := 0; i < len(config.TrustedRelays); i++ { - workers = append(workers, - newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, merger, &flashbotsData{ - isFlashbots: true, - isMegabundleWorker: true, - queue: queue, - relayAddr: config.TrustedRelays[i], - })) + relayWorker := newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, merger, &flashbotsData{ + isFlashbots: true, + isMegabundleWorker: true, + queue: queue, + relayAddr: config.TrustedRelays[i], + }) + workers = append(workers, relayWorker) + relayWorkerMap[config.TrustedRelays[i]] = relayWorker } + eth.TxPool().NewMegabundleHooks = append(eth.TxPool().NewMegabundleHooks, func(relayAddr common.Address, megabundle *types.MevBundle) { + worker, found := relayWorkerMap[relayAddr] + if !found { + return + } + + select { + case worker.newMegabundleCh <- megabundle: + default: + } + }) + log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers)) return &multiWorker{ regularWorker: regularWorker, diff --git a/miner/worker.go b/miner/worker.go index aa378f114477..241eb1631449 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -157,6 +157,7 @@ type worker struct { exitCh chan struct{} resubmitIntervalCh chan time.Duration resubmitAdjustCh chan *intervalAdjust + newMegabundleCh chan *types.MevBundle wg sync.WaitGroup @@ -240,15 +241,17 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), - newWorkCh: make(chan *newWorkReq), + newWorkCh: make(chan *newWorkReq, 1), taskCh: taskCh, resultCh: make(chan *types.Block, resultQueueSize), exitCh: exitCh, startCh: make(chan struct{}, 1), + newMegabundleCh: make(chan *types.MevBundle), resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), flashbots: flashbots, } + // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) // Subscribe events for blockchain @@ -391,26 +394,38 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t func (w *worker) newWorkLoop(recommit time.Duration) { defer w.wg.Done() var ( - interrupt *int32 - minRecommit = recommit // minimal resubmit interval specified by user. - timestamp int64 // timestamp for each round of mining. + runningInterrupt *int32 // Running task interrupt + queuedInterrupt *int32 // Queued task interrupt + minRecommit = recommit // minimal resubmit interval specified by user. + timestamp int64 // timestamp for each round of mining. ) timer := time.NewTimer(0) defer timer.Stop() <-timer.C // discard the initial tick - // commit aborts in-flight transaction execution with given signal and resubmits a new one. + // commit aborts in-flight transaction execution with highest seen signal and resubmits a new one commit := func(noempty bool, s int32) { - if interrupt != nil { - atomic.StoreInt32(interrupt, s) - } - interrupt = new(int32) select { - case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}: case <-w.exitCh: return + case queuedRequest := <-w.newWorkCh: + // Previously queued request wasn't started yet, update the request and resubmit + queuedRequest.noempty = queuedRequest.noempty || noempty + queuedRequest.timestamp = timestamp + w.newWorkCh <- queuedRequest // guaranteed to be nonblocking + default: + // Previously queued request has already started, cycle interrupt pointer and submit new work + runningInterrupt = queuedInterrupt + queuedInterrupt = new(int32) + + w.newWorkCh <- &newWorkReq{interrupt: queuedInterrupt, noempty: noempty, timestamp: timestamp} // guaranteed to be nonblocking } + + if runningInterrupt != nil && s > atomic.LoadInt32(runningInterrupt) { + atomic.StoreInt32(runningInterrupt, s) + } + timer.Reset(recommit) atomic.StoreInt32(&w.newTxs, 0) } @@ -437,6 +452,11 @@ func (w *worker) newWorkLoop(recommit time.Duration) { timestamp = time.Now().Unix() commit(false, commitInterruptNewHead) + case <-w.newMegabundleCh: + if w.isRunning() { + commit(true, commitInterruptNone) + } + case <-timer.C: // If mining is running resubmit a new work cycle periodically to pull in // higher priced transactions. Disable this overhead for pending blocks. @@ -500,7 +520,10 @@ func (w *worker) mainLoop() { for { select { case req := <-w.newWorkCh: - w.commitNewWork(req.interrupt, req.noempty, req.timestamp) + // Don't start if the work has already been interrupted + if req.interrupt == nil || atomic.LoadInt32(req.interrupt) == commitInterruptNone { + w.commitNewWork(req.interrupt, req.noempty, req.timestamp) + } case ev := <-w.chainSideCh: // Short circuit for duplicate side blocks @@ -761,10 +784,10 @@ func (w *worker) generateEnv(parent *types.Block, header *types.Header) (*enviro // makeCurrent creates a new environment for the current cycle. func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { env, err := w.generateEnv(parent, header) - env.state.StartPrefetcher("miner") if err != nil { return err } + env.state.StartPrefetcher("miner") // Swap out the old work with the new one, terminating any leftover prefetcher // processes in the mean time and starting a new one. @@ -869,7 +892,6 @@ func (w *worker) commitBundle(txs types.Transactions, coinbase common.Address, i // (2) worker start or restart, the interrupt signal is 1 // (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2. // For the first two cases, the semi-finished work will be discarded. - // For the third case, the semi-finished work will be submitted to the consensus engine. if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { // Notify resubmit loop to increase resubmitting interval due to too frequent commits. if atomic.LoadInt32(interrupt) == commitInterruptResubmit { @@ -881,8 +903,11 @@ func (w *worker) commitBundle(txs types.Transactions, coinbase common.Address, i ratio: ratio, inc: true, } + return false } - return atomic.LoadInt32(interrupt) == commitInterruptNewHead + + // Discard the work as new head is present + return true } // If we don't have enough gas for any further transactions then we're done if w.current.gasPool.Gas() < params.TxGas { @@ -982,7 +1007,6 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin // (2) worker start or restart, the interrupt signal is 1 // (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2. // For the first two cases, the semi-finished work will be discarded. - // For the third case, the semi-finished work will be submitted to the consensus engine. if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { // Notify resubmit loop to increase resubmitting interval due to too frequent commits. if atomic.LoadInt32(interrupt) == commitInterruptResubmit { @@ -994,8 +1018,11 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin ratio: ratio, inc: true, } + return false } - return atomic.LoadInt32(interrupt) == commitInterruptNewHead + + // Discard the work as new head is present + return true } // If we don't have enough gas for any further transactions then we're done if w.current.gasPool.Gas() < params.TxGas { @@ -1223,6 +1250,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) if err != nil { return // no valid megabundle for this relay, nothing to do } + // Flashbots bundle merging duplicates work by simulating TXes and then committing them once more. // Megabundles API focuses on speed and runs everything in one cycle. coinbaseBalanceBefore := w.current.state.GetBalance(w.coinbase)