Skip to content

Commit

Permalink
[Consensus Observer] Add support for disabling QS.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jul 11, 2024
1 parent f679e56 commit 9374b6c
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions consensus/src/consensus_observer/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub struct ConsensusObserver {

// The current epoch state
epoch_state: Option<Arc<EpochState>>,
// Whether quorum store is enabled (updated on epoch changes)
quorum_store_enabled: bool,
// The latest ledger info (updated via a callback)
root: Arc<Mutex<LedgerInfoWithSignatures>>,

Expand Down Expand Up @@ -127,6 +129,7 @@ impl ConsensusObserver {
consensus_observer_config,
consensus_observer_client,
epoch_state: None,
quorum_store_enabled: false, // Updated on epoch changes
root: Arc::new(Mutex::new(root)),
block_payload_store: BlockPayloadStore::new(),
missing_block_store: MissingBlockStore::new(consensus_observer_config),
Expand All @@ -142,6 +145,17 @@ impl ConsensusObserver {
}
}

/// Returns true iff all payloads exist for the given blocks
fn all_payloads_exist(&self, blocks: &[Arc<PipelinedBlock>]) -> bool {
// If quorum store is disabled, all payloads exist (they're already in the blocks)
if !self.quorum_store_enabled {
return true;
}

// Otherwise, check if all the payloads exist in the payload store
self.block_payload_store.all_payloads_exist(blocks)
}

/// Checks the progress of the consensus observer
async fn check_progress(&mut self) {
debug!(LogSchema::new(LogEntry::ConsensusObserver)
Expand Down Expand Up @@ -551,11 +565,8 @@ impl ConsensusObserver {
.pending_ordered_blocks
.get_verified_pending_block(commit_decision.epoch(), commit_decision.round());
if let Some(pending_block) = pending_block {
// If the payload exists, add the commit decision to the pending blocks
if self
.block_payload_store
.all_payloads_exist(&pending_block.blocks)
{
// If all payloads exist, add the commit decision to the pending blocks
if self.all_payloads_exist(&pending_block.blocks) {
debug!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Adding decision to pending block: {}",
Expand Down Expand Up @@ -695,9 +706,9 @@ impl ConsensusObserver {
return;
};

// If all the payloads exist, process the ordered block.
// Otherwise, store the block in the missing block store.
if self.block_payload_store.all_payloads_exist(&blocks) {
// If all payloads exist, process the block. Otherwise, store it
// in the missing block store and wait for the payloads to arrive.
if self.all_payloads_exist(&blocks) {
self.process_ordered_block(ordered_block).await;
} else {
self.missing_block_store.insert_missing_block(ordered_block);
Expand Down Expand Up @@ -1067,17 +1078,18 @@ impl ConsensusObserver {
panic!("Reconfig events are required to wait for a new epoch to start! Something has gone wrong!")
};

// Update the local epoch state
// Update the local epoch state and quorum store config
self.epoch_state = Some(epoch_state.clone());
self.quorum_store_enabled = consensus_config.quorum_store_enabled();
info!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"New epoch started: {}. Updated the epoch state!",
epoch_state.epoch
"New epoch started: {:?}. Updated the epoch state! Quorum store enabled: {:?}",
epoch_state.epoch, self.quorum_store_enabled,
))
);

// Create the payload manager
let payload_manager = if consensus_config.quorum_store_enabled() {
let payload_manager = if self.quorum_store_enabled {
PayloadManager::ConsensusObserver(
self.block_payload_store.get_block_payloads(),
self.consensus_publisher.clone(),
Expand Down

0 comments on commit 9374b6c

Please sign in to comment.