Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sync handling and increase reorg speed in mempool #4706

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl ChainMetadataService {
match event {
BlockEvent::ValidBlockAdded(_, BlockAddResult::Ok(_)) |
BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { .. }) |
BlockEvent::BlockSyncComplete(_) => {
BlockEvent::BlockSyncComplete(_, _) => {
self.update_liveness_chain_metadata().await?;
},
_ => {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub enum BlockEvent {
AddBlockErrored {
block: Arc<Block>,
},
BlockSyncComplete(Arc<ChainBlock>),
BlockSyncComplete(Arc<ChainBlock>, u64),
BlockSyncRewind(Vec<Arc<ChainBlock>>),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ impl BlockSync {
});

let local_nci = shared.local_node_interface.clone();
synchronizer.on_complete(move |block| {
local_nci.publish_block_event(BlockEvent::BlockSyncComplete(block));
synchronizer.on_complete(move |block, starting_height| {
local_nci.publish_block_event(BlockEvent::BlockSyncComplete(block, starting_height));
});

let timer = Instant::now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
}

pub fn on_complete<H>(&mut self, hook: H)
where H: Fn(Arc<ChainBlock>) + Send + Sync + 'static {
where H: Fn(Arc<ChainBlock>, u64) + Send + Sync + 'static {
self.hooks.add_on_complete_hook(hook);
}

Expand Down Expand Up @@ -377,7 +377,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
}

if let Some(block) = current_block {
self.hooks.call_on_complete_hooks(block);
self.hooks.call_on_complete_hooks(block, best_height);
}

debug!(target: LOG_TARGET, "Completed block sync with peer `{}`", sync_peer);
Expand Down
10 changes: 6 additions & 4 deletions base_layer/core/src/base_node/sync/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(super) struct Hooks {
on_progress_header: Vec<Box<dyn Fn(u64, u64, &SyncPeer) + Send + Sync>>,
on_progress_block: Vec<Box<dyn Fn(Arc<ChainBlock>, u64, &SyncPeer) + Send + Sync>>,
on_progress_horizon_sync: Vec<Box<dyn Fn(HorizonSyncInfo) + Send + Sync>>,
on_complete: Vec<Box<dyn Fn(Arc<ChainBlock>) + Send + Sync>>,
on_complete: Vec<Box<dyn Fn(Arc<ChainBlock>, u64) + Send + Sync>>,
on_rewind: Vec<Box<dyn Fn(Vec<Arc<ChainBlock>>) + Send + Sync>>,
}

Expand Down Expand Up @@ -81,12 +81,14 @@ impl Hooks {
}

pub fn add_on_complete_hook<H>(&mut self, hook: H)
where H: Fn(Arc<ChainBlock>) + Send + Sync + 'static {
where H: Fn(Arc<ChainBlock>, u64) + Send + Sync + 'static {
self.on_complete.push(Box::new(hook));
}

pub fn call_on_complete_hooks(&self, final_block: Arc<ChainBlock>) {
self.on_complete.iter().for_each(|f| (*f)(final_block.clone()));
pub fn call_on_complete_hooks(&self, final_block: Arc<ChainBlock>, starting_height: u64) {
self.on_complete
.iter()
.for_each(|f| (*f)(final_block.clone(), starting_height));
}

pub fn add_on_rewind_hook<H>(&mut self, hook: H)
Expand Down
6 changes: 6 additions & 0 deletions base_layer/core/src/mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ impl Mempool {
.await
}

/// After a sync event, we can move all orphan transactions to the unconfirmed pool after validation
pub async fn process_sync(&self) -> Result<(), MempoolError> {
self.with_write_access(move |storage| storage.process_sync())
.await
}

/// Returns all unconfirmed transaction stored in the Mempool, except the transactions stored in the ReOrgPool.
pub async fn snapshot(&self) -> Result<Vec<Arc<Transaction>>, MempoolError> {
self.with_read_access(|storage| Ok(storage.snapshot())).await
Expand Down
46 changes: 17 additions & 29 deletions base_layer/core/src/mempool/mempool_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,45 +186,33 @@ impl MempoolStorage {
new_blocks: &[Arc<Block>],
) -> Result<(), MempoolError> {
debug!(target: LOG_TARGET, "Mempool processing reorg");
let previous_tip = removed_blocks.last().map(|block| block.header.height);
let new_tip = new_blocks.last().map(|block| block.header.height);

// Clear out all transactions from the unconfirmed pool and re-submit them to the unconfirmed mempool for
// validation. This is important as invalid transactions that have not been mined yet may remain in the mempool
// after a reorg.
let removed_txs = self.unconfirmed_pool.drain_all_mempool_transactions();
self.insert_txs(removed_txs);
// this returns all orphaned transaction, but because we know the new blocks are already added, they will still
// be orphaned, we can drop them here
let _ = self.insert_txs(removed_txs);
// Remove re-orged transactions from reorg pool and re-submit them to the unconfirmed mempool
let removed_txs = self
.reorg_pool
.remove_reorged_txs_and_discard_double_spends(removed_blocks, new_blocks);
self.insert_txs(removed_txs);
// Update the Mempool based on the received set of new blocks.
for block in new_blocks {
self.process_published_block(block)?;
}

if let (Some(previous_tip_height), Some(new_tip_height)) = (previous_tip, new_tip) {
if new_tip_height < previous_tip_height {
debug!(
target: LOG_TARGET,
"Checking for time locked transactions in unconfirmed pool as chain height was reduced from {} to \
{} during reorg.",
previous_tip_height,
new_tip_height,
);
self.unconfirmed_pool.remove_timelocked(new_tip_height);
} else {
debug!(
target: LOG_TARGET,
"No need to check for time locked transactions in unconfirmed pool. Previous tip height: {}. New \
tip height: {}.",
previous_tip_height,
new_tip_height,
);
}
}
let _ = self.insert_txs(removed_txs);
Ok(())
}

/// After a sync event, we need to try to add in all the transaction form the reorg pool.
pub fn process_sync(&mut self) -> Result<(), MempoolError> {
SWvheerden marked this conversation as resolved.
Show resolved Hide resolved
debug!(target: LOG_TARGET, "Mempool processing sync finished");
// lets remove and revalidate all transactions from the mempool. All we know is that the state has changed, but
// we dont have the data to know what.
let txs = self.unconfirmed_pool.drain_all_mempool_transactions();
// lets add them all back into the mempool
self.insert_txs(txs);
//let retrieve all re-org pool transactions as well as make sure they are mined as well
let txs = self.reorg_pool.clear_and_retrieve_all();
self.insert_txs(txs);
Ok(())
}

Expand Down
35 changes: 35 additions & 0 deletions base_layer/core/src/mempool/reorg_pool/reorg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,17 @@ impl ReorgPool {
}
}

/// This will clear the reorg pool and return all transactions contained within
pub fn clear_and_retrieve_all(&mut self) -> Vec<Arc<Transaction>> {
let mut result = Vec::new();
for (_, tx) in self.tx_by_key.drain() {
result.push(tx);
}
self.txs_by_signature.clear();
self.txs_by_height.clear();
result
}

pub fn retrieve_by_excess_sigs(&self, excess_sigs: &[PrivateKey]) -> (Vec<Arc<Transaction>>, Vec<PrivateKey>) {
// Hashset used to prevent duplicates
let mut found = HashSet::new();
Expand Down Expand Up @@ -387,6 +398,30 @@ mod test {
assert!(reorg_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig));
}

#[test]
fn test_remove_all() {
let tx1 = Arc::new(tx!(MicroTari(100_000), fee: MicroTari(100), lock: 4000, inputs: 2, outputs: 1).0);
let tx2 = Arc::new(tx!(MicroTari(100_000), fee: MicroTari(60), lock: 3000, inputs: 2, outputs: 1).0);
let tx3 = Arc::new(tx!(MicroTari(100_000), fee: MicroTari(20), lock: 2500, inputs: 2, outputs: 1).0);

let mut reorg_pool = ReorgPool::new(ReorgPoolConfig { expiry_height: 2 });
reorg_pool.insert(1, tx1.clone());
reorg_pool.insert(1, tx2.clone());
reorg_pool.insert(1, tx3.clone());

let txs = reorg_pool.clear_and_retrieve_all();
assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig));
assert!(!reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig));
assert!(reorg_pool.txs_by_height.is_empty());
assert!(reorg_pool.tx_by_key.is_empty());
assert!(reorg_pool.txs_by_signature.is_empty());

