Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for txpool loosing txs #210

Merged
merged 8 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions consensus/dev/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ func (d *Dev) run() {
// There are new transactions in the pool, try to seal them
header := d.blockchain.Header()
if err := d.writeNewBlock(header); err != nil {
d.txpool.ReinsertProposed()
d.logger.Error("failed to mine block", "err", err)
}

d.txpool.ClearProposed()
}
}

Expand Down
8 changes: 8 additions & 0 deletions consensus/polybft/consensus_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type txPoolInterface interface {
Demote(*types.Transaction)
SetSealing(bool)
ResetWithBlock(*types.Block)
ReinsertProposed()
ClearProposed()
}

// epochMetadata is the static info for epoch currently being processed
Expand Down Expand Up @@ -1007,6 +1009,12 @@ func (c *consensusRuntime) BuildCommitMessage(proposalHash []byte, view *proto.V

// StartRound starts a new round with the specified view
oliverbundalo marked this conversation as resolved.
Show resolved Hide resolved
func (c *consensusRuntime) StartRound(view *proto.View) error {
if view.Round > 0 {
c.config.txPool.ReinsertProposed()
} else {
c.config.txPool.ClearProposed()
}
Stefan-Ethernal marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

Expand Down
35 changes: 35 additions & 0 deletions consensus/polybft/consensus_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,41 @@ func TestConsensusRuntime_BuildPrepareMessage(t *testing.T) {
assert.Equal(t, signedMsg, runtime.BuildPrepareMessage(proposalHash, view))
}

func TestConsensusRuntime_StartRound(t *testing.T) {
cases := []struct {
name string
oliverbundalo marked this conversation as resolved.
Show resolved Hide resolved
round uint64
}{
{
name: "ClearProposed",
round: 0,
},
{
name: "ReinsertProposed",
round: 1,
},
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
txPool := new(txPoolMock)
txPool.On(c.name).Once()

runtime := &consensusRuntime{
config: &runtimeConfig{
txPool: txPool,
},
logger: hclog.NewNullLogger(),
}

view := &proto.View{Round: c.round}
require.NoError(t, runtime.StartRound(view))
oliverbundalo marked this conversation as resolved.
Show resolved Hide resolved
txPool.AssertExpectations(t)
})
}
}

