Skip to content

Commit

Permalink
[CATCHUP] Repro Restart Bug + Fix (#3686)
Browse files Browse the repository at this point in the history
* create restart context, and allow restarting after number of views

* repros the issue

* Fix the issue by saving the timeout vote as an action

* cleanup/lint

* revert some logging remove unused file

* logging

* Rename UpDown enum
  • Loading branch information
bfish713 authored Sep 19, 2024
1 parent 51b8a98 commit 4016292
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 222 deletions.
3 changes: 2 additions & 1 deletion crates/task-impls/src/consensus/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ pub async fn create_and_send_proposal<TYPES: NodeType, V: Versions>(
};

debug!(
"Sending proposal for view {:?}",
"Sending proposal for view {:?} ID: {}",
proposed_leaf.view_number(),
id,
);

async_sleep(Duration::from_millis(round_start_delay)).await;
Expand Down
143 changes: 0 additions & 143 deletions crates/task-impls/src/consensus/view_change.rs

This file was deleted.

17 changes: 10 additions & 7 deletions crates/task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,13 +488,16 @@ impl<
)),
TransmitType::Broadcast,
)),
HotShotEvent::TimeoutVoteSend(vote) => Some((
vote.signing_key(),
MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
GeneralConsensusMessage::TimeoutVote(vote.clone()),
)),
TransmitType::Direct(membership.leader(vote.view_number() + 1)),
)),
HotShotEvent::TimeoutVoteSend(vote) => {
*maybe_action = Some(HotShotAction::Vote);
Some((
vote.signing_key(),
MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
GeneralConsensusMessage::TimeoutVote(vote.clone()),
)),
TransmitType::Direct(membership.leader(vote.view_number() + 1)),
))
}
HotShotEvent::UpgradeProposalSend(proposal, sender) => Some((
sender,
MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
Expand Down
84 changes: 64 additions & 20 deletions crates/testing/src/spinning_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ use std::{
};

use anyhow::Result;
use async_broadcast::broadcast;
use async_lock::RwLock;
use async_trait::async_trait;
use futures::future::join_all;
use hotshot::{traits::TestableNodeImplementation, types::EventType, HotShotInitializer};
use hotshot::{
traits::TestableNodeImplementation, types::EventType, HotShotInitializer, SystemContext,
};
use hotshot_example_types::{
auction_results_provider_types::TestAuctionResultsProvider,
block_types::TestBlockHeader,
Expand All @@ -22,6 +25,7 @@ use hotshot_example_types::{
testable_delay::DelayConfig,
};
use hotshot_types::{
constants::EVENT_CHANNEL_SIZE,
data::Leaf,
event::Event,
simple_certificate::QuorumCertificate,
Expand All @@ -47,7 +51,12 @@ pub type StateAndBlock<S, B> = (Vec<S>, Vec<B>);
pub struct SpinningTaskErr {}

/// Spinning task state
pub struct SpinningTask<TYPES: NodeType, I: TestableNodeImplementation<TYPES>, V: Versions> {
pub struct SpinningTask<
TYPES: NodeType,
N: ConnectedNetwork<TYPES::SignatureKey>,
I: TestableNodeImplementation<TYPES>,
V: Versions,
> {
/// handle to the nodes
pub(crate) handles: Arc<RwLock<Vec<Node<TYPES, I, V>>>>,
/// late start nodes
Expand All @@ -62,6 +71,8 @@ pub struct SpinningTask<TYPES: NodeType, I: TestableNodeImplementation<TYPES>, V
pub(crate) high_qc: QuorumCertificate<TYPES>,
/// Add specified delay to async calls
pub(crate) async_delay_config: DelayConfig,
/// Context stored for nodes to be restarted with
pub(crate) restart_contexts: HashMap<usize, RestartContext<TYPES, N, I, V>>,
}

#[async_trait]
Expand All @@ -74,7 +85,7 @@ impl<
I: TestableNodeImplementation<TYPES>,
N: ConnectedNetwork<TYPES::SignatureKey>,
V: Versions,
> TestTaskState for SpinningTask<TYPES, I, V>
> TestTaskState for SpinningTask<TYPES, N, I, V>
where
I: TestableNodeImplementation<TYPES>,
I: NodeImplementation<
Expand Down Expand Up @@ -117,7 +128,7 @@ where
if let Some(operations) = self.changes.remove(&view_number) {
for ChangeNode { idx, updown } in operations {
match updown {
UpDown::Up => {
NodeAction::Up => {
let node_id = idx.try_into().unwrap();
if let Some(node) = self.late_start.remove(&node_id) {
tracing::error!("Node {} spinning up late", idx);
Expand Down Expand Up @@ -187,13 +198,13 @@ where
self.handles.write().await.push(node);
}
}
UpDown::Down => {
NodeAction::Down => {
if let Some(node) = self.handles.write().await.get_mut(idx) {
tracing::error!("Node {} shutting down", idx);
node.handle.shut_down().await;
}
}
UpDown::Restart => {
NodeAction::RestartDown(delay_views) => {
let node_id = idx.try_into().unwrap();
if let Some(node) = self.handles.write().await.get_mut(idx) {
tracing::error!("Node {} shutting down", idx);
Expand All @@ -217,7 +228,7 @@ where
self.last_decided_leaf.clone(),
TestInstanceState::new(self.async_delay_config.clone()),
None,
view_number,
read_storage.last_actioned_view().await,
read_storage.last_actioned_view().await,
read_storage.proposals_cloned().await,
read_storage.high_qc_cloned().await.unwrap_or(
Expand All @@ -238,6 +249,7 @@ where
// For tests, make the node DA based on its index
node_id < config.da_staked_committee_size as u64,
);
let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
let context =
TestRunner::<TYPES, I, V, N>::add_node_with_config_and_channels(
node_id,
Expand All @@ -248,27 +260,44 @@ where
validator_config,
(*read_storage).clone(),
marketplace_config.clone(),
(
node.handle.internal_channel_sender(),
node.handle.internal_event_stream_receiver_known_impl(),
),
internal_chan,
(
node.handle.external_channel_sender(),
node.handle.event_stream_known_impl(),
node.handle.event_stream_known_impl().new_receiver(),
),
)
.await;
new_nodes.push((context, idx));
new_networks.push(network.clone());
if delay_views == 0 {
new_nodes.push((context, idx));
new_networks.push(network.clone());
} else {
let up_view = view_number + delay_views;
let change = ChangeNode {
idx,
updown: NodeAction::RestartUp,
};
self.changes.entry(up_view).or_default().push(change);
let new_ctx = RestartContext {
context,
network: network.clone(),
};
self.restart_contexts.insert(idx, new_ctx);
}
}
}
UpDown::NetworkUp => {
NodeAction::RestartUp => {
if let Some(ctx) = self.restart_contexts.remove(&idx) {
new_nodes.push((ctx.context, idx));
new_networks.push(ctx.network.clone());
}
}
NodeAction::NetworkUp => {
if let Some(handle) = self.handles.write().await.get(idx) {
tracing::error!("Node {} networks resuming", idx);
handle.network.resume();
}
}
UpDown::NetworkDown => {
NodeAction::NetworkDown => {
if let Some(handle) = self.handles.write().await.get(idx) {
tracing::error!("Node {} networks pausing", idx);
handle.network.pause();
Expand All @@ -286,6 +315,7 @@ where
join_all(ready_futs).await;

while let Some((node, id)) = new_nodes.pop() {
tracing::error!("Starting node {} back up", id);
let handle = node.run_tasks().await;

// Create the node and add it to the state, so we can shut them
Expand All @@ -312,9 +342,20 @@ where
}
}

#[derive(Clone)]
pub(crate) struct RestartContext<
TYPES: NodeType,
N: ConnectedNetwork<TYPES::SignatureKey>,
I: TestableNodeImplementation<TYPES>,
V: Versions,
> {
context: Arc<SystemContext<TYPES, I, V>>,
network: Arc<N>,
}

/// Spin the node up or down
#[derive(Clone, Debug)]
pub enum UpDown {
pub enum NodeAction {
/// spin the node up
Up,
/// spin the node down
Expand All @@ -323,8 +364,11 @@ pub enum UpDown {
NetworkUp,
/// spin the node's network down
NetworkDown,
/// restart the node
Restart,
/// Take a node down to be restarted after a number of views
RestartDown(u64),
/// Start a node up again after it's been shutdown for restart. This
/// should only be created following a `ResartDown`
RestartUp,
}

/// denotes a change in node state
Expand All @@ -333,7 +377,7 @@ pub struct ChangeNode {
/// the index of the node
pub idx: usize,
/// spin the node or node's network up or down
pub updown: UpDown,
pub updown: NodeAction,
}

/// description of the spinning task
Expand Down
Loading

0 comments on commit 4016292

Please sign in to comment.