From 46969482bb5756099cd5d8321e7313180bbefb92 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Tue, 20 Sep 2022 16:44:10 +0200 Subject: [PATCH 1/6] fix sync handling and increase reorg speed in mempool --- .../chain_metadata_service/service.rs | 2 +- .../comms_interface/inbound_handlers.rs | 2 +- .../states/block_sync.rs | 4 +- base_layer/core/src/mempool/mempool.rs | 11 ++++ .../core/src/mempool/mempool_storage.rs | 59 ++++++++++--------- .../core/src/mempool/reorg_pool/reorg_pool.rs | 11 ++++ .../src/mempool/service/inbound_handlers.rs | 6 +- .../unconfirmed_pool/unconfirmed_pool.rs | 15 ----- 8 files changed, 60 insertions(+), 50 deletions(-) diff --git a/base_layer/core/src/base_node/chain_metadata_service/service.rs b/base_layer/core/src/base_node/chain_metadata_service/service.rs index 306332d297..c38cc7f18b 100644 --- a/base_layer/core/src/base_node/chain_metadata_service/service.rs +++ b/base_layer/core/src/base_node/chain_metadata_service/service.rs @@ -136,7 +136,7 @@ impl ChainMetadataService { match event { BlockEvent::ValidBlockAdded(_, BlockAddResult::Ok(_)) | BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { .. }) | - BlockEvent::BlockSyncComplete(_) => { + BlockEvent::BlockSyncComplete() => { self.update_liveness_chain_metadata().await?; }, _ => {}, diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 1067179be8..6354c2a38e 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -70,7 +70,7 @@ pub enum BlockEvent { AddBlockErrored { block: Arc, }, - BlockSyncComplete(Arc), + BlockSyncComplete(), BlockSyncRewind(Vec>), } diff --git a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs index 4d276796ea..498977717d 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs @@ -93,8 +93,8 @@ impl BlockSync { }); let local_nci = shared.local_node_interface.clone(); - synchronizer.on_complete(move |block| { - local_nci.publish_block_event(BlockEvent::BlockSyncComplete(block)); + synchronizer.on_complete(move |_| { + local_nci.publish_block_event(BlockEvent::BlockSyncComplete()); }); let timer = Instant::now(); diff --git a/base_layer/core/src/mempool/mempool.rs b/base_layer/core/src/mempool/mempool.rs index 95495634a4..b79a806d49 100644 --- a/base_layer/core/src/mempool/mempool.rs +++ b/base_layer/core/src/mempool/mempool.rs @@ -101,6 +101,17 @@ impl Mempool { .await } + /// In the event of a Rewind for a block sync, all transactions to the orphan pool + pub async fn process_rewind(&self, removed_blocks: Vec>) -> Result<(), MempoolError> { + self.with_write_access(move |storage| storage.process_rewind(&removed_blocks)) + .await + } + + /// After a sync event, we can move all orphan transactions to the unconfirmed pool after validation + pub async fn process_sync(&self) -> Result<(), MempoolError> { + self.with_write_access(move |storage| storage.process_sync()).await + } + /// Returns all unconfirmed transaction stored in the Mempool, except the transactions stored in the ReOrgPool. pub async fn snapshot(&self) -> Result>, MempoolError> { self.with_read_access(|storage| Ok(storage.snapshot())).await diff --git a/base_layer/core/src/mempool/mempool_storage.rs b/base_layer/core/src/mempool/mempool_storage.rs index 31afcd356b..7d242b6891 100644 --- a/base_layer/core/src/mempool/mempool_storage.rs +++ b/base_layer/core/src/mempool/mempool_storage.rs @@ -186,45 +186,48 @@ impl MempoolStorage { new_blocks: &[Arc], ) -> Result<(), MempoolError> { debug!(target: LOG_TARGET, "Mempool processing reorg"); - let previous_tip = removed_blocks.last().map(|block| block.header.height); - let new_tip = new_blocks.last().map(|block| block.header.height); // Clear out all transactions from the unconfirmed pool and re-submit them to the unconfirmed mempool for // validation. This is important as invalid transactions that have not been mined yet may remain in the mempool // after a reorg. let removed_txs = self.unconfirmed_pool.drain_all_mempool_transactions(); - self.insert_txs(removed_txs); + // this returns all orphaned transaction, but because we know the new blocks are already added, they will still + // be orphaned, we can drop them here + let _ = self.insert_txs(removed_txs); // Remove re-orged transactions from reorg pool and re-submit them to the unconfirmed mempool let removed_txs = self .reorg_pool .remove_reorged_txs_and_discard_double_spends(removed_blocks, new_blocks); - self.insert_txs(removed_txs); - // Update the Mempool based on the received set of new blocks. - for block in new_blocks { - self.process_published_block(block)?; - } + let _ = self.insert_txs(removed_txs); + Ok(()) + } - if let (Some(previous_tip_height), Some(new_tip_height)) = (previous_tip, new_tip) { - if new_tip_height < previous_tip_height { - debug!( - target: LOG_TARGET, - "Checking for time locked transactions in unconfirmed pool as chain height was reduced from {} to \ - {} during reorg.", - previous_tip_height, - new_tip_height, - ); - self.unconfirmed_pool.remove_timelocked(new_tip_height); - } else { - debug!( - target: LOG_TARGET, - "No need to check for time locked transactions in unconfirmed pool. Previous tip height: {}. New \ - tip height: {}.", - previous_tip_height, - new_tip_height, - ); - } - } + /// In the event of a Rewind for a block sync. Move all transactions to the orphan pool + pub fn process_rewind(&mut self, removed_blocks: &[Arc]) -> Result<(), MempoolError> { + debug!(target: LOG_TARGET, "Mempool processing rewind"); + let current_tip = removed_blocks + .first() + .map(|block| block.header.height) + .unwrap_or_default() + .checked_sub(1) + .unwrap_or_default(); + + // Clear out all transactions from the unconfirmed pool and save them in the reorg pool. We dont reinsert valid + // transactions here as we will need to revalidate them again after the sync was done and the mempool and + // blockchain does not yet know how the blocks will look. + let removed_txs = self.unconfirmed_pool.drain_all_mempool_transactions(); + // lets save to the reorg pool + self.reorg_pool.insert_all(current_tip, removed_txs); + Ok(()) + } + /// After a sync event, we need to try to add in all the transaction form the reorg pool. + pub fn process_sync(&mut self) -> Result<(), MempoolError> { + debug!(target: LOG_TARGET, "Mempool processing sync finished"); + // lets retrieve all the transactions from the reorg pool and try to reinsert them. + let txs = self.reorg_pool.clear_and_retrieve_all(); + // lets add them all back into the mempool + self.insert_txs(txs); Ok(()) } diff --git a/base_layer/core/src/mempool/reorg_pool/reorg_pool.rs b/base_layer/core/src/mempool/reorg_pool/reorg_pool.rs index 76d9bc6708..49ef52e0b7 100644 --- a/base_layer/core/src/mempool/reorg_pool/reorg_pool.rs +++ b/base_layer/core/src/mempool/reorg_pool/reorg_pool.rs @@ -130,6 +130,17 @@ impl ReorgPool { } } + /// This will clear the reorg pool and return all transactions contained within + pub fn clear_and_retrieve_all(&mut self) -> Vec> { + let mut result = Vec::new(); + for (_, tx) in self.tx_by_key.drain() { + result.push(tx); + } + self.txs_by_signature.clear(); + self.txs_by_height.clear(); + result + } + pub fn retrieve_by_excess_sigs(&self, excess_sigs: &[PrivateKey]) -> (Vec>, Vec) { // Hashset used to prevent duplicates let mut found = HashSet::new(); diff --git a/base_layer/core/src/mempool/service/inbound_handlers.rs b/base_layer/core/src/mempool/service/inbound_handlers.rs index e305107a58..1248776f15 100644 --- a/base_layer/core/src/mempool/service/inbound_handlers.rs +++ b/base_layer/core/src/mempool/service/inbound_handlers.rs @@ -175,11 +175,11 @@ impl MempoolInboundHandlers { ValidBlockAdded(_, _) => {}, BlockSyncRewind(removed_blocks) => { self.mempool - .process_reorg(removed_blocks.iter().map(|b| b.to_arc_block()).collect(), vec![]) + .process_rewind(removed_blocks.iter().map(|b| b.to_arc_block()).collect()) .await?; }, - BlockSyncComplete(tip_block) => { - self.mempool.process_published_block(tip_block.to_arc_block()).await?; + BlockSyncComplete() => { + self.mempool.process_sync().await?; }, AddBlockValidationFailed { block: failed_block, diff --git a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs index bf59316dc8..bd5600b5d8 100644 --- a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs +++ b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs @@ -475,21 +475,6 @@ impl UnconfirmedPool { Some(prioritized_transaction.transaction) } - /// Remove all unconfirmed transactions that have become time locked. This can happen when the chain height was - /// reduced on some reorgs. - pub fn remove_timelocked(&mut self, tip_height: u64) { - debug!(target: LOG_TARGET, "Removing time-locked inputs from unconfirmed pool"); - let to_remove = self - .tx_by_key - .iter() - .filter(|(_, ptx)| ptx.transaction.min_spendable_height() > tip_height + 1) - .map(|(k, _)| *k) - .collect::>(); - for tx_key in to_remove { - self.remove_transaction(tx_key); - } - } - /// Returns the total number of unconfirmed transactions stored in the UnconfirmedPool. pub fn len(&self) -> usize { self.txs_by_signature.len() From 0ed0a60216c2f87f9c83e31f8d7ed79147499ccf Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Tue, 20 Sep 2022 17:47:34 +0200 Subject: [PATCH 2/6] add test --- .../core/src/mempool/reorg_pool/reorg_pool.rs | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/base_layer/core/src/mempool/reorg_pool/reorg_pool.rs b/base_layer/core/src/mempool/reorg_pool/reorg_pool.rs index 49ef52e0b7..3de43d98b7 100644 --- a/base_layer/core/src/mempool/reorg_pool/reorg_pool.rs +++ b/base_layer/core/src/mempool/reorg_pool/reorg_pool.rs @@ -398,6 +398,30 @@ mod test { assert!(reorg_pool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig)); } + #[test] + fn test_remove_all() { + let tx1 = Arc::new(tx!(MicroTari(100_000), fee: MicroTari(100), lock: 4000, inputs: 2, outputs: 1).0); + let tx2 = Arc::new(tx!(MicroTari(100_000), fee: MicroTari(60), lock: 3000, inputs: 2, outputs: 1).0); + let tx3 = Arc::new(tx!(MicroTari(100_000), fee: MicroTari(20), lock: 2500, inputs: 2, outputs: 1).0); + + let mut reorg_pool = ReorgPool::new(ReorgPoolConfig { expiry_height: 2 }); + reorg_pool.insert(1, tx1.clone()); + reorg_pool.insert(1, tx2.clone()); + reorg_pool.insert(1, tx3.clone()); + + let txs = reorg_pool.clear_and_retrieve_all(); + assert!(!reorg_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig)); + assert!(!reorg_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig)); + assert!(!reorg_pool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig)); + assert!(reorg_pool.txs_by_height.is_empty()); + assert!(reorg_pool.tx_by_key.is_empty()); + assert!(reorg_pool.txs_by_signature.is_empty()); + + assert!(txs.contains(&tx1)); + assert!(txs.contains(&tx2)); + assert!(txs.contains(&tx3)); + } + #[test] fn remove_scan_for_and_remove_reorged_txs() { let network = Network::LocalNet; From b9f1f5becbc3b0f78eb3c4a8966c4e57bcd217c7 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Thu, 22 Sep 2022 09:18:31 +0200 Subject: [PATCH 3/6] wip --- .../chain_metadata_service/service.rs | 2 +- .../comms_interface/inbound_handlers.rs | 2 +- .../states/block_sync.rs | 4 +-- .../base_node/sync/block_sync/synchronizer.rs | 4 +-- base_layer/core/src/base_node/sync/hooks.rs | 10 ++++--- base_layer/core/src/mempool/mempool.rs | 11 ++----- .../core/src/mempool/mempool_storage.rs | 29 +++++-------------- .../src/mempool/service/inbound_handlers.rs | 11 +++---- 8 files changed, 26 insertions(+), 47 deletions(-) diff --git a/base_layer/core/src/base_node/chain_metadata_service/service.rs b/base_layer/core/src/base_node/chain_metadata_service/service.rs index c38cc7f18b..a4a19c8db2 100644 --- a/base_layer/core/src/base_node/chain_metadata_service/service.rs +++ b/base_layer/core/src/base_node/chain_metadata_service/service.rs @@ -136,7 +136,7 @@ impl ChainMetadataService { match event { BlockEvent::ValidBlockAdded(_, BlockAddResult::Ok(_)) | BlockEvent::ValidBlockAdded(_, BlockAddResult::ChainReorg { .. }) | - BlockEvent::BlockSyncComplete() => { + BlockEvent::BlockSyncComplete(_, _) => { self.update_liveness_chain_metadata().await?; }, _ => {}, diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 6354c2a38e..96d6262f69 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -70,7 +70,7 @@ pub enum BlockEvent { AddBlockErrored { block: Arc, }, - BlockSyncComplete(), + BlockSyncComplete(Arc, u64), BlockSyncRewind(Vec>), } diff --git a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs index 498977717d..0e3dd9fc09 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs @@ -93,8 +93,8 @@ impl BlockSync { }); let local_nci = shared.local_node_interface.clone(); - synchronizer.on_complete(move |_| { - local_nci.publish_block_event(BlockEvent::BlockSyncComplete()); + synchronizer.on_complete(move |block, starting_height| { + local_nci.publish_block_event(BlockEvent::BlockSyncComplete(block, starting_height)); }); let timer = Instant::now(); diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index 319fd172b7..917f4cac59 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -92,7 +92,7 @@ impl BlockSynchronizer { } pub fn on_complete(&mut self, hook: H) - where H: Fn(Arc) + Send + Sync + 'static { + where H: Fn(Arc, u64) + Send + Sync + 'static { self.hooks.add_on_complete_hook(hook); } @@ -377,7 +377,7 @@ impl BlockSynchronizer { } if let Some(block) = current_block { - self.hooks.call_on_complete_hooks(block); + self.hooks.call_on_complete_hooks(block, best_height); } debug!(target: LOG_TARGET, "Completed block sync with peer `{}`", sync_peer); diff --git a/base_layer/core/src/base_node/sync/hooks.rs b/base_layer/core/src/base_node/sync/hooks.rs index 9ddd2cd7ed..6e7f00a1c7 100644 --- a/base_layer/core/src/base_node/sync/hooks.rs +++ b/base_layer/core/src/base_node/sync/hooks.rs @@ -35,7 +35,7 @@ pub(super) struct Hooks { on_progress_header: Vec>, on_progress_block: Vec, u64, &SyncPeer) + Send + Sync>>, on_progress_horizon_sync: Vec>, - on_complete: Vec) + Send + Sync>>, + on_complete: Vec, u64) + Send + Sync>>, on_rewind: Vec>) + Send + Sync>>, } @@ -81,12 +81,14 @@ impl Hooks { } pub fn add_on_complete_hook(&mut self, hook: H) - where H: Fn(Arc) + Send + Sync + 'static { + where H: Fn(Arc, u64) + Send + Sync + 'static { self.on_complete.push(Box::new(hook)); } - pub fn call_on_complete_hooks(&self, final_block: Arc) { - self.on_complete.iter().for_each(|f| (*f)(final_block.clone())); + pub fn call_on_complete_hooks(&self, final_block: Arc, starting_height: u64) { + self.on_complete + .iter() + .for_each(|f| (*f)(final_block.clone(), starting_height)); } pub fn add_on_rewind_hook(&mut self, hook: H) diff --git a/base_layer/core/src/mempool/mempool.rs b/base_layer/core/src/mempool/mempool.rs index b79a806d49..49a2951544 100644 --- a/base_layer/core/src/mempool/mempool.rs +++ b/base_layer/core/src/mempool/mempool.rs @@ -101,15 +101,10 @@ impl Mempool { .await } - /// In the event of a Rewind for a block sync, all transactions to the orphan pool - pub async fn process_rewind(&self, removed_blocks: Vec>) -> Result<(), MempoolError> { - self.with_write_access(move |storage| storage.process_rewind(&removed_blocks)) - .await - } - /// After a sync event, we can move all orphan transactions to the unconfirmed pool after validation - pub async fn process_sync(&self) -> Result<(), MempoolError> { - self.with_write_access(move |storage| storage.process_sync()).await + pub async fn process_sync(&self, blocks_added: u64) -> Result<(), MempoolError> { + self.with_write_access(move |storage| storage.process_sync(blocks_added)) + .await } /// Returns all unconfirmed transaction stored in the Mempool, except the transactions stored in the ReOrgPool. diff --git a/base_layer/core/src/mempool/mempool_storage.rs b/base_layer/core/src/mempool/mempool_storage.rs index 7d242b6891..a5bb0f6b76 100644 --- a/base_layer/core/src/mempool/mempool_storage.rs +++ b/base_layer/core/src/mempool/mempool_storage.rs @@ -202,32 +202,17 @@ impl MempoolStorage { Ok(()) } - /// In the event of a Rewind for a block sync. Move all transactions to the orphan pool - pub fn process_rewind(&mut self, removed_blocks: &[Arc]) -> Result<(), MempoolError> { - debug!(target: LOG_TARGET, "Mempool processing rewind"); - let current_tip = removed_blocks - .first() - .map(|block| block.header.height) - .unwrap_or_default() - .checked_sub(1) - .unwrap_or_default(); - - // Clear out all transactions from the unconfirmed pool and save them in the reorg pool. We dont reinsert valid - // transactions here as we will need to revalidate them again after the sync was done and the mempool and - // blockchain does not yet know how the blocks will look. - let removed_txs = self.unconfirmed_pool.drain_all_mempool_transactions(); - // lets save to the reorg pool - self.reorg_pool.insert_all(current_tip, removed_txs); - Ok(()) - } - /// After a sync event, we need to try to add in all the transaction form the reorg pool. - pub fn process_sync(&mut self) -> Result<(), MempoolError> { + pub fn process_sync(&mut self, blocks_added: u64) -> Result<(), MempoolError> { debug!(target: LOG_TARGET, "Mempool processing sync finished"); - // lets retrieve all the transactions from the reorg pool and try to reinsert them. - let txs = self.reorg_pool.clear_and_retrieve_all(); + // lets remove and revalidate all transactions from the mempool. All we know is that the state has changed, but + // we dont have the data to know what. + let txs = self.unconfirmed_pool.drain_all_mempool_transactions(); // lets add them all back into the mempool self.insert_txs(txs); + //let retrieve all re-org pool transactions as well as make sure they are mined as well + let txs = self.reorg_pool.clear_and_retrieve_all(); + self.insert_txs(txs); Ok(()) } diff --git a/base_layer/core/src/mempool/service/inbound_handlers.rs b/base_layer/core/src/mempool/service/inbound_handlers.rs index 1248776f15..29a1ced323 100644 --- a/base_layer/core/src/mempool/service/inbound_handlers.rs +++ b/base_layer/core/src/mempool/service/inbound_handlers.rs @@ -173,13 +173,10 @@ impl MempoolInboundHandlers { .await?; }, ValidBlockAdded(_, _) => {}, - BlockSyncRewind(removed_blocks) => { - self.mempool - .process_rewind(removed_blocks.iter().map(|b| b.to_arc_block()).collect()) - .await?; - }, - BlockSyncComplete() => { - self.mempool.process_sync().await?; + BlockSyncRewind(_) => {}, + BlockSyncComplete(tip_block, starting_sync_height) => { + let height_diff = tip_block.height() - starting_sync_height; + self.mempool.process_sync(height_diff).await?; }, AddBlockValidationFailed { block: failed_block, From 9a2c0aa0706f12aa1ec4bcc48fab6b464039da5e Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Thu, 22 Sep 2022 10:47:01 +0200 Subject: [PATCH 4/6] finish mempool side --- base_layer/core/src/mempool/mempool.rs | 4 ++-- base_layer/core/src/mempool/mempool_storage.rs | 2 +- base_layer/core/src/mempool/service/inbound_handlers.rs | 5 ++--- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/base_layer/core/src/mempool/mempool.rs b/base_layer/core/src/mempool/mempool.rs index 49a2951544..e70cf6030c 100644 --- a/base_layer/core/src/mempool/mempool.rs +++ b/base_layer/core/src/mempool/mempool.rs @@ -102,8 +102,8 @@ impl Mempool { } /// After a sync event, we can move all orphan transactions to the unconfirmed pool after validation - pub async fn process_sync(&self, blocks_added: u64) -> Result<(), MempoolError> { - self.with_write_access(move |storage| storage.process_sync(blocks_added)) + pub async fn process_sync(&self) -> Result<(), MempoolError> { + self.with_write_access(move |storage| storage.process_sync()) .await } diff --git a/base_layer/core/src/mempool/mempool_storage.rs b/base_layer/core/src/mempool/mempool_storage.rs index a5bb0f6b76..69c629b50d 100644 --- a/base_layer/core/src/mempool/mempool_storage.rs +++ b/base_layer/core/src/mempool/mempool_storage.rs @@ -203,7 +203,7 @@ impl MempoolStorage { } /// After a sync event, we need to try to add in all the transaction form the reorg pool. - pub fn process_sync(&mut self, blocks_added: u64) -> Result<(), MempoolError> { + pub fn process_sync(&mut self) -> Result<(), MempoolError> { debug!(target: LOG_TARGET, "Mempool processing sync finished"); // lets remove and revalidate all transactions from the mempool. All we know is that the state has changed, but // we dont have the data to know what. diff --git a/base_layer/core/src/mempool/service/inbound_handlers.rs b/base_layer/core/src/mempool/service/inbound_handlers.rs index 29a1ced323..954394e565 100644 --- a/base_layer/core/src/mempool/service/inbound_handlers.rs +++ b/base_layer/core/src/mempool/service/inbound_handlers.rs @@ -174,9 +174,8 @@ impl MempoolInboundHandlers { }, ValidBlockAdded(_, _) => {}, BlockSyncRewind(_) => {}, - BlockSyncComplete(tip_block, starting_sync_height) => { - let height_diff = tip_block.height() - starting_sync_height; - self.mempool.process_sync(height_diff).await?; + BlockSyncComplete(_,_) => { + self.mempool.process_sync().await?; }, AddBlockValidationFailed { block: failed_block, From cec6f14d5eafd413d1260e92ff4057e9600a916a Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Thu, 22 Sep 2022 11:23:31 +0200 Subject: [PATCH 5/6] remove flaky tag --- integration_tests/features/Mempool.feature | 1 - 1 file changed, 1 deletion(-) diff --git a/integration_tests/features/Mempool.feature b/integration_tests/features/Mempool.feature index 1048a272ac..cd58eef232 100644 --- a/integration_tests/features/Mempool.feature +++ b/integration_tests/features/Mempool.feature @@ -88,7 +88,6 @@ Feature: Mempool Then SENDER has TX1 in NOT_STORED state Then SENDER has TX2 in MINED state - @flaky Scenario: Mempool clearing out invalid transactions after a reorg Given I have a seed node SEED_A And I have a base node NODE_A connected to seed SEED_A From 29cd327d378179e3a6d58f0d7206d3a44c1dd448 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Fri, 23 Sep 2022 10:05:03 +0200 Subject: [PATCH 6/6] clippy --- base_layer/core/src/mempool/mempool.rs | 3 +-- base_layer/core/src/mempool/mempool_storage.rs | 9 ++++----- base_layer/core/src/mempool/service/inbound_handlers.rs | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/base_layer/core/src/mempool/mempool.rs b/base_layer/core/src/mempool/mempool.rs index e70cf6030c..898011efbb 100644 --- a/base_layer/core/src/mempool/mempool.rs +++ b/base_layer/core/src/mempool/mempool.rs @@ -103,8 +103,7 @@ impl Mempool { /// After a sync event, we can move all orphan transactions to the unconfirmed pool after validation pub async fn process_sync(&self) -> Result<(), MempoolError> { - self.with_write_access(move |storage| storage.process_sync()) - .await + self.with_write_access(move |storage| storage.process_sync()).await } /// Returns all unconfirmed transaction stored in the Mempool, except the transactions stored in the ReOrgPool. diff --git a/base_layer/core/src/mempool/mempool_storage.rs b/base_layer/core/src/mempool/mempool_storage.rs index 69c629b50d..518f241e83 100644 --- a/base_layer/core/src/mempool/mempool_storage.rs +++ b/base_layer/core/src/mempool/mempool_storage.rs @@ -191,14 +191,13 @@ impl MempoolStorage { // validation. This is important as invalid transactions that have not been mined yet may remain in the mempool // after a reorg. let removed_txs = self.unconfirmed_pool.drain_all_mempool_transactions(); - // this returns all orphaned transaction, but because we know the new blocks are already added, they will still - // be orphaned, we can drop them here - let _ = self.insert_txs(removed_txs); + // Try to add in all the transactions again. + self.insert_txs(removed_txs); // Remove re-orged transactions from reorg pool and re-submit them to the unconfirmed mempool let removed_txs = self .reorg_pool .remove_reorged_txs_and_discard_double_spends(removed_blocks, new_blocks); - let _ = self.insert_txs(removed_txs); + self.insert_txs(removed_txs); Ok(()) } @@ -210,7 +209,7 @@ impl MempoolStorage { let txs = self.unconfirmed_pool.drain_all_mempool_transactions(); // lets add them all back into the mempool self.insert_txs(txs); - //let retrieve all re-org pool transactions as well as make sure they are mined as well + // let retrieve all re-org pool transactions as well as make sure they are mined as well let txs = self.reorg_pool.clear_and_retrieve_all(); self.insert_txs(txs); Ok(()) diff --git a/base_layer/core/src/mempool/service/inbound_handlers.rs b/base_layer/core/src/mempool/service/inbound_handlers.rs index 954394e565..86ffa9d962 100644 --- a/base_layer/core/src/mempool/service/inbound_handlers.rs +++ b/base_layer/core/src/mempool/service/inbound_handlers.rs @@ -174,7 +174,7 @@ impl MempoolInboundHandlers { }, ValidBlockAdded(_, _) => {}, BlockSyncRewind(_) => {}, - BlockSyncComplete(_,_) => { + BlockSyncComplete(_, _) => { self.mempool.process_sync().await?; }, AddBlockValidationFailed {