Skip to content

Commit

Permalink
fix: mempool fetch_highest_priority_txs
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Jun 29, 2023
1 parent 6c3fa50 commit 43a7d3a
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub struct PrioritizedTransaction {
pub key: usize,
pub transaction: Arc<Transaction>,
pub priority: FeePriority,
pub fee_per_byte: u64,
pub weight: u64,
pub dependent_output_hashes: Vec<HashOutput>,
}
Expand All @@ -95,6 +96,7 @@ impl PrioritizedTransaction {
Self {
key,
priority: FeePriority::new(&transaction, insert_epoch, weight),
fee_per_byte: ((f64::from(transaction.body.get_total_fee()) / weight as f64) * 1000.0) as u64,
weight,
transaction,
dependent_output_hashes: dependent_outputs.unwrap_or_default(),
Expand Down
157 changes: 150 additions & 7 deletions base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
sync::Arc,
};

Expand Down Expand Up @@ -170,39 +170,93 @@ impl UnconfirmedPool {

/// Returns a set of the highest priority unconfirmed transactions, that can be included in a block
pub fn fetch_highest_priority_txs(&mut self, total_weight: u64) -> Result<RetrieveResults, UnconfirmedPoolError> {
// The process of selection is as follows:
// Assume that all transaction have the same weight for simplicity. A(20)->B(2) means A depends on B and A has
// fee 20 and B has fee 2. A(20)->B(2)->C(14), D(12)
// 1) A will be selected first, but B and C will be piggybacked on A, because overall fee_per_byte is 12, so we
// store it temporarily.
// 2) We look at transaction C with fee per byte 14, it's good, nothing is better.
// 3) We come back to transaction A with fee per byte 12, but now that C is already in, we recompute it's fee
// per byte to 11, and again we store it temporarily.
// 4) Next we process transaction D, it's good, nothing is better.
// 5) And now we proceed finally to transaction A, because there is no other possible better option.
//
// Note, if we store some TX_a that is dependent on some TXs including TX_b. And we remove TX_b (this should
// trigger TX_a fee per byte recompute) before we process TX_a again, then the TX_a fee_per_byte will be lower
// or equal, it will never be higher. Proof by contradiction we remove TX_b sooner then TX_a is process and
// fee_per_byte(TX_a+dependents) > fee_per_byte(TX_a+dependents-TX_b), that would mean that
// fee_per_byte(TX_b)<fee_per_byte(TX_a+dependents), but if this would be the case then we would not
// process TX_b before TX_a.

let mut selected_txs = HashMap::new();
let mut curr_weight = 0;
let mut curr_skip_count = 0;
let mut transactions_to_remove_and_recheck = Vec::new();
let mut potential_transactions_to_remove_and_recheck = Vec::new();
let mut unique_ids = HashSet::new();
let mut complete_transaction_branch =
HashMap::<TransactionKey, (HashMap<TransactionKey, Arc<Transaction>>, u64, u64)>::new();
let mut potentional_to_add = BinaryHeap::<(u64, TransactionKey)>::new();
// For each transaction we store transactions that depends on it. So when we process it, we can mark all of them
// for recomputing.
let mut depended_on: HashMap<TransactionKey, Vec<&TransactionKey>> = HashMap::new();
let mut recompute = HashSet::new();
for (_, tx_key) in self.tx_by_priority.iter().rev() {
if selected_txs.contains_key(tx_key) {
continue;
}

let prioritized_transaction = self
.tx_by_key
.get(tx_key)
.ok_or(UnconfirmedPoolError::StorageOutofSync)?;

self.check_the_potential_txs(
total_weight,
&mut selected_txs,
&mut curr_weight,
&mut curr_skip_count,
&mut complete_transaction_branch,
&mut potentional_to_add,
&mut depended_on,
&mut recompute,
prioritized_transaction.fee_per_byte,
);
if curr_skip_count >= self.config.weight_tx_skip_count {
break;
}
let mut total_transaction_weight = 0;
let mut total_transaction_fees = 0;
let mut candidate_transactions_to_select = HashMap::new();
let mut potential_transactions_to_remove_and_recheck = Vec::new();
self.get_all_dependent_transactions(
prioritized_transaction,
&mut candidate_transactions_to_select,
&mut potential_transactions_to_remove_and_recheck,
&selected_txs,
&mut total_transaction_weight,
&mut total_transaction_fees,
&mut unique_ids,
)?;
let total_weight_after_candidates = curr_weight + total_transaction_weight;
if total_weight_after_candidates <= total_weight && potential_transactions_to_remove_and_recheck.is_empty()
{
if !UnconfirmedPool::find_duplicate_input(&selected_txs, &candidate_transactions_to_select) {
curr_weight += total_transaction_weight;
selected_txs.extend(candidate_transactions_to_select);
for (dependend_on_tx_key, _transaction) in &candidate_transactions_to_select {
if dependend_on_tx_key != tx_key {
// Transaction is not depended on itself.
depended_on
.entry(dependend_on_tx_key.clone())
.and_modify(|v| v.push(tx_key))
.or_insert(vec![tx_key]);
}
}
let fee_per_byte = (total_transaction_fees as f64 / total_transaction_weight as f64 * 1000.0) as u64;
complete_transaction_branch.insert(
*tx_key,
(
candidate_transactions_to_select.clone(),
total_transaction_weight,
total_transaction_fees,
),
);
potentional_to_add.push((fee_per_byte, *tx_key));
} else {
transactions_to_remove_and_recheck.append(&mut potential_transactions_to_remove_and_recheck);
// Check if some the next few txs with slightly lower priority wont fit in the remaining space.
Expand All @@ -212,6 +266,19 @@ impl UnconfirmedPool {
}
}
}
if curr_skip_count < self.config.weight_tx_skip_count {
self.check_the_potential_txs(
total_weight,
&mut selected_txs,
&mut curr_weight,
&mut curr_skip_count,
&mut complete_transaction_branch,
&mut potentional_to_add,
&mut depended_on,
&mut recompute,
0,
);
}
if !transactions_to_remove_and_recheck.is_empty() {
// we need to remove all transactions that need to be rechecked.
debug!(
Expand All @@ -234,6 +301,79 @@ impl UnconfirmedPool {
Ok(results)
}

fn check_the_potential_txs<'a>(
&self,
total_weight: u64,
selected_txs: &mut HashMap<TransactionKey, Arc<Transaction>>,
curr_weight: &mut u64,
curr_skip_count: &mut usize,
complete_transaction_branch: &mut HashMap<
TransactionKey,
(HashMap<TransactionKey, Arc<Transaction>>, u64, u64),
>,
potentional_to_add: &mut BinaryHeap<(u64, TransactionKey)>,
depended_on: &mut HashMap<TransactionKey, Vec<&'a TransactionKey>>,
recompute: &mut HashSet<&'a TransactionKey>,
fee_per_byte_threshold: u64,
) {
while match potentional_to_add.peek() {
Some((fee_per_byte, _)) => *fee_per_byte >= fee_per_byte_threshold,
None => false,
} {
// If the current TXs has lower fee than the ones we already processed, we can add some.
let (_fee_per_byte, tx_key) = potentional_to_add.pop().unwrap(); // Safe, we already checked we have some.

// Before we do anything with the top transaction we need to know if needs to be recomputed.
if recompute.contains(&tx_key) {
recompute.remove(&tx_key);
// So we recompute the total fees based on updated weights and fees.
let (_, total_transaction_weight, total_transaction_fees) =
complete_transaction_branch.get(&tx_key).unwrap();
let fee_per_byte = (*total_transaction_fees as f64 / *total_transaction_weight as f64 * 1000.0) as u64;
potentional_to_add.push((fee_per_byte, tx_key));
continue;
}
let (candidate_transactions_to_select, total_transaction_weight, total_transaction_fees) =
complete_transaction_branch.remove(&tx_key).unwrap();

let total_weight_after_candidates = *curr_weight + total_transaction_weight;
if total_weight_after_candidates <= total_weight {
if !UnconfirmedPool::find_duplicate_input(&selected_txs, &candidate_transactions_to_select) {
*curr_weight += total_transaction_weight;
// So we processed the transaction, let's mark the dependents to be recomputed.
if let Some(txs) = depended_on.remove(&tx_key) {
for tx in txs {
let (
update_candidate_transactions_to_select,
update_total_transaction_weight,
update_total_transaction_fees,
) = complete_transaction_branch.get_mut(&tx).unwrap();
// We remove all of the added ones.
for (selected_tx_key, _) in &candidate_transactions_to_select {
update_candidate_transactions_to_select.remove(selected_tx_key);
}
// We don't need the fees/weights from all the selected ones, we have the totals.
*update_total_transaction_fees -= total_transaction_fees;
*update_total_transaction_weight -= total_transaction_weight;
// We mark it as recompute, we don't have to update the Heap, because it will never be
// better as it was (see the note at the top of the function).
recompute.insert(tx);
}
}
selected_txs.extend(candidate_transactions_to_select);
}
} else {
*curr_skip_count += 1;
if *curr_skip_count >= self.config.weight_tx_skip_count {
break;
}
}
// Some cleanup of what we don't need anymore
complete_transaction_branch.remove(&tx_key);
depended_on.remove(&tx_key);
}
}

pub fn retrieve_by_excess_sigs(&self, excess_sigs: &[PrivateKey]) -> (Vec<Arc<Transaction>>, Vec<PrivateKey>) {
// Hashset used to prevent duplicates
let mut found = HashSet::new();
Expand Down Expand Up @@ -266,6 +406,7 @@ impl UnconfirmedPool {
transactions_to_recheck: &mut Vec<(TransactionKey, Arc<Transaction>)>,
selected_txs: &HashMap<TransactionKey, Arc<Transaction>>,
total_weight: &mut u64,
total_fees: &mut u64,
_unique_ids: &mut HashSet<[u8; 32]>,
) -> Result<(), UnconfirmedPoolError> {
for dependent_output in &transaction.dependent_output_hashes {
Expand All @@ -279,6 +420,7 @@ impl UnconfirmedPool {
transactions_to_recheck,
selected_txs,
total_weight,
total_fees,
_unique_ids,
)?;

Expand All @@ -301,6 +443,7 @@ impl UnconfirmedPool {
.insert(transaction.key, transaction.transaction.clone())
.is_none()
{
*total_fees += transaction.transaction.body.get_total_fee().0;
*total_weight += transaction.weight;
}

Expand Down

0 comments on commit 43a7d3a

Please sign in to comment.