Skip to content

Commit

Permalink
[Consensus Observer] Support non-QS and add metrics for block process…
Browse files Browse the repository at this point in the history
…ing.
  • Loading branch information
JoshLind committed Jul 14, 2024
1 parent befdd3d commit 1e26882
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 39 deletions.
47 changes: 44 additions & 3 deletions consensus/src/consensus_observer/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ use aptos_metrics_core::{
use once_cell::sync::Lazy;

// Useful metric labels
pub const BLOCK_PAYLOAD_LABEL: &str = "block_payload";
pub const COMMIT_DECISION_LABEL: &str = "commit_decision";
pub const CREATED_SUBSCRIPTION_LABEL: &str = "created_subscription";
pub const MISSING_BLOCKS_LABEL: &str = "missing_blocks";
pub const ORDERED_MESSAGE_LABEL: &str = "ordered_message";
pub const PENDING_ORDERED_BLOCKS_LABEL: &str = "pending_ordered_blocks";
pub const STORED_PAYLOADS_LABEL: &str = "stored_payloads";

/// Counter for tracking created subscriptions for the consensus observer
pub static OBSERVER_CREATED_SUBSCRIPTIONS: Lazy<IntCounterVec> = Lazy::new(|| {
Expand All @@ -21,7 +27,7 @@ pub static OBSERVER_CREATED_SUBSCRIPTIONS: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Counter for tracking the number of active subscriptions for the consensus observer
/// Gauge for tracking the number of active subscriptions for the consensus observer
pub static OBSERVER_NUM_ACTIVE_SUBSCRIPTIONS: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"consensus_observer_num_active_subscriptions",
Expand All @@ -31,6 +37,26 @@ pub static OBSERVER_NUM_ACTIVE_SUBSCRIPTIONS: Lazy<IntGaugeVec> = Lazy::new(|| {
.unwrap()
});

/// Gauge for tracking the number of processed blocks by the consensus observer
pub static OBSERVER_NUM_PROCESSED_BLOCKS: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"consensus_observer_num_processed_blocks",
"Gauge for tracking the number of processed blocks by the consensus observer",
&["processed_type"]
)
.unwrap()
});

/// Gauge for tracking the processed block rounds by the consensus observer
pub static OBSERVER_PROCESSED_BLOCK_ROUNDS: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"consensus_observer_processed_block_rounds",
"Gauge for tracking the processed block rounds by the consensus observer",
&["processed_type"]
)
.unwrap()
});

/// Counter for tracking successful RPC responses received by the consensus observer
pub static OBSERVER_RECEIVED_MESSAGE_RESPONSES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand All @@ -51,6 +77,16 @@ pub static OBSERVER_RECEIVED_MESSAGES: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Gauge for tracking the rounds of received messages by the consensus observer
pub static OBSERVER_RECEIVED_MESSAGE_ROUNDS: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"consensus_observer_received_message_rounds",
"Gauge for tracking the rounds of received messages by the consensus observer",
&["message_type"]
)
.unwrap()
});

/// Counter for tracking RPC request latencies sent by the consensus observer
pub static OBSERVER_REQUEST_LATENCIES: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
Expand Down Expand Up @@ -101,7 +137,7 @@ pub static PENDING_CONSENSUS_OBSERVER_NETWORK_EVENTS: Lazy<IntCounterVec> = Lazy
.unwrap()
});

/// Counter for tracking the number of active subscribers for the consensus publisher
/// Gauge for tracking the number of active subscribers for the consensus publisher
pub static PUBLISHER_NUM_ACTIVE_SUBSCRIBERS: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"consensus_publisher_num_active_subscribers",
Expand Down Expand Up @@ -166,7 +202,12 @@ pub fn observe_value_with_label(
.observe(value)
}

/// Sets the gauge with the specific label and value
/// Sets the gauge with the specific network ID and value
pub fn set_gauge(counter: &Lazy<IntGaugeVec>, network_id: &NetworkId, value: i64) {
counter.with_label_values(&[network_id.as_str()]).set(value);
}

/// Sets the gauge with the specific label and value
pub fn set_gauge_with_label(counter: &Lazy<IntGaugeVec>, label: &str, value: u64) {
counter.with_label_values(&[label]).set(value as i64);
}
25 changes: 25 additions & 0 deletions consensus/src/consensus_observer/missing_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::consensus_observer::{
logging::{LogEntry, LogSchema},
metrics,
network_message::OrderedBlock,
payload_store::BlockPayloadStore,
};
Expand Down Expand Up @@ -133,6 +134,30 @@ impl MissingBlockStore {
// Return the ready block (if one exists)
ready_block
}

