Skip to content

Commit

Permalink
[Consensus Observer] Skip expired batches from verification.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Sep 28, 2024
1 parent ee34c5e commit 57257a9
Showing 1 changed file with 168 additions and 23 deletions.
191 changes: 168 additions & 23 deletions consensus/src/consensus_observer/network/observer_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,8 @@ impl BlockPayload {

/// Verifies the block payload digests and returns an error if the data is invalid
pub fn verify_payload_digests(&self) -> Result<(), Error> {
// Get the transactions, payload proofs and inline batches
// Get the block info, transactions, payload proofs and inline batches
let block_info = self.block.clone();
let transactions = self.transaction_payload.transactions();
let payload_proofs = self.transaction_payload.payload_proofs();
let inline_batches = self.transaction_payload.inline_batches();
Expand All @@ -676,7 +677,7 @@ impl BlockPayload {
// Verify the payload proof digests using the transactions
let mut transactions_iter = transactions.iter();
for proof_of_store in &payload_proofs {
reconstruct_and_verify_batch(&mut transactions_iter, proof_of_store.info()).map_err(
reconstruct_and_verify_batch(&block_info, &mut transactions_iter, proof_of_store.info(), true).map_err(
|error| {
Error::InvalidMessageError(format!(
"Failed to verify payload proof digests! Num transactions: {:?}, \
Expand All @@ -689,7 +690,7 @@ impl BlockPayload {

// Verify the inline batch digests using the transactions
for batch_info in inline_batches.into_iter() {
reconstruct_and_verify_batch(&mut transactions_iter, batch_info).map_err(
reconstruct_and_verify_batch(&block_info, &mut transactions_iter, batch_info, false).map_err(
|error| {
Error::InvalidMessageError(format!(
"Failed to verify inline batch digests! Num transactions: {:?}, \
Expand Down Expand Up @@ -738,12 +739,22 @@ impl BlockPayload {
}
}

/// Reconstructs and verifies the batch using the
/// given transactions and the expected batch info.
/// Reconstructs and verifies the batch using the given transactions
/// and the expected batch info. If `skip_expired_batches` is true
/// then verification will be skipped if the batch is expired.
fn reconstruct_and_verify_batch(
block_info: &BlockInfo,
transactions_iter: &mut Iter<SignedTransaction>,
expected_batch_info: &BatchInfo,
skip_expired_batches: bool,
) -> Result<(), Error> {
// If the batch is expired we should skip verification (as the
// transactions for the expired batch won't be sent in the payload).
// Note: this should only be required for QS batches (not inline batches).
if skip_expired_batches && block_info.timestamp_usecs() > expected_batch_info.expiration() {
return Ok(());
}

// Gather the transactions for the batch
let mut batch_transactions = vec![];
for i in 0..expected_batch_info.num_txns() {
Expand Down Expand Up @@ -797,7 +808,7 @@ mod test {
validator_verifier::{ValidatorConsensusInfo, ValidatorVerifier},
PeerId,
};
use claims::assert_matches;
use claims::{assert_matches, assert_ok};
use move_core_types::account_address::AccountAddress;

#[test]
Expand Down Expand Up @@ -1138,17 +1149,18 @@ mod test {
let num_batches = num_signed_transactions - 1;
let mut proofs = vec![];
for _ in 0..num_batches {
let batch_info = create_batch_info_with_digest(HashValue::random(), 1);
let batch_info = create_batch_info_with_digest(HashValue::random(), 1, 1000);
let proof = ProofOfStore::new(batch_info, AggregateSignature::empty());
proofs.push(proof);
}

// Create a single inline batch with a random digest
let inline_batch = create_batch_info_with_digest(HashValue::random(), 1);
let inline_batch = create_batch_info_with_digest(HashValue::random(), 1, 1000);
let inline_batches = vec![inline_batch];

// Create a block payload (with the transactions, proofs and inline batches)
let block_payload = create_block_payload(&signed_transactions, &proofs, &inline_batches);
let block_payload =
create_block_payload(None, &signed_transactions, &proofs, &inline_batches);

// Verify the block payload digests and ensure it fails (the batch digests don't match)
let error = block_payload.verify_payload_digests().unwrap_err();
Expand All @@ -1158,13 +1170,14 @@ mod test {
let mut proofs = vec![];
for transaction in &signed_transactions[0..num_batches] {
let batch_payload = BatchPayload::new(PeerId::ZERO, vec![transaction.clone()]);
let batch_info = create_batch_info_with_digest(batch_payload.hash(), 1);
let batch_info = create_batch_info_with_digest(batch_payload.hash(), 1, 1000);
let proof = ProofOfStore::new(batch_info, AggregateSignature::empty());
proofs.push(proof);
}

// Create a block payload (with the transactions, correct proofs and inline batches)
let block_payload = create_block_payload(&signed_transactions, &proofs, &inline_batches);
let block_payload =
create_block_payload(None, &signed_transactions, &proofs, &inline_batches);

// Verify the block payload digests and ensure it fails (the inline batch digests don't match)
let error = block_payload.verify_payload_digests().unwrap_err();
Expand All @@ -1175,18 +1188,20 @@ mod test {
.last()
.unwrap()
.clone()]);
let inline_batch_info = create_batch_info_with_digest(inline_batch_payload.hash(), 1);
let inline_batch_info = create_batch_info_with_digest(inline_batch_payload.hash(), 1, 1000);
let inline_batches = vec![inline_batch_info];

// Create a block payload (with the transactions, correct proofs and correct inline batches)
let block_payload = create_block_payload(&signed_transactions, &proofs, &inline_batches);
let block_payload =
create_block_payload(None, &signed_transactions, &proofs, &inline_batches);

// Verify the block payload digests and ensure it passes
block_payload.verify_payload_digests().unwrap();

// Create a block payload (with too many transactions)
signed_transactions.append(&mut create_signed_transactions(1));
let block_payload = create_block_payload(&signed_transactions, &proofs, &inline_batches);
let block_payload =
create_block_payload(None, &signed_transactions, &proofs, &inline_batches);

// Verify the block payload digests and ensure it fails (there are too many transactions)
let error = block_payload.verify_payload_digests().unwrap_err();
Expand All @@ -1196,13 +1211,101 @@ mod test {
for _ in 0..3 {
signed_transactions.pop();
}
let block_payload = create_block_payload(&signed_transactions, &proofs, &inline_batches);
let block_payload =
create_block_payload(None, &signed_transactions, &proofs, &inline_batches);

// Verify the block payload digests and ensure it fails (there are too few transactions)
let error = block_payload.verify_payload_digests().unwrap_err();
assert_matches!(error, Error::InvalidMessageError(_));
}

#[test]
fn test_verify_payload_digests_expired() {
// Create a new block info with the specified timestamp
let block_timestamp = 1000;
let block_info = BlockInfo::new(
0,
0,
HashValue::random(),
HashValue::random(),
0,
block_timestamp,
None,
);

// Create multiple signed transactions
let num_signed_transactions = 100;
let signed_transactions = create_signed_transactions(num_signed_transactions);

// Create multiple batch proofs (where some batches are expired)
let (proofs, non_expired_transactions) =
create_mixed_expiration_proofs(block_timestamp, &signed_transactions);

// Create a block payload (with non-expired transactions, all proofs and no inline batches)
let block_payload = create_block_payload(
Some(block_info.clone()),
&non_expired_transactions,
&proofs,
&[],
);

// Verify the block payload digests and ensure it passes
assert_ok!(block_payload.verify_payload_digests());

// Create multiple inline transactions
let num_inline_transactions = 25;
let inline_transactions = create_signed_transactions(num_inline_transactions);

// Create multiple inline batches (where some batches are expired)
let (inline_batches, non_expired_inline_transactions) =
create_mixed_expiration_proofs(block_timestamp, &inline_transactions);

// Create a block payload (with all non-expired inline transactions, no proofs and inline batches)
let inline_batches: Vec<_> = inline_batches
.iter()
.map(|proof| proof.info().clone())
.collect();
let block_payload = create_block_payload(
Some(block_info.clone()),
&non_expired_inline_transactions,
&[],
&inline_batches,
);

// Verify the block payload digests and ensure it fails (expired inline batches are still checked)
let error = block_payload.verify_payload_digests().unwrap_err();
assert_matches!(error, Error::InvalidMessageError(_));

// Create a block payload (with all inline transactions, no proofs and inline batches)
let block_payload = create_block_payload(
Some(block_info.clone()),
&inline_transactions,
&[],
&inline_batches,
);

// Verify the block payload digests and ensure it now passes
assert_ok!(block_payload.verify_payload_digests());

// Gather all transactions (from both QS and inline batches)
let all_transactions: Vec<_> = non_expired_transactions
.iter()
.chain(inline_transactions.iter())
.cloned()
.collect();

// Create a block payload (with all transactions, all proofs and inline batches)
let block_payload = create_block_payload(
Some(block_info),
&all_transactions,
&proofs,
&inline_batches,
);

// Verify the block payload digests and ensure it passes
assert_ok!(block_payload.verify_payload_digests());
}

#[test]
fn test_verify_payload_signatures() {
// Create multiple batch info proofs (with empty signatures)
Expand Down Expand Up @@ -1253,16 +1356,20 @@ mod test {

/// Creates and returns a new batch info with random data
fn create_batch_info() -> BatchInfo {
create_batch_info_with_digest(HashValue::random(), 0)
create_batch_info_with_digest(HashValue::random(), 0, 0)
}

/// Creates and returns a new batch info with the specified digest
fn create_batch_info_with_digest(digest: HashValue, num_transactions: u64) -> BatchInfo {
/// Creates and returns a new batch info with the specified digest and properties
fn create_batch_info_with_digest(
digest: HashValue,
num_transactions: u64,
batch_expiration: u64,
) -> BatchInfo {
BatchInfo::new(
PeerId::ZERO,
BatchId::new(0),
10,
1,
batch_expiration,
digest,
num_transactions,
1,
Expand All @@ -1277,6 +1384,7 @@ mod test {

/// Creates and returns a hybrid quorum store payload using the given data
fn create_block_payload(
block_info: Option<BlockInfo>,
signed_transactions: &[SignedTransaction],
proofs: &[ProofOfStore],
inline_batches: &[BatchInfo],
Expand All @@ -1289,11 +1397,11 @@ mod test {
inline_batches.to_vec(),
);

// Determine the block info to use
let block_info = block_info.unwrap_or_else(|| create_block_info(0, HashValue::random()));

// Create the block payload
BlockPayload::new(
create_block_info(0, HashValue::random()),
transaction_payload,
)
BlockPayload::new(block_info, transaction_payload)
}

/// Creates and returns a new ledger info with an empty signature set
Expand All @@ -1304,6 +1412,43 @@ mod test {
)
}

/// Creates and returns a set of batch proofs using the given block
/// timestamp and transactions. Note: some batches will be expired.
fn create_mixed_expiration_proofs(
block_timestamp: u64,
signed_transactions: &[SignedTransaction],
) -> (Vec<ProofOfStore>, Vec<SignedTransaction>) {
let mut proofs = vec![];
let mut non_expired_transactions = vec![];

// Create multiple batch proofs (each batch has 1 transaction, and some batches are expired)
for (i, transaction) in signed_transactions.iter().enumerate() {
// Expire every other (odd) batch and transaction
let is_batch_expired = i % 2 != 0;

// Determine the expiration time for the batch
let batch_expiration = if is_batch_expired {
block_timestamp - 1 // Older than the block timestamp
} else {
block_timestamp + 1 // Newer than the block timestamp
};

// Create and store the batch proof
let batch_payload = BatchPayload::new(PeerId::ZERO, vec![transaction.clone()]);
let batch_info =
create_batch_info_with_digest(batch_payload.hash(), 1, batch_expiration);
let proof = ProofOfStore::new(batch_info, AggregateSignature::empty());
proofs.push(proof);

// Save the non-expired transactions
if !is_batch_expired {
non_expired_transactions.push(transaction.clone());
}
}

(proofs, non_expired_transactions)
}

/// Creates and returns a new pipelined block with the given block info
fn create_pipelined_block(block_info: BlockInfo) -> Arc<PipelinedBlock> {
let block_data = BlockData::new_for_testing(
Expand Down

0 comments on commit 57257a9

Please sign in to comment.