Skip to content

Commit

Permalink
Merge pull request #2387 from AleoHQ/bounded-parallel-verification
Browse files Browse the repository at this point in the history
Bound parallel verification of transactions
  • Loading branch information
howardwu authored Mar 27, 2024
2 parents fef4d68 + 0f47547 commit 2cbf34a
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions ledger/src/check_transaction_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,13 @@ impl<N: Network, C: ConsensusStorage<N>> Ledger<N, C> {
) -> Result<()> {
self.vm().check_transaction(transaction, rejected_id, rng)
}

/// Checks that the given list of transactions are well-formed and unique.
pub fn check_transactions_basic<R: CryptoRng + Rng>(
&self,
transactions: &[(&Transaction<N>, Option<Field<N>>)],
rng: &mut R,
) -> Result<()> {
self.vm().check_transactions(transactions, rng)
}
}
5 changes: 5 additions & 0 deletions synthesizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ path = "./snark"
version = "=0.16.19"
optional = true

[dependencies.utilities]
package = "snarkvm-utilities"
path = "../utilities"
version = "=0.16.19"

[dependencies.aleo-std]
version = "0.1.24"
default-features = false
Expand Down
32 changes: 13 additions & 19 deletions synthesizer/process/src/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use super::*;
use console::program::{FinalizeType, Future, Register};
use synthesizer_program::{Await, FinalizeRegistersState, Operand};
use utilities::handle_halting;
use utilities::try_vm_runtime;

use std::collections::HashSet;

Expand Down Expand Up @@ -227,9 +227,7 @@ fn finalize_transition<N: Network, P: FinalizeStorage<N>>(
// Finalize the command.
match &command {
Command::BranchEq(branch_eq) => {
let result = handle_halting!(panic::AssertUnwindSafe(|| {
branch_to(counter, branch_eq, finalize, stack, &registers)
}));
let result = try_vm_runtime!(|| branch_to(counter, branch_eq, finalize, stack, &registers));
match result {
Ok(Ok(new_counter)) => {
counter = new_counter;
Expand All @@ -241,9 +239,7 @@ fn finalize_transition<N: Network, P: FinalizeStorage<N>>(
}
}
Command::BranchNeq(branch_neq) => {
let result = handle_halting!(panic::AssertUnwindSafe(|| {
branch_to(counter, branch_neq, finalize, stack, &registers)
}));
let result = try_vm_runtime!(|| branch_to(counter, branch_neq, finalize, stack, &registers));
match result {
Ok(Ok(new_counter)) => {
counter = new_counter;
Expand Down Expand Up @@ -277,16 +273,15 @@ fn finalize_transition<N: Network, P: FinalizeStorage<N>>(
None => bail!("Transition ID '{transition_id}' not found in call graph"),
};

let callee_state = match handle_halting!(panic::AssertUnwindSafe(|| {
// Set up the finalize state for the await.
setup_await(state, await_, stack, &registers, child_transition_id)
})) {
Ok(Ok(callee_state)) => callee_state,
// If the evaluation fails, bail and return the error.
Ok(Err(error)) => bail!("'finalize' failed to evaluate command ({command}): {error}"),
// If the evaluation fails, bail and return the error.
Err(_) => bail!("'finalize' failed to evaluate command ({command})"),
};
// Set up the finalize state for the await.
let callee_state =
match try_vm_runtime!(|| setup_await(state, await_, stack, &registers, child_transition_id)) {
Ok(Ok(callee_state)) => callee_state,
// If the evaluation fails, bail and return the error.
Ok(Err(error)) => bail!("'finalize' failed to evaluate command ({command}): {error}"),
// If the evaluation fails, bail and return the error.
Err(_) => bail!("'finalize' failed to evaluate command ({command})"),
};

// Increment the call counter.
call_counter += 1;
Expand All @@ -306,8 +301,7 @@ fn finalize_transition<N: Network, P: FinalizeStorage<N>>(
continue 'outer;
}
_ => {
let result =
handle_halting!(panic::AssertUnwindSafe(|| { command.finalize(stack, store, &mut registers) }));
let result = try_vm_runtime!(|| command.finalize(stack, store, &mut registers));
match result {
// If the evaluation succeeds with an operation, add it to the list.
Ok(Ok(Some(finalize_operation))) => finalize_operations.push(finalize_operation),
Expand Down
96 changes: 64 additions & 32 deletions synthesizer/src/vm/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use super::*;

use ledger_committee::{MAX_DELEGATORS, MIN_DELEGATOR_STAKE, MIN_VALIDATOR_STAKE};

use rand::{rngs::StdRng, SeedableRng};

impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
/// Speculates on the given list of transactions in the VM.
/// This function aborts all transactions that are not are well-formed or unique.
Expand Down Expand Up @@ -47,32 +45,16 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
let candidate_transactions: Vec<_> = candidate_transactions.collect::<Vec<_>>();
let candidate_transaction_ids: Vec<_> = candidate_transactions.iter().map(|tx| tx.id()).collect();

// Determine if the vm is currently processing the genesis block.
let is_genesis =
self.block_store().find_block_height_from_state_root(self.block_store().current_state_root())?.is_none();
// If the transactions are not part of the genesis block, ensure each transaction is well-formed and unique. Abort any transactions that are not.
let (verified_transactions, verification_aborted_transactions) =
match self.block_store().find_block_height_from_state_root(self.block_store().current_state_root())? {
// If the current state root does not exist in the block store, then the genesis block has not been introduced yet.
None => (candidate_transactions, vec![]),
// Verify transactions for all non-genesis cases.
_ => {
let rngs =
(0..candidate_transactions.len()).map(|_| StdRng::from_seed(rng.gen())).collect::<Vec<_>>();
// Verify the transactions and collect the error message if there is one.
cfg_into_iter!(candidate_transactions).zip(rngs).partition_map(|(transaction, mut rng)| {
// Abort the transaction if it is a fee transaction.
if transaction.is_fee() {
return Either::Right((
transaction,
"Fee transactions are not allowed in speculate".to_string(),
));
}
// Verify the transaction.
match self.check_transaction(transaction, None, &mut rng) {
Ok(_) => Either::Left(transaction),
Err(e) => Either::Right((transaction, e.to_string())),
}
})
}
};
let (verified_transactions, verification_aborted_transactions) = match is_genesis {
// If the current state root does not exist in the block store, then the genesis block has not been introduced yet.
true => (candidate_transactions, vec![]),
// Verify transactions for all non-genesis cases.
false => self.prepare_for_speculate(&candidate_transactions, rng)?,
};

// Performs a **dry-run** over the list of ratifications, solutions, and transactions.
let (ratifications, confirmed_transactions, speculation_aborted_transactions, ratified_finalize_operations) =
Expand Down Expand Up @@ -127,15 +109,15 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
) -> Result<Vec<FinalizeOperation<N>>> {
let timer = timer!("VM::check_speculate");

// Retrieve the transactions and their rejected IDs.
let transactions_and_rejected_ids = cfg_iter!(transactions)
.map(|transaction| transaction.to_rejected_id().map(|rejected_id| (transaction.deref(), rejected_id)))
.collect::<Result<Vec<_>>>()?;
// Ensure each transaction is well-formed and unique.
// NOTE: We perform the transaction checks here prior to `atomic_speculate` because we must
// ensure that the `Fee` transactions are valid. We can't unify the transaction checks in `atomic_speculate`
// because we run speculation on the unconfirmed variant of the transactions.
let rngs = (0..transactions.len()).map(|_| StdRng::from_seed(rng.gen())).collect::<Vec<_>>();
cfg_iter!(transactions).zip(rngs).try_for_each(|(transaction, mut rng)| {
self.check_transaction(transaction, transaction.to_rejected_id()?, &mut rng)
.map_err(|e| anyhow!("Invalid transaction found in the transactions list: {e}"))
})?;
self.check_transactions(&transactions_and_rejected_ids, rng)?;

// Reconstruct the candidate ratifications to verify the speculation.
let candidate_ratifications = ratifications.iter().cloned().collect::<Vec<_>>();
Expand Down Expand Up @@ -806,6 +788,56 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
})
}

/// Performs precondition checks on the transactions prior to speculation.
///
/// This method is used to check the following conditions:
/// - If a transaction is a fee transaction or if it is invalid,
/// then the transaction will be aborted.
pub(crate) fn prepare_for_speculate<'a, R: CryptoRng + Rng>(
&self,
transactions: &[&'a Transaction<N>],
rng: &mut R,
) -> Result<(Vec<&'a Transaction<N>>, Vec<(&'a Transaction<N>, String)>)> {
// Construct the list of valid and invalid transactions.
let mut valid_transactions = Vec::with_capacity(transactions.len());
let mut aborted_transactions = Vec::with_capacity(transactions.len());

// Separate the transactions into deploys and executions.
let (deployments, executions): (Vec<&Transaction<N>>, Vec<&Transaction<N>>) =
transactions.iter().partition(|tx| tx.is_deploy());
// Chunk the deploys and executions into groups for parallel verification.
let deployments_for_verification = deployments.chunks(Self::MAX_PARALLEL_DEPLOY_VERIFICATIONS);
let executions_for_verification = executions.chunks(Self::MAX_PARALLEL_EXECUTE_VERIFICATIONS);

// Verify the transactions in batches and separate the valid and invalid transactions.
for transactions in deployments_for_verification.chain(executions_for_verification) {
let rngs = (0..transactions.len()).map(|_| StdRng::from_seed(rng.gen())).collect::<Vec<_>>();
// Verify the transactions and collect the error message if there is one.
let (valid, invalid): (Vec<_>, Vec<_>) =
cfg_into_iter!(transactions).zip(rngs).partition_map(|(transaction, mut rng)| {
// Abort the transaction if it is a fee transaction.
if transaction.is_fee() {
return Either::Right((
*transaction,
"Fee transactions are not allowed in speculate".to_string(),
));
}
// Verify the transaction.
match self.check_transaction(transaction, None, &mut rng) {
Ok(_) => Either::Left(*transaction),
Err(e) => Either::Right((*transaction, e.to_string())),
}
});

// Collect the valid and aborted transactions.
valid_transactions.extend(valid);
aborted_transactions.extend(invalid);
}

// Return the valid and invalid transactions.
Ok((valid_transactions, aborted_transactions))
}

/// Performs precondition checks on the transaction prior to execution.
///
/// This method is used to check the following conditions:
Expand Down
31 changes: 27 additions & 4 deletions synthesizer/src/vm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ use ledger_store::{
};
use synthesizer_process::{deployment_cost, execution_cost, Authorization, Process, Trace};
use synthesizer_program::{FinalizeGlobalState, FinalizeOperation, FinalizeStoreTrait, Program};
use utilities::try_vm_runtime;

use aleo_std::prelude::{finish, lap, timer};
use indexmap::{IndexMap, IndexSet};
use itertools::Either;
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use rand::{rngs::StdRng, SeedableRng};
use std::{collections::HashSet, num::NonZeroUsize, sync::Arc};

#[cfg(not(feature = "serial"))]
Expand Down Expand Up @@ -733,7 +735,6 @@ function compute:
// Construct the new block header.
let (ratifications, transactions, aborted_transaction_ids, ratified_finalize_operations) =
vm.speculate(sample_finalize_state(1), None, vec![], &None.into(), transactions.iter(), rng)?;
assert!(aborted_transaction_ids.is_empty());

// Construct the metadata associated with the block.
let metadata = Metadata::new(
Expand Down Expand Up @@ -1383,12 +1384,12 @@ function do:
}

#[test]
#[should_panic]
fn test_deployment_synthesis_underreport() {
let rng = &mut TestRng::default();

// Initialize a private key.
let private_key = sample_genesis_private_key(rng);
let address = Address::try_from(&private_key).unwrap();

// Initialize the genesis block.
let genesis = sample_genesis_block(rng);
Expand Down Expand Up @@ -1432,8 +1433,30 @@ function do:
Deployment::new(deployment.edition(), deployment.program().clone(), vks_with_underreport).unwrap();
let adjusted_transaction = Transaction::Deploy(txid, program_owner, Box::new(adjusted_deployment), fee);

// Verify the deployment transaction. It should panic when enforcing the first constraint over the vk limit.
let _ = vm.check_transaction(&adjusted_transaction, None, rng);
// Verify the deployment transaction. It should error when enforcing the first constraint over the vk limit.
let result = vm.check_transaction(&adjusted_transaction, None, rng);
assert!(result.is_err());

// Create a standard transaction
// Prepare the inputs.
let inputs = [
Value::<CurrentNetwork>::from_str(&address.to_string()).unwrap(),
Value::<CurrentNetwork>::from_str("1u64").unwrap(),
]
.into_iter();

// Execute.
let transaction =
vm.execute(&private_key, ("credits.aleo", "transfer_public"), inputs, None, 0, None, rng).unwrap();

// Check that the deployment transaction will be aborted if injected into a block.
let block = sample_next_block(&vm, &private_key, &[transaction, adjusted_transaction.clone()], rng).unwrap();

// Check that the block aborts the deployment transaction.
assert_eq!(block.aborted_transaction_ids(), &vec![adjusted_transaction.id()]);

// Update the VM.
vm.add_next_block(&block).unwrap();
}

#[test]
Expand Down
43 changes: 41 additions & 2 deletions synthesizer/src/vm/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,38 @@ macro_rules! ensure_is_unique {
};
}

impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
/// The maximum number of deployments to verify in parallel.
pub(crate) const MAX_PARALLEL_DEPLOY_VERIFICATIONS: usize = 5;
/// The maximum number of executions to verify in parallel.
pub(crate) const MAX_PARALLEL_EXECUTE_VERIFICATIONS: usize = 1000;

/// Verifies the list of transactions in the VM. On failure, returns an error.
pub fn check_transactions<R: CryptoRng + Rng>(
&self,
transactions: &[(&Transaction<N>, Option<Field<N>>)],
rng: &mut R,
) -> Result<()> {
// Separate the transactions into deploys and executions.
let (deployments, executions): (Vec<_>, Vec<_>) = transactions.iter().partition(|(tx, _)| tx.is_deploy());
// Chunk the deploys and executions into groups for parallel verification.
let deployments_for_verification = deployments.chunks(Self::MAX_PARALLEL_DEPLOY_VERIFICATIONS);
let executions_for_verification = executions.chunks(Self::MAX_PARALLEL_EXECUTE_VERIFICATIONS);

// Verify the transactions in batches.
for transactions in deployments_for_verification.chain(executions_for_verification) {
// Ensure each transaction is well-formed and unique.
let rngs = (0..transactions.len()).map(|_| StdRng::from_seed(rng.gen())).collect::<Vec<_>>();
cfg_iter!(transactions).zip(rngs).try_for_each(|((transaction, rejected_id), mut rng)| {
self.check_transaction(transaction, *rejected_id, &mut rng)
.map_err(|e| anyhow!("Invalid transaction found in the transactions list: {e}"))
})?;
}

Ok(())
}
}

impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
/// Verifies the transaction in the VM. On failure, returns an error.
#[inline]
Expand Down Expand Up @@ -121,7 +153,11 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
}
// Verify the deployment if it has not been verified before.
if !is_partially_verified {
self.check_deployment_internal(deployment, rng)?;
// Verify the deployment.
match try_vm_runtime!(|| self.check_deployment_internal(deployment, rng)) {
Ok(result) => result?,
Err(_) => bail!("VM safely halted transaction '{id}' during verification"),
}
}
}
Transaction::Execute(id, execution, _) => {
Expand All @@ -134,7 +170,10 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
bail!("Transaction '{id}' contains a previously rejected execution")
}
// Verify the execution.
self.check_execution_internal(execution, is_partially_verified)?;
match try_vm_runtime!(|| self.check_execution_internal(execution, is_partially_verified)) {
Ok(result) => result?,
Err(_) => bail!("VM safely halted transaction '{id}' during verification"),
}
}
Transaction::Fee(..) => { /* no-op */ }
}
Expand Down
12 changes: 6 additions & 6 deletions utilities/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ impl Error for crate::String {}
#[cfg(not(feature = "std"))]
impl Error for crate::io::Error {}

/// This purpose of this macro is to catch the instances of halting
/// without producing logs looking like unexpected panics. It prints
/// to stderr using the format: "Halted at <location>: <halt message>".
/// This macro provides a VM runtime environment which will safely halt
/// without producing logs that look like unexpected behavior.
/// It prints to stderr using the format: "VM safely halted at <location>: <halt message>".
#[macro_export]
macro_rules! handle_halting {
macro_rules! try_vm_runtime {
($e:expr) => {{
use std::panic;

Expand All @@ -52,12 +52,12 @@ macro_rules! handle_halting {
let msg = e.to_string();
let msg = msg.split_ascii_whitespace().skip_while(|&word| word != "panicked").collect::<Vec<&str>>();
let mut msg = msg.join(" ");
msg = msg.replacen("panicked", "Halted", 1);
msg = msg.replacen("panicked", "VM safely halted", 1);
eprintln!("{msg}");
}));

// Perform the operation that may panic.
let result = panic::catch_unwind($e);
let result = panic::catch_unwind(panic::AssertUnwindSafe($e));

// Restore the standard panic hook.
let _ = panic::take_hook();
Expand Down

0 comments on commit 2cbf34a

Please sign in to comment.