Skip to content

Commit

Permalink
[Consensus 2.0] re-broadcast last proposed block after recover (Myste…
Browse files Browse the repository at this point in the history
…nLabs#16751)

## Description 

To ensure liveness we should attempt to re-broadcast the last proposed
block after recovery if no new one created. This is a fix for the case
but a more proper solution should be provided as discussed in the past
to ensure re-broadcast of last proposed block under no new round
advancement

## Test Plan 

CI

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
akichidis authored Mar 20, 2024
1 parent e728868 commit 0b20942
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 23 deletions.
2 changes: 1 addition & 1 deletion consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ where
let (tx_client, tx_receiver) = TransactionClient::new(context.clone());
let tx_consumer = TransactionConsumer::new(tx_receiver, context.clone(), None);

let (core_signals, signals_receivers) = CoreSignals::new();
let (core_signals, signals_receivers) = CoreSignals::new(context.clone());

let mut network_manager = N::new(context.clone());
let network_client = network_manager.client();
Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod test {
let (context, _keys) = Context::new_for_test(4);
let context = Arc::new(context);
let network_client = Arc::new(FakeNetworkClient::new());
let (core_signals, signals_receiver) = CoreSignals::new();
let (core_signals, signals_receiver) = CoreSignals::new(context.clone());
let _broadcaster =
Broadcaster::new(context.clone(), network_client.clone(), &signals_receiver);

Expand Down
47 changes: 29 additions & 18 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use consensus_config::ProtocolKeyPair;
use mysten_metrics::monitored_scope;
use parking_lot::RwLock;
use tokio::sync::{broadcast, watch};
use tracing::warn;
use tracing::{debug, info, warn};

use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
use crate::transaction::TransactionGuard;
use crate::{
block::{
timestamp_utc_ms, Block, BlockAPI, BlockRef, BlockTimestampMs, BlockV1, Round, SignedBlock,
Slot, VerifiedBlock,
Slot, VerifiedBlock, GENESIS_ROUND,
},
block_manager::BlockManager,
commit_observer::CommitObserver,
Expand Down Expand Up @@ -131,7 +131,14 @@ impl Core {
self.add_accepted_blocks(last_quorum);
// Try to commit and propose, since they may not have run after the last storage write.
self.try_commit().unwrap();
self.try_propose(true).unwrap();
if self.try_propose(true).unwrap().is_none() {
assert!(self.last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher that genesis should have been produced during recovery");

// if no new block proposed then just re-broadcast the last proposed one to ensure liveness.
self.signals
.new_block(self.last_proposed_block.clone())
.unwrap();
}

self
}
Expand Down Expand Up @@ -197,11 +204,7 @@ impl Core {
// the minimum round delay has passed.
fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
if let Some(block) = self.try_new_block(force) {
// When there is only one authority in committee, it is unnecessary to broadcast
// the block which will fail anyway without subscribers to the signal.
if self.context.committee.size() > 1 {
self.signals.new_block(block.clone())?;
}
self.signals.new_block(block.clone())?;
// The new block may help commit.
self.try_commit()?;
return Ok(Some(block));
Expand Down Expand Up @@ -296,7 +299,7 @@ impl Core {
.into_iter()
.for_each(TransactionGuard::acknowledge);

tracing::info!("Created block {}", verified_block);
info!("Created block {}", verified_block);

Some(verified_block)
}
Expand Down Expand Up @@ -454,20 +457,22 @@ impl Core {
pub(crate) struct CoreSignals {
tx_block_broadcast: broadcast::Sender<VerifiedBlock>,
new_round_sender: watch::Sender<Round>,
context: Arc<Context>,
}

impl CoreSignals {
// TODO: move to Parameters.
const BROADCAST_BACKLOG_CAPACITY: usize = 1000;

pub fn new() -> (Self, CoreSignalsReceivers) {
pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
let (tx_block_broadcast, _rx_block_broadcast) =
broadcast::channel::<VerifiedBlock>(Self::BROADCAST_BACKLOG_CAPACITY);
let (new_round_sender, new_round_receiver) = watch::channel(0);

let me = Self {
tx_block_broadcast: tx_block_broadcast.clone(),
new_round_sender,
context,
};

let receivers = CoreSignalsReceivers {
Expand All @@ -481,9 +486,15 @@ impl CoreSignals {
/// Sends a signal to all the waiters that a new block has been produced. The method will return
/// true if block has reached even one subscriber, false otherwise.
pub fn new_block(&self, block: VerifiedBlock) -> ConsensusResult<()> {
if let Err(err) = self.tx_block_broadcast.send(block) {
warn!("Couldn't broadcast the block to any receiver: {err}");
return Err(ConsensusError::Shutdown);
// When there is only one authority in committee, it is unnecessary to broadcast
// the block which will fail anyway without subscribers to the signal.
if self.context.committee.size() > 1 {
if let Err(err) = self.tx_block_broadcast.send(block) {
warn!("Couldn't broadcast the block to any receiver: {err}");
return Err(ConsensusError::Shutdown);
}
} else {
debug!("Did not broadcast block {block:?} to receivers as committee size is <= 1");
}
Ok(())
}
Expand Down Expand Up @@ -588,7 +599,7 @@ mod test {
assert_eq!(dag_state.read().last_commit_index(), 0);

// Now spin up core
let (signals, signal_receivers) = CoreSignals::new();
let (signals, signal_receivers) = CoreSignals::new(context.clone());
// Need at least one subscriber to the block broadcast channel.
let mut block_receiver = signal_receivers.block_broadcast_receiver();
let mut core = Core::new(
Expand Down Expand Up @@ -698,7 +709,7 @@ mod test {
assert_eq!(dag_state.read().last_commit_index(), 0);

// Now spin up core
let (signals, signal_receivers) = CoreSignals::new();
let (signals, signal_receivers) = CoreSignals::new(context.clone());
// Need at least one subscriber to the block broadcast channel.
let mut block_receiver = signal_receivers.block_broadcast_receiver();
let mut core = Core::new(
Expand Down Expand Up @@ -769,7 +780,7 @@ mod test {
);
let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone(), None);
let (signals, signal_receivers) = CoreSignals::new();
let (signals, signal_receivers) = CoreSignals::new(context.clone());
// Need at least one subscriber to the block broadcast channel.
let mut block_receiver = signal_receivers.block_broadcast_receiver();

Expand Down Expand Up @@ -870,7 +881,7 @@ mod test {
);
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone(), None);
let (signals, signal_receivers) = CoreSignals::new();
let (signals, signal_receivers) = CoreSignals::new(context.clone());
// Need at least one subscriber to the block broadcast channel.
let _block_receiver = signal_receivers.block_broadcast_receiver();

Expand Down Expand Up @@ -1175,7 +1186,7 @@ mod test {
);
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone(), None);
let (signals, signal_receivers) = CoreSignals::new();
let (signals, signal_receivers) = CoreSignals::new(context.clone());
// Need at least one subscriber to the block broadcast channel.
let block_receiver = signal_receivers.block_broadcast_receiver();

Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/core_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ mod test {
);
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone(), None);
let (signals, signal_receivers) = CoreSignals::new();
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let _block_receiver = signal_receivers.block_broadcast_receiver();
let (sender, _receiver) = unbounded_channel();
let commit_observer = CommitObserver::new(
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/leader_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod tests {
let context = Arc::new(context.with_parameters(parameters));
let start = Instant::now();

let (mut signals, signal_receivers) = CoreSignals::new();
let (mut signals, signal_receivers) = CoreSignals::new(context.clone());

// spawn the task
let _handle = LeaderTimeoutTask::start(dispatcher.clone(), &signal_receivers, context);
Expand Down Expand Up @@ -199,7 +199,7 @@ mod tests {
let context = Arc::new(context.with_parameters(parameters));
let now = Instant::now();

let (mut signals, signal_receivers) = CoreSignals::new();
let (mut signals, signal_receivers) = CoreSignals::new(context.clone());

// spawn the task
let _handle = LeaderTimeoutTask::start(dispatcher.clone(), &signal_receivers, context);
Expand Down

0 comments on commit 0b20942

Please sign in to comment.