Skip to content

Commit

Permalink
commit message #2:
Browse files Browse the repository at this point in the history
Fix ChainStorageError after a reorg with new block

- Fixed a ChainStorageError that occurs due to invalid transactions remaining in
the mempool when the 1st block is requested for mining via GRPC directly after
a chain reorg.
- Added a failing cucumber test for the above condition.
- Minor cucumber improvements.
  • Loading branch information
hansieodendaal committed May 25, 2021
1 parent 83eca21 commit 0b8058e
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 39 deletions.
4 changes: 2 additions & 2 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {
make_async_fn!(fetch_kernels_by_mmr_position(start: u64, end: u64) -> Vec<TransactionKernel>, "fetch_kernels_by_mmr_position");

//---------------------------------- MMR --------------------------------------------//
make_async_fn!(prepare_block_merkle_roots(template: NewBlockTemplate) -> Block, "create_block");
make_async_fn!(prepare_block_merkle_roots(template: NewBlockTemplate) -> Block, "prepare_block_merkle_roots");

make_async_fn!(fetch_mmr_size(tree: MmrTree) -> u64, "fetch_mmr_node_count");
make_async_fn!(fetch_mmr_size(tree: MmrTree) -> u64, "fetch_mmr_size");

make_async_fn!(rewind_to_height(height: u64) -> Vec<Arc<ChainBlock>>, "rewind_to_height");

Expand Down
25 changes: 13 additions & 12 deletions base_layer/core/src/mempool/mempool_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ use log::*;
use std::sync::Arc;
use tari_crypto::tari_utilities::{hex::Hex, Hashable};

pub const LOG_TARGET: &str = "c::mp::mempool";
pub const LOG_TARGET: &str = "c::mp::mempool_storage";

/// The Mempool consists of an Unconfirmed Transaction Pool, Pending Pool, Orphan Pool and Reorg Pool and is responsible
/// The Mempool consists of an Unconfirmed Transaction Pool and Reorg Pool and is responsible
/// for managing and maintaining all unconfirmed transactions have not yet been included in a block, and transactions
/// that have recently been included in a block.
pub struct MempoolStorage {
Expand All @@ -50,7 +50,7 @@ pub struct MempoolStorage {
}

impl MempoolStorage {
/// Create a new Mempool with an UnconfirmedPool, OrphanPool, PendingPool and ReOrgPool.
/// Create a new Mempool with an UnconfirmedPool and ReOrgPool.
pub fn new(config: MempoolConfig, validators: Arc<dyn MempoolTransactionValidation>) -> Self {
Self {
unconfirmed_pool: UnconfirmedPool::new(config.unconfirmed_pool),
Expand Down Expand Up @@ -115,14 +115,6 @@ impl MempoolStorage {
Ok(())
}

// Update the Mempool based on the received set of published blocks.
fn process_published_blocks(&mut self, published_blocks: Vec<Arc<Block>>) -> Result<(), MempoolError> {
for published_block in published_blocks {
self.process_published_block(published_block)?;
}
Ok(())
}

/// In the event of a ReOrg, resubmit all ReOrged transactions into the Mempool and process each newly introduced
/// block from the latest longest chain.
pub fn process_reorg(
Expand Down Expand Up @@ -151,11 +143,20 @@ impl MempoolStorage {
let previous_tip = removed_blocks.last().map(|block| block.header.height);
let new_tip = new_blocks.last().map(|block| block.header.height);

// Clear out all transactions from the unconfirmed pool and re-submit them to the unconfirmed mempool for
// validation. This is important as invalid transactions that have not been mined yet may remain in the mempool
// after a reorg.
let removed_txs = self.unconfirmed_pool.drain_all_mempool_transactions();
self.insert_txs(removed_txs)?;
// Remove re-orged transactions from reorg pool and re-submit them to the unconfirmed mempool
self.insert_txs(
self.reorg_pool
.remove_reorged_txs_and_discard_double_spends(removed_blocks, &new_blocks)?,
)?;
self.process_published_blocks(new_blocks)?;
// Update the Mempool based on the received set of new blocks.
for block in new_blocks {
self.process_published_block(block)?;
}

if let (Some(previous_tip_height), Some(new_tip_height)) = (previous_tip, new_tip) {
if new_tip_height < previous_tip_height {
Expand Down
18 changes: 12 additions & 6 deletions base_layer/core/src/mempool/reorg_pool/reorg_pool_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ impl ReorgPoolStorage {
/// the ReorgPoolStorage and will be discarded once the Time-to-live threshold has been reached.
pub fn insert(&mut self, tx: Arc<Transaction>) {
let tx_key = tx.body.kernels()[0].excess_sig.clone();
trace!(
let _ = self
.txs_by_signature
.insert(tx_key.clone(), tx.clone(), self.config.tx_ttl);
debug!(
target: LOG_TARGET,
"Inserting tx into reorg pool: {}",
"Inserted transaction with signature {} into reorg pool:",
tx_key.get_signature().to_hex()
);
trace!(target: LOG_TARGET, "Transaction inserted: {}", tx);
let _ = self.txs_by_signature.insert(tx_key, tx, self.config.tx_ttl);
trace!(target: LOG_TARGET, "{}", tx);
}

/// Insert a set of new transactions into the ReorgPoolStorage
Expand Down Expand Up @@ -91,8 +93,12 @@ impl ReorgPoolStorage {
}

for tx_key in &removed_tx_keys {
trace!(target: LOG_TARGET, "Removed double spends: {:?}", tx_key);
self.txs_by_signature.remove(&tx_key);
trace!(
target: LOG_TARGET,
"Removed double spend tx from reorg pool: {}",
tx_key.get_signature().to_hex()
);
}
}

Expand All @@ -111,7 +117,7 @@ impl ReorgPoolStorage {
for block in &removed_blocks {
for kernel in block.body.kernels() {
if let Some(removed_tx) = self.txs_by_signature.remove(&kernel.excess_sig) {
trace!(target: LOG_TARGET, "Removing tx from reorg pool: {:?}", removed_tx);
trace!(target: LOG_TARGET, "Removed tx from reorg pool: {:?}", removed_tx);
removed_txs.push(removed_tx);
}
}
Expand Down
50 changes: 37 additions & 13 deletions base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::{
convert::TryFrom,
sync::Arc,
};
use tari_crypto::tari_utilities::hex::Hex;
use tari_crypto::tari_utilities::{hex::Hex, Hashable};

pub const LOG_TARGET: &str = "c::mp::unconfirmed_pool::unconfirmed_pool_storage";

Expand Down Expand Up @@ -103,12 +103,6 @@ impl UnconfirmedPool {
.first_kernel_excess_sig()
.ok_or(UnconfirmedPoolError::TransactionNoKernels)?;
if !self.txs_by_signature.contains_key(tx_key) {
debug!(
target: LOG_TARGET,
"Inserting tx into unconfirmed pool: {}",
tx_key.get_signature().to_hex()
);
trace!(target: LOG_TARGET, "Transaction inserted: {}", tx);
let prioritized_tx = PrioritizedTransaction::try_from((*tx).clone())?;
if self.txs_by_signature.len() >= self.config.storage_capacity {
if prioritized_tx.priority < *self.lowest_priority() {
Expand All @@ -119,6 +113,12 @@ impl UnconfirmedPool {
self.txs_by_priority
.insert(prioritized_tx.priority.clone(), tx_key.clone());
self.txs_by_signature.insert(tx_key.clone(), prioritized_tx);
debug!(
target: LOG_TARGET,
"Inserted transaction with signature {} into unconfirmed pool:",
tx_key.get_signature().to_hex()
);
trace!(target: LOG_TARGET, "{}", tx);
}
Ok(())
}
Expand Down Expand Up @@ -176,36 +176,60 @@ impl UnconfirmedPool {
false
}

/// Remove all current mempool transactions from the UnconfirmedPoolStorage, returning that which have been removed
pub fn drain_all_mempool_transactions(&mut self) -> Vec<Arc<Transaction>> {
let mempool_txs: Vec<Arc<Transaction>> = self
.txs_by_signature
.drain()
.map(|(_key, val)| val.transaction)
.collect();
self.txs_by_priority.clear();

mempool_txs
}

/// Remove all published transactions from the UnconfirmedPool and discard all double spend transactions.
/// Returns a list of all transactions that were removed the unconfirmed pool as a result of appearing in the block.
fn discard_double_spends(&mut self, published_block: &Block) {
let mut removed_tx_keys = Vec::new();
for (tx_key, ptx) in self.txs_by_signature.iter() {
for input in ptx.transaction.body.inputs() {
if published_block.body.inputs().contains(input) {
self.txs_by_priority.remove(&ptx.priority);
debug!(
target: LOG_TARGET,
"Removed double spend tx with key {} from unconfirmed pool",
tx_key.get_signature().to_hex()
);
trace!(target: LOG_TARGET, "{}", &ptx.transaction);
removed_tx_keys.push(tx_key.clone());
}
}
}

for tx_key in &removed_tx_keys {
trace!(
target: LOG_TARGET,
"Removing double spends from unconfirmed pool: {:?}",
tx_key
);
self.txs_by_signature.remove(&tx_key);
}
}

/// Remove all published transactions from the UnconfirmedPoolStorage and discard double spends
pub fn remove_published_and_discard_double_spends(&mut self, published_block: &Block) -> Vec<Arc<Transaction>> {
trace!(
target: LOG_TARGET,
"Searching for txns to remove from unconfirmed pool in block {} ({})",
published_block.header.height,
published_block.header.hash().to_hex(),
);
let mut removed_txs = Vec::new();
published_block.body.kernels().iter().for_each(|kernel| {
if let Some(ptx) = self.txs_by_signature.get(&kernel.excess_sig) {
self.txs_by_priority.remove(&ptx.priority);
if let Some(ptx) = self.txs_by_signature.remove(&kernel.excess_sig) {
debug!(
target: LOG_TARGET,
"Removed tx with key {} from unconfirmed pool",
kernel.excess_sig.get_signature().to_hex()
);
trace!(target: LOG_TARGET, "{}", &ptx.transaction);
removed_txs.push(ptx.transaction);
}
}
Expand Down
7 changes: 6 additions & 1 deletion base_layer/wallet/tests/support/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,20 @@ impl BaseNodeWalletRpcMockState {
timeout: Duration,
) -> Result<Vec<Transaction>, String> {
let now = Instant::now();
let mut count = 0usize;
while now.elapsed() < timeout {
let mut lock = acquire_lock!(self.submit_transaction_calls);
count = (*lock).len();
if (*lock).len() >= num_calls {
return Ok((*lock).drain(..num_calls).collect());
}
drop(lock);
delay_for(Duration::from_millis(100)).await;
}
Err("Did not receive enough calls within the timeout period".to_string())
Err(format!(
"Did not receive enough calls within the timeout period, received {}, expected {}.",
count, num_calls
))
}

pub async fn wait_pop_fetch_utxos_calls(
Expand Down
59 changes: 58 additions & 1 deletion integration_tests/features/Mempool.feature
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
@mempool
Feature: Mempool


Scenario: Transactions are propagated through a network
Given I have 10 seed nodes
And I have a base node SENDER connected to all seed nodes
Expand Down Expand Up @@ -74,3 +73,61 @@ Feature: Mempool
When I mine 1 blocks on SENDER
Then SENDER has TX1 in NOT_STORED state
Then SENDER has TX2 in MINED state

@critical
Scenario: Mempool clearing out invalid transactions after a reorg
#
# Chain 1:
# Collects 7 coinbases into one wallet, send 7 transactions
# Stronger chain
#
Given I have a seed node SEED_A
And I have a base node NODE_A1 connected to seed SEED_A
And I have wallet WALLET_A1 connected to seed node SEED_A
And I have wallet WALLET_A2 connected to seed node SEED_A
And I have mining node MINER_A1 connected to base node SEED_A and wallet WALLET_A1
When I wait 5 seconds
When mining node MINER_A1 mines 7 blocks with min difficulty 200 and max difficulty 100000
Then node SEED_A is at height 7
Then node NODE_A1 is at height 7
When I mine 3 blocks on SEED_A
Then wallet WALLET_A1 detects at least 7 coinbase transactions as Mined_Confirmed
Then node SEED_A is at height 10
Then node NODE_A1 is at height 10
And I multi-send 7 transactions of 1000000 uT from wallet WALLET_A1 to wallet WALLET_A2 at fee 100
Then wallet WALLET_A1 detects all transactions are at least Broadcast
When I wait 1 seconds
#
# Chain 2:
# Collects 7 coinbases into one wallet, send 7 transactions
# Weaker chain
#
And I have a seed node SEED_B
And I have a base node NODE_B1 connected to seed SEED_B
And I have wallet WALLET_B1 connected to seed node SEED_B
And I have wallet WALLET_B2 connected to seed node SEED_B
And I have mining node MINER_B1 connected to base node SEED_B and wallet WALLET_B1
When I wait 5 seconds
When mining node MINER_B1 mines 7 blocks with min difficulty 1 and max difficulty 100
Then node SEED_B is at height 7
Then node NODE_B1 is at height 7
When I mine 5 blocks on SEED_B
Then wallet WALLET_B1 detects at least 7 coinbase transactions as Mined_Confirmed
Then node SEED_B is at height 12
Then node NODE_B1 is at height 12
And I multi-send 7 transactions of 1000000 uT from wallet WALLET_B1 to wallet WALLET_B2 at fee 100
Then wallet WALLET_B1 detects all transactions are at least Broadcast
When I wait 1 seconds
#
# Connect Chain 1 and 2 in stages
# New node connects to weaker chain, receives all broadcast (not mined) transactions into mempool
# New node connects to stronger chain, then reorgs its complete chain
# New node mines blocks; no invalid inputs from the weaker chain should be used in the block template
#
And I have a base node NODE_C connected to seed SEED_B
Then node NODE_C is at height 12
# Wait for the reorg to filter through
And I connect node SEED_A to node NODE_C and wait 30 seconds
Then all nodes are at height 10
When I mine 6 blocks on NODE_C
Then all nodes are at height 16
4 changes: 2 additions & 2 deletions integration_tests/features/WalletMonitoring.feature
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Scenario: Wallets monitoring coinbase after a reorg
# Use 7 of the 10 coinbase UTXOs in transactions (others require 3 confirmations)
And I multi-send 7 transactions of 1000000 uT from wallet WALLET_A1 to wallet WALLET_A2 at fee 100
Then wallet WALLET_A1 detects all transactions are at least Broadcast
When I wait 1 seconds
#
# Chain 2:
# Collects 10 coinbases into one wallet, send 7 transactions
Expand All @@ -39,11 +40,10 @@ Scenario: Wallets monitoring coinbase after a reorg
# Use 7 of the 10 coinbase UTXOs in transactions (others require 3 confirmations)
And I multi-send 7 transactions of 1000000 uT from wallet WALLET_B1 to wallet WALLET_B2 at fee 100
Then wallet WALLET_B1 detects all transactions are at least Broadcast
When I wait 1 seconds
#
# Connect Chain 1 and 2
#
# TODO: This wait is needed to stop next base node task from continuing
When I wait 1 seconds
And I have a SHA3 miner NODE_C connected to all seed nodes
# Wait for the reorg to filter through
When I wait 30 seconds
Expand Down
11 changes: 9 additions & 2 deletions integration_tests/features/support/steps.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Given(
async function (nodeNameA, nodeNameB, waitSeconds) {
expect(waitSeconds < 1190).to.equal(true);
console.log(
"Connecting",
"Connecting (add new peer seed, shut down, then start up)",
nodeNameA,
"to",
nodeNameB,
Expand All @@ -158,6 +158,7 @@ Given(
nodeA.setPeerSeeds([nodeB.peerAddress()]);
await this.stopNode(nodeNameA);
await this.startNode(nodeNameA);
await sleep(waitSeconds * 1000);
}
);

Expand Down Expand Up @@ -538,7 +539,13 @@ Then(
async function (name, height) {
const client = this.getClient(name);
await waitFor(async () => client.getTipHeight(), height, 115 * 1000);
expect(await client.getTipHeight()).to.equal(height);
const currentHeight = await client.getTipHeight();
console.log(
`Node ${name} is at tip: ${currentHeight} (should be`,
height,
`)`
);
expect(currentHeight).to.equal(height);
}
);

Expand Down

0 comments on commit 0b8058e

Please sign in to comment.