diff --git a/txpool/lookup_map.go b/txpool/lookup_map.go index 2a5347fcbc..ebf49fba91 100644 --- a/txpool/lookup_map.go +++ b/txpool/lookup_map.go @@ -12,14 +12,19 @@ type lookupMap struct { all map[types.Hash]*types.Transaction } -// add inserts the given transaction into the map. [thread-safe] -func (m *lookupMap) add(txs ...*types.Transaction) { +// add inserts the given transaction into the map. Returns false +// if it already exists. [thread-safe] +func (m *lookupMap) add(tx *types.Transaction) bool { m.Lock() defer m.Unlock() - for _, tx := range txs { - m.all[tx.Hash] = tx + if _, exists := m.all[tx.Hash]; exists { + return false } + + m.all[tx.Hash] = tx + + return true } // remove removes the given transactions from the map. [thread-safe] diff --git a/txpool/txpool.go b/txpool/txpool.go index 1a89d02725..5c5429bf83 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -589,20 +589,9 @@ func (p *TxPool) addTx(origin txOrigin, tx *types.Transaction) error { tx.ComputeHash() - // check if already known - if _, ok := p.index.get(tx.Hash); ok { - if origin == gossip { - // silently drop known tx - // that is gossiped back - p.logger.Debug( - "dropping known gossiped transaction", - "hash", tx.Hash.String(), - ) - - return nil - } else { - return ErrAlreadyKnown - } + // add to index + if ok := p.index.add(tx); !ok { + return ErrAlreadyKnown } // initialize account for this address once @@ -632,13 +621,13 @@ func (p *TxPool) handleEnqueueRequest(req enqueueRequest) { if err := account.enqueue(tx); err != nil { p.logger.Error("enqueue request", "err", err) + p.index.remove(tx) + return } p.logger.Debug("enqueue request", "hash", tx.Hash.String()) - // update state - p.index.add(tx) p.gauge.increase(slotsRequired(tx)) p.eventManager.signalEvent(proto.EventType_ENQUEUED, tx.Hash) @@ -693,14 +682,20 @@ func (p *TxPool) addGossipTx(obj interface{}) { // decode tx if err := tx.UnmarshalRLP(raw.Raw.Value); err != nil { - p.logger.Error("failed to decode broadcasted tx", "err", err) + p.logger.Error("failed to decode broadcast tx", "err", err) return } // add tx if err := p.addTx(gossip, tx); err != nil { - p.logger.Error("failed to add broadcasted txn", "err", err) + if errors.Is(err, ErrAlreadyKnown) { + p.logger.Debug("rejecting known tx (gossip)", "hash", tx.Hash.String()) + + return + } + + p.logger.Error("failed to add broadcast tx", "err", err, "hash", tx.Hash.String()) } } diff --git a/txpool/txpool_test.go b/txpool/txpool_test.go index b698227b57..a287d6bdbc 100644 --- a/txpool/txpool_test.go +++ b/txpool/txpool_test.go @@ -389,6 +389,8 @@ func TestAddGossipTx(t *testing.T) { } func TestDropKnownGossipTx(t *testing.T) { + t.Parallel() + pool, err := newTestPool() assert.NoError(t, err) pool.SetSigner(&mockSigner{}) @@ -397,18 +399,18 @@ func TestDropKnownGossipTx(t *testing.T) { // send tx as local go func() { - err := pool.addTx(local, tx) - assert.NoError(t, err) + assert.NoError(t, pool.addTx(local, tx)) }() - pool.handleEnqueueRequest(<-pool.enqueueReqCh) + <-pool.enqueueReqCh - assert.Equal(t, uint64(1), pool.accounts.get(addr1).enqueued.length()) + _, exists := pool.index.get(tx.Hash) + assert.True(t, exists) // send tx as gossip (will be discarded) - err = pool.addTx(gossip, tx) - assert.Nil(t, err) - - assert.Equal(t, uint64(1), pool.accounts.get(addr1).enqueued.length()) + assert.ErrorIs(t, + pool.addTx(gossip, tx), + ErrAlreadyKnown, + ) } func TestAddHandler(t *testing.T) {