From 9374b6c2bc4a36cc96e6fa4e76398ef32edf7fbc Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Sun, 7 Jul 2024 10:54:02 -0400 Subject: [PATCH] [Consensus Observer] Add support for disabling QS. --- consensus/src/consensus_observer/observer.rs | 36 +++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/consensus/src/consensus_observer/observer.rs b/consensus/src/consensus_observer/observer.rs index 8aeac9d5003d14..b606a8f05f2552 100644 --- a/consensus/src/consensus_observer/observer.rs +++ b/consensus/src/consensus_observer/observer.rs @@ -76,6 +76,8 @@ pub struct ConsensusObserver { // The current epoch state epoch_state: Option>, + // Whether quorum store is enabled (updated on epoch changes) + quorum_store_enabled: bool, // The latest ledger info (updated via a callback) root: Arc>, @@ -127,6 +129,7 @@ impl ConsensusObserver { consensus_observer_config, consensus_observer_client, epoch_state: None, + quorum_store_enabled: false, // Updated on epoch changes root: Arc::new(Mutex::new(root)), block_payload_store: BlockPayloadStore::new(), missing_block_store: MissingBlockStore::new(consensus_observer_config), @@ -142,6 +145,17 @@ impl ConsensusObserver { } } + /// Returns true iff all payloads exist for the given blocks + fn all_payloads_exist(&self, blocks: &[Arc]) -> bool { + // If quorum store is disabled, all payloads exist (they're already in the blocks) + if !self.quorum_store_enabled { + return true; + } + + // Otherwise, check if all the payloads exist in the payload store + self.block_payload_store.all_payloads_exist(blocks) + } + /// Checks the progress of the consensus observer async fn check_progress(&mut self) { debug!(LogSchema::new(LogEntry::ConsensusObserver) @@ -551,11 +565,8 @@ impl ConsensusObserver { .pending_ordered_blocks .get_verified_pending_block(commit_decision.epoch(), commit_decision.round()); if let Some(pending_block) = pending_block { - // If the payload exists, add the commit decision to the pending blocks - if self - .block_payload_store - .all_payloads_exist(&pending_block.blocks) - { + // If all payloads exist, add the commit decision to the pending blocks + if self.all_payloads_exist(&pending_block.blocks) { debug!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( "Adding decision to pending block: {}", @@ -695,9 +706,9 @@ impl ConsensusObserver { return; }; - // If all the payloads exist, process the ordered block. - // Otherwise, store the block in the missing block store. - if self.block_payload_store.all_payloads_exist(&blocks) { + // If all payloads exist, process the block. Otherwise, store it + // in the missing block store and wait for the payloads to arrive. + if self.all_payloads_exist(&blocks) { self.process_ordered_block(ordered_block).await; } else { self.missing_block_store.insert_missing_block(ordered_block); @@ -1067,17 +1078,18 @@ impl ConsensusObserver { panic!("Reconfig events are required to wait for a new epoch to start! Something has gone wrong!") }; - // Update the local epoch state + // Update the local epoch state and quorum store config self.epoch_state = Some(epoch_state.clone()); + self.quorum_store_enabled = consensus_config.quorum_store_enabled(); info!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "New epoch started: {}. Updated the epoch state!", - epoch_state.epoch + "New epoch started: {:?}. Updated the epoch state! Quorum store enabled: {:?}", + epoch_state.epoch, self.quorum_store_enabled, )) ); // Create the payload manager - let payload_manager = if consensus_config.quorum_store_enabled() { + let payload_manager = if self.quorum_store_enabled { PayloadManager::ConsensusObserver( self.block_payload_store.get_block_payloads(), self.consensus_publisher.clone(),