Skip to content

Commit

Permalink
[Consensus Observer] Add block payload verification.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jul 16, 2024
1 parent 8f07165 commit 56ccd17
Show file tree
Hide file tree
Showing 8 changed files with 1,282 additions and 227 deletions.
4 changes: 4 additions & 0 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl ProofWithData {
}
}

pub fn empty() -> Self {
Self::new(vec![])
}

pub fn extend(&mut self, other: ProofWithData) {
let other_data_status = other.status.lock().as_mut().unwrap().take();
self.proofs.extend(other.proofs);
Expand Down
21 changes: 14 additions & 7 deletions consensus/src/consensus_observer/missing_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ impl MissingBlockStore {
#[cfg(test)]
mod test {
use super::*;
use crate::consensus_observer::network_message::{BlockPayload, BlockTransactionPayload};
use aptos_consensus_types::{
block::Block,
block_data::{BlockData, BlockType},
Expand Down Expand Up @@ -378,7 +379,7 @@ mod test {
);

// Create a new block payload store and insert payloads for the second block
let mut block_payload_store = BlockPayloadStore::new();
let mut block_payload_store = BlockPayloadStore::new(consensus_observer_config);
let second_block = missing_blocks[1].clone();
insert_payloads_for_ordered_block(&mut block_payload_store, &second_block);

Expand Down Expand Up @@ -439,13 +440,15 @@ mod test {
);

// Create an empty block payload store
let mut block_payload_store = BlockPayloadStore::new();
let mut block_payload_store = BlockPayloadStore::new(consensus_observer_config);

// Incrementally insert and process each payload for the first block
let first_block = missing_blocks.first().unwrap().clone();
for block in first_block.blocks().clone() {
// Insert the block
block_payload_store.insert_block_payload(block.block_info(), vec![], None);
let block_payload =
BlockPayload::new(block.block_info(), BlockTransactionPayload::empty());
block_payload_store.insert_block_payload(block_payload, true);

// Attempt to remove the block (which might not be ready)
let payload_round = block.round();
Expand Down Expand Up @@ -486,7 +489,9 @@ mod test {
// Insert the block only if this is not the first block
let payload_round = block.round();
if payload_round != last_block.first_block().round() {
block_payload_store.insert_block_payload(block.block_info(), vec![], None);
let block_payload =
BlockPayload::new(block.block_info(), BlockTransactionPayload::empty());
block_payload_store.insert_block_payload(block_payload, true);
}

// Attempt to remove the block (which might not be ready)
Expand Down Expand Up @@ -533,7 +538,7 @@ mod test {
);

// Create a new block payload store and insert payloads for the first block
let mut block_payload_store = BlockPayloadStore::new();
let mut block_payload_store = BlockPayloadStore::new(consensus_observer_config);
let first_block = missing_blocks.first().unwrap().clone();
insert_payloads_for_ordered_block(&mut block_payload_store, &first_block);

Expand Down Expand Up @@ -614,7 +619,7 @@ mod test {
);

// Create an empty block payload store
let block_payload_store = BlockPayloadStore::new();
let block_payload_store = BlockPayloadStore::new(consensus_observer_config);

// Remove the third block (which is not ready)
let third_block = missing_blocks[2].clone();
Expand Down Expand Up @@ -716,7 +721,9 @@ mod test {
ordered_block: &OrderedBlock,
) {
for block in ordered_block.blocks() {
block_payload_store.insert_block_payload(block.block_info(), vec![], None);
let block_payload =
BlockPayload::new(block.block_info(), BlockTransactionPayload::empty());
block_payload_store.insert_block_payload(block_payload, true);
}
}

Expand Down
161 changes: 149 additions & 12 deletions consensus/src/consensus_observer/network_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
// SPDX-License-Identifier: Apache-2.0

use crate::consensus_observer::error::Error;
use aptos_consensus_types::pipelined_block::PipelinedBlock;
use aptos_consensus_types::{
common::{BatchPayload, ProofWithData},
pipelined_block::PipelinedBlock,
proof_of_store::{BatchInfo, ProofCache},
};
use aptos_crypto::hash::CryptoHash;
use aptos_types::{
block_info::{BlockInfo, Round},
epoch_change::Verifier,
Expand All @@ -12,6 +17,7 @@ use aptos_types::{
};
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
fmt::{Display, Formatter},
sync::Arc,
};
Expand Down Expand Up @@ -46,13 +52,11 @@ impl ConsensusObserverMessage {
/// Creates and returns a new block payload message using the given block, transactions and limit
pub fn new_block_payload_message(
block: BlockInfo,
transactions: Vec<SignedTransaction>,
limit: Option<u64>,
transaction_payload: BlockTransactionPayload,
) -> ConsensusObserverDirectSend {
ConsensusObserverDirectSend::BlockPayload(BlockPayload {
block,
transactions,
limit,
transaction_payload,
})
}
}
Expand Down Expand Up @@ -150,10 +154,11 @@ impl ConsensusObserverDirectSend {
},
ConsensusObserverDirectSend::BlockPayload(block_payload) => {
format!(
"BlockPayload: {} {} {:?}",
block_payload.block.id(),
block_payload.transactions.len(),
block_payload.limit
"BlockPayload: {}. Number of transactions: {}, limit: {:?}, proofs: {:?}",
block_payload.block,
block_payload.transaction_payload.transactions.len(),
block_payload.transaction_payload.limit,
block_payload.transaction_payload.proof_with_data.proofs,
)
},
}
Expand Down Expand Up @@ -304,10 +309,142 @@ impl CommitDecision {
}
}

/// Payload message contains the block, transactions and the limit of the block
/// The transaction payload of each block
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct BlockPayload {
pub block: BlockInfo,
pub struct BlockTransactionPayload {
pub transactions: Vec<SignedTransaction>,
pub limit: Option<u64>,
pub proof_with_data: ProofWithData,
pub inline_batches: Vec<BatchInfo>,
}

impl BlockTransactionPayload {
pub fn new(
transactions: Vec<SignedTransaction>,
limit: Option<u64>,
proof_with_data: ProofWithData,
inline_batches: Vec<BatchInfo>,
) -> Self {
Self {
transactions,
limit,
proof_with_data,
inline_batches,
}
}

#[cfg(test)]
/// Returns an empty transaction payload (for testing)
pub fn empty() -> Self {
Self {
transactions: vec![],
limit: None,
proof_with_data: ProofWithData::empty(),
inline_batches: vec![],
}
}
}

/// Payload message contains the block and transaction payload
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct BlockPayload {
pub block: BlockInfo,
pub transaction_payload: BlockTransactionPayload,
}

impl BlockPayload {
pub fn new(block: BlockInfo, transaction_payload: BlockTransactionPayload) -> Self {
Self {
block,
transaction_payload,
}
}

/// Verifies the block payload digests and returns an error if the data is invalid
pub fn verify_payload_digests(&self) -> Result<(), Error> {
// Verify the proof of store digests against the transactions
let mut transactions = self
.transaction_payload
.transactions
.iter()
.cloned()
.collect::<VecDeque<_>>();
for proof_of_store in &self.transaction_payload.proof_with_data.proofs {
reconstruct_and_verify_batch(&mut transactions, proof_of_store.info())?;
}

// Verify the inline batch digests against the inline batches
for batch_info in &self.transaction_payload.inline_batches {
reconstruct_and_verify_batch(&mut transactions, batch_info)?;
}

// Verify that there are no transactions remaining
if !transactions.is_empty() {
return Err(Error::InvalidMessageError(format!(
"Failed to verify payload transactions! Transactions remaining: {:?}. Expected: 0",
transactions.len()
)));
}

Ok(()) // All digests match
}

/// Verifies that the block payload proofs are correctly signed according
/// to the current epoch state. Returns an error if the data is invalid.
pub fn verify_payload_signatures(&self, epoch_state: &EpochState) -> Result<(), Error> {
// Create a dummy proof cache to verify the proofs
let proof_cache = ProofCache::new(1);

// Verify each of the proof signatures
let validator_verifier = &epoch_state.verifier;
for proof_of_store in &self.transaction_payload.proof_with_data.proofs {
if let Err(error) = proof_of_store.verify(validator_verifier, &proof_cache) {
return Err(Error::InvalidMessageError(format!(
"Failed to verify the proof of store for batch: {:?}, Error: {:?}",
proof_of_store.info(),
error
)));
}
}

Ok(()) // All proofs are correctly signed
}
}

/// Reconstructs and verifies the batch using the
/// given transactions and the expected batch info.
fn reconstruct_and_verify_batch(
transactions: &mut VecDeque<SignedTransaction>,
expected_batch_info: &BatchInfo,
) -> Result<(), Error> {
// Gather the transactions for the batch
let mut batch_transactions = vec![];
for i in 0..expected_batch_info.num_txns() {
let batch_transaction = match transactions.pop_front() {
Some(transaction) => transaction,
None => {
return Err(Error::InvalidMessageError(format!(
"Failed to extract transaction during batch reconstruction! Batch: {:?}, transaction index: {:?}",
expected_batch_info, i
)));
},
};
batch_transactions.push(batch_transaction);
}

// Calculate the batch digest
let batch_payload = BatchPayload::new(expected_batch_info.author(), batch_transactions);
let batch_digest = batch_payload.hash();

// Verify the reconstructed digest against the expected digest
let expected_digest = expected_batch_info.digest();
if batch_digest != *expected_digest {
return Err(Error::InvalidMessageError(format!(
"The reconstructed inline batch digest does not match the expected digest!\
Batch: {:?}, Expected digest: {:?}, Reconstructed digest: {:?}",
expected_batch_info, expected_digest, batch_digest
)));
}

Ok(())
}
Loading

0 comments on commit 56ccd17

Please sign in to comment.