Skip to content

Commit

Permalink
another approach
Browse files Browse the repository at this point in the history
  • Loading branch information
igorcrevar committed Jun 26, 2023
1 parent e5a3060 commit e0c3d16
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 66 deletions.
81 changes: 47 additions & 34 deletions txpool/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,6 @@ func (m *NonceToTxLookup) reset() {
m.mapping = make(map[uint64]*types.Transaction)
}

func (m *NonceToTxLookup) replace(tx *types.Transaction) *types.Transaction {
if oldTx, ok := m.mapping[tx.Nonce]; ok {
toReturn := new(types.Transaction)

oldTx.CopyTo(toReturn)
tx.CopyTo(oldTx)

return toReturn
}

return nil
}

func (m *NonceToTxLookup) remove(txs ...*types.Transaction) {
for _, tx := range txs {
delete(m.mapping, tx.Nonce)
Expand Down Expand Up @@ -281,39 +268,65 @@ func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) (
return
}

// enqueue attempts tp push the transaction onto the enqueued queue.
// first returning value is transaction that is being replaced
func (a *account) enqueue(tx *types.Transaction) (*types.Transaction, error) {
func (a *account) addTxWithNonce(tx *types.Transaction) error {
a.nonceToTx.lock()
defer a.nonceToTx.lock()

oldTx := a.nonceToTx.get(tx.Nonce)
if oldTx == nil {
// if transaction with same nonce does not exist -> return immediately
return nil
} else if oldTx.GasPrice.Cmp(tx.GasPrice) >= 0 {
// if transaction with same nonce does exist and has >= gas price -> return error
return ErrUnderpriced
}

a.enqueued.lock(true)
defer a.enqueued.unlock()

a.promoted.lock(true)
a.nonceToTx.lock()
defer a.promoted.unlock()

defer func() {
a.nonceToTx.unlock()
a.promoted.unlock()
a.enqueued.unlock()
}()
replaceInQueue := func(txs []*types.Transaction) bool {
for i, x := range txs {
if x.Nonce == tx.Nonce {
txs[i] = tx

return true
}
}

return false
}

// add to nonce map
a.nonceToTx.set(tx)
// first -> try to replace in enqueued
if !replaceInQueue(a.enqueued.queue) {
replaceInQueue(a.promoted.queue) // .. then try to replace in promoted
}

return nil
}

// enqueue attempts tp push the transaction onto the enqueued queue.
func (a *account) enqueue(tx *types.Transaction) error {
a.enqueued.lock(true)
defer a.enqueued.unlock()

if a.enqueued.length() == a.maxEnqueued {
return nil, ErrMaxEnqueuedLimitReached
return ErrMaxEnqueuedLimitReached
}

// the only case when the tx should be replaced is when the nonces are the same and the new one is pricier
if oldTx := a.nonceToTx.get(tx.Nonce); oldTx != nil {
if oldTx.GasPrice.Cmp(tx.GasPrice) < 0 {
return a.nonceToTx.replace(tx), nil
} else {
return nil, ErrUnderpriced
}
} else if tx.Nonce < a.getNonce() {
return nil, ErrNonceTooLow
// reject low nonce tx
if tx.Nonce < a.getNonce() {
return ErrNonceTooLow
}

// enqueue tx
a.enqueued.push(tx)
a.nonceToTx.set(tx)

return nil, nil
return nil
}

// Promote moves eligible transactions from enqueued to promoted.
Expand Down
55 changes: 31 additions & 24 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ through their designated channels. */
// This request is created for (new) transactions
// that passed validation in addTx.
type enqueueRequest struct {
tx *types.Transaction
errchan chan<- error
tx *types.Transaction
}

// A promoteRequest is created each time some account
Expand Down Expand Up @@ -770,20 +769,34 @@ func (p *TxPool) addTx(origin txOrigin, tx *types.Transaction) error {
return ErrAlreadyKnown
}

// initialize account for this address once
p.createAccountOnce(tx.From)
if account := p.accounts.get(tx.From); account != nil {
// reject low nonce tx
if tx.Nonce < account.getNonce() {
return ErrNonceTooLow
}

// send request [BLOCKING]
errchan := make(chan error, 1)
defer close(errchan)
// check if nonce for tx.From account already exists. do tx replacement if needed
if err := account.addTxWithNonce(tx); err != nil {
return err
}

p.enqueueReqCh <- enqueueRequest{tx: tx, errchan: errchan}
// check if max items in enqueue reached
account.enqueued.lock(true)
maxItemsReached := account.enqueued.length() == account.maxEnqueued
account.enqueued.unlock()

if err := <-errchan; err != nil {
return err
if maxItemsReached {
return ErrMaxEnqueuedLimitReached
}
}

// initialize account for this address once
p.createAccountOnce(tx.From)

// send request [BLOCKING]
p.enqueueReqCh <- enqueueRequest{tx: tx}
p.eventManager.signalEvent(proto.EventType_ADDED, tx.Hash)

metrics.SetGauge([]string{txPoolMetrics, "added_tx"}, 1)

return nil
Expand All @@ -801,23 +814,16 @@ func (p *TxPool) handleEnqueueRequest(req enqueueRequest) {
account := p.accounts.get(addr)

// enqueue tx
replacedTx, err := account.enqueue(tx)
if err != nil {
if err := account.enqueue(tx); err != nil {
p.logger.Error("enqueue request", "err", err)
p.index.remove(tx)
req.errchan <- err // notify observer about enqueue error

return
}
account.nonceToTx.lock()
account.nonceToTx.remove(tx) // remove nonce from map
account.nonceToTx.unlock()

req.errchan <- nil // notify observer that there is no enqueue error
p.index.remove(tx)

if replacedTx != nil {
// the old transaction should be evicted from the lookup
// but because we are copying contents of the new tx into pointer of the old tx
// we need to decrease slots by the size of the old tx
p.index.remove(replacedTx)
p.gauge.decrease(slotsRequired(replacedTx))
return
}

if p.logger.IsDebug() {
Expand All @@ -829,7 +835,8 @@ func (p *TxPool) handleEnqueueRequest(req enqueueRequest) {
p.eventManager.signalEvent(proto.EventType_ENQUEUED, tx.Hash)

if tx.Nonce > account.getNonce() {
// don't signal promotion for higher nonce txs
// don't signal promotion for
// higher nonce txs
return
}

Expand Down
10 changes: 2 additions & 8 deletions types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,8 @@ func (t *Transaction) ComputeHash() *Transaction {

func (t *Transaction) Copy() *Transaction {
tt := new(Transaction)
t.CopyTo(tt)

return tt
}

func (t *Transaction) CopyTo(tt *Transaction) {
*tt = *t

tt.size = atomic.Pointer[uint64]{} // clear size pointer

tt.GasPrice = new(big.Int)
if t.GasPrice != nil {
tt.GasPrice.Set(t.GasPrice)
Expand Down Expand Up @@ -133,6 +125,8 @@ func (t *Transaction) CopyTo(tt *Transaction) {

tt.Input = make([]byte, len(t.Input))
copy(tt.Input[:], t.Input[:])

return tt
}

// Cost returns gas * gasPrice + value
Expand Down

0 comments on commit e0c3d16

Please sign in to comment.