Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Limit number of deployments in mempool. #3089

Merged
merged 3 commits into from
Feb 14, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ use tokio::{
task::JoinHandle,
};

/// Percentage of mempool transactions capacity reserved for deployments.
const CAPACITY_FOR_DEPLOYMENTS: usize = 20;
/// Percentage of mempool transactions capacity reserved for executions.
const CAPACITY_FOR_EXECUTIONS: usize = 80;

#[derive(Clone)]
pub struct Consensus<N: Network> {
/// The ledger.
Expand All @@ -63,8 +68,10 @@ pub struct Consensus<N: Network> {
primary_sender: Arc<OnceCell<PrimarySender<N>>>,
/// The unconfirmed solutions queue.
solutions_queue: Arc<Mutex<LruCache<PuzzleCommitment<N>, ProverSolution<N>>>>,
/// The unconfirmed transactions queue.
transactions_queue: Arc<Mutex<LruCache<N::TransactionID, Transaction<N>>>>,
/// The unconfirmed deployment transactions queue.
deployments_queue: Arc<Mutex<LruCache<N::TransactionID, Transaction<N>>>>,
/// The unconfirmed execution transactions queue.
executions_queue: Arc<Mutex<LruCache<N::TransactionID, Transaction<N>>>>,
/// The recently-seen unconfirmed solutions.
seen_solutions: Arc<Mutex<LruCache<PuzzleCommitment<N>, ()>>>,
/// The recently-seen unconfirmed transactions.
Expand Down Expand Up @@ -101,8 +108,13 @@ impl<N: Network> Consensus<N> {
solutions_queue: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH).unwrap(),
))),
transactions_queue: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH).unwrap(),
deployments_queue: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * CAPACITY_FOR_DEPLOYMENTS / 100)
.unwrap(),
))),
executions_queue: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * CAPACITY_FOR_EXECUTIONS / 100)
.unwrap(),
))),
seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
Expand Down Expand Up @@ -260,7 +272,11 @@ impl<N: Network> Consensus<N> {
}
// Add the transaction to the memory pool.
trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
if self.transactions_queue.lock().put(transaction_id, transaction).is_some() {
if transaction.is_deploy() {
if self.deployments_queue.lock().put(transaction_id, transaction).is_some() {
bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
}
} else if self.executions_queue.lock().put(transaction_id, transaction).is_some() {
bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
}
}
Expand All @@ -274,14 +290,20 @@ impl<N: Network> Consensus<N> {
let transactions = {
// Determine the available capacity.
let capacity = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH.saturating_sub(num_unconfirmed);
// Acquire the lock on the queue.
let mut queue = self.transactions_queue.lock();
// Determine the number of transactions to send.
let num_transactions = queue.len().min(capacity);
// Drain the solutions from the queue.
(0..num_transactions)
.filter_map(|_| queue.pop_lru().map(|(_, transaction)| transaction))
.collect::<Vec<_>>()
// Acquire the lock on the deployments queue.
let mut deployments_queue = self.deployments_queue.lock();
// Acquire the lock on the executions queue.
let mut executions_queue = self.executions_queue.lock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future consideration: ideally, collections that are always accessed together could be under a single lock (for added performance), though that may require a little bit more type boilerplate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it anyway so Howard can decide: f93060b
Did not implement any further methods for TransactionsQueue to avoid having to add input checking for those methods and to keep it simple.

// Determine the number of deployments to send.
let num_deployments = deployments_queue.len().min(capacity * CAPACITY_FOR_DEPLOYMENTS / 100);
// Determine the number of executions to send.
let num_executions = executions_queue.len().min(capacity.saturating_sub(num_deployments));
// Drain the deployments from the queue.
let deployments = (0..num_deployments).filter_map(|_| deployments_queue.pop_lru().map(|(_, tx)| tx));
// Drain the executions from the queue.
let executions = (0..num_executions).filter_map(|_| executions_queue.pop_lru().map(|(_, tx)| tx));
// Interleave the transactions to prevent having too many consecutive deployments.
executions.interleave(deployments).collect_vec()
};
// Iterate over the transactions.
for transaction in transactions.into_iter() {
Expand Down