/// Updates the metrics for the missing blocks
pub fn update_missing_blocks_metrics(&self) {
// Update the number of missing blocks
let blocks_missing_payloads = self.blocks_missing_payloads.lock();
let num_missing_blocks = blocks_missing_payloads.len() as u64;
metrics::set_gauge_with_label(
&metrics::OBSERVER_NUM_PROCESSED_BLOCKS,
metrics::MISSING_BLOCKS_LABEL,
num_missing_blocks,
);

// Update the highest round for the missing blocks
let highest_missing_round = blocks_missing_payloads
.last_key_value()
.map(|(_, missing_block)| missing_block.last_block())
.and_then(|last_block| last_block.map(|block| block.round()))
.unwrap_or(0);
metrics::set_gauge_with_label(
&metrics::OBSERVER_PROCESSED_BLOCK_ROUNDS,
metrics::MISSING_BLOCKS_LABEL,
highest_missing_round,
);
}
}

#[cfg(test)]
Expand Down
128 changes: 93 additions & 35 deletions consensus/src/consensus_observer/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::{config::ConsensusObserverConfig, network_id::PeerNetworkId};
use aptos_consensus_types::pipeline;
use aptos_consensus_types::{pipeline, pipelined_block::PipelinedBlock};
use aptos_crypto::{bls12381, Genesis};
use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener};
use aptos_infallible::Mutex;
Expand Down Expand Up @@ -60,6 +60,9 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::{sync::mpsc::UnboundedSender, time::interval};
use tokio_stream::wrappers::IntervalStream;

// Whether to log messages at the info level (useful for debugging)
const LOG_MESSAGES_AT_INFO_LEVEL: bool = false;

