Skip to content

Commit

Permalink
[Consensus Observer] Simplify payload manager.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jul 18, 2024
1 parent b4480ee commit c0d820f
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 146 deletions.
126 changes: 30 additions & 96 deletions consensus/src/consensus_observer/payload_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::consensus_observer::{
logging::{LogEntry, LogSchema},
metrics,
network_message::{BlockPayload, BlockTransactionPayload},
network_message::BlockPayload,
};
use aptos_config::config::ConsensusObserverConfig;
use aptos_consensus_types::{common::Round, pipelined_block::PipelinedBlock};
Expand All @@ -15,16 +15,11 @@ use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
};
use tokio::sync::oneshot;

/// The status of the block payload
pub enum BlockPayloadStatus {
AvailableAndVerified(BlockPayload),
AvailableAndUnverified(
BlockPayload,
Option<oneshot::Sender<BlockTransactionPayload>>,
),
Requested(oneshot::Sender<BlockTransactionPayload>),
AvailableAndUnverified(BlockPayload),
}

/// A simple struct to store the block payloads of ordered and committed blocks
Expand Down Expand Up @@ -86,49 +81,18 @@ impl BlockPayloadStore {
return; // Drop the block if we've exceeded the maximum
}

// Get the current block payload status (if one exists)
// Create the new payload status
let epoch_and_round = (block_payload.block.epoch(), block_payload.block.round());
let previous_payload_status = self.block_payloads.lock().remove_entry(&epoch_and_round);

// Insert the new block payload status entry
match previous_payload_status {
Some((_, BlockPayloadStatus::Requested(payload_sender)))
| Some((_, BlockPayloadStatus::AvailableAndUnverified(_, Some(payload_sender)))) => {
// The payload was requested. Send it to the listener if it has already verified.
if verified_payload_signatures {
if let Err(error) =
payload_sender.send(block_payload.transaction_payload.clone())
{
error!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Failed to send block payload to listener! Error: {:?}",
error
))
);
}
self.block_payloads.lock().insert(
epoch_and_round,
BlockPayloadStatus::AvailableAndVerified(block_payload),
);
} else {
// Otherwise, save the payload and listener for verification later
self.block_payloads.lock().insert(
epoch_and_round,
BlockPayloadStatus::AvailableAndUnverified(
block_payload,
Some(payload_sender),
),
);
}
},
_ => {
// Insert the new payload status
self.block_payloads.lock().insert(
epoch_and_round,
create_available_payload_status(block_payload, verified_payload_signatures),
);
},
}
let payload_status = if verified_payload_signatures {
BlockPayloadStatus::AvailableAndVerified(block_payload)
} else {
BlockPayloadStatus::AvailableAndUnverified(block_payload)
};

// Insert the new payload status
self.block_payloads
.lock()
.insert(epoch_and_round, payload_status);
}

