Skip to content

Commit

Permalink
Make HotShot generic over ViewRunnerTYpe, remove ViewRunner
Browse files Browse the repository at this point in the history
  • Loading branch information
shenkeyao committed Dec 21, 2022
1 parent 81f6079 commit db9ef10
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 379 deletions.
233 changes: 218 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,25 @@ pub mod tasks;

use crate::{
certificate::QuorumCertificate,
tasks::ViewRunnerType,
traits::{NetworkingImplementation, NodeImplementation, Storage},
types::{Event, HotShotHandle},
};
use async_compatibility_layer::{
art::async_spawn,
art::{async_sleep, async_spawn, async_spawn_local},
async_primitives::{broadcast::BroadcastSender, subscribable_rwlock::SubscribableRwLock},
};
use async_compatibility_layer::{
art::async_spawn_local,
channel::{unbounded, UnboundedReceiver, UnboundedSender},
};
use async_lock::{Mutex, RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard};
use async_trait::async_trait;
use bincode::Options;
use commit::{Commitment, Committable};
use hotshot_consensus::{
Consensus, ConsensusApi, ConsensusMetrics, SendToTasks, View, ViewInner, ViewQueue,
Consensus, ConsensusApi, ConsensusMetrics, NextValidatingLeader, Replica, SendToTasks,
ValidatingLeader, View, ViewInner, ViewQueue,
};
use hotshot_types::{
data::LeafType,
data::{LeafType, ValidatingLeaf, ValidatingProposal},
error::StorageSnafu,
message::{ConsensusMessage, DataMessage, Message},
traits::{
Expand All @@ -72,22 +71,27 @@ use hotshot_types::{
network::{NetworkChange, NetworkError},
node_implementation::NodeType,
signature_key::{EncodedPublicKey, EncodedSignature, SignatureKey},
state::ConsensusTime,
state::{
ConsensusTime, ConsensusType, SequencingConsensus, TestableBlock, TestableState,
ValidatingConsensus,
},
storage::StoredView,
State,
},
HotShotConfig,
};
use hotshot_types::{message::MessageKind, traits::election::VoteToken};
use hotshot_utils::bincode::bincode_opts;
#[allow(deprecated)]
use nll::nll_todo::nll_todo;
use snafu::ResultExt;
use std::{
collections::{BTreeMap, HashMap},
marker::PhantomData,
num::{NonZeroU64, NonZeroUsize},
sync::{atomic::Ordering, Arc},
time::Duration,
time::{Duration, Instant},
};
use tasks::ViewRunner;
use tracing::{debug, error, info, instrument, trace, warn};

// -- Rexports
Expand Down Expand Up @@ -135,7 +139,7 @@ pub struct HotShotInner<TYPES: NodeType, I: NodeImplementation<TYPES>> {

/// Thread safe, shared view of a `HotShot`
#[derive(Clone)]
pub struct HotShot<TYPES: NodeType, I: NodeImplementation<TYPES>> {
pub struct HotShot<CONSENSUS: ConsensusType, TYPES: NodeType, I: NodeImplementation<TYPES>> {
/// Handle to internal hotshot implementation
inner: Arc<HotShotInner<TYPES, I>>,

Expand All @@ -161,12 +165,12 @@ pub struct HotShot<TYPES: NodeType, I: NodeImplementation<TYPES>> {

/// uid for instrumentation
id: u64,

/// Phantom data for consensus type
_pd: PhantomData<CONSENSUS>,
}

impl<TYPES: NodeType, I: NodeImplementation<TYPES>> HotShot<TYPES, I>
where
ViewRunner<<TYPES as NodeType>::ConsensusType>: tasks::ViewRunnerType<TYPES, I>,
{
impl<TYPES: NodeType, I: NodeImplementation<TYPES>> HotShot<TYPES::ConsensusType, TYPES, I> {
/// Creates a new hotshot with the given configuration options and sets it up with the given
/// genesis block
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -250,6 +254,7 @@ where
next_leader_channel_map: Arc::new(RwLock::new(SendToTasks::new(start_view))),
send_network_lookup,
recv_network_lookup: Arc::new(Mutex::new(recv_network_lookup)),
_pd: PhantomData,
})
}

Expand Down Expand Up @@ -343,7 +348,10 @@ where
election: I::Election,
initializer: HotShotInitializer<TYPES, I::Leaf>,
metrics: Box<dyn Metrics>,
) -> Result<HotShotHandle<TYPES, I>, HotShotError<TYPES>> {
) -> Result<HotShotHandle<TYPES, I>, HotShotError<TYPES>>
where
HotShot<TYPES::ConsensusType, TYPES, I>: ViewRunnerType<TYPES, I>,
{
// Save a clone of the storage for the handle
let hotshot = Self::new(
public_key,
Expand Down Expand Up @@ -670,6 +678,201 @@ where
}
}

#[allow(clippy::too_many_lines)]
#[async_trait]
impl<
TYPES: NodeType<ConsensusType = ValidatingConsensus>,
ELECTION: Election<TYPES, LeafType = ValidatingLeaf<TYPES>>,
I: NodeImplementation<
TYPES,
Leaf = ValidatingLeaf<TYPES>,
Proposal = ValidatingProposal<TYPES, ELECTION>,
>,
> ViewRunnerType<TYPES, I> for HotShot<ValidatingConsensus, TYPES, I>
where
TYPES::StateType: TestableState,
TYPES::BlockType: TestableBlock,
{
#[instrument(skip(hotshot), fields(id = hotshot.id), name = "Validating View Runner Task", level = "error")]
async fn run_view(hotshot: HotShot<TYPES::ConsensusType, TYPES, I>) -> Result<(), ()> {
let c_api = HotShotConsensusApi {
inner: hotshot.inner.clone(),
};
let start = Instant::now();
let metrics = Arc::clone(&hotshot.hotstuff.read().await.metrics);

// do book keeping on channel map
// TODO probably cleaner to separate this into a function
// e.g. insert the view and remove the last view
let mut send_to_replica = hotshot.replica_channel_map.write().await;
let replica_last_view: TYPES::Time = send_to_replica.cur_view;
// gc previous view's channel map
send_to_replica.channel_map.remove(&replica_last_view);
send_to_replica.cur_view += 1;
let replica_cur_view = send_to_replica.cur_view;
let ViewQueue {
sender_chan: send_replica,
receiver_chan: recv_replica,
has_received_proposal: _,
} = HotShot::<ValidatingConsensus, TYPES, I>::create_or_obtain_chan_from_write(
replica_cur_view,
send_to_replica,
)
.await;

let mut send_to_next_leader = hotshot.next_leader_channel_map.write().await;
let next_leader_last_view = send_to_next_leader.cur_view;
// gc previous view's channel map
send_to_next_leader
.channel_map
.remove(&next_leader_last_view);
send_to_next_leader.cur_view += 1;
let next_leader_cur_view = send_to_next_leader.cur_view;
let (send_next_leader, recv_next_leader) =
if c_api.is_leader(next_leader_cur_view + 1).await {
let vq =
HotShot::<ValidatingConsensus, TYPES, I>::create_or_obtain_chan_from_write(
next_leader_cur_view,
send_to_next_leader,
)
.await;
(Some(vq.sender_chan), Some(vq.receiver_chan))
} else {
(None, None)
};

// increment consensus and start tasks

let (cur_view, high_qc, txns) = {
// OBTAIN write lock on consensus
let mut consensus = hotshot.hotstuff.write().await;
let cur_view = consensus.increment_view();
// make sure consistent
assert_eq!(cur_view, next_leader_cur_view);
assert_eq!(cur_view, replica_cur_view);
let high_qc = consensus.high_qc.clone();
let txns = consensus.transactions.clone();
// DROP write lock on consensus
drop(consensus);
(cur_view, high_qc, txns)
};

// notify networking to start worrying about the (`cur_view + LOOK_AHEAD`)th leader ahead of the current view
if hotshot
.send_network_lookup
.send(Some(cur_view))
.await
.is_err()
{
error!("Failed to initiate network lookup");
};

info!("Starting tasks for View {:?}!", cur_view);
metrics.current_view.set(*cur_view as usize);

let mut task_handles = Vec::new();

// replica always runs? TODO this will change once vrf integration is added
let replica = Replica {
id: hotshot.id,
consensus: hotshot.hotstuff.clone(),
proposal_collection_chan: recv_replica,
cur_view,
high_qc: high_qc.clone(),
api: c_api.clone(),
};
let replica_handle = async_spawn(async move { replica.run_view().await });
task_handles.push(replica_handle);

if c_api.is_leader(cur_view).await {
let leader = ValidatingLeader {
id: hotshot.id,
consensus: hotshot.hotstuff.clone(),
high_qc: high_qc.clone(),
cur_view,
transactions: txns,
api: c_api.clone(),
_pd: PhantomData,
};
let leader_handle = async_spawn(async move { leader.run_view().await });
task_handles.push(leader_handle);
}

if c_api.is_leader(cur_view + 1).await {
let next_leader = NextValidatingLeader {
id: hotshot.id,
generic_qc: high_qc,
// should be fine to unwrap here since the view numbers must be the same
vote_collection_chan: recv_next_leader.unwrap(),
cur_view,
api: c_api.clone(),
metrics,
};
let next_leader_handle = async_spawn(async move {
NextValidatingLeader::<HotShotConsensusApi<TYPES, I>, TYPES, _>::run_view(
next_leader,
)
.await
});
task_handles.push(next_leader_handle);
}

let children_finished = futures::future::join_all(task_handles);

async_spawn({
let next_view_timeout = hotshot.inner.config.next_view_timeout;
let next_view_timeout = next_view_timeout;
let hotshot: HotShot<TYPES::ConsensusType, TYPES, I> = hotshot.clone();
async move {
async_sleep(Duration::from_millis(next_view_timeout)).await;
hotshot
.timeout_view(cur_view, send_replica, send_next_leader)
.await;
}
});

let results = children_finished.await;

// unwrap is fine since results must have >= 1 item(s)
#[cfg(feature = "async-std-executor")]
let high_qc = results
.into_iter()
.max_by_key(|qc: &QuorumCertificate<TYPES, I::Leaf>| qc.view_number)
.unwrap();
#[cfg(feature = "tokio-executor")]
let high_qc = results
.into_iter()
.filter_map(std::result::Result::ok)
.max_by_key(|qc| qc.view_number)
.unwrap();

#[cfg(not(any(feature = "async-std-executor", feature = "tokio-executor")))]
compile_error! {"Either feature \"async-std-executor\" or feature \"tokio-executor\" must be enabled for this crate."}

let mut consensus = hotshot.hotstuff.write().await;
consensus.high_qc = high_qc;
consensus
.metrics
.view_duration
.add_point(start.elapsed().as_secs_f64());
c_api.send_view_finished(consensus.cur_view).await;

info!("Returning from view {:?}!", cur_view);
Ok(())
}
}

#[async_trait]
impl<TYPES: NodeType<ConsensusType = SequencingConsensus>, I: NodeImplementation<TYPES>>
ViewRunnerType<TYPES, I> for HotShot<SequencingConsensus, TYPES, I>
{
// #[instrument]
async fn run_view(_hotshot: HotShot<TYPES::ConsensusType, TYPES, I>) -> Result<(), ()> {
#[allow(deprecated)]
nll_todo()
}
}

/// A handle that is passed to [`hotshot_hotstuff`] with to expose the interface that hotstuff needs to interact with [`HotShot`]
#[derive(Clone)]
struct HotShotConsensusApi<TYPES: NodeType, I: NodeImplementation<TYPES>> {
Expand Down
Loading

0 comments on commit db9ef10

Please sign in to comment.