From 0b20942ca504d70901ab844427987bdd63287c46 Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Wed, 20 Mar 2024 12:02:53 +0000 Subject: [PATCH] [Consensus 2.0] re-broadcast last proposed block after recover (#16751) ## Description To ensure liveness we should attempt to re-broadcast the last proposed block after recovery if no new one created. This is a fix for the case but a more proper solution should be provided as discussed in the past to ensure re-broadcast of last proposed block under no new round advancement ## Test Plan CI --- If your changes are not user-facing and do not break anything, you can skip the following section. Otherwise, please briefly describe what has changed under the Release Notes section. ### Type of Change (Check all that apply) - [ ] protocol change - [ ] user-visible impact - [ ] breaking change for a client SDKs - [ ] breaking change for FNs (FN binary must upgrade) - [ ] breaking change for validators or node operators (must upgrade binaries) - [ ] breaking change for on-chain data layout - [ ] necessitate either a data wipe or data migration ### Release notes --- consensus/core/src/authority_node.rs | 2 +- consensus/core/src/broadcaster.rs | 2 +- consensus/core/src/core.rs | 47 +++++++++++++++++----------- consensus/core/src/core_thread.rs | 2 +- consensus/core/src/leader_timeout.rs | 4 +-- 5 files changed, 34 insertions(+), 23 deletions(-) diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index dcbe93bb31980..5035afebbe44c 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -124,7 +124,7 @@ where let (tx_client, tx_receiver) = TransactionClient::new(context.clone()); let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone(), None); - let (core_signals, signals_receivers) = CoreSignals::new(); + let (core_signals, signals_receivers) = CoreSignals::new(context.clone()); let mut network_manager = N::new(context.clone()); let network_client = network_manager.client(); diff --git a/consensus/core/src/broadcaster.rs b/consensus/core/src/broadcaster.rs index dba4f563f431f..866ce26a20d18 100644 --- a/consensus/core/src/broadcaster.rs +++ b/consensus/core/src/broadcaster.rs @@ -207,7 +207,7 @@ mod test { let (context, _keys) = Context::new_for_test(4); let context = Arc::new(context); let network_client = Arc::new(FakeNetworkClient::new()); - let (core_signals, signals_receiver) = CoreSignals::new(); + let (core_signals, signals_receiver) = CoreSignals::new(context.clone()); let _broadcaster = Broadcaster::new(context.clone(), network_client.clone(), &signals_receiver); diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index b8e2f33b6811e..c8bdfda1720ee 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -11,14 +11,14 @@ use consensus_config::ProtocolKeyPair; use mysten_metrics::monitored_scope; use parking_lot::RwLock; use tokio::sync::{broadcast, watch}; -use tracing::warn; +use tracing::{debug, info, warn}; use crate::stake_aggregator::{QuorumThreshold, StakeAggregator}; use crate::transaction::TransactionGuard; use crate::{ block::{ timestamp_utc_ms, Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, Round, SignedBlock, - Slot, VerifiedBlock, + Slot, VerifiedBlock, GENESIS_ROUND, }, block_manager::BlockManager, commit_observer::CommitObserver, @@ -131,7 +131,14 @@ impl Core { self.add_accepted_blocks(last_quorum); // Try to commit and propose, since they may not have run after the last storage write. self.try_commit().unwrap(); - self.try_propose(true).unwrap(); + if self.try_propose(true).unwrap().is_none() { + assert!(self.last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher that genesis should have been produced during recovery"); + + // if no new block proposed then just re-broadcast the last proposed one to ensure liveness. + self.signals + .new_block(self.last_proposed_block.clone()) + .unwrap(); + } self } @@ -197,11 +204,7 @@ impl Core { // the minimum round delay has passed. fn try_propose(&mut self, force: bool) -> ConsensusResult> { if let Some(block) = self.try_new_block(force) { - // When there is only one authority in committee, it is unnecessary to broadcast - // the block which will fail anyway without subscribers to the signal. - if self.context.committee.size() > 1 { - self.signals.new_block(block.clone())?; - } + self.signals.new_block(block.clone())?; // The new block may help commit. self.try_commit()?; return Ok(Some(block)); @@ -296,7 +299,7 @@ impl Core { .into_iter() .for_each(TransactionGuard::acknowledge); - tracing::info!("Created block {}", verified_block); + info!("Created block {}", verified_block); Some(verified_block) } @@ -454,13 +457,14 @@ impl Core { pub(crate) struct CoreSignals { tx_block_broadcast: broadcast::Sender, new_round_sender: watch::Sender, + context: Arc, } impl CoreSignals { // TODO: move to Parameters. const BROADCAST_BACKLOG_CAPACITY: usize = 1000; - pub fn new() -> (Self, CoreSignalsReceivers) { + pub fn new(context: Arc) -> (Self, CoreSignalsReceivers) { let (tx_block_broadcast, _rx_block_broadcast) = broadcast::channel::(Self::BROADCAST_BACKLOG_CAPACITY); let (new_round_sender, new_round_receiver) = watch::channel(0); @@ -468,6 +472,7 @@ impl CoreSignals { let me = Self { tx_block_broadcast: tx_block_broadcast.clone(), new_round_sender, + context, }; let receivers = CoreSignalsReceivers { @@ -481,9 +486,15 @@ impl CoreSignals { /// Sends a signal to all the waiters that a new block has been produced. The method will return /// true if block has reached even one subscriber, false otherwise. pub fn new_block(&self, block: VerifiedBlock) -> ConsensusResult<()> { - if let Err(err) = self.tx_block_broadcast.send(block) { - warn!("Couldn't broadcast the block to any receiver: {err}"); - return Err(ConsensusError::Shutdown); + // When there is only one authority in committee, it is unnecessary to broadcast + // the block which will fail anyway without subscribers to the signal. + if self.context.committee.size() > 1 { + if let Err(err) = self.tx_block_broadcast.send(block) { + warn!("Couldn't broadcast the block to any receiver: {err}"); + return Err(ConsensusError::Shutdown); + } + } else { + debug!("Did not broadcast block {block:?} to receivers as committee size is <= 1"); } Ok(()) } @@ -588,7 +599,7 @@ mod test { assert_eq!(dag_state.read().last_commit_index(), 0); // Now spin up core - let (signals, signal_receivers) = CoreSignals::new(); + let (signals, signal_receivers) = CoreSignals::new(context.clone()); // Need at least one subscriber to the block broadcast channel. let mut block_receiver = signal_receivers.block_broadcast_receiver(); let mut core = Core::new( @@ -698,7 +709,7 @@ mod test { assert_eq!(dag_state.read().last_commit_index(), 0); // Now spin up core - let (signals, signal_receivers) = CoreSignals::new(); + let (signals, signal_receivers) = CoreSignals::new(context.clone()); // Need at least one subscriber to the block broadcast channel. let mut block_receiver = signal_receivers.block_broadcast_receiver(); let mut core = Core::new( @@ -769,7 +780,7 @@ mod test { ); let (transaction_client, tx_receiver) = TransactionClient::new(context.clone()); let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone(), None); - let (signals, signal_receivers) = CoreSignals::new(); + let (signals, signal_receivers) = CoreSignals::new(context.clone()); // Need at least one subscriber to the block broadcast channel. let mut block_receiver = signal_receivers.block_broadcast_receiver(); @@ -870,7 +881,7 @@ mod test { ); let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone()); let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone(), None); - let (signals, signal_receivers) = CoreSignals::new(); + let (signals, signal_receivers) = CoreSignals::new(context.clone()); // Need at least one subscriber to the block broadcast channel. let _block_receiver = signal_receivers.block_broadcast_receiver(); @@ -1175,7 +1186,7 @@ mod test { ); let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone()); let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone(), None); - let (signals, signal_receivers) = CoreSignals::new(); + let (signals, signal_receivers) = CoreSignals::new(context.clone()); // Need at least one subscriber to the block broadcast channel. let block_receiver = signal_receivers.block_broadcast_receiver(); diff --git a/consensus/core/src/core_thread.rs b/consensus/core/src/core_thread.rs index 8257760888316..1fc4dbac91f1f 100644 --- a/consensus/core/src/core_thread.rs +++ b/consensus/core/src/core_thread.rs @@ -205,7 +205,7 @@ mod test { ); let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone()); let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone(), None); - let (signals, signal_receivers) = CoreSignals::new(); + let (signals, signal_receivers) = CoreSignals::new(context.clone()); let _block_receiver = signal_receivers.block_broadcast_receiver(); let (sender, _receiver) = unbounded_channel(); let commit_observer = CommitObserver::new( diff --git a/consensus/core/src/leader_timeout.rs b/consensus/core/src/leader_timeout.rs index 292035b4bd21f..e4ac5c0a49f48 100644 --- a/consensus/core/src/leader_timeout.rs +++ b/consensus/core/src/leader_timeout.rs @@ -157,7 +157,7 @@ mod tests { let context = Arc::new(context.with_parameters(parameters)); let start = Instant::now(); - let (mut signals, signal_receivers) = CoreSignals::new(); + let (mut signals, signal_receivers) = CoreSignals::new(context.clone()); // spawn the task let _handle = LeaderTimeoutTask::start(dispatcher.clone(), &signal_receivers, context); @@ -199,7 +199,7 @@ mod tests { let context = Arc::new(context.with_parameters(parameters)); let now = Instant::now(); - let (mut signals, signal_receivers) = CoreSignals::new(); + let (mut signals, signal_receivers) = CoreSignals::new(context.clone()); // spawn the task let _handle = LeaderTimeoutTask::start(dispatcher.clone(), &signal_receivers, context);