func createTestBlocks(t *testing.T, numberOfBlocks, defaultEpochSize uint64,
validatorSet validator.AccountSet) (*types.Header, *testHeadersMap) {
t.Helper()
Expand Down
8 changes: 8 additions & 0 deletions consensus/polybft/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,14 @@ func (tp *txPoolMock) ResetWithBlock(fullBlock *types.Block) {
tp.Called(fullBlock)
}

func (tp *txPoolMock) ReinsertProposed() {
tp.Called()
}

func (tp *txPoolMock) ClearProposed() {
tp.Called()
}

var _ syncer.Syncer = (*syncerMock)(nil)

type syncerMock struct {
Expand Down
98 changes: 91 additions & 7 deletions txpool/account.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package txpool

import (
"maps"
"sync"
"sync/atomic"

Expand All @@ -19,11 +20,13 @@ type accountsMap struct {
// Initializes an account for the given address.
func (m *accountsMap) initOnce(addr types.Address, nonce uint64) *account {
a, loaded := m.LoadOrStore(addr, &account{
enqueued: newAccountQueue(),
promoted: newAccountQueue(),
nonceToTx: newNonceToTxLookup(),
maxEnqueued: m.maxEnqueuedLimit,
nextNonce: nonce,
enqueued: newAccountQueue(),
promoted: newAccountQueue(),
proposed: newAccountQueue(),
nonceToTx: newNonceToTxLookup(),
nonceProposed: newNonceToTxLookup(),
maxEnqueued: m.maxEnqueuedLimit,
nextNonce: nonce,
})
newAccount := a.(*account) //nolint:forcetypeassert

Expand Down Expand Up @@ -129,6 +132,79 @@ func (m *accountsMap) allTxs(includeEnqueued bool) (
return
}

func (m *accountsMap) reinsertProposed() uint64 {
var count uint64

m.Range(func(key, value interface{}) bool {
accountKey, ok := key.(types.Address)
if !ok {
return false
}

account := m.get(accountKey)

account.promoted.lock(true)
account.proposed.lock(true)
account.nonceToTx.lock()
account.nonceProposed.lock()

defer func() {
account.nonceProposed.unlock()
account.nonceToTx.unlock()
account.proposed.unlock()
account.promoted.unlock()
}()

if account.proposed.length() > 0 {
for {
tx := account.proposed.peek()
if tx == nil {
break
}

account.promoted.push(tx)
account.proposed.pop()

count++
}
account.proposed.clear()
oliverbundalo marked this conversation as resolved.
Show resolved Hide resolved

maps.Copy(account.nonceToTx.mapping, account.nonceProposed.mapping)
account.nonceProposed.reset()
}

return true
})

return count
}

func (m *accountsMap) clearProposed() {
m.Range(func(key, value interface{}) bool {
accountKey, ok := key.(types.Address)
if !ok {
return false
}

account := m.get(accountKey)

account.proposed.lock(true)
account.nonceProposed.lock()

defer func() {
account.nonceProposed.unlock()
account.proposed.unlock()
}()

if account.proposed.length() > 0 {
goran-ethernal marked this conversation as resolved.
Show resolved Hide resolved
account.proposed.clear()
account.nonceProposed.reset()
}

return true
})
}

type nonceToTxLookup struct {
mapping map[uint64]*types.Transaction
mutex sync.Mutex
Expand Down Expand Up @@ -179,8 +255,8 @@ func (m *nonceToTxLookup) remove(txs ...*types.Transaction) {
// are ready to be moved to the promoted queue.
// lock order is important! promoted.lock(true), enqueued.lock(true), nonceToTx.lock()
type account struct {
enqueued, promoted *accountQueue
nonceToTx *nonceToTxLookup
enqueued, promoted, proposed *accountQueue
nonceToTx, nonceProposed *nonceToTxLookup

nextNonce uint64
demotions uint64
Expand Down Expand Up @@ -226,14 +302,22 @@ func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) (
) {
a.promoted.lock(true)
a.enqueued.lock(true)
a.proposed.lock(true)
a.nonceToTx.lock()
a.nonceProposed.lock()

defer func() {
a.nonceProposed.unlock()
a.nonceToTx.unlock()
a.proposed.unlock()
a.enqueued.unlock()
a.promoted.unlock()
}()

// prune the proposed txs
prunedProposed := a.proposed.prune(nonce)
a.nonceProposed.remove(prunedProposed...)

// prune the promoted txs
prunedPromoted = a.promoted.prune(nonce)
a.nonceToTx.remove(prunedPromoted...)
Expand Down
33 changes: 32 additions & 1 deletion txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,19 +357,29 @@ func (p *TxPool) Pop(tx *types.Transaction) {
account := p.accounts.get(tx.From())

account.promoted.lock(true)
account.proposed.lock(true)
account.nonceToTx.lock()
account.nonceProposed.lock()

defer func() {
account.nonceProposed.unlock()
account.nonceToTx.unlock()
account.proposed.unlock()
account.promoted.unlock()
}()

// pop the top most promoted tx
account.promoted.pop()

// keep it till round completion
account.proposed.push(tx)

// update the account nonce -> *tx map
account.nonceToTx.remove(tx)

// keep proposed tx nonce
account.nonceProposed.set(tx)

// successfully popping an account resets its demotions count to 0
account.resetDemotions()

Expand Down Expand Up @@ -398,10 +408,14 @@ func (p *TxPool) Drop(tx *types.Transaction) {
func (p *TxPool) dropAccount(account *account, nextNonce uint64, tx *types.Transaction) {
account.promoted.lock(true)
account.enqueued.lock(true)
account.proposed.lock(true)
account.nonceToTx.lock()
account.nonceProposed.lock()

defer func() {
account.nonceProposed.unlock()
account.nonceToTx.unlock()
account.proposed.unlock()
account.enqueued.unlock()
account.promoted.unlock()
}()
Expand All @@ -424,8 +438,15 @@ func (p *TxPool) dropAccount(account *account, nextNonce uint64, tx *types.Trans
// reset accounts nonce map
account.nonceToTx.reset()

// reset proposed nonce map
account.nonceProposed.reset()

// drop proposed
dropped := account.proposed.clear()
p.index.remove(dropped...)

// drop promoted
dropped := account.promoted.clear()
dropped = account.promoted.clear()
clearAccountQueue(dropped)

// update metrics
Expand Down Expand Up @@ -523,6 +544,16 @@ func (p *TxPool) ResetWithBlock(block *types.Block) {
}
}

func (p *TxPool) ReinsertProposed() {
oliverbundalo marked this conversation as resolved.
Show resolved Hide resolved
count := p.accounts.reinsertProposed()
p.gauge.increase(count)
p.Prepare()
}

func (p *TxPool) ClearProposed() {
oliverbundalo marked this conversation as resolved.
Show resolved Hide resolved
p.accounts.clearProposed()
}

// validateTx ensures the transaction conforms to specific
// constraints before entering the pool.
func (p *TxPool) validateTx(tx *types.Transaction) error {
Expand Down
Loading
Loading