From 1e26882ab95c340ffd486e6591f93680efddf1ae Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Fri, 5 Jul 2024 14:44:27 -0400 Subject: [PATCH] [Consensus Observer] Support non-QS and add metrics for block processing. --- consensus/src/consensus_observer/metrics.rs | 47 ++++++- .../src/consensus_observer/missing_blocks.rs | 25 ++++ consensus/src/consensus_observer/observer.rs | 128 +++++++++++++----- .../src/consensus_observer/payload_store.rs | 17 ++- .../src/consensus_observer/pending_blocks.rs | 26 ++++ 5 files changed, 204 insertions(+), 39 deletions(-) diff --git a/consensus/src/consensus_observer/metrics.rs b/consensus/src/consensus_observer/metrics.rs index 0425a1e799d488..a410a333013a8f 100644 --- a/consensus/src/consensus_observer/metrics.rs +++ b/consensus/src/consensus_observer/metrics.rs @@ -9,7 +9,13 @@ use aptos_metrics_core::{ use once_cell::sync::Lazy; // Useful metric labels +pub const BLOCK_PAYLOAD_LABEL: &str = "block_payload"; +pub const COMMIT_DECISION_LABEL: &str = "commit_decision"; pub const CREATED_SUBSCRIPTION_LABEL: &str = "created_subscription"; +pub const MISSING_BLOCKS_LABEL: &str = "missing_blocks"; +pub const ORDERED_MESSAGE_LABEL: &str = "ordered_message"; +pub const PENDING_ORDERED_BLOCKS_LABEL: &str = "pending_ordered_blocks"; +pub const STORED_PAYLOADS_LABEL: &str = "stored_payloads"; /// Counter for tracking created subscriptions for the consensus observer pub static OBSERVER_CREATED_SUBSCRIPTIONS: Lazy = Lazy::new(|| { @@ -21,7 +27,7 @@ pub static OBSERVER_CREATED_SUBSCRIPTIONS: Lazy = Lazy::new(|| { .unwrap() }); -/// Counter for tracking the number of active subscriptions for the consensus observer +/// Gauge for tracking the number of active subscriptions for the consensus observer pub static OBSERVER_NUM_ACTIVE_SUBSCRIPTIONS: Lazy = Lazy::new(|| { register_int_gauge_vec!( "consensus_observer_num_active_subscriptions", @@ -31,6 +37,26 @@ pub static OBSERVER_NUM_ACTIVE_SUBSCRIPTIONS: Lazy = Lazy::new(|| { .unwrap() }); +/// Gauge for tracking the number of processed blocks by the consensus observer +pub static OBSERVER_NUM_PROCESSED_BLOCKS: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "consensus_observer_num_processed_blocks", + "Gauge for tracking the number of processed blocks by the consensus observer", + &["processed_type"] + ) + .unwrap() +}); + +/// Gauge for tracking the processed block rounds by the consensus observer +pub static OBSERVER_PROCESSED_BLOCK_ROUNDS: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "consensus_observer_processed_block_rounds", + "Gauge for tracking the processed block rounds by the consensus observer", + &["processed_type"] + ) + .unwrap() +}); + /// Counter for tracking successful RPC responses received by the consensus observer pub static OBSERVER_RECEIVED_MESSAGE_RESPONSES: Lazy = Lazy::new(|| { register_int_counter_vec!( @@ -51,6 +77,16 @@ pub static OBSERVER_RECEIVED_MESSAGES: Lazy = Lazy::new(|| { .unwrap() }); +/// Gauge for tracking the rounds of received messages by the consensus observer +pub static OBSERVER_RECEIVED_MESSAGE_ROUNDS: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "consensus_observer_received_message_rounds", + "Gauge for tracking the rounds of received messages by the consensus observer", + &["message_type"] + ) + .unwrap() +}); + /// Counter for tracking RPC request latencies sent by the consensus observer pub static OBSERVER_REQUEST_LATENCIES: Lazy = Lazy::new(|| { register_histogram_vec!( @@ -101,7 +137,7 @@ pub static PENDING_CONSENSUS_OBSERVER_NETWORK_EVENTS: Lazy = Lazy .unwrap() }); -/// Counter for tracking the number of active subscribers for the consensus publisher +/// Gauge for tracking the number of active subscribers for the consensus publisher pub static PUBLISHER_NUM_ACTIVE_SUBSCRIBERS: Lazy = Lazy::new(|| { register_int_gauge_vec!( "consensus_publisher_num_active_subscribers", @@ -166,7 +202,12 @@ pub fn observe_value_with_label( .observe(value) } -/// Sets the gauge with the specific label and value +/// Sets the gauge with the specific network ID and value pub fn set_gauge(counter: &Lazy, network_id: &NetworkId, value: i64) { counter.with_label_values(&[network_id.as_str()]).set(value); } + +/// Sets the gauge with the specific label and value +pub fn set_gauge_with_label(counter: &Lazy, label: &str, value: u64) { + counter.with_label_values(&[label]).set(value as i64); +} diff --git a/consensus/src/consensus_observer/missing_blocks.rs b/consensus/src/consensus_observer/missing_blocks.rs index ec56604a3a96f2..8bf8dd189ba316 100644 --- a/consensus/src/consensus_observer/missing_blocks.rs +++ b/consensus/src/consensus_observer/missing_blocks.rs @@ -3,6 +3,7 @@ use crate::consensus_observer::{ logging::{LogEntry, LogSchema}, + metrics, network_message::OrderedBlock, payload_store::BlockPayloadStore, }; @@ -133,6 +134,30 @@ impl MissingBlockStore { // Return the ready block (if one exists) ready_block } + + /// Updates the metrics for the missing blocks + pub fn update_missing_blocks_metrics(&self) { + // Update the number of missing blocks + let blocks_missing_payloads = self.blocks_missing_payloads.lock(); + let num_missing_blocks = blocks_missing_payloads.len() as u64; + metrics::set_gauge_with_label( + &metrics::OBSERVER_NUM_PROCESSED_BLOCKS, + metrics::MISSING_BLOCKS_LABEL, + num_missing_blocks, + ); + + // Update the highest round for the missing blocks + let highest_missing_round = blocks_missing_payloads + .last_key_value() + .map(|(_, missing_block)| missing_block.last_block()) + .and_then(|last_block| last_block.map(|block| block.round())) + .unwrap_or(0); + metrics::set_gauge_with_label( + &metrics::OBSERVER_PROCESSED_BLOCK_ROUNDS, + metrics::MISSING_BLOCKS_LABEL, + highest_missing_round, + ); + } } #[cfg(test)] diff --git a/consensus/src/consensus_observer/observer.rs b/consensus/src/consensus_observer/observer.rs index 548929f2945cce..153d09d49e4500 100644 --- a/consensus/src/consensus_observer/observer.rs +++ b/consensus/src/consensus_observer/observer.rs @@ -28,7 +28,7 @@ use crate::{ }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::{config::ConsensusObserverConfig, network_id::PeerNetworkId}; -use aptos_consensus_types::pipeline; +use aptos_consensus_types::{pipeline, pipelined_block::PipelinedBlock}; use aptos_crypto::{bls12381, Genesis}; use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener}; use aptos_infallible::Mutex; @@ -60,6 +60,9 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::{sync::mpsc::UnboundedSender, time::interval}; use tokio_stream::wrappers::IntervalStream; +// Whether to log messages at the info level (useful for debugging) +const LOG_MESSAGES_AT_INFO_LEVEL: bool = false; + /// The consensus observer receives consensus updates and propagates them to the execution pipeline pub struct ConsensusObserver { // The configuration of the consensus observer @@ -70,6 +73,8 @@ pub struct ConsensusObserver { // The current epoch state epoch_state: Option>, + // Whether quorum store is enabled (updated on epoch changes) + quorum_store_enabled: bool, // The latest ledger info (updated via a callback) root: Arc>, @@ -121,6 +126,7 @@ impl ConsensusObserver { consensus_observer_config, consensus_observer_client, epoch_state: None, + quorum_store_enabled: false, // Updated on epoch changes root: Arc::new(Mutex::new(root)), block_payload_store: BlockPayloadStore::new(), missing_block_store: MissingBlockStore::new(consensus_observer_config), @@ -136,6 +142,17 @@ impl ConsensusObserver { } } + /// Returns true iff all payloads exist for the given blocks + fn all_payloads_exist(&self, blocks: &[Arc]) -> bool { + // If quorum store is disabled, all payloads exist (they're already in the blocks) + if !self.quorum_store_enabled { + return true; + } + + // Otherwise, check if all the payloads exist in the payload store + self.block_payload_store.all_payloads_exist(blocks) + } + /// Checks the progress of the consensus observer async fn check_progress(&mut self) { debug!(LogSchema::new(LogEntry::ConsensusObserver) @@ -442,15 +459,22 @@ impl ConsensusObserver { /// Processes the block payload message async fn process_block_payload_message(&mut self, block_payload: BlockPayload) { - // Unpack the block round and epoch + // Get the block round and epoch let block = block_payload.block; let block_round = block.round(); let block_epoch = block.epoch(); - // Unpack the block payload + // Get the block transactions and limit let transactions = block_payload.transactions; let limit = block_payload.limit; + // Update the metrics for the received block payload + metrics::set_gauge_with_label( + &metrics::OBSERVER_RECEIVED_MESSAGE_ROUNDS, + metrics::BLOCK_PAYLOAD_LABEL, + block_round, + ); + // TODO: verify the block payload! // Update the payload store with the payload @@ -471,6 +495,13 @@ impl ConsensusObserver { /// Processes the commit decision message fn process_commit_decision_message(&mut self, commit_decision: CommitDecision) { + // Update the metrics for the received commit decision + metrics::set_gauge_with_label( + &metrics::OBSERVER_RECEIVED_MESSAGE_ROUNDS, + metrics::COMMIT_DECISION_LABEL, + commit_decision.round(), + ); + // If the commit decision is for the current epoch, verify it let epoch_state = self.get_epoch_state(); let commit_decision_epoch = commit_decision.epoch(); @@ -537,11 +568,8 @@ impl ConsensusObserver { // Process the pending block if let Some(pending_block) = pending_block { - // If the payload exists, add the commit decision to the pending blocks - if self - .block_payload_store - .all_payloads_exist(pending_block.blocks()) - { + // If all payloads exist, add the commit decision to the pending blocks + if self.all_payloads_exist(pending_block.blocks()) { debug!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Adding decision to pending block: {}", @@ -612,35 +640,44 @@ impl ConsensusObserver { // Process the message based on the type match message { ConsensusObserverDirectSend::OrderedBlock(ordered_block) => { - debug!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Received ordered block: {}, from peer: {}!", - ordered_block.proof_block_info(), - peer_network_id - )) + // Log the received ordered block message + let log_message = format!( + "Received ordered block: {}, from peer: {}!", + ordered_block.proof_block_info(), + peer_network_id ); + log_received_message(log_message); + + // Process the ordered block message self.process_ordered_block_message(ordered_block).await; }, ConsensusObserverDirectSend::CommitDecision(commit_decision) => { - debug!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Received commit decision: {}, from peer: {}!", - commit_decision.proof_block_info(), - peer_network_id - )) + // Log the received commit decision message + let log_message = format!( + "Received commit decision: {}, from peer: {}!", + commit_decision.proof_block_info(), + peer_network_id ); + log_received_message(log_message); + + // Process the commit decision message self.process_commit_decision_message(commit_decision); }, ConsensusObserverDirectSend::BlockPayload(block_payload) => { - debug!( - LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Received block payload: {}, from peer: {}!", - block_payload.block, peer_network_id - )) + // Log the received block payload message + let log_message = format!( + "Received block payload: {}, from peer: {}!", + block_payload.block, peer_network_id ); + log_received_message(log_message); + + // Process the block payload message self.process_block_payload_message(block_payload).await; }, } + + // Update the metrics for the processed blocks + self.update_processed_blocks_metrics(); } /// Processes the ordered block @@ -657,12 +694,9 @@ impl ConsensusObserver { return; }; - // If all the payloads exist, process the ordered block. - // Otherwise, store the block in the missing block store. - if self - .block_payload_store - .all_payloads_exist(ordered_block.blocks()) - { + // If all payloads exist, process the block. Otherwise, store it + // in the missing block store and wait for the payloads to arrive. + if self.all_payloads_exist(ordered_block.blocks()) { self.process_ordered_block(ordered_block).await; } else { self.missing_block_store.insert_missing_block(ordered_block); @@ -884,6 +918,18 @@ impl ConsensusObserver { }); } + /// Updates the metrics for the processed blocks + fn update_processed_blocks_metrics(&self) { + // Update the missing block metrics + self.missing_block_store.update_missing_blocks_metrics(); + + // Update the payload store metrics + self.block_payload_store.update_payload_store_metrics(); + + // Update the pending block metrics + self.pending_ordered_blocks.update_pending_blocks_metrics(); + } + /// Updates the subscription creation metrics for the given peer fn update_subscription_creation_metrics(&self, peer_network_id: PeerNetworkId) { // Set the number of active subscriptions @@ -935,17 +981,18 @@ impl ConsensusObserver { panic!("Reconfig events are required to wait for a new epoch to start! Something has gone wrong!") }; - // Update the local epoch state + // Update the local epoch state and quorum store config self.epoch_state = Some(epoch_state.clone()); + self.quorum_store_enabled = consensus_config.quorum_store_enabled(); info!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "New epoch started: {}. Updated the epoch state!", - epoch_state.epoch + "New epoch started: {:?}. Updated the epoch state! Quorum store enabled: {:?}", + epoch_state.epoch, self.quorum_store_enabled, )) ); // Create the payload manager - let payload_manager = if consensus_config.quorum_store_enabled() { + let payload_manager = if self.quorum_store_enabled { PayloadManager::ConsensusObserver( self.block_payload_store.get_block_payloads(), self.consensus_publisher.clone(), @@ -1175,6 +1222,17 @@ async fn extract_on_chain_configs( ) } +/// Logs the received message using an appropriate log level +fn log_received_message(message: String) { + // Log the message at the appropriate level + let log_schema = LogSchema::new(LogEntry::ConsensusObserver).message(&message); + if LOG_MESSAGES_AT_INFO_LEVEL { + info!(log_schema); + } else { + debug!(log_schema); + } +} + /// Spawns a task to sync to the given commit decision and notifies /// the consensus observer. Also, returns an abort handle to cancel the task. fn sync_to_commit_decision( diff --git a/consensus/src/consensus_observer/payload_store.rs b/consensus/src/consensus_observer/payload_store.rs index 30da7bc0ad0aa1..1fa8bcda6bc9c0 100644 --- a/consensus/src/consensus_observer/payload_store.rs +++ b/consensus/src/consensus_observer/payload_store.rs @@ -1,7 +1,10 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::consensus_observer::logging::{LogEntry, LogSchema}; +use crate::consensus_observer::{ + logging::{LogEntry, LogSchema}, + metrics, +}; use aptos_consensus_types::pipelined_block::PipelinedBlock; use aptos_crypto::HashValue; use aptos_infallible::Mutex; @@ -105,6 +108,18 @@ impl BlockPayloadStore { block_transaction_payloads.remove(&block.id()); } } + + /// Updates the metrics for the payload store + pub fn update_payload_store_metrics(&self) { + // Update the number of block transaction payloads + let block_transaction_payloads = self.block_transaction_payloads.lock(); + let num_payloads = block_transaction_payloads.len() as u64; + metrics::set_gauge_with_label( + &metrics::OBSERVER_NUM_PROCESSED_BLOCKS, + metrics::STORED_PAYLOADS_LABEL, + num_payloads, + ); + } } impl Default for BlockPayloadStore { diff --git a/consensus/src/consensus_observer/pending_blocks.rs b/consensus/src/consensus_observer/pending_blocks.rs index 7ed959a4a8d67c..21101f8bd7c402 100644 --- a/consensus/src/consensus_observer/pending_blocks.rs +++ b/consensus/src/consensus_observer/pending_blocks.rs @@ -3,6 +3,7 @@ use crate::consensus_observer::{ logging::{LogEntry, LogSchema}, + metrics, network_message::{CommitDecision, OrderedBlock}, }; use aptos_config::config::ConsensusObserverConfig; @@ -152,6 +153,31 @@ impl PendingOrderedBlocks { } } + /// Updates the metrics for the pending blocks + pub fn update_pending_blocks_metrics(&self) { + // Update the number of pending blocks + let num_pending_blocks = self.pending_blocks.lock().len() as u64; + metrics::set_gauge_with_label( + &metrics::OBSERVER_NUM_PROCESSED_BLOCKS, + metrics::PENDING_ORDERED_BLOCKS_LABEL, + num_pending_blocks, + ); + + // Update the highest round for the pending blocks + let highest_pending_round = self + .pending_blocks + .lock() + .last_key_value() + .map(|(_, (ordered_block, _, _))| ordered_block.last_block()) + .and_then(|last_block| last_block.map(|block| block.round())) + .unwrap_or(0); + metrics::set_gauge_with_label( + &metrics::OBSERVER_PROCESSED_BLOCK_ROUNDS, + metrics::PENDING_ORDERED_BLOCKS_LABEL, + highest_pending_round, + ); + } + /// Verifies the pending blocks against the given epoch state. /// If verification is successful, blocks are marked as verified. pub fn verify_pending_blocks(&self, epoch_state: &EpochState) {