diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index 52aa75a626..30bc32c7e6 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -53,6 +53,32 @@ use tokio::{ task::JoinHandle, }; +/// Percentage of mempool transactions capacity reserved for deployments. +const CAPACITY_FOR_DEPLOYMENTS: usize = 20; +/// Percentage of mempool transactions capacity reserved for executions. +const CAPACITY_FOR_EXECUTIONS: usize = 80; + +/// Helper struct to track incoming transactions. +struct TransactionsQueue { + pub deployments: LruCache>, + pub executions: LruCache>, +} + +impl Default for TransactionsQueue { + fn default() -> Self { + Self { + deployments: LruCache::new( + NonZeroUsize::new(BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH * CAPACITY_FOR_DEPLOYMENTS / 100) + .unwrap(), + ), + executions: LruCache::new( + NonZeroUsize::new(BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH * CAPACITY_FOR_EXECUTIONS / 100) + .unwrap(), + ), + } + } +} + #[derive(Clone)] pub struct Consensus { /// The ledger. @@ -64,7 +90,7 @@ pub struct Consensus { /// The unconfirmed solutions queue. solutions_queue: Arc, ProverSolution>>>, /// The unconfirmed transactions queue. - transactions_queue: Arc>>>, + transactions_queue: Arc>>, /// The recently-seen unconfirmed solutions. seen_solutions: Arc, ()>>>, /// The recently-seen unconfirmed transactions. @@ -101,9 +127,7 @@ impl Consensus { solutions_queue: Arc::new(Mutex::new(LruCache::new( NonZeroUsize::new(BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH).unwrap(), ))), - transactions_queue: Arc::new(Mutex::new(LruCache::new( - NonZeroUsize::new(BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH).unwrap(), - ))), + transactions_queue: Default::default(), seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))), seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))), handles: Default::default(), @@ -260,7 +284,11 @@ impl Consensus { } // Add the transaction to the memory pool. trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id)); - if self.transactions_queue.lock().put(transaction_id, transaction).is_some() { + if transaction.is_deploy() { + if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() { + bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id)); + } + } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() { bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id)); } } @@ -274,14 +302,25 @@ impl Consensus { let transactions = { // Determine the available capacity. let capacity = BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH.saturating_sub(num_unconfirmed); - // Acquire the lock on the queue. - let mut queue = self.transactions_queue.lock(); - // Determine the number of transactions to send. - let num_transactions = queue.len().min(capacity); - // Drain the solutions from the queue. - (0..num_transactions) - .filter_map(|_| queue.pop_lru().map(|(_, transaction)| transaction)) - .collect::>() + // Acquire the lock on the transactions queue. + let mut tx_queue = self.transactions_queue.lock(); + // Determine the number of deployments to send. + let num_deployments = tx_queue.deployments.len().min(capacity * CAPACITY_FOR_DEPLOYMENTS / 100); + // Determine the number of executions to send. + let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments)); + // Create an iterator which will select interleaved deployments and executions within the capacity. + // Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue. + let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false)); + // Drain the transactions from the queue, interleaving deployments and executions. + selector_iter + .filter_map(|select_deployment| { + if select_deployment { + tx_queue.deployments.pop_lru().map(|(_, tx)| tx) + } else { + tx_queue.executions.pop_lru().map(|(_, tx)| tx) + } + }) + .collect_vec() }; // Iterate over the transactions. for transaction in transactions.into_iter() {