Skip to content

Commit

Permalink
Merge pull request #3089 from AleoHQ/limit_deployments_2
Browse files Browse the repository at this point in the history
Limit number of deployments in mempool.
  • Loading branch information
howardwu authored Feb 14, 2024
2 parents 94f88e2 + a5d3349 commit 264b2d4
Showing 1 changed file with 52 additions and 13 deletions.
65 changes: 52 additions & 13 deletions node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,32 @@ use tokio::{
task::JoinHandle,
};

/// Percentage of mempool transactions capacity reserved for deployments.
const CAPACITY_FOR_DEPLOYMENTS: usize = 20;
/// Percentage of mempool transactions capacity reserved for executions.
const CAPACITY_FOR_EXECUTIONS: usize = 80;

/// Helper struct to track incoming transactions.
struct TransactionsQueue<N: Network> {
pub deployments: LruCache<N::TransactionID, Transaction<N>>,
pub executions: LruCache<N::TransactionID, Transaction<N>>,
}

impl<N: Network> Default for TransactionsQueue<N> {
fn default() -> Self {
Self {
deployments: LruCache::new(
NonZeroUsize::new(BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * CAPACITY_FOR_DEPLOYMENTS / 100)
.unwrap(),
),
executions: LruCache::new(
NonZeroUsize::new(BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * CAPACITY_FOR_EXECUTIONS / 100)
.unwrap(),
),
}
}
}

#[derive(Clone)]
pub struct Consensus<N: Network> {
/// The ledger.
Expand All @@ -64,7 +90,7 @@ pub struct Consensus<N: Network> {
/// The unconfirmed solutions queue.
solutions_queue: Arc<Mutex<LruCache<PuzzleCommitment<N>, ProverSolution<N>>>>,
/// The unconfirmed transactions queue.
transactions_queue: Arc<Mutex<LruCache<N::TransactionID, Transaction<N>>>>,
transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
/// The recently-seen unconfirmed solutions.
seen_solutions: Arc<Mutex<LruCache<PuzzleCommitment<N>, ()>>>,
/// The recently-seen unconfirmed transactions.
Expand Down Expand Up @@ -101,9 +127,7 @@ impl<N: Network> Consensus<N> {
solutions_queue: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH).unwrap(),
))),
transactions_queue: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH).unwrap(),
))),
transactions_queue: Default::default(),
seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
handles: Default::default(),
Expand Down Expand Up @@ -260,7 +284,11 @@ impl<N: Network> Consensus<N> {
}
// Add the transaction to the memory pool.
trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
if self.transactions_queue.lock().put(transaction_id, transaction).is_some() {
if transaction.is_deploy() {
if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
}
} else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
}
}
Expand All @@ -274,14 +302,25 @@ impl<N: Network> Consensus<N> {
let transactions = {
// Determine the available capacity.
let capacity = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH.saturating_sub(num_unconfirmed);
// Acquire the lock on the queue.
let mut queue = self.transactions_queue.lock();
// Determine the number of transactions to send.
let num_transactions = queue.len().min(capacity);
// Drain the solutions from the queue.
(0..num_transactions)
.filter_map(|_| queue.pop_lru().map(|(_, transaction)| transaction))
.collect::<Vec<_>>()
// Acquire the lock on the transactions queue.
let mut tx_queue = self.transactions_queue.lock();
// Determine the number of deployments to send.
let num_deployments = tx_queue.deployments.len().min(capacity * CAPACITY_FOR_DEPLOYMENTS / 100);
// Determine the number of executions to send.
let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
// Create an iterator which will select interleaved deployments and executions within the capacity.
// Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue.
let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
// Drain the transactions from the queue, interleaving deployments and executions.
selector_iter
.filter_map(|select_deployment| {
if select_deployment {
tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
} else {
tx_queue.executions.pop_lru().map(|(_, tx)| tx)
}
})
.collect_vec()
};
// Iterate over the transactions.
for transaction in transactions.into_iter() {
Expand Down

0 comments on commit 264b2d4

Please sign in to comment.