diff --git a/consensus/dev/dev.go b/consensus/dev/dev.go index 985fe5c490..c24df66a00 100644 --- a/consensus/dev/dev.go +++ b/consensus/dev/dev.go @@ -101,8 +101,11 @@ func (d *Dev) run() { // There are new transactions in the pool, try to seal them header := d.blockchain.Header() if err := d.writeNewBlock(header); err != nil { + d.txpool.ReinsertProposed() d.logger.Error("failed to mine block", "err", err) } + + d.txpool.ClearProposed() } } diff --git a/consensus/polybft/consensus_runtime.go b/consensus/polybft/consensus_runtime.go index 805970ec0f..93bcbcbb30 100644 --- a/consensus/polybft/consensus_runtime.go +++ b/consensus/polybft/consensus_runtime.go @@ -48,6 +48,8 @@ type txPoolInterface interface { Demote(*types.Transaction) SetSealing(bool) ResetWithBlock(*types.Block) + ReinsertProposed() + ClearProposed() } // epochMetadata is the static info for epoch currently being processed @@ -1007,8 +1009,14 @@ func (c *consensusRuntime) BuildCommitMessage(proposalHash []byte, view *proto.V return message } -// StartRound starts a new round with the specified view +// StartRound represents round start callback func (c *consensusRuntime) StartRound(view *proto.View) error { + if view.Round > 0 { + c.config.txPool.ReinsertProposed() + } else { + c.config.txPool.ClearProposed() + } + return nil } diff --git a/consensus/polybft/consensus_runtime_test.go b/consensus/polybft/consensus_runtime_test.go index fc74f6a8c8..d58616eaca 100644 --- a/consensus/polybft/consensus_runtime_test.go +++ b/consensus/polybft/consensus_runtime_test.go @@ -1002,6 +1002,41 @@ func TestConsensusRuntime_BuildPrepareMessage(t *testing.T) { assert.Equal(t, signedMsg, runtime.BuildPrepareMessage(proposalHash, view)) } +func TestConsensusRuntime_StartRound(t *testing.T) { + cases := []struct { + funcName string + round uint64 + }{ + { + funcName: "ClearProposed", + round: 0, + }, + { + funcName: "ReinsertProposed", + round: 1, + }, + } + + for _, c := range cases { + c := c + t.Run(c.funcName, func(t *testing.T) { + txPool := new(txPoolMock) + txPool.On(c.funcName).Once() + + runtime := &consensusRuntime{ + config: &runtimeConfig{ + txPool: txPool, + }, + logger: hclog.NewNullLogger(), + } + + view := &proto.View{Round: c.round} + require.NoError(t, runtime.StartRound(view)) + txPool.AssertExpectations(t) + }) + } +} + func createTestBlocks(t *testing.T, numberOfBlocks, defaultEpochSize uint64, validatorSet validator.AccountSet) (*types.Header, *testHeadersMap) { t.Helper() diff --git a/consensus/polybft/mocks_test.go b/consensus/polybft/mocks_test.go index cb53583c2b..1a0f6297ca 100644 --- a/consensus/polybft/mocks_test.go +++ b/consensus/polybft/mocks_test.go @@ -364,6 +364,14 @@ func (tp *txPoolMock) ResetWithBlock(fullBlock *types.Block) { tp.Called(fullBlock) } +func (tp *txPoolMock) ReinsertProposed() { + tp.Called() +} + +func (tp *txPoolMock) ClearProposed() { + tp.Called() +} + var _ syncer.Syncer = (*syncerMock)(nil) type syncerMock struct { diff --git a/txpool/account.go b/txpool/account.go index 408ea62928..735cf2a8ab 100644 --- a/txpool/account.go +++ b/txpool/account.go @@ -1,6 +1,7 @@ package txpool import ( + "maps" "sync" "sync/atomic" @@ -19,11 +20,13 @@ type accountsMap struct { // Initializes an account for the given address. func (m *accountsMap) initOnce(addr types.Address, nonce uint64) *account { a, loaded := m.LoadOrStore(addr, &account{ - enqueued: newAccountQueue(), - promoted: newAccountQueue(), - nonceToTx: newNonceToTxLookup(), - maxEnqueued: m.maxEnqueuedLimit, - nextNonce: nonce, + enqueued: newAccountQueue(), + promoted: newAccountQueue(), + proposed: newAccountQueue(), + nonceToTx: newNonceToTxLookup(), + nonceProposed: newNonceToTxLookup(), + maxEnqueued: m.maxEnqueuedLimit, + nextNonce: nonce, }) newAccount := a.(*account) //nolint:forcetypeassert @@ -129,6 +132,78 @@ func (m *accountsMap) allTxs(includeEnqueued bool) ( return } +func (m *accountsMap) reinsertProposed() uint64 { + var count uint64 + + m.Range(func(key, value interface{}) bool { + accountKey, ok := key.(types.Address) + if !ok { + return false + } + + account := m.get(accountKey) + + account.promoted.lock(true) + account.proposed.lock(true) + account.nonceToTx.lock() + account.nonceProposed.lock() + + defer func() { + account.nonceProposed.unlock() + account.nonceToTx.unlock() + account.proposed.unlock() + account.promoted.unlock() + }() + + if account.proposed.length() > 0 { + for { + tx := account.proposed.peek() + if tx == nil { + break + } + + account.promoted.push(tx) + account.proposed.pop() + + count++ + } + + maps.Copy(account.nonceToTx.mapping, account.nonceProposed.mapping) + account.nonceProposed.reset() + } + + return true + }) + + return count +} + +func (m *accountsMap) clearProposed() { + m.Range(func(key, value interface{}) bool { + accountKey, ok := key.(types.Address) + if !ok { + return false + } + + account := m.get(accountKey) + + account.proposed.lock(true) + account.nonceProposed.lock() + + defer func() { + account.nonceProposed.unlock() + account.proposed.unlock() + }() + + if account.proposed.length() > 0 { + account.proposed.clear() + account.nonceProposed.reset() + } + + return true + }) +} + type nonceToTxLookup struct { mapping map[uint64]*types.Transaction mutex sync.Mutex @@ -179,8 +254,8 @@ func (m *nonceToTxLookup) remove(txs ...*types.Transaction) { // are ready to be moved to the promoted queue. // lock order is important! promoted.lock(true), enqueued.lock(true), nonceToTx.lock() type account struct { - enqueued, promoted *accountQueue - nonceToTx *nonceToTxLookup + enqueued, promoted, proposed *accountQueue + nonceToTx, nonceProposed *nonceToTxLookup nextNonce uint64 demotions uint64 @@ -226,14 +301,22 @@ func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) ( ) { a.promoted.lock(true) a.enqueued.lock(true) + a.proposed.lock(true) a.nonceToTx.lock() + a.nonceProposed.lock() defer func() { + a.nonceProposed.unlock() a.nonceToTx.unlock() + a.proposed.unlock() a.enqueued.unlock() a.promoted.unlock() }() + // prune the proposed txs + prunedProposed := a.proposed.prune(nonce) + a.nonceProposed.remove(prunedProposed...) + // prune the promoted txs prunedPromoted = a.promoted.prune(nonce) a.nonceToTx.remove(prunedPromoted...) diff --git a/txpool/txpool.go b/txpool/txpool.go index 98bd43e8f6..90292ee69f 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -357,19 +357,29 @@ func (p *TxPool) Pop(tx *types.Transaction) { account := p.accounts.get(tx.From()) account.promoted.lock(true) + account.proposed.lock(true) account.nonceToTx.lock() + account.nonceProposed.lock() defer func() { + account.nonceProposed.unlock() account.nonceToTx.unlock() + account.proposed.unlock() account.promoted.unlock() }() // pop the top most promoted tx account.promoted.pop() + // keep it till round completion + account.proposed.push(tx) + // update the account nonce -> *tx map account.nonceToTx.remove(tx) + // keep proposed tx nonce + account.nonceProposed.set(tx) + // successfully popping an account resets its demotions count to 0 account.resetDemotions() @@ -398,10 +408,14 @@ func (p *TxPool) Drop(tx *types.Transaction) { func (p *TxPool) dropAccount(account *account, nextNonce uint64, tx *types.Transaction) { account.promoted.lock(true) account.enqueued.lock(true) + account.proposed.lock(true) account.nonceToTx.lock() + account.nonceProposed.lock() defer func() { + account.nonceProposed.unlock() account.nonceToTx.unlock() + account.proposed.unlock() account.enqueued.unlock() account.promoted.unlock() }() @@ -424,8 +438,15 @@ func (p *TxPool) dropAccount(account *account, nextNonce uint64, tx *types.Trans // reset accounts nonce map account.nonceToTx.reset() + // reset proposed nonce map + account.nonceProposed.reset() + + // drop proposed + dropped := account.proposed.clear() + p.index.remove(dropped...) + // drop promoted - dropped := account.promoted.clear() + dropped = account.promoted.clear() clearAccountQueue(dropped) // update metrics @@ -523,6 +544,20 @@ func (p *TxPool) ResetWithBlock(block *types.Block) { } } +// ReinsertProposed returns all txs from the accounts proposed queue to the promoted queue +// it is called from consensus_runtime when new round > 0 starts or when current sequence is cancelled +func (p *TxPool) ReinsertProposed() { + count := p.accounts.reinsertProposed() + p.gauge.increase(count) + p.Prepare() +} + +// ClearProposed clears accounts proposed queue when round 0 starts +// it is called from consensus_runtime +func (p *TxPool) ClearProposed() { + p.accounts.clearProposed() +} + // validateTx ensures the transaction conforms to specific // constraints before entering the pool. func (p *TxPool) validateTx(tx *types.Transaction) error { diff --git a/txpool/txpool_test.go b/txpool/txpool_test.go index dc4376cc4a..be6d9f3a2d 100644 --- a/txpool/txpool_test.go +++ b/txpool/txpool_test.go @@ -109,6 +109,7 @@ func newTestPoolWithSlots(maxSlots uint64, mockStore ...store) (*TxPool, error) type accountState struct { enqueued, promoted, + proposed, nextNonce uint64 } @@ -3188,6 +3189,213 @@ func TestRecovery(t *testing.T) { } } +func TestProposed(t *testing.T) { + commonAssert := func(accounts map[types.Address]accountState, pool *TxPool) { + for addr := range accounts { + assert.Equal(t, // proposed + accounts[addr].proposed, + pool.accounts.get(addr).proposed.length()) + + assert.Equal(t, // promoted + accounts[addr].promoted, + pool.accounts.get(addr).promoted.length()) + } + } + + const ( + REINSERT = 1 + CLEAN = 2 + ) + + testCases := []struct { + name string + method int + doPop bool + allTxs map[types.Address][]*types.Transaction + beforeCall result + afterCall result + }{ + { + name: "reinsert with pop", + method: REINSERT, + doPop: true, + allTxs: map[types.Address][]*types.Transaction{ + addr1: { + newTx(addr1, 0, 1, types.LegacyTxType), + newTx(addr1, 1, 1, types.LegacyTxType), + }, + }, + beforeCall: result{ + slots: 1, + accounts: map[types.Address]accountState{ + addr1: { + proposed: 1, + promoted: 1, + }, + }, + }, + afterCall: result{ + slots: 2, + accounts: map[types.Address]accountState{ + addr1: { + proposed: 0, + promoted: 2, + }, + }, + }, + }, + { + name: "reinsert without pop", + method: REINSERT, + doPop: false, + allTxs: map[types.Address][]*types.Transaction{ + addr1: { + newTx(addr1, 0, 1, types.LegacyTxType), + newTx(addr1, 1, 1, types.LegacyTxType), + }, + }, + beforeCall: result{ + slots: 2, + accounts: map[types.Address]accountState{ + addr1: { + proposed: 0, + promoted: 2, + }, + }, + }, + afterCall: result{ + slots: 2, + accounts: map[types.Address]accountState{ + addr1: { + proposed: 0, + promoted: 2, + }, + }, + }, + }, + { + name: "clean with pop", + method: CLEAN, + doPop: true, + allTxs: map[types.Address][]*types.Transaction{ + addr1: { + newTx(addr1, 0, 1, types.LegacyTxType), + newTx(addr1, 1, 1, types.LegacyTxType), + }, + }, + beforeCall: result{ + slots: 1, + accounts: map[types.Address]accountState{ + addr1: { + proposed: 1, + promoted: 1, + }, + }, + }, + afterCall: result{ + slots: 1, + accounts: map[types.Address]accountState{ + addr1: { + proposed: 0, + promoted: 1, + }, + }, + }, + }, + { + name: "clean without pop", + method: CLEAN, + doPop: false, + allTxs: map[types.Address][]*types.Transaction{ + addr1: { + newTx(addr1, 0, 1, types.LegacyTxType), + newTx(addr1, 1, 1, types.LegacyTxType), + }, + }, + beforeCall: result{ + slots: 2, + accounts: map[types.Address]accountState{ + addr1: { + proposed: 0, + promoted: 2, + }, + }, + }, + afterCall: result{ + slots: 2, + accounts: map[types.Address]accountState{ + addr1: { + proposed: 0, + promoted: 2, + }, + }, + }, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.name, func(t *testing.T) { + // create pool + pool, err := newTestPool() + assert.NoError(t, err) + pool.SetSigner(&mockSigner{}) + + pool.Start() + defer pool.Close() + + promoteSubscription := pool.eventManager.subscribe( + []proto.EventType{proto.EventType_PROMOTED}, + ) + + // setup prestate + totalTx := 0 + expectedEnqueued := uint64(0) + + for addr, txs := range test.allTxs { + // preset nonce so promotions can happen + acc := pool.getOrCreateAccount(addr) + acc.setNonce(txs[0].Nonce()) + + expectedEnqueued += test.afterCall.accounts[addr].enqueued + + // send txs + for _, tx := range txs { + totalTx++ + + assert.NoError(t, pool.addTx(local, tx)) + } + } + + ctx, cancelFn := context.WithTimeout(context.Background(), time.Second*10) + defer cancelFn() + + // All txns should get added + assert.Len(t, waitForEvents(ctx, promoteSubscription, totalTx), totalTx) + + pool.Prepare() + tx := pool.Peek() + assert.NotNil(t, tx) + + if test.doPop { + pool.Pop(tx) + } + + assert.Equal(t, test.beforeCall.slots, pool.gauge.read()) + commonAssert(test.beforeCall.accounts, pool) + + if test.method == REINSERT { + pool.ReinsertProposed() + } else { + pool.ClearProposed() + } + + assert.Equal(t, test.afterCall.slots, pool.gauge.read()) + commonAssert(test.afterCall.accounts, pool) + }) + } +} + func TestGetTxs(t *testing.T) { t.Parallel()