Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

miner: use atomic type #27013

Merged
merged 2 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ const (

// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
type newWorkReq struct {
interrupt *int32
interrupt *atomic.Int32
noempty bool
timestamp int64
}
Expand Down Expand Up @@ -239,15 +239,15 @@ type worker struct {
snapshotState *state.StateDB

// atomic status counters
running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting.
running atomic.Bool // The indicator whether the consensus engine is running or not.
newTxs atomic.Int32 // New arrival transaction count since last sealing work submitting.

// noempty is the flag used to control whether the feature of pre-seal empty
// block is enabled. The default value is false(pre-seal is enabled by default).
// But in some special scenario the consensus engine will seal blocks instantaneously,
// in this case this feature will add all empty blocks into canonical chain
// non-stop and no real transaction will be included.
noempty uint32
noempty atomic.Bool

// newpayloadTimeout is the maximum timeout allowance for creating payload.
// The default value is 2 seconds but node operator can set it to arbitrary
Expand Down Expand Up @@ -372,12 +372,12 @@ func (w *worker) setRecommitInterval(interval time.Duration) {

// disablePreseal disables pre-sealing feature
func (w *worker) disablePreseal() {
atomic.StoreUint32(&w.noempty, 1)
w.noempty.Store(true)
}

// enablePreseal enables pre-sealing feature
func (w *worker) enablePreseal() {
atomic.StoreUint32(&w.noempty, 0)
w.noempty.Store(false)
}

// pending returns the pending state and corresponding block.
Expand Down Expand Up @@ -409,24 +409,24 @@ func (w *worker) pendingBlockAndReceipts() (*types.Block, types.Receipts) {

// start sets the running status as 1 and triggers new work submitting.
func (w *worker) start() {
atomic.StoreInt32(&w.running, 1)
w.running.Store(true)
w.startCh <- struct{}{}
}

// stop sets the running status as 0.
func (w *worker) stop() {
atomic.StoreInt32(&w.running, 0)
w.running.Store(false)
}

// isRunning returns an indicator whether worker is running or not.
func (w *worker) isRunning() bool {
return atomic.LoadInt32(&w.running) == 1
return w.running.Load()
}

// close terminates all background threads maintained by the worker.
// Note the worker does not support being closed multiple times.
func (w *worker) close() {
atomic.StoreInt32(&w.running, 0)
w.running.Store(false)
close(w.exitCh)
w.wg.Wait()
}
Expand Down Expand Up @@ -457,7 +457,7 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t
func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done()
var (
interrupt *int32
interrupt *atomic.Int32
minRecommit = recommit // minimal resubmit interval specified by user.
timestamp int64 // timestamp for each round of sealing.
)
Expand All @@ -469,16 +469,16 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
// commit aborts in-flight transaction execution with given signal and resubmits a new one.
commit := func(noempty bool, s int32) {
if interrupt != nil {
atomic.StoreInt32(interrupt, s)
interrupt.Store(s)
}
interrupt = new(int32)
interrupt = new(atomic.Int32)
select {
case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}:
case <-w.exitCh:
return
}
timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
w.newTxs.Store(0)
}
// clearPending cleans the stale pending tasks.
clearPending := func(number uint64) {
Expand Down Expand Up @@ -508,7 +508,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
// higher priced transactions. Disable this overhead for pending blocks.
if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) {
// Short circuit if no new transaction arrives.
if atomic.LoadInt32(&w.newTxs) == 0 {
if w.newTxs.Load() == 0 {
timer.Reset(recommit)
continue
}
Expand Down Expand Up @@ -650,7 +650,7 @@ func (w *worker) mainLoop() {
w.commitWork(nil, true, time.Now().Unix())
}
}
atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))
w.newTxs.Add(int32(len(ev.Txs)))

// System stopped
case <-w.exitCh:
Expand Down Expand Up @@ -877,7 +877,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*
return receipt.Logs, nil
}

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error {
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *atomic.Int32) error {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -887,7 +887,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
for {
// Check interruption signal and abort building if it's fired.
if interrupt != nil {
if signal := atomic.LoadInt32(interrupt); signal != commitInterruptNone {
if signal := interrupt.Load(); signal != commitInterruptNone {
return signalToErr(signal)
}
}
Expand Down Expand Up @@ -1067,7 +1067,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// fillTransactions retrieves the pending transactions from the txpool and fills them
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(interrupt *int32, env *environment) error {
func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) error {
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(true)
Expand Down Expand Up @@ -1102,9 +1102,9 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, *big.Int, e
defer work.discard()

if !params.noTxs {
interrupt := new(int32)
interrupt := new(atomic.Int32)
timer := time.AfterFunc(w.newpayloadTimeout, func() {
atomic.StoreInt32(interrupt, commitInterruptTimeout)
interrupt.Store(commitInterruptTimeout)
})
defer timer.Stop()

Expand All @@ -1122,7 +1122,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, *big.Int, e

// commitWork generates several new sealing tasks based on the parent block
// and submit them to the sealer.
func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int64) {
start := time.Now()

// Set the coinbase if the worker is running or it's required
Expand All @@ -1143,7 +1143,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
}
// Create an empty block based on temporary copied state for
// sealing in advance without waiting block execution finished.
if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
if !noempty && !w.noempty.Load() {
w.commit(work.copy(), nil, false, start)
}
// Fill pending transactions from the txpool into the block.
Expand Down
6 changes: 3 additions & 3 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,11 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
progress = make(chan struct{}, 10)
result = make([]float64, 0, 10)
index = 0
start uint32
start atomic.Bool
)
w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) {
// Short circuit if interval checking hasn't started.
if atomic.LoadUint32(&start) == 0 {
if !start.Load() {
return
}
var wantMinInterval, wantRecommitInterval time.Duration
Expand Down Expand Up @@ -493,7 +493,7 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
w.start()

time.Sleep(time.Second) // Ensure two tasks have been submitted due to start opt
atomic.StoreUint32(&start, 1)
start.Store(true)

w.setRecommitInterval(3 * time.Second)
select {
Expand Down