/// The consensus observer receives consensus updates and propagates them to the execution pipeline
pub struct ConsensusObserver {
// The configuration of the consensus observer
Expand All @@ -70,6 +73,8 @@ pub struct ConsensusObserver {

// The current epoch state
epoch_state: Option<Arc<EpochState>>,
// Whether quorum store is enabled (updated on epoch changes)
quorum_store_enabled: bool,
// The latest ledger info (updated via a callback)
root: Arc<Mutex<LedgerInfoWithSignatures>>,

Expand Down Expand Up @@ -121,6 +126,7 @@ impl ConsensusObserver {
consensus_observer_config,
consensus_observer_client,
epoch_state: None,
quorum_store_enabled: false, // Updated on epoch changes
root: Arc::new(Mutex::new(root)),
block_payload_store: BlockPayloadStore::new(),
missing_block_store: MissingBlockStore::new(consensus_observer_config),
Expand All @@ -136,6 +142,17 @@ impl ConsensusObserver {
}
}

/// Returns true iff all payloads exist for the given blocks
fn all_payloads_exist(&self, blocks: &[Arc<PipelinedBlock>]) -> bool {
// If quorum store is disabled, all payloads exist (they're already in the blocks)
if !self.quorum_store_enabled {
return true;
}

// Otherwise, check if all the payloads exist in the payload store
self.block_payload_store.all_payloads_exist(blocks)
}

/// Checks the progress of the consensus observer
async fn check_progress(&mut self) {
debug!(LogSchema::new(LogEntry::ConsensusObserver)
Expand Down Expand Up @@ -442,15 +459,22 @@ impl ConsensusObserver {

/// Processes the block payload message
async fn process_block_payload_message(&mut self, block_payload: BlockPayload) {
// Unpack the block round and epoch
// Get the block round and epoch
let block = block_payload.block;
let block_round = block.round();
let block_epoch = block.epoch();

// Unpack the block payload
// Get the block transactions and limit
let transactions = block_payload.transactions;
let limit = block_payload.limit;

// Update the metrics for the received block payload
metrics::set_gauge_with_label(
&metrics::OBSERVER_RECEIVED_MESSAGE_ROUNDS,
metrics::BLOCK_PAYLOAD_LABEL,
block_round,
);

// TODO: verify the block payload!

// Update the payload store with the payload
Expand All @@ -471,6 +495,13 @@ impl ConsensusObserver {

/// Processes the commit decision message
fn process_commit_decision_message(&mut self, commit_decision: CommitDecision) {
// Update the metrics for the received commit decision
metrics::set_gauge_with_label(
&metrics::OBSERVER_RECEIVED_MESSAGE_ROUNDS,
metrics::COMMIT_DECISION_LABEL,
commit_decision.round(),
);

// If the commit decision is for the current epoch, verify it
let epoch_state = self.get_epoch_state();
let commit_decision_epoch = commit_decision.epoch();
Expand Down Expand Up @@ -537,11 +568,8 @@ impl ConsensusObserver {

// Process the pending block
if let Some(pending_block) = pending_block {
// If the payload exists, add the commit decision to the pending blocks
if self
.block_payload_store
.all_payloads_exist(pending_block.blocks())
{
// If all payloads exist, add the commit decision to the pending blocks
if self.all_payloads_exist(pending_block.blocks()) {
debug!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Adding decision to pending block: {}",
Expand Down Expand Up @@ -612,35 +640,44 @@ impl ConsensusObserver {
// Process the message based on the type
match message {
ConsensusObserverDirectSend::OrderedBlock(ordered_block) => {
debug!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Received ordered block: {}, from peer: {}!",
ordered_block.proof_block_info(),
peer_network_id
))
// Log the received ordered block message
let log_message = format!(
"Received ordered block: {}, from peer: {}!",
ordered_block.proof_block_info(),
peer_network_id
);
log_received_message(log_message);

// Process the ordered block message
self.process_ordered_block_message(ordered_block).await;
},
ConsensusObserverDirectSend::CommitDecision(commit_decision) => {
debug!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Received commit decision: {}, from peer: {}!",
commit_decision.proof_block_info(),
peer_network_id
))
// Log the received commit decision message
let log_message = format!(
"Received commit decision: {}, from peer: {}!",
commit_decision.proof_block_info(),
peer_network_id
);
log_received_message(log_message);

// Process the commit decision message
self.process_commit_decision_message(commit_decision);
},
ConsensusObserverDirectSend::BlockPayload(block_payload) => {
debug!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Received block payload: {}, from peer: {}!",
block_payload.block, peer_network_id
))
// Log the received block payload message
let log_message = format!(
"Received block payload: {}, from peer: {}!",
block_payload.block, peer_network_id
);
log_received_message(log_message);

// Process the block payload message
self.process_block_payload_message(block_payload).await;
},
}

// Update the metrics for the processed blocks
self.update_processed_blocks_metrics();
}

/// Processes the ordered block
Expand All @@ -657,12 +694,9 @@ impl ConsensusObserver {
return;
};

// If all the payloads exist, process the ordered block.
// Otherwise, store the block in the missing block store.
if self
.block_payload_store
.all_payloads_exist(ordered_block.blocks())
{
// If all payloads exist, process the block. Otherwise, store it
// in the missing block store and wait for the payloads to arrive.
if self.all_payloads_exist(ordered_block.blocks()) {
self.process_ordered_block(ordered_block).await;
} else {
self.missing_block_store.insert_missing_block(ordered_block);
Expand Down Expand Up @@ -884,6 +918,18 @@ impl ConsensusObserver {
});
}

/// Updates the metrics for the processed blocks
fn update_processed_blocks_metrics(&self) {
// Update the missing block metrics
self.missing_block_store.update_missing_blocks_metrics();

// Update the payload store metrics
self.block_payload_store.update_payload_store_metrics();

// Update the pending block metrics
self.pending_ordered_blocks.update_pending_blocks_metrics();
}

/// Updates the subscription creation metrics for the given peer
fn update_subscription_creation_metrics(&self, peer_network_id: PeerNetworkId) {
// Set the number of active subscriptions
Expand Down Expand Up @@ -935,17 +981,18 @@ impl ConsensusObserver {
panic!("Reconfig events are required to wait for a new epoch to start! Something has gone wrong!")
};

// Update the local epoch state
// Update the local epoch state and quorum store config
self.epoch_state = Some(epoch_state.clone());
self.quorum_store_enabled = consensus_config.quorum_store_enabled();
info!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"New epoch started: {}. Updated the epoch state!",
epoch_state.epoch
"New epoch started: {:?}. Updated the epoch state! Quorum store enabled: {:?}",
epoch_state.epoch, self.quorum_store_enabled,
))
);

// Create the payload manager
let payload_manager = if consensus_config.quorum_store_enabled() {
let payload_manager = if self.quorum_store_enabled {
PayloadManager::ConsensusObserver(
self.block_payload_store.get_block_payloads(),
self.consensus_publisher.clone(),
Expand Down Expand Up @@ -1175,6 +1222,17 @@ async fn extract_on_chain_configs(
)
}

/// Logs the received message using an appropriate log level
fn log_received_message(message: String) {
// Log the message at the appropriate level
let log_schema = LogSchema::new(LogEntry::ConsensusObserver).message(&message);
if LOG_MESSAGES_AT_INFO_LEVEL {
info!(log_schema);
} else {
debug!(log_schema);
}
}

/// Spawns a task to sync to the given commit decision and notifies
/// the consensus observer. Also, returns an abort handle to cancel the task.
fn sync_to_commit_decision(
Expand Down
Loading

0 comments on commit 1e26882

Please sign in to comment.