assert!(txs.contains(&tx1));
assert!(txs.contains(&tx2));
assert!(txs.contains(&tx3));
}

#[test]
fn remove_scan_for_and_remove_reorged_txs() {
let network = Network::LocalNet;
Expand Down
10 changes: 3 additions & 7 deletions base_layer/core/src/mempool/service/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,9 @@ impl MempoolInboundHandlers {
.await?;
},
ValidBlockAdded(_, _) => {},
BlockSyncRewind(removed_blocks) => {
self.mempool
.process_reorg(removed_blocks.iter().map(|b| b.to_arc_block()).collect(), vec![])
.await?;
},
BlockSyncComplete(tip_block) => {
self.mempool.process_published_block(tip_block.to_arc_block()).await?;
BlockSyncRewind(_) => {},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

BlockSyncComplete(_,_) => {
self.mempool.process_sync().await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

},
AddBlockValidationFailed {
block: failed_block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,21 +475,6 @@ impl UnconfirmedPool {
Some(prioritized_transaction.transaction)
}

/// Remove all unconfirmed transactions that have become time locked. This can happen when the chain height was
/// reduced on some reorgs.
pub fn remove_timelocked(&mut self, tip_height: u64) {
debug!(target: LOG_TARGET, "Removing time-locked inputs from unconfirmed pool");
let to_remove = self
.tx_by_key
.iter()
.filter(|(_, ptx)| ptx.transaction.min_spendable_height() > tip_height + 1)
.map(|(k, _)| *k)
.collect::<Vec<_>>();
for tx_key in to_remove {
self.remove_transaction(tx_key);
}
}

/// Returns the total number of unconfirmed transactions stored in the UnconfirmedPool.
pub fn len(&self) -> usize {
self.txs_by_signature.len()
Expand Down
1 change: 0 additions & 1 deletion integration_tests/features/Mempool.feature
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ Feature: Mempool
Then SENDER has TX1 in NOT_STORED state
Then SENDER has TX2 in MINED state

@flaky
Scenario: Mempool clearing out invalid transactions after a reorg
Given I have a seed node SEED_A
And I have a base node NODE_A connected to seed SEED_A
Expand Down