/// Removes all blocks up to the specified epoch and round (inclusive)
Expand Down Expand Up @@ -201,7 +165,7 @@ impl BlockPayloadStore {
if epoch == current_epoch {
if let Entry::Occupied(mut entry) = self.block_payloads.lock().entry((epoch, round))
{
if let BlockPayloadStatus::AvailableAndUnverified(block_payload, _) =
if let BlockPayloadStatus::AvailableAndUnverified(block_payload) =
entry.get_mut()
{
if let Err(error) = block_payload.verify_payload_signatures(epoch_state) {
Expand Down Expand Up @@ -241,22 +205,10 @@ impl BlockPayloadStore {
}
}

/// Creates and returns a new block payload status based on the
/// given payload and whether the signatures have been verified.
fn create_available_payload_status(
block_payload: BlockPayload,
verified_payload_signatures: bool,
) -> BlockPayloadStatus {
if verified_payload_signatures {
BlockPayloadStatus::AvailableAndVerified(block_payload)
} else {
BlockPayloadStatus::AvailableAndUnverified(block_payload, None)
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::consensus_observer::network_message::BlockTransactionPayload;
use aptos_consensus_types::{
block::Block,
block_data::{BlockData, BlockType},
Expand Down Expand Up @@ -338,7 +290,7 @@ mod test {
}

#[test]
fn test_all_payloads_exist_requested() {
fn test_all_payloads_exist_unverified() {
// Create a new block payload store
let consensus_observer_config = ConsensusObserverConfig::default();
let block_payload_store = BlockPayloadStore::new(consensus_observer_config);
Expand All @@ -355,8 +307,8 @@ mod test {
// Check that the payloads exists in the block payload store
assert!(block_payload_store.all_payloads_exist(&verified_blocks));

// Mark the payload of the first block as requested
mark_payload_as_requested(block_payload_store.clone(), &verified_blocks[0]);
// Mark the payload of the first block as unverified
mark_payload_as_unverified(block_payload_store.clone(), &verified_blocks[0]);

// Check that the payload no longer exists in the block payload store
assert!(!block_payload_store.all_payloads_exist(&verified_blocks));
Expand Down Expand Up @@ -423,29 +375,23 @@ mod test {
check_num_unverified_payloads(&block_payload_store, 0);
check_num_verified_payloads(&block_payload_store, num_blocks_in_store);

// Mark the payload of the first block as requested
let payload_receiver =
mark_payload_as_requested(block_payload_store.clone(), &verified_blocks[0]);
// Mark the payload of the first block as unverified
mark_payload_as_unverified(block_payload_store.clone(), &verified_blocks[0]);

// Check that the payload no longer exists in the block payload store
assert!(!block_payload_store.all_payloads_exist(&verified_blocks));

// Verify the number of verified blocks in the block payload store
check_num_verified_payloads(&block_payload_store, num_blocks_in_store - 1);

// Insert the same block payload into the block payload store
// Insert the same block payload into the block payload store (as verified)
let transaction_payload =
BlockTransactionPayload::new(vec![], Some(0), ProofWithData::empty(), vec![]);
let block_payload = BlockPayload::new(verified_blocks[0].block_info(), transaction_payload);
block_payload_store.insert_block_payload(block_payload, true);

// Check that the block payload store now contains the requested block payload
assert!(block_payload_store.all_payloads_exist(&verified_blocks));

// Check that the payload receiver receives the requested block payload message
let block_payload = payload_receiver.blocking_recv().unwrap();
assert!(block_payload.transactions.is_empty());
assert_eq!(block_payload.limit, Some(0));
}

#[test]
Expand Down Expand Up @@ -1008,7 +954,7 @@ mod test {
fn get_num_unverified_payloads(block_payload_store: &BlockPayloadStore) -> usize {
let mut num_unverified_payloads = 0;
for (_, block_payload_status) in block_payload_store.block_payloads.lock().iter() {
if let BlockPayloadStatus::AvailableAndUnverified(_, _) = block_payload_status {
if let BlockPayloadStatus::AvailableAndUnverified(_) = block_payload_status {
num_unverified_payloads += 1;
}
}
Expand All @@ -1026,34 +972,22 @@ mod test {
num_verified_payloads
}

/// Marks the payload of the given block as requested and returns the receiver
fn mark_payload_as_requested(
/// Marks the payload of the given block as unverified
fn mark_payload_as_unverified(
block_payload_store: BlockPayloadStore,
block: &Arc<PipelinedBlock>,
) -> oneshot::Receiver<BlockTransactionPayload> {
) {
// Get the payload entry for the given block
let block_payloads = block_payload_store.get_block_payloads();
let mut block_payloads = block_payloads.lock();
let block_payload = block_payloads
.get_mut(&(block.epoch(), block.round()))
.unwrap();

// Create a new payload sender and receiver
let (payload_sender, payload_receiver) = oneshot::channel();

// Mark the block payload as requested (either requested or unverified)
if get_random_u64() % 2 == 0 {
// Mark the payload as requested
*block_payload = BlockPayloadStatus::Requested(payload_sender);
} else {
// Mark the payload as unverified and requested
*block_payload = BlockPayloadStatus::AvailableAndUnverified(
BlockPayload::new(block.block_info(), BlockTransactionPayload::empty()),
Some(payload_sender),
);
};

// Return the payload receiver
payload_receiver
// Mark the block payload as unverified
*block_payload = BlockPayloadStatus::AvailableAndUnverified(BlockPayload::new(
block.block_info(),
BlockTransactionPayload::empty(),
));
}
}
96 changes: 46 additions & 50 deletions consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@ use aptos_consensus_types::{
proof_of_store::{BatchInfo, ProofOfStore},
};
use aptos_crypto::HashValue;
use aptos_executor_types::{ExecutorError::DataNotFound, *};
use aptos_executor_types::{
ExecutorError::{DataNotFound, InternalError},
*,
};
use aptos_infallible::Mutex;
use aptos_logger::prelude::*;
use aptos_types::transaction::SignedTransaction;
use futures::channel::mpsc::Sender;
use itertools::Either;
use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
time::Duration,
};
use tokio::{sync::oneshot, time::timeout};
use tokio::sync::oneshot;

pub trait TPayloadManager: Send + Sync {
fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64);
Expand Down Expand Up @@ -185,9 +186,14 @@ impl PayloadManager {
None => return Ok((Vec::new(), None)),
};

if let PayloadManager::ConsensusObserver(txns_pool, consensus_publisher) = self {
return get_transactions_for_observer(block, payload, txns_pool, consensus_publisher)
.await;
if let PayloadManager::ConsensusObserver(block_payloads, consensus_publisher) = self {
return get_transactions_for_observer(
block,
payload,
block_payloads,
consensus_publisher,
)
.await;
}

async fn process_payload(
Expand Down Expand Up @@ -348,92 +354,82 @@ impl PayloadManager {
async fn get_transactions_for_observer(
block: &Block,
payload: &Payload,
txns_pool: &Arc<Mutex<BTreeMap<(u64, Round), BlockPayloadStatus>>>,
block_payloads: &Arc<Mutex<BTreeMap<(u64, Round), BlockPayloadStatus>>>,
consensus_publisher: &Option<Arc<ConsensusPublisher>>,
) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
// If the data is already available, return it, otherwise wait for it.
// It's important to make sure this doesn't race with the payload insertion part.
let result = match txns_pool.lock().entry((block.epoch(), block.round())) {
// The data should already be available (as consensus observer will only ever
// forward a block to the executor once the data has been received and verified).
let block_payload = match block_payloads.lock().entry((block.epoch(), block.round())) {
Entry::Occupied(mut value) => match value.get_mut() {
BlockPayloadStatus::AvailableAndVerified(data) => Either::Left(data.clone()),
BlockPayloadStatus::AvailableAndUnverified(_, payload_sender) => {
let (new_payload_sender, new_payload_receiver) = oneshot::channel();
*payload_sender = Some(new_payload_sender);
Either::Right(new_payload_receiver)
},
BlockPayloadStatus::Requested(payload_sender) => {
let (new_payload_sender, new_payload_receiver) = oneshot::channel();
*payload_sender = new_payload_sender;
Either::Right(new_payload_receiver)
BlockPayloadStatus::AvailableAndVerified(block_payload) => block_payload.clone(),
BlockPayloadStatus::AvailableAndUnverified(_) => {
// This shouldn't happen (the payload should already be verified)
let error = format!(
"Payload data for block epoch {}, round {} is unverified!",
block.epoch(),
block.round()
);
return Err(InternalError { error });
},
},
Entry::Vacant(entry) => {
let (payload_sender, payload_receiver) = oneshot::channel();
entry.insert(BlockPayloadStatus::Requested(payload_sender));
Either::Right(payload_receiver)
Entry::Vacant(_) => {
// This shouldn't happen (the payload should already be present)
let error = format!(
"Missing payload data for block epoch {}, round {}!",
block.epoch(),
block.round()
);
return Err(InternalError { error });
},
};
let block_transaction_payload = match result {
Either::Left(data) => data.transaction_payload,
Either::Right(rx) => timeout(Duration::from_millis(300), rx)
.await
.map_err(|_| ExecutorError::CouldNotGetData)?
.map_err(|_| ExecutorError::CouldNotGetData)?,
};

// Verify the payload and inline batches before returning the data. The
// batch digests and transactions will have already been verified by the
// consensus observer on message receipt.
let transaction_payload = block_payload.transaction_payload;
match payload {
Payload::DirectMempool(_) => {
return Err(ExecutorError::InternalError {
error: "DirectMempool payloads should not be sent to the consensus observer!"
.to_string(),
});
let error =
"DirectMempool payloads should not be sent to the consensus observer!".into();
return Err(InternalError { error });
},
Payload::InQuorumStore(proof_with_data) => {
// Verify the batches in the requested block
verify_batches_in_block(&proof_with_data.proofs, &block_transaction_payload)?;
verify_batches_in_block(&proof_with_data.proofs, &transaction_payload)?;
},
Payload::InQuorumStoreWithLimit(proof_with_data) => {
// Verify the batches in the requested block
verify_batches_in_block(
&proof_with_data.proof_with_data.proofs,
&block_transaction_payload,
&transaction_payload,
)?;

// Verify the transaction limit
verify_transaction_limit(
proof_with_data.max_txns_to_execute,
&block_transaction_payload,
)?;
verify_transaction_limit(proof_with_data.max_txns_to_execute, &transaction_payload)?;
},
Payload::QuorumStoreInlineHybrid(inline_batches, proof_with_data, max_txns_to_execute) => {
// Verify the batches in the requested block
verify_batches_in_block(&proof_with_data.proofs, &block_transaction_payload)?;
verify_batches_in_block(&proof_with_data.proofs, &transaction_payload)?;

// Verify the inline batches
verify_inline_batches_in_block(inline_batches, &block_transaction_payload)?;
verify_inline_batches_in_block(inline_batches, &transaction_payload)?;

// Verify the transaction limit
verify_transaction_limit(*max_txns_to_execute, &block_transaction_payload)?;
verify_transaction_limit(*max_txns_to_execute, &transaction_payload)?;
},
}

// If the payload is valid, publish it to any downstream observers
if let Some(consensus_publisher) = consensus_publisher {
let message = ConsensusObserverMessage::new_block_payload_message(
block.gen_block_info(HashValue::zero(), 0, None),
block_transaction_payload.clone(),
transaction_payload.clone(),
);
consensus_publisher.publish_message(message).await;
}

// Return the transactions and the transaction limit
Ok((
block_transaction_payload.transactions,
block_transaction_payload.limit,
))
Ok((transaction_payload.transactions, transaction_payload.limit))
}

/// Verifies that the batches in the block transaction payload
Expand Down

0 comments on commit c0d820f

Please sign in to comment.