diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index 610f3cfded..b808976e80 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -150,9 +150,9 @@ impl AsyncBlockchainDb { make_async_fn!(fetch_kernels_by_mmr_position(start: u64, end: u64) -> Vec, "fetch_kernels_by_mmr_position"); //---------------------------------- MMR --------------------------------------------// - make_async_fn!(prepare_block_merkle_roots(template: NewBlockTemplate) -> Block, "create_block"); + make_async_fn!(prepare_block_merkle_roots(template: NewBlockTemplate) -> Block, "prepare_block_merkle_roots"); - make_async_fn!(fetch_mmr_size(tree: MmrTree) -> u64, "fetch_mmr_node_count"); + make_async_fn!(fetch_mmr_size(tree: MmrTree) -> u64, "fetch_mmr_size"); make_async_fn!(rewind_to_height(height: u64) -> Vec>, "rewind_to_height"); diff --git a/base_layer/core/src/mempool/mempool_storage.rs b/base_layer/core/src/mempool/mempool_storage.rs index 518cac1ae7..463d069455 100644 --- a/base_layer/core/src/mempool/mempool_storage.rs +++ b/base_layer/core/src/mempool/mempool_storage.rs @@ -38,9 +38,9 @@ use log::*; use std::sync::Arc; use tari_crypto::tari_utilities::{hex::Hex, Hashable}; -pub const LOG_TARGET: &str = "c::mp::mempool"; +pub const LOG_TARGET: &str = "c::mp::mempool_storage"; -/// The Mempool consists of an Unconfirmed Transaction Pool, Pending Pool, Orphan Pool and Reorg Pool and is responsible +/// The Mempool consists of an Unconfirmed Transaction Pool and Reorg Pool and is responsible /// for managing and maintaining all unconfirmed transactions have not yet been included in a block, and transactions /// that have recently been included in a block. pub struct MempoolStorage { @@ -50,7 +50,7 @@ pub struct MempoolStorage { } impl MempoolStorage { - /// Create a new Mempool with an UnconfirmedPool, OrphanPool, PendingPool and ReOrgPool. + /// Create a new Mempool with an UnconfirmedPool and ReOrgPool. pub fn new(config: MempoolConfig, validators: Arc) -> Self { Self { unconfirmed_pool: UnconfirmedPool::new(config.unconfirmed_pool), @@ -115,14 +115,6 @@ impl MempoolStorage { Ok(()) } - // Update the Mempool based on the received set of published blocks. - fn process_published_blocks(&mut self, published_blocks: Vec>) -> Result<(), MempoolError> { - for published_block in published_blocks { - self.process_published_block(published_block)?; - } - Ok(()) - } - /// In the event of a ReOrg, resubmit all ReOrged transactions into the Mempool and process each newly introduced /// block from the latest longest chain. pub fn process_reorg( @@ -151,11 +143,20 @@ impl MempoolStorage { let previous_tip = removed_blocks.last().map(|block| block.header.height); let new_tip = new_blocks.last().map(|block| block.header.height); + // Clear out all transactions from the unconfirmed pool and re-submit them to the unconfirmed mempool for + // validation. This is important as invalid transactions that have not been mined yet may remain in the mempool + // after a reorg. + let removed_txs = self.unconfirmed_pool.drain_all_mempool_transactions(); + self.insert_txs(removed_txs)?; + // Remove re-orged transactions from reorg pool and re-submit them to the unconfirmed mempool self.insert_txs( self.reorg_pool .remove_reorged_txs_and_discard_double_spends(removed_blocks, &new_blocks)?, )?; - self.process_published_blocks(new_blocks)?; + // Update the Mempool based on the received set of new blocks. + for block in new_blocks { + self.process_published_block(block)?; + } if let (Some(previous_tip_height), Some(new_tip_height)) = (previous_tip, new_tip) { if new_tip_height < previous_tip_height { diff --git a/base_layer/core/src/mempool/reorg_pool/reorg_pool_storage.rs b/base_layer/core/src/mempool/reorg_pool/reorg_pool_storage.rs index e8442f9208..71fbb468ff 100644 --- a/base_layer/core/src/mempool/reorg_pool/reorg_pool_storage.rs +++ b/base_layer/core/src/mempool/reorg_pool/reorg_pool_storage.rs @@ -56,13 +56,15 @@ impl ReorgPoolStorage { /// the ReorgPoolStorage and will be discarded once the Time-to-live threshold has been reached. pub fn insert(&mut self, tx: Arc) { let tx_key = tx.body.kernels()[0].excess_sig.clone(); - trace!( + let _ = self + .txs_by_signature + .insert(tx_key.clone(), tx.clone(), self.config.tx_ttl); + debug!( target: LOG_TARGET, - "Inserting tx into reorg pool: {}", + "Inserted transaction with signature {} into reorg pool:", tx_key.get_signature().to_hex() ); - trace!(target: LOG_TARGET, "Transaction inserted: {}", tx); - let _ = self.txs_by_signature.insert(tx_key, tx, self.config.tx_ttl); + trace!(target: LOG_TARGET, "{}", tx); } /// Insert a set of new transactions into the ReorgPoolStorage @@ -91,8 +93,12 @@ impl ReorgPoolStorage { } for tx_key in &removed_tx_keys { - trace!(target: LOG_TARGET, "Removed double spends: {:?}", tx_key); self.txs_by_signature.remove(&tx_key); + trace!( + target: LOG_TARGET, + "Removed double spend tx from reorg pool: {}", + tx_key.get_signature().to_hex() + ); } } @@ -111,7 +117,7 @@ impl ReorgPoolStorage { for block in &removed_blocks { for kernel in block.body.kernels() { if let Some(removed_tx) = self.txs_by_signature.remove(&kernel.excess_sig) { - trace!(target: LOG_TARGET, "Removing tx from reorg pool: {:?}", removed_tx); + trace!(target: LOG_TARGET, "Removed tx from reorg pool: {:?}", removed_tx); removed_txs.push(removed_tx); } } diff --git a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs index bbcf4e000c..2ca4756a00 100644 --- a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs +++ b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs @@ -36,7 +36,7 @@ use std::{ convert::TryFrom, sync::Arc, }; -use tari_crypto::tari_utilities::hex::Hex; +use tari_crypto::tari_utilities::{hex::Hex, Hashable}; pub const LOG_TARGET: &str = "c::mp::unconfirmed_pool::unconfirmed_pool_storage"; @@ -103,12 +103,6 @@ impl UnconfirmedPool { .first_kernel_excess_sig() .ok_or(UnconfirmedPoolError::TransactionNoKernels)?; if !self.txs_by_signature.contains_key(tx_key) { - debug!( - target: LOG_TARGET, - "Inserting tx into unconfirmed pool: {}", - tx_key.get_signature().to_hex() - ); - trace!(target: LOG_TARGET, "Transaction inserted: {}", tx); let prioritized_tx = PrioritizedTransaction::try_from((*tx).clone())?; if self.txs_by_signature.len() >= self.config.storage_capacity { if prioritized_tx.priority < *self.lowest_priority() { @@ -119,6 +113,12 @@ impl UnconfirmedPool { self.txs_by_priority .insert(prioritized_tx.priority.clone(), tx_key.clone()); self.txs_by_signature.insert(tx_key.clone(), prioritized_tx); + debug!( + target: LOG_TARGET, + "Inserted transaction with signature {} into unconfirmed pool:", + tx_key.get_signature().to_hex() + ); + trace!(target: LOG_TARGET, "{}", tx); } Ok(()) } @@ -176,36 +176,60 @@ impl UnconfirmedPool { false } + /// Remove all current mempool transactions from the UnconfirmedPoolStorage, returning that which have been removed + pub fn drain_all_mempool_transactions(&mut self) -> Vec> { + let mempool_txs: Vec> = self + .txs_by_signature + .drain() + .map(|(_key, val)| val.transaction) + .collect(); + self.txs_by_priority.clear(); + + mempool_txs + } + /// Remove all published transactions from the UnconfirmedPool and discard all double spend transactions. - /// Returns a list of all transactions that were removed the unconfirmed pool as a result of appearing in the block. fn discard_double_spends(&mut self, published_block: &Block) { let mut removed_tx_keys = Vec::new(); for (tx_key, ptx) in self.txs_by_signature.iter() { for input in ptx.transaction.body.inputs() { if published_block.body.inputs().contains(input) { self.txs_by_priority.remove(&ptx.priority); + debug!( + target: LOG_TARGET, + "Removed double spend tx with key {} from unconfirmed pool", + tx_key.get_signature().to_hex() + ); + trace!(target: LOG_TARGET, "{}", &ptx.transaction); removed_tx_keys.push(tx_key.clone()); } } } for tx_key in &removed_tx_keys { - trace!( - target: LOG_TARGET, - "Removing double spends from unconfirmed pool: {:?}", - tx_key - ); self.txs_by_signature.remove(&tx_key); } } /// Remove all published transactions from the UnconfirmedPoolStorage and discard double spends pub fn remove_published_and_discard_double_spends(&mut self, published_block: &Block) -> Vec> { + trace!( + target: LOG_TARGET, + "Searching for txns to remove from unconfirmed pool in block {} ({})", + published_block.header.height, + published_block.header.hash().to_hex(), + ); let mut removed_txs = Vec::new(); published_block.body.kernels().iter().for_each(|kernel| { if let Some(ptx) = self.txs_by_signature.get(&kernel.excess_sig) { self.txs_by_priority.remove(&ptx.priority); if let Some(ptx) = self.txs_by_signature.remove(&kernel.excess_sig) { + debug!( + target: LOG_TARGET, + "Removed tx with key {} from unconfirmed pool", + kernel.excess_sig.get_signature().to_hex() + ); + trace!(target: LOG_TARGET, "{}", &ptx.transaction); removed_txs.push(ptx.transaction); } } diff --git a/base_layer/wallet/tests/support/rpc.rs b/base_layer/wallet/tests/support/rpc.rs index a51529d8ff..52632f9726 100644 --- a/base_layer/wallet/tests/support/rpc.rs +++ b/base_layer/wallet/tests/support/rpc.rs @@ -245,15 +245,20 @@ impl BaseNodeWalletRpcMockState { timeout: Duration, ) -> Result, String> { let now = Instant::now(); + let mut count = 0usize; while now.elapsed() < timeout { let mut lock = acquire_lock!(self.submit_transaction_calls); + count = (*lock).len(); if (*lock).len() >= num_calls { return Ok((*lock).drain(..num_calls).collect()); } drop(lock); delay_for(Duration::from_millis(100)).await; } - Err("Did not receive enough calls within the timeout period".to_string()) + Err(format!( + "Did not receive enough calls within the timeout period, received {}, expected {}.", + count, num_calls + )) } pub async fn wait_pop_fetch_utxos_calls( diff --git a/integration_tests/features/Mempool.feature b/integration_tests/features/Mempool.feature index 7e2bb5111e..95eab12fa6 100644 --- a/integration_tests/features/Mempool.feature +++ b/integration_tests/features/Mempool.feature @@ -1,7 +1,6 @@ @mempool Feature: Mempool - Scenario: Transactions are propagated through a network Given I have 10 seed nodes And I have a base node SENDER connected to all seed nodes @@ -74,3 +73,61 @@ Feature: Mempool When I mine 1 blocks on SENDER Then SENDER has TX1 in NOT_STORED state Then SENDER has TX2 in MINED state + + @critical + Scenario: Mempool clearing out invalid transactions after a reorg + # + # Chain 1: + # Collects 7 coinbases into one wallet, send 7 transactions + # Stronger chain + # + Given I have a seed node SEED_A + And I have a base node NODE_A1 connected to seed SEED_A + And I have wallet WALLET_A1 connected to seed node SEED_A + And I have wallet WALLET_A2 connected to seed node SEED_A + And I have mining node MINER_A1 connected to base node SEED_A and wallet WALLET_A1 + When I wait 5 seconds + When mining node MINER_A1 mines 7 blocks with min difficulty 200 and max difficulty 100000 + Then node SEED_A is at height 7 + Then node NODE_A1 is at height 7 + When I mine 3 blocks on SEED_A + Then wallet WALLET_A1 detects at least 7 coinbase transactions as Mined_Confirmed + Then node SEED_A is at height 10 + Then node NODE_A1 is at height 10 + And I multi-send 7 transactions of 1000000 uT from wallet WALLET_A1 to wallet WALLET_A2 at fee 100 + Then wallet WALLET_A1 detects all transactions are at least Broadcast + When I wait 1 seconds + # + # Chain 2: + # Collects 7 coinbases into one wallet, send 7 transactions + # Weaker chain + # + And I have a seed node SEED_B + And I have a base node NODE_B1 connected to seed SEED_B + And I have wallet WALLET_B1 connected to seed node SEED_B + And I have wallet WALLET_B2 connected to seed node SEED_B + And I have mining node MINER_B1 connected to base node SEED_B and wallet WALLET_B1 + When I wait 5 seconds + When mining node MINER_B1 mines 7 blocks with min difficulty 1 and max difficulty 100 + Then node SEED_B is at height 7 + Then node NODE_B1 is at height 7 + When I mine 5 blocks on SEED_B + Then wallet WALLET_B1 detects at least 7 coinbase transactions as Mined_Confirmed + Then node SEED_B is at height 12 + Then node NODE_B1 is at height 12 + And I multi-send 7 transactions of 1000000 uT from wallet WALLET_B1 to wallet WALLET_B2 at fee 100 + Then wallet WALLET_B1 detects all transactions are at least Broadcast + When I wait 1 seconds + # + # Connect Chain 1 and 2 in stages + # New node connects to weaker chain, receives all broadcast (not mined) transactions into mempool + # New node connects to stronger chain, then reorgs its complete chain + # New node mines blocks; no invalid inputs from the weaker chain should be used in the block template + # + And I have a base node NODE_C connected to seed SEED_B + Then node NODE_C is at height 12 + # Wait for the reorg to filter through + And I connect node SEED_A to node NODE_C and wait 30 seconds + Then all nodes are at height 10 + When I mine 6 blocks on NODE_C + Then all nodes are at height 16 diff --git a/integration_tests/features/WalletMonitoring.feature b/integration_tests/features/WalletMonitoring.feature index eb5d29e3b3..112a90595e 100644 --- a/integration_tests/features/WalletMonitoring.feature +++ b/integration_tests/features/WalletMonitoring.feature @@ -21,6 +21,7 @@ Scenario: Wallets monitoring coinbase after a reorg # Use 7 of the 10 coinbase UTXOs in transactions (others require 3 confirmations) And I multi-send 7 transactions of 1000000 uT from wallet WALLET_A1 to wallet WALLET_A2 at fee 100 Then wallet WALLET_A1 detects all transactions are at least Broadcast + When I wait 1 seconds # # Chain 2: # Collects 10 coinbases into one wallet, send 7 transactions @@ -39,11 +40,10 @@ Scenario: Wallets monitoring coinbase after a reorg # Use 7 of the 10 coinbase UTXOs in transactions (others require 3 confirmations) And I multi-send 7 transactions of 1000000 uT from wallet WALLET_B1 to wallet WALLET_B2 at fee 100 Then wallet WALLET_B1 detects all transactions are at least Broadcast + When I wait 1 seconds # # Connect Chain 1 and 2 # - # TODO: This wait is needed to stop next base node task from continuing - When I wait 1 seconds And I have a SHA3 miner NODE_C connected to all seed nodes # Wait for the reorg to filter through When I wait 30 seconds diff --git a/integration_tests/features/support/steps.js b/integration_tests/features/support/steps.js index 34b64306b9..342625e05a 100644 --- a/integration_tests/features/support/steps.js +++ b/integration_tests/features/support/steps.js @@ -145,7 +145,7 @@ Given( async function (nodeNameA, nodeNameB, waitSeconds) { expect(waitSeconds < 1190).to.equal(true); console.log( - "Connecting", + "Connecting (add new peer seed, shut down, then start up)", nodeNameA, "to", nodeNameB, @@ -158,6 +158,7 @@ Given( nodeA.setPeerSeeds([nodeB.peerAddress()]); await this.stopNode(nodeNameA); await this.startNode(nodeNameA); + await sleep(waitSeconds * 1000); } ); @@ -538,7 +539,13 @@ Then( async function (name, height) { const client = this.getClient(name); await waitFor(async () => client.getTipHeight(), height, 115 * 1000); - expect(await client.getTipHeight()).to.equal(height); + const currentHeight = await client.getTipHeight(); + console.log( + `Node ${name} is at tip: ${currentHeight} (should be`, + height, + `)` + ); + expect(currentHeight).to.equal(height); } );