From 57257a92b5b48461d6b1b016217dc3e299b70463 Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Fri, 27 Sep 2024 08:51:42 -0400 Subject: [PATCH] [Consensus Observer] Skip expired batches from verification. --- .../network/observer_message.rs | 191 +++++++++++++++--- 1 file changed, 168 insertions(+), 23 deletions(-) diff --git a/consensus/src/consensus_observer/network/observer_message.rs b/consensus/src/consensus_observer/network/observer_message.rs index 8b673f6335f56..1905d162b0354 100644 --- a/consensus/src/consensus_observer/network/observer_message.rs +++ b/consensus/src/consensus_observer/network/observer_message.rs @@ -663,7 +663,8 @@ impl BlockPayload { /// Verifies the block payload digests and returns an error if the data is invalid pub fn verify_payload_digests(&self) -> Result<(), Error> { - // Get the transactions, payload proofs and inline batches + // Get the block info, transactions, payload proofs and inline batches + let block_info = self.block.clone(); let transactions = self.transaction_payload.transactions(); let payload_proofs = self.transaction_payload.payload_proofs(); let inline_batches = self.transaction_payload.inline_batches(); @@ -676,7 +677,7 @@ impl BlockPayload { // Verify the payload proof digests using the transactions let mut transactions_iter = transactions.iter(); for proof_of_store in &payload_proofs { - reconstruct_and_verify_batch(&mut transactions_iter, proof_of_store.info()).map_err( + reconstruct_and_verify_batch(&block_info, &mut transactions_iter, proof_of_store.info(), true).map_err( |error| { Error::InvalidMessageError(format!( "Failed to verify payload proof digests! Num transactions: {:?}, \ @@ -689,7 +690,7 @@ impl BlockPayload { // Verify the inline batch digests using the transactions for batch_info in inline_batches.into_iter() { - reconstruct_and_verify_batch(&mut transactions_iter, batch_info).map_err( + reconstruct_and_verify_batch(&block_info, &mut transactions_iter, batch_info, false).map_err( |error| { Error::InvalidMessageError(format!( "Failed to verify inline batch digests! Num transactions: {:?}, \ @@ -738,12 +739,22 @@ impl BlockPayload { } } -/// Reconstructs and verifies the batch using the -/// given transactions and the expected batch info. +/// Reconstructs and verifies the batch using the given transactions +/// and the expected batch info. If `skip_expired_batches` is true +/// then verification will be skipped if the batch is expired. fn reconstruct_and_verify_batch( + block_info: &BlockInfo, transactions_iter: &mut Iter, expected_batch_info: &BatchInfo, + skip_expired_batches: bool, ) -> Result<(), Error> { + // If the batch is expired we should skip verification (as the + // transactions for the expired batch won't be sent in the payload). + // Note: this should only be required for QS batches (not inline batches). + if skip_expired_batches && block_info.timestamp_usecs() > expected_batch_info.expiration() { + return Ok(()); + } + // Gather the transactions for the batch let mut batch_transactions = vec![]; for i in 0..expected_batch_info.num_txns() { @@ -797,7 +808,7 @@ mod test { validator_verifier::{ValidatorConsensusInfo, ValidatorVerifier}, PeerId, }; - use claims::assert_matches; + use claims::{assert_matches, assert_ok}; use move_core_types::account_address::AccountAddress; #[test] @@ -1138,17 +1149,18 @@ mod test { let num_batches = num_signed_transactions - 1; let mut proofs = vec![]; for _ in 0..num_batches { - let batch_info = create_batch_info_with_digest(HashValue::random(), 1); + let batch_info = create_batch_info_with_digest(HashValue::random(), 1, 1000); let proof = ProofOfStore::new(batch_info, AggregateSignature::empty()); proofs.push(proof); } // Create a single inline batch with a random digest - let inline_batch = create_batch_info_with_digest(HashValue::random(), 1); + let inline_batch = create_batch_info_with_digest(HashValue::random(), 1, 1000); let inline_batches = vec![inline_batch]; // Create a block payload (with the transactions, proofs and inline batches) - let block_payload = create_block_payload(&signed_transactions, &proofs, &inline_batches); + let block_payload = + create_block_payload(None, &signed_transactions, &proofs, &inline_batches); // Verify the block payload digests and ensure it fails (the batch digests don't match) let error = block_payload.verify_payload_digests().unwrap_err(); @@ -1158,13 +1170,14 @@ mod test { let mut proofs = vec![]; for transaction in &signed_transactions[0..num_batches] { let batch_payload = BatchPayload::new(PeerId::ZERO, vec![transaction.clone()]); - let batch_info = create_batch_info_with_digest(batch_payload.hash(), 1); + let batch_info = create_batch_info_with_digest(batch_payload.hash(), 1, 1000); let proof = ProofOfStore::new(batch_info, AggregateSignature::empty()); proofs.push(proof); } // Create a block payload (with the transactions, correct proofs and inline batches) - let block_payload = create_block_payload(&signed_transactions, &proofs, &inline_batches); + let block_payload = + create_block_payload(None, &signed_transactions, &proofs, &inline_batches); // Verify the block payload digests and ensure it fails (the inline batch digests don't match) let error = block_payload.verify_payload_digests().unwrap_err(); @@ -1175,18 +1188,20 @@ mod test { .last() .unwrap() .clone()]); - let inline_batch_info = create_batch_info_with_digest(inline_batch_payload.hash(), 1); + let inline_batch_info = create_batch_info_with_digest(inline_batch_payload.hash(), 1, 1000); let inline_batches = vec![inline_batch_info]; // Create a block payload (with the transactions, correct proofs and correct inline batches) - let block_payload = create_block_payload(&signed_transactions, &proofs, &inline_batches); + let block_payload = + create_block_payload(None, &signed_transactions, &proofs, &inline_batches); // Verify the block payload digests and ensure it passes block_payload.verify_payload_digests().unwrap(); // Create a block payload (with too many transactions) signed_transactions.append(&mut create_signed_transactions(1)); - let block_payload = create_block_payload(&signed_transactions, &proofs, &inline_batches); + let block_payload = + create_block_payload(None, &signed_transactions, &proofs, &inline_batches); // Verify the block payload digests and ensure it fails (there are too many transactions) let error = block_payload.verify_payload_digests().unwrap_err(); @@ -1196,13 +1211,101 @@ mod test { for _ in 0..3 { signed_transactions.pop(); } - let block_payload = create_block_payload(&signed_transactions, &proofs, &inline_batches); + let block_payload = + create_block_payload(None, &signed_transactions, &proofs, &inline_batches); // Verify the block payload digests and ensure it fails (there are too few transactions) let error = block_payload.verify_payload_digests().unwrap_err(); assert_matches!(error, Error::InvalidMessageError(_)); } + #[test] + fn test_verify_payload_digests_expired() { + // Create a new block info with the specified timestamp + let block_timestamp = 1000; + let block_info = BlockInfo::new( + 0, + 0, + HashValue::random(), + HashValue::random(), + 0, + block_timestamp, + None, + ); + + // Create multiple signed transactions + let num_signed_transactions = 100; + let signed_transactions = create_signed_transactions(num_signed_transactions); + + // Create multiple batch proofs (where some batches are expired) + let (proofs, non_expired_transactions) = + create_mixed_expiration_proofs(block_timestamp, &signed_transactions); + + // Create a block payload (with non-expired transactions, all proofs and no inline batches) + let block_payload = create_block_payload( + Some(block_info.clone()), + &non_expired_transactions, + &proofs, + &[], + ); + + // Verify the block payload digests and ensure it passes + assert_ok!(block_payload.verify_payload_digests()); + + // Create multiple inline transactions + let num_inline_transactions = 25; + let inline_transactions = create_signed_transactions(num_inline_transactions); + + // Create multiple inline batches (where some batches are expired) + let (inline_batches, non_expired_inline_transactions) = + create_mixed_expiration_proofs(block_timestamp, &inline_transactions); + + // Create a block payload (with all non-expired inline transactions, no proofs and inline batches) + let inline_batches: Vec<_> = inline_batches + .iter() + .map(|proof| proof.info().clone()) + .collect(); + let block_payload = create_block_payload( + Some(block_info.clone()), + &non_expired_inline_transactions, + &[], + &inline_batches, + ); + + // Verify the block payload digests and ensure it fails (expired inline batches are still checked) + let error = block_payload.verify_payload_digests().unwrap_err(); + assert_matches!(error, Error::InvalidMessageError(_)); + + // Create a block payload (with all inline transactions, no proofs and inline batches) + let block_payload = create_block_payload( + Some(block_info.clone()), + &inline_transactions, + &[], + &inline_batches, + ); + + // Verify the block payload digests and ensure it now passes + assert_ok!(block_payload.verify_payload_digests()); + + // Gather all transactions (from both QS and inline batches) + let all_transactions: Vec<_> = non_expired_transactions + .iter() + .chain(inline_transactions.iter()) + .cloned() + .collect(); + + // Create a block payload (with all transactions, all proofs and inline batches) + let block_payload = create_block_payload( + Some(block_info), + &all_transactions, + &proofs, + &inline_batches, + ); + + // Verify the block payload digests and ensure it passes + assert_ok!(block_payload.verify_payload_digests()); + } + #[test] fn test_verify_payload_signatures() { // Create multiple batch info proofs (with empty signatures) @@ -1253,16 +1356,20 @@ mod test { /// Creates and returns a new batch info with random data fn create_batch_info() -> BatchInfo { - create_batch_info_with_digest(HashValue::random(), 0) + create_batch_info_with_digest(HashValue::random(), 0, 0) } - /// Creates and returns a new batch info with the specified digest - fn create_batch_info_with_digest(digest: HashValue, num_transactions: u64) -> BatchInfo { + /// Creates and returns a new batch info with the specified digest and properties + fn create_batch_info_with_digest( + digest: HashValue, + num_transactions: u64, + batch_expiration: u64, + ) -> BatchInfo { BatchInfo::new( PeerId::ZERO, BatchId::new(0), 10, - 1, + batch_expiration, digest, num_transactions, 1, @@ -1277,6 +1384,7 @@ mod test { /// Creates and returns a hybrid quorum store payload using the given data fn create_block_payload( + block_info: Option, signed_transactions: &[SignedTransaction], proofs: &[ProofOfStore], inline_batches: &[BatchInfo], @@ -1289,11 +1397,11 @@ mod test { inline_batches.to_vec(), ); + // Determine the block info to use + let block_info = block_info.unwrap_or_else(|| create_block_info(0, HashValue::random())); + // Create the block payload - BlockPayload::new( - create_block_info(0, HashValue::random()), - transaction_payload, - ) + BlockPayload::new(block_info, transaction_payload) } /// Creates and returns a new ledger info with an empty signature set @@ -1304,6 +1412,43 @@ mod test { ) } + /// Creates and returns a set of batch proofs using the given block + /// timestamp and transactions. Note: some batches will be expired. + fn create_mixed_expiration_proofs( + block_timestamp: u64, + signed_transactions: &[SignedTransaction], + ) -> (Vec, Vec) { + let mut proofs = vec![]; + let mut non_expired_transactions = vec![]; + + // Create multiple batch proofs (each batch has 1 transaction, and some batches are expired) + for (i, transaction) in signed_transactions.iter().enumerate() { + // Expire every other (odd) batch and transaction + let is_batch_expired = i % 2 != 0; + + // Determine the expiration time for the batch + let batch_expiration = if is_batch_expired { + block_timestamp - 1 // Older than the block timestamp + } else { + block_timestamp + 1 // Newer than the block timestamp + }; + + // Create and store the batch proof + let batch_payload = BatchPayload::new(PeerId::ZERO, vec![transaction.clone()]); + let batch_info = + create_batch_info_with_digest(batch_payload.hash(), 1, batch_expiration); + let proof = ProofOfStore::new(batch_info, AggregateSignature::empty()); + proofs.push(proof); + + // Save the non-expired transactions + if !is_batch_expired { + non_expired_transactions.push(transaction.clone()); + } + } + + (proofs, non_expired_transactions) + } + /// Creates and returns a new pipelined block with the given block info fn create_pipelined_block(block_info: BlockInfo) -> Arc { let block_data = BlockData::new_for_testing(