Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

[TxPool] Fix duplicate transactions in account #575

Merged
merged 7 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions txpool/lookup_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@ type lookupMap struct {
all map[types.Hash]*types.Transaction
}

// add inserts the given transaction into the map. [thread-safe]
func (m *lookupMap) add(txs ...*types.Transaction) {
// add inserts the given transaction into the map. Returns false
// if it already exists. [thread-safe]
func (m *lookupMap) add(tx *types.Transaction) bool {
m.Lock()
defer m.Unlock()

for _, tx := range txs {
m.all[tx.Hash] = tx
if _, exists := m.all[tx.Hash]; exists {
return false
}

m.all[tx.Hash] = tx

return true
}

// remove removes the given transactions from the map. [thread-safe]
Expand Down
31 changes: 13 additions & 18 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,20 +589,9 @@ func (p *TxPool) addTx(origin txOrigin, tx *types.Transaction) error {

tx.ComputeHash()

// check if already known
if _, ok := p.index.get(tx.Hash); ok {
if origin == gossip {
// silently drop known tx
// that is gossiped back
p.logger.Debug(
"dropping known gossiped transaction",
"hash", tx.Hash.String(),
)

return nil
} else {
return ErrAlreadyKnown
}
// add to index
if ok := p.index.add(tx); !ok {
return ErrAlreadyKnown
}
Kourin1996 marked this conversation as resolved.
Show resolved Hide resolved

// initialize account for this address once
Expand Down Expand Up @@ -632,13 +621,13 @@ func (p *TxPool) handleEnqueueRequest(req enqueueRequest) {
if err := account.enqueue(tx); err != nil {
p.logger.Error("enqueue request", "err", err)

p.index.remove(tx)

return
}

p.logger.Debug("enqueue request", "hash", tx.Hash.String())

// update state
p.index.add(tx)
p.gauge.increase(slotsRequired(tx))

p.eventManager.signalEvent(proto.EventType_ENQUEUED, tx.Hash)
Expand Down Expand Up @@ -693,14 +682,20 @@ func (p *TxPool) addGossipTx(obj interface{}) {

// decode tx
if err := tx.UnmarshalRLP(raw.Raw.Value); err != nil {
p.logger.Error("failed to decode broadcasted tx", "err", err)
p.logger.Error("failed to decode broadcast tx", "err", err)
zivkovicmilos marked this conversation as resolved.
Show resolved Hide resolved

return
}

// add tx
if err := p.addTx(gossip, tx); err != nil {
p.logger.Error("failed to add broadcasted txn", "err", err)
if errors.Is(err, ErrAlreadyKnown) {
p.logger.Debug("rejecting known tx (gossip)", "hash", tx.Hash.String())

return
}

p.logger.Error("failed to add broadcast tx", "err", err, "hash", tx.Hash.String())
}
}

Expand Down
18 changes: 10 additions & 8 deletions txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ func TestAddGossipTx(t *testing.T) {
}

func TestDropKnownGossipTx(t *testing.T) {
t.Parallel()

pool, err := newTestPool()
assert.NoError(t, err)
pool.SetSigner(&mockSigner{})
Expand All @@ -397,18 +399,18 @@ func TestDropKnownGossipTx(t *testing.T) {

// send tx as local
go func() {
err := pool.addTx(local, tx)
assert.NoError(t, err)
assert.NoError(t, pool.addTx(local, tx))
}()
pool.handleEnqueueRequest(<-pool.enqueueReqCh)
<-pool.enqueueReqCh

assert.Equal(t, uint64(1), pool.accounts.get(addr1).enqueued.length())
_, exists := pool.index.get(tx.Hash)
assert.True(t, exists)

// send tx as gossip (will be discarded)
err = pool.addTx(gossip, tx)
assert.Nil(t, err)

assert.Equal(t, uint64(1), pool.accounts.get(addr1).enqueued.length())
assert.ErrorIs(t,
pool.addTx(gossip, tx),
ErrAlreadyKnown,
)
}

func TestAddHandler(t *testing.T) {
Expand Down