Skip to content

Commit

Permalink
[TxPool] Fix duplicate transactions in account (0xPolygon#575)
Browse files Browse the repository at this point in the history
* fixes gossip transaction ending up in enqueued next to the original 

* fixes "failed to apply tx: incorrect nonce" logs
  • Loading branch information
dbrajovic authored Jun 10, 2022
1 parent 6bb316b commit 5249831
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 30 deletions.
13 changes: 9 additions & 4 deletions txpool/lookup_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@ type lookupMap struct {
all map[types.Hash]*types.Transaction
}

// add inserts the given transaction into the map. [thread-safe]
func (m *lookupMap) add(txs ...*types.Transaction) {
// add inserts the given transaction into the map. Returns false
// if it already exists. [thread-safe]
func (m *lookupMap) add(tx *types.Transaction) bool {
m.Lock()
defer m.Unlock()

for _, tx := range txs {
m.all[tx.Hash] = tx
if _, exists := m.all[tx.Hash]; exists {
return false
}

m.all[tx.Hash] = tx

return true
}

// remove removes the given transactions from the map. [thread-safe]
Expand Down
31 changes: 13 additions & 18 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,20 +589,9 @@ func (p *TxPool) addTx(origin txOrigin, tx *types.Transaction) error {

tx.ComputeHash()

// check if already known
if _, ok := p.index.get(tx.Hash); ok {
if origin == gossip {
// silently drop known tx
// that is gossiped back
p.logger.Debug(
"dropping known gossiped transaction",
"hash", tx.Hash.String(),
)

return nil
} else {
return ErrAlreadyKnown
}
// add to index
if ok := p.index.add(tx); !ok {
return ErrAlreadyKnown
}

// initialize account for this address once
Expand Down Expand Up @@ -632,13 +621,13 @@ func (p *TxPool) handleEnqueueRequest(req enqueueRequest) {
if err := account.enqueue(tx); err != nil {
p.logger.Error("enqueue request", "err", err)

p.index.remove(tx)

return
}

p.logger.Debug("enqueue request", "hash", tx.Hash.String())

// update state
p.index.add(tx)
p.gauge.increase(slotsRequired(tx))

p.eventManager.signalEvent(proto.EventType_ENQUEUED, tx.Hash)
Expand Down Expand Up @@ -693,14 +682,20 @@ func (p *TxPool) addGossipTx(obj interface{}) {

// decode tx
if err := tx.UnmarshalRLP(raw.Raw.Value); err != nil {
p.logger.Error("failed to decode broadcasted tx", "err", err)
p.logger.Error("failed to decode broadcast tx", "err", err)

return
}

// add tx
if err := p.addTx(gossip, tx); err != nil {
p.logger.Error("failed to add broadcasted txn", "err", err)
if errors.Is(err, ErrAlreadyKnown) {
p.logger.Debug("rejecting known tx (gossip)", "hash", tx.Hash.String())

return
}

p.logger.Error("failed to add broadcast tx", "err", err, "hash", tx.Hash.String())
}
}

Expand Down
18 changes: 10 additions & 8 deletions txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ func TestAddGossipTx(t *testing.T) {
}

func TestDropKnownGossipTx(t *testing.T) {
t.Parallel()

pool, err := newTestPool()
assert.NoError(t, err)
pool.SetSigner(&mockSigner{})
Expand All @@ -397,18 +399,18 @@ func TestDropKnownGossipTx(t *testing.T) {

// send tx as local
go func() {
err := pool.addTx(local, tx)
assert.NoError(t, err)
assert.NoError(t, pool.addTx(local, tx))
}()
pool.handleEnqueueRequest(<-pool.enqueueReqCh)
<-pool.enqueueReqCh

assert.Equal(t, uint64(1), pool.accounts.get(addr1).enqueued.length())
_, exists := pool.index.get(tx.Hash)
assert.True(t, exists)

// send tx as gossip (will be discarded)
err = pool.addTx(gossip, tx)
assert.Nil(t, err)

assert.Equal(t, uint64(1), pool.accounts.get(addr1).enqueued.length())
assert.ErrorIs(t,
pool.addTx(gossip, tx),
ErrAlreadyKnown,
)
}

func TestAddHandler(t *testing.T) {
Expand Down

0 comments on commit 5249831

Please sign in to comment.