Skip to content

Commit

Permalink
fix: mempool fetch_highest_priority_txs (#5551)
Browse files Browse the repository at this point in the history
Description
---
Fix the `fetch_highest_priority_txs`.
The process of selection is as follows:
Assume that all transaction have the same weight for simplicity.
A(20)->B(2) means A depends on B and A has
fee 20 and B has fee 2. A(20)->B(2)->C(14), D(12)
1) A will be selected first, but B and C will be piggybacked on A,
because overall fee_per_byte is 12, so we
  store it temporarily.
2) We look at transaction C with fee per byte 14, it's good, nothing is
better.
3) We come back to transaction A with fee per byte 12, but now that C is
already in, we recompute it's fee
  per byte to 11, and again we store it temporarily.
4) Next we process transaction D, it's good, nothing is better.
5) And now we proceed finally to transaction A, because there is no
other possible better option.

Motivation and Context
---
As it was it was easy to piggyback heavy transaction with low
fee_per_byte on light transaction with high fee per byte.

How Has This Been Tested?
---
There are two tests for zero conf, one is for the piggybacking.

What process can a PR reviewer use to test or verify this change?
---

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
Cifko authored Jun 30, 2023
1 parent ae4eaad commit f7f749c
Show file tree
Hide file tree
Showing 3 changed files with 275 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub struct PrioritizedTransaction {
pub key: usize,
pub transaction: Arc<Transaction>,
pub priority: FeePriority,
pub fee_per_byte: u64,
pub weight: u64,
pub dependent_output_hashes: Vec<HashOutput>,
}
Expand All @@ -95,6 +96,7 @@ impl PrioritizedTransaction {
Ok(Self {
key,
priority: FeePriority::new(&transaction, insert_epoch, weight),
fee_per_byte: ((f64::from(transaction.body.get_total_fee()) / weight as f64) * 1000.0) as u64,
weight,
transaction,
dependent_output_hashes: dependent_outputs.unwrap_or_default(),
Expand Down
178 changes: 171 additions & 7 deletions base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
sync::Arc,
};

Expand Down Expand Up @@ -89,6 +89,8 @@ pub struct RetrieveResults {
pub transactions_to_insert: Vec<Arc<Transaction>>,
}

pub type CompleteTransactionBranch = HashMap<TransactionKey, (HashMap<TransactionKey, Arc<Transaction>>, u64, u64)>;

impl UnconfirmedPool {
/// Create a new UnconfirmedPool with the specified configuration
pub fn new(config: UnconfirmedPoolConfig) -> Self {
Expand Down Expand Up @@ -172,40 +174,94 @@ impl UnconfirmedPool {
}

/// Returns a set of the highest priority unconfirmed transactions, that can be included in a block
#[allow(clippy::too_many_lines)]
pub fn fetch_highest_priority_txs(&mut self, total_weight: u64) -> Result<RetrieveResults, UnconfirmedPoolError> {
// The process of selection is as follows:
// Assume that all transaction have the same weight for simplicity. A(20)->B(2) means A depends on B and A has
// fee 20 and B has fee 2. A(20)->B(2)->C(14), D(12)
// 1) A will be selected first, but B and C will be piggybacked on A, because overall fee_per_byte is 12, so we
// store it temporarily.
// 2) We look at transaction C with fee per byte 14, it's good, nothing is better.
// 3) We come back to transaction A with fee per byte 12, but now that C is already in, we recompute it's fee
// per byte to 11, and again we store it temporarily.
// 4) Next we process transaction D, it's good, nothing is better.
// 5) And now we proceed finally to transaction A, because there is no other possible better option.
//
// Note, if we store some TX_a that is dependent on some TXs including TX_b. And we remove TX_b (this should
// trigger TX_a fee per byte recompute) before we process TX_a again, then the TX_a fee_per_byte will be lower
// or equal, it will never be higher. Proof by contradiction we remove TX_b sooner then TX_a is process and
// fee_per_byte(TX_a+dependents) > fee_per_byte(TX_a+dependents-TX_b), that would mean that
// fee_per_byte(TX_b)<fee_per_byte(TX_a+dependents), but if this would be the case then we would not
// process TX_b before TX_a.

let mut selected_txs = HashMap::new();
let mut curr_weight = 0;
let mut curr_skip_count = 0;
let mut transactions_to_remove_and_recheck = Vec::new();
let mut potential_transactions_to_remove_and_recheck = Vec::new();
let mut unique_ids = HashSet::new();
let mut complete_transaction_branch = CompleteTransactionBranch::new();
let mut potentional_to_add = BinaryHeap::<(u64, TransactionKey)>::new();
// For each transaction we store transactions that depends on it. So when we process it, we can mark all of them
// for recomputing.
let mut depended_on: HashMap<TransactionKey, Vec<&TransactionKey>> = HashMap::new();
let mut recompute = HashSet::new();
for (_, tx_key) in self.tx_by_priority.iter().rev() {
if selected_txs.contains_key(tx_key) {
continue;
}

let prioritized_transaction = self
.tx_by_key
.get(tx_key)
.ok_or(UnconfirmedPoolError::StorageOutofSync)?;

self.check_the_potential_txs(
total_weight,
&mut selected_txs,
&mut curr_weight,
&mut curr_skip_count,
&mut complete_transaction_branch,
&mut potentional_to_add,
&mut depended_on,
&mut recompute,
prioritized_transaction.fee_per_byte,
);
if curr_skip_count >= self.config.weight_tx_skip_count {
break;
}
let mut total_transaction_weight = 0;
let mut total_transaction_fees = 0;
let mut candidate_transactions_to_select = HashMap::new();
let mut potential_transactions_to_remove_and_recheck = Vec::new();
self.get_all_dependent_transactions(
prioritized_transaction,
&mut candidate_transactions_to_select,
&mut potential_transactions_to_remove_and_recheck,
&selected_txs,
&mut total_transaction_weight,
&mut total_transaction_fees,
&mut unique_ids,
)?;
let total_weight_after_candidates = curr_weight + total_transaction_weight;
if total_weight_after_candidates <= total_weight && potential_transactions_to_remove_and_recheck.is_empty()
{
if !UnconfirmedPool::find_duplicate_input(&selected_txs, &candidate_transactions_to_select) {
curr_weight += total_transaction_weight;
selected_txs.extend(candidate_transactions_to_select);
for dependend_on_tx_key in candidate_transactions_to_select.keys() {
if dependend_on_tx_key != tx_key {
// Transaction is not depended on itself.
depended_on
.entry(*dependend_on_tx_key)
.and_modify(|v| v.push(tx_key))
.or_insert_with(|| vec![tx_key]);
}
}
let fee_per_byte = (total_transaction_fees as f64 / total_transaction_weight as f64 * 1000.0) as u64;
complete_transaction_branch.insert(
*tx_key,
(
candidate_transactions_to_select.clone(),
total_transaction_weight,
total_transaction_fees,
),
);
potentional_to_add.push((fee_per_byte, *tx_key));
} else {
transactions_to_remove_and_recheck.append(&mut potential_transactions_to_remove_and_recheck);
// Check if some the next few txs with slightly lower priority wont fit in the remaining space.
Expand All @@ -215,6 +271,19 @@ impl UnconfirmedPool {
}
}
}
if curr_skip_count < self.config.weight_tx_skip_count {
self.check_the_potential_txs(
total_weight,
&mut selected_txs,
&mut curr_weight,
&mut curr_skip_count,
&mut complete_transaction_branch,
&mut potentional_to_add,
&mut depended_on,
&mut recompute,
0,
);
}
if !transactions_to_remove_and_recheck.is_empty() {
// we need to remove all transactions that need to be rechecked.
debug!(
Expand All @@ -237,6 +306,98 @@ impl UnconfirmedPool {
Ok(results)
}

fn check_the_potential_txs<'a>(
&self,
total_weight: u64,
selected_txs: &mut HashMap<TransactionKey, Arc<Transaction>>,
curr_weight: &mut u64,
curr_skip_count: &mut usize,
complete_transaction_branch: &mut CompleteTransactionBranch,
potentional_to_add: &mut BinaryHeap<(u64, TransactionKey)>,
depended_on: &mut HashMap<TransactionKey, Vec<&'a TransactionKey>>,
recompute: &mut HashSet<&'a TransactionKey>,
fee_per_byte_threshold: u64,
) {
while match potentional_to_add.peek() {
Some((fee_per_byte, _)) => *fee_per_byte >= fee_per_byte_threshold,
None => false,
} {
// If the current TXs has lower fee than the ones we already processed, we can add some.
let (_fee_per_byte, tx_key) = potentional_to_add.pop().unwrap(); // Safe, we already checked we have some.
if selected_txs.contains_key(&tx_key) {
continue;
}
// Before we do anything with the top transaction we need to know if needs to be recomputed.
if recompute.contains(&tx_key) {
recompute.remove(&tx_key);
// So we recompute the total fees based on updated weights and fees.
let (_, total_transaction_weight, total_transaction_fees) =
complete_transaction_branch.get(&tx_key).unwrap();
let fee_per_byte = (*total_transaction_fees as f64 / *total_transaction_weight as f64 * 1000.0) as u64;
potentional_to_add.push((fee_per_byte, tx_key));
continue;
}
let (candidate_transactions_to_select, total_transaction_weight, _total_transaction_fees) =
complete_transaction_branch.remove(&tx_key).unwrap();

let total_weight_after_candidates = *curr_weight + total_transaction_weight;
if total_weight_after_candidates <= total_weight {
if !UnconfirmedPool::find_duplicate_input(selected_txs, &candidate_transactions_to_select) {
*curr_weight += total_transaction_weight;
// So we processed the transaction, let's mark the dependents to be recomputed.
for tx_key in candidate_transactions_to_select.keys() {
self.remove_transaction_from_the_dependants(
*tx_key,
complete_transaction_branch,
depended_on,
recompute,
);
}
selected_txs.extend(candidate_transactions_to_select);
}
} else {
*curr_skip_count += 1;
if *curr_skip_count >= self.config.weight_tx_skip_count {
break;
}
}
// Some cleanup of what we don't need anymore
complete_transaction_branch.remove(&tx_key);
depended_on.remove(&tx_key);
}
}

pub fn remove_transaction_from_the_dependants<'a>(
&self,
tx_key: TransactionKey,
complete_transaction_branch: &mut CompleteTransactionBranch,
depended_on: &mut HashMap<TransactionKey, Vec<&'a TransactionKey>>,
recompute: &mut HashSet<&'a TransactionKey>,
) {
if let Some(txs) = depended_on.remove(&tx_key) {
let prioritized_transaction = self
.tx_by_key
.get(&tx_key)
.ok_or(UnconfirmedPoolError::StorageOutofSync)
.unwrap();
for tx in txs {
if let Some((
update_candidate_transactions_to_select,
update_total_transaction_weight,
update_total_transaction_fees,
)) = complete_transaction_branch.get_mut(tx)
{
update_candidate_transactions_to_select.remove(&tx_key);
*update_total_transaction_weight -= prioritized_transaction.weight;
*update_total_transaction_fees -= prioritized_transaction.transaction.body.get_total_fee().0;
// We mark it as recompute, we don't have to update the Heap, because it will never be
// better as it was (see the note at the top of the function).
recompute.insert(tx);
}
}
}
}

pub fn retrieve_by_excess_sigs(&self, excess_sigs: &[PrivateKey]) -> (Vec<Arc<Transaction>>, Vec<PrivateKey>) {
// Hashset used to prevent duplicates
let mut found = HashSet::new();
Expand Down Expand Up @@ -269,6 +430,7 @@ impl UnconfirmedPool {
transactions_to_recheck: &mut Vec<(TransactionKey, Arc<Transaction>)>,
selected_txs: &HashMap<TransactionKey, Arc<Transaction>>,
total_weight: &mut u64,
total_fees: &mut u64,
_unique_ids: &mut HashSet<[u8; 32]>,
) -> Result<(), UnconfirmedPoolError> {
for dependent_output in &transaction.dependent_output_hashes {
Expand All @@ -282,6 +444,7 @@ impl UnconfirmedPool {
transactions_to_recheck,
selected_txs,
total_weight,
total_fees,
_unique_ids,
)?;

Expand All @@ -304,6 +467,7 @@ impl UnconfirmedPool {
.insert(transaction.key, transaction.transaction.clone())
.is_none()
{
*total_fees += transaction.transaction.body.get_total_fee().0;
*total_weight += transaction.weight;
}

Expand Down
104 changes: 102 additions & 2 deletions base_layer/core/tests/tests/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,106 @@ async fn test_retrieve() {
assert!(retrieved_txs.contains(&tx2[1]));
}

#[tokio::test]
#[allow(clippy::identity_op)]
async fn test_zero_conf_no_piggyback() {
// This is the scenario described in fetch_highest_priority_txs function.
let network = Network::LocalNet;
let (mut store, mut blocks, mut outputs, consensus_manager, key_manager) = create_new_blockchain(network).await;
let mempool_validator = TransactionChainLinkedValidator::new(store.clone(), consensus_manager.clone());
let mempool = Mempool::new(
MempoolConfig::default(),
consensus_manager.clone(),
Box::new(mempool_validator),
);
let txs = vec![txn_schema!(
from: vec![outputs[0][0].clone()],
to: vec![21 * T, 11 * T, 11 * T, 16 * T]
)];
// "Mine" Block 1
generate_new_block(
&mut store,
&mut blocks,
&mut outputs,
txs,
&consensus_manager,
&key_manager,
)
.await
.unwrap();
mempool.process_published_block(blocks[1].to_arc_block()).await.unwrap();

let (tx_d, _tx_d_out) = spend_utxos(
txn_schema!(
from: vec![outputs[1][1].clone()],
to: vec![5 * T, 5 * T],
fee: 12*uT,
lock: 0,
features: OutputFeatures::default()
),
&key_manager,
)
.await;
assert_eq!(
mempool.insert(Arc::new(tx_d.clone())).await.unwrap(),
TxStorageResponse::UnconfirmedPool
);
let (tx_c, tx_c_out) = spend_utxos(
txn_schema!(
from: vec![outputs[1][0].clone()],
to: vec![15 * T, 5 * T],
fee: 14*uT,
lock: 0,
features: OutputFeatures::default()
),
&key_manager,
)
.await;
assert_eq!(
mempool.insert(Arc::new(tx_c.clone())).await.unwrap(),
TxStorageResponse::UnconfirmedPool
);

let (tx_b, tx_b_out) = spend_utxos(
txn_schema!(
from: vec![tx_c_out[0].clone()],
to: vec![7 * T, 4 * T],
fee: 2*uT, lock: 0,
features: OutputFeatures::default()
),
&key_manager,
)
.await;
assert_eq!(
mempool.insert(Arc::new(tx_b.clone())).await.unwrap(),
TxStorageResponse::UnconfirmedPool
);
let (tx_a, _tx_a_out) = spend_utxos(
txn_schema!(
from: vec![tx_b_out[1].clone()],
to: vec![2 * T, 1 * T],
fee: 20*uT,
lock: 0,
features: OutputFeatures::default()
),
&key_manager,
)
.await;

assert_eq!(
mempool.insert(Arc::new(tx_a.clone())).await.unwrap(),
TxStorageResponse::UnconfirmedPool
);

let weight = mempool.stats().await.unwrap().unconfirmed_weight - 1;
let retrieved_txs = mempool.retrieve(weight).await.unwrap();
assert_eq!(retrieved_txs.len(), 3);
assert!(retrieved_txs.contains(&Arc::new(tx_d)));
assert!(retrieved_txs.contains(&Arc::new(tx_c)));
assert!(retrieved_txs.contains(&Arc::new(tx_b)));
assert!(!retrieved_txs.contains(&Arc::new(tx_a)));
}

#[tokio::test]
#[allow(clippy::identity_op)]
#[allow(clippy::too_many_lines)]
Expand Down Expand Up @@ -834,8 +934,8 @@ async fn test_zero_conf() {
assert!(retrieved_txs.contains(&Arc::new(tx22)));
assert!(retrieved_txs.contains(&Arc::new(tx23)));
assert!(retrieved_txs.contains(&Arc::new(tx24)));
assert!(!retrieved_txs.contains(&Arc::new(tx31))); // Missing
assert!(retrieved_txs.contains(&Arc::new(tx32)));
assert!(retrieved_txs.contains(&Arc::new(tx31)));
assert!(!retrieved_txs.contains(&Arc::new(tx32))); // Missing
assert!(retrieved_txs.contains(&Arc::new(tx33)));
assert!(retrieved_txs.contains(&Arc::new(tx34)));
}
Expand Down

0 comments on commit f7f749c

Please sign in to comment.