diff --git a/txpool/account.go b/txpool/account.go index 8a989e4e6d..8a24657480 100644 --- a/txpool/account.go +++ b/txpool/account.go @@ -167,19 +167,6 @@ func (m *NonceToTxLookup) reset() { m.mapping = make(map[uint64]*types.Transaction) } -func (m *NonceToTxLookup) replace(tx *types.Transaction) *types.Transaction { - if oldTx, ok := m.mapping[tx.Nonce]; ok { - toReturn := new(types.Transaction) - - oldTx.CopyTo(toReturn) - tx.CopyTo(oldTx) - - return toReturn - } - - return nil -} - func (m *NonceToTxLookup) remove(txs ...*types.Transaction) { for _, tx := range txs { delete(m.mapping, tx.Nonce) @@ -281,39 +268,65 @@ func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) ( return } -// enqueue attempts tp push the transaction onto the enqueued queue. -// first returning value is transaction that is being replaced -func (a *account) enqueue(tx *types.Transaction) (*types.Transaction, error) { +func (a *account) addTxWithNonce(tx *types.Transaction) error { + a.nonceToTx.lock() + defer a.nonceToTx.lock() + + oldTx := a.nonceToTx.get(tx.Nonce) + if oldTx == nil { + // if transaction with same nonce does not exist -> return immediately + return nil + } else if oldTx.GasPrice.Cmp(tx.GasPrice) >= 0 { + // if transaction with same nonce does exist and has >= gas price -> return error + return ErrUnderpriced + } + a.enqueued.lock(true) + defer a.enqueued.unlock() + a.promoted.lock(true) - a.nonceToTx.lock() + defer a.promoted.unlock() - defer func() { - a.nonceToTx.unlock() - a.promoted.unlock() - a.enqueued.unlock() - }() + replaceInQueue := func(txs []*types.Transaction) bool { + for i, x := range txs { + if x.Nonce == tx.Nonce { + txs[i] = tx + + return true + } + } + + return false + } + + // add to nonce map + a.nonceToTx.set(tx) + // first -> try to replace in enqueued + if !replaceInQueue(a.enqueued.queue) { + replaceInQueue(a.promoted.queue) // .. then try to replace in promoted + } + + return nil +} + +// enqueue attempts tp push the transaction onto the enqueued queue. +func (a *account) enqueue(tx *types.Transaction) error { + a.enqueued.lock(true) + defer a.enqueued.unlock() if a.enqueued.length() == a.maxEnqueued { - return nil, ErrMaxEnqueuedLimitReached + return ErrMaxEnqueuedLimitReached } - // the only case when the tx should be replaced is when the nonces are the same and the new one is pricier - if oldTx := a.nonceToTx.get(tx.Nonce); oldTx != nil { - if oldTx.GasPrice.Cmp(tx.GasPrice) < 0 { - return a.nonceToTx.replace(tx), nil - } else { - return nil, ErrUnderpriced - } - } else if tx.Nonce < a.getNonce() { - return nil, ErrNonceTooLow + // reject low nonce tx + if tx.Nonce < a.getNonce() { + return ErrNonceTooLow } // enqueue tx a.enqueued.push(tx) - a.nonceToTx.set(tx) - return nil, nil + return nil } // Promote moves eligible transactions from enqueued to promoted. diff --git a/txpool/txpool.go b/txpool/txpool.go index 53d9a22c56..135c091d58 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -108,8 +108,7 @@ through their designated channels. */ // This request is created for (new) transactions // that passed validation in addTx. type enqueueRequest struct { - tx *types.Transaction - errchan chan<- error + tx *types.Transaction } // A promoteRequest is created each time some account @@ -770,20 +769,34 @@ func (p *TxPool) addTx(origin txOrigin, tx *types.Transaction) error { return ErrAlreadyKnown } - // initialize account for this address once - p.createAccountOnce(tx.From) + if account := p.accounts.get(tx.From); account != nil { + // reject low nonce tx + if tx.Nonce < account.getNonce() { + return ErrNonceTooLow + } - // send request [BLOCKING] - errchan := make(chan error, 1) - defer close(errchan) + // check if nonce for tx.From account already exists. do tx replacement if needed + if err := account.addTxWithNonce(tx); err != nil { + return err + } - p.enqueueReqCh <- enqueueRequest{tx: tx, errchan: errchan} + // check if max items in enqueue reached + account.enqueued.lock(true) + maxItemsReached := account.enqueued.length() == account.maxEnqueued + account.enqueued.unlock() - if err := <-errchan; err != nil { - return err + if maxItemsReached { + return ErrMaxEnqueuedLimitReached + } } + // initialize account for this address once + p.createAccountOnce(tx.From) + + // send request [BLOCKING] + p.enqueueReqCh <- enqueueRequest{tx: tx} p.eventManager.signalEvent(proto.EventType_ADDED, tx.Hash) + metrics.SetGauge([]string{txPoolMetrics, "added_tx"}, 1) return nil @@ -801,23 +814,16 @@ func (p *TxPool) handleEnqueueRequest(req enqueueRequest) { account := p.accounts.get(addr) // enqueue tx - replacedTx, err := account.enqueue(tx) - if err != nil { + if err := account.enqueue(tx); err != nil { p.logger.Error("enqueue request", "err", err) - p.index.remove(tx) - req.errchan <- err // notify observer about enqueue error - return - } + account.nonceToTx.lock() + account.nonceToTx.remove(tx) // remove nonce from map + account.nonceToTx.unlock() - req.errchan <- nil // notify observer that there is no enqueue error + p.index.remove(tx) - if replacedTx != nil { - // the old transaction should be evicted from the lookup - // but because we are copying contents of the new tx into pointer of the old tx - // we need to decrease slots by the size of the old tx - p.index.remove(replacedTx) - p.gauge.decrease(slotsRequired(replacedTx)) + return } if p.logger.IsDebug() { @@ -829,7 +835,8 @@ func (p *TxPool) handleEnqueueRequest(req enqueueRequest) { p.eventManager.signalEvent(proto.EventType_ENQUEUED, tx.Hash) if tx.Nonce > account.getNonce() { - // don't signal promotion for higher nonce txs + // don't signal promotion for + // higher nonce txs return } diff --git a/types/transaction.go b/types/transaction.go index 4c2c8fd8da..cfa3266286 100644 --- a/types/transaction.go +++ b/types/transaction.go @@ -89,16 +89,8 @@ func (t *Transaction) ComputeHash() *Transaction { func (t *Transaction) Copy() *Transaction { tt := new(Transaction) - t.CopyTo(tt) - - return tt -} - -func (t *Transaction) CopyTo(tt *Transaction) { *tt = *t - tt.size = atomic.Pointer[uint64]{} // clear size pointer - tt.GasPrice = new(big.Int) if t.GasPrice != nil { tt.GasPrice.Set(t.GasPrice) @@ -133,6 +125,8 @@ func (t *Transaction) CopyTo(tt *Transaction) { tt.Input = make([]byte, len(t.Input)) copy(tt.Input[:], t.Input[:]) + + return tt } // Cost returns gas * gasPrice + value