Skip to content

Commit

Permalink
using UnitFinalizationHandler and its constraints (Data, Hasher) inst…
Browse files Browse the repository at this point in the history
…ead of seperatly providing Data and Hasher
  • Loading branch information
fixxxedpoint committed May 6, 2024
1 parent 06e9f67 commit b3da33f
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 82 deletions.
15 changes: 3 additions & 12 deletions consensus/src/extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,12 @@ use extender::Extender;
///
/// We refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/internals.html
/// Section 5.4 for a discussion of this component.
pub struct Ordering<
MK: MultiKeychain,
UFH: UnitFinalizationHandler,
> {
pub struct Ordering<MK: MultiKeychain, UFH: UnitFinalizationHandler> {
extender: Extender<DagUnit<UFH::Hasher, UFH::Data, MK>>,
finalization_handler: UFH,
}

impl<MK: MultiKeychain, UFH: UnitFinalizationHandler>
Ordering<MK, UFH>
{
impl<MK: MultiKeychain, UFH: UnitFinalizationHandler> Ordering<MK, UFH> {
pub fn new(finalization_handler: UFH) -> Self {
let extender = Extender::new();
Ordering {
Expand All @@ -35,13 +30,9 @@ impl<MK: MultiKeychain, UFH: UnitFinalizationHandler>
}
}

fn handle_batch(&mut self, batch: Vec<DagUnit<UFH::Hasher, UFH::Data, MK>>) {
self.finalization_handler.batch_finalized(batch);
}

pub fn add_unit(&mut self, unit: DagUnit<UFH::Hasher, UFH::Data, MK>) {
for batch in self.extender.add_unit(unit) {
self.handle_batch(batch);
self.finalization_handler.batch_finalized(batch);
}
}
}
3 changes: 1 addition & 2 deletions consensus/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,8 +635,7 @@ pub async fn run_session<
UFH: UnitFinalizationHandler<Data = DP::Output, Hasher = H>,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
N: Network<NetworkData<H, DP::Output, MK::Signature, MK::PartialMultisignature>>
+ 'static,
N: Network<NetworkData<H, DP::Output, MK::Signature, MK::PartialMultisignature>>,
SH: SpawnHandle,
MK: MultiKeychain,
>(
Expand Down
139 changes: 73 additions & 66 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,29 +99,27 @@ type CollectionResponse<H, D, MK> = UncheckedSigned<
<MK as Keychain>::Signature,
>;

struct Runway<H, D, UFH, MK>
struct Runway<FH, MK>
where
H: Hasher,
D: Data,
UFH: UnitFinalizationHandler<Data = D, Hasher = H>,
FH: UnitFinalizationHandler,
MK: MultiKeychain,
{
missing_coords: HashSet<UnitCoord>,
missing_parents: HashSet<H::Hash>,
store: UnitStore<DagUnit<H, D, MK>>,
missing_parents: HashSet<<FH::Hasher as Hasher>::Hash>,
store: UnitStore<DagUnit<FH::Hasher, FH::Data, MK>>,
keychain: MK,
dag: Dag<H, D, MK>,
ordering: Ordering<MK, UFH>,
alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
notifications_from_alerter: Receiver<ForkingNotification<H, D, MK::Signature>>,
unit_messages_from_network: Receiver<RunwayNotificationIn<H, D, MK::Signature>>,
unit_messages_for_network: Sender<RunwayNotificationOut<H, D, MK::Signature>>,
responses_for_collection: Sender<CollectionResponse<H, D, MK>>,
resolved_requests: Sender<Request<H>>,
parents_for_creator: Sender<DagUnit<H, D, MK>>,
backup_units_for_saver: Sender<DagUnit<H, D, MK>>,
backup_units_from_saver: Receiver<DagUnit<H, D, MK>>,
new_units_from_creation: Receiver<SignedUnit<H, D, MK>>,
dag: Dag<FH::Hasher, FH::Data, MK>,
ordering: Ordering<MK, FH>,
alerts_for_alerter: Sender<Alert<FH::Hasher, FH::Data, MK::Signature>>,
notifications_from_alerter: Receiver<ForkingNotification<FH::Hasher, FH::Data, MK::Signature>>,
unit_messages_from_network: Receiver<RunwayNotificationIn<FH::Hasher, FH::Data, MK::Signature>>,
unit_messages_for_network: Sender<RunwayNotificationOut<FH::Hasher, FH::Data, MK::Signature>>,
responses_for_collection: Sender<CollectionResponse<FH::Hasher, FH::Data, MK>>,
resolved_requests: Sender<Request<FH::Hasher>>,
parents_for_creator: Sender<DagUnit<FH::Hasher, FH::Data, MK>>,
backup_units_for_saver: Sender<DagUnit<FH::Hasher, FH::Data, MK>>,
backup_units_from_saver: Receiver<DagUnit<FH::Hasher, FH::Data, MK>>,
new_units_from_creation: Receiver<SignedUnit<FH::Hasher, FH::Data, MK>>,
exiting: bool,
}

Expand Down Expand Up @@ -205,33 +203,36 @@ impl<'a, H: Hasher> Display for RunwayStatus<'a, H> {
}
}

struct RunwayConfig<
H: Hasher,
D: Data,
UFH: UnitFinalizationHandler<Data = D, Hasher = H>,
MK: MultiKeychain,
> {
struct RunwayConfig<UFH: UnitFinalizationHandler, MK: MultiKeychain> {
finalization_handler: UFH,
backup_units_for_saver: Sender<DagUnit<H, D, MK>>,
backup_units_from_saver: Receiver<DagUnit<H, D, MK>>,
alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
notifications_from_alerter: Receiver<ForkingNotification<H, D, MK::Signature>>,
unit_messages_from_network: Receiver<RunwayNotificationIn<H, D, MK::Signature>>,
unit_messages_for_network: Sender<RunwayNotificationOut<H, D, MK::Signature>>,
responses_for_collection: Sender<CollectionResponse<H, D, MK>>,
parents_for_creator: Sender<DagUnit<H, D, MK>>,
resolved_requests: Sender<Request<H>>,
new_units_from_creation: Receiver<SignedUnit<H, D, MK>>,
backup_units_for_saver: Sender<DagUnit<UFH::Hasher, UFH::Data, MK>>,
backup_units_from_saver: Receiver<DagUnit<UFH::Hasher, UFH::Data, MK>>,
alerts_for_alerter: Sender<Alert<UFH::Hasher, UFH::Data, MK::Signature>>,
notifications_from_alerter:
Receiver<ForkingNotification<UFH::Hasher, UFH::Data, MK::Signature>>,
unit_messages_from_network:
Receiver<RunwayNotificationIn<UFH::Hasher, UFH::Data, MK::Signature>>,
unit_messages_for_network: Sender<RunwayNotificationOut<UFH::Hasher, UFH::Data, MK::Signature>>,
responses_for_collection: Sender<CollectionResponse<UFH::Hasher, UFH::Data, MK>>,
parents_for_creator: Sender<DagUnit<UFH::Hasher, UFH::Data, MK>>,
resolved_requests: Sender<Request<UFH::Hasher>>,
new_units_from_creation: Receiver<SignedUnit<UFH::Hasher, UFH::Data, MK>>,
}

impl<H, D, UFH, MK> Runway<H, D, UFH, MK>
type BackupUnits<UFH, MK> = Vec<
UncheckedSignedUnit<
<UFH as UnitFinalizationHandler>::Hasher,
<UFH as UnitFinalizationHandler>::Data,
<MK as Keychain>::Signature,
>,
>;

impl<UFH, MK> Runway<UFH, MK>
where
H: Hasher,
D: Data,
UFH: UnitFinalizationHandler<Data = D, Hasher = H>,
UFH: UnitFinalizationHandler,
MK: MultiKeychain,
{
fn new(config: RunwayConfig<H, D, UFH, MK>, keychain: MK, validator: Validator<MK>) -> Self {
fn new(config: RunwayConfig<UFH, MK>, keychain: MK, validator: Validator<MK>) -> Self {
let n_members = keychain.node_count();
let RunwayConfig {
finalization_handler,
Expand Down Expand Up @@ -275,7 +276,7 @@ where
self.keychain.index()
}

fn handle_dag_result(&mut self, result: DagResult<H, D, MK>) {
fn handle_dag_result(&mut self, result: DagResult<UFH::Hasher, UFH::Data, MK>) {
let DagResult {
units,
requests,
Expand All @@ -295,12 +296,18 @@ where
}
}

fn on_unit_received(&mut self, unit: UncheckedSignedUnit<H, D, MK::Signature>) {
fn on_unit_received(
&mut self,
unit: UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>,
) {
let result = self.dag.add_unit(unit, &self.store);
self.handle_dag_result(result);
}

fn on_unit_message(&mut self, message: RunwayNotificationIn<H, D, MK::Signature>) {
fn on_unit_message(
&mut self,
message: RunwayNotificationIn<UFH::Hasher, UFH::Data, MK::Signature>,
) {
match message {
RunwayNotificationIn::NewUnit(u) => {
trace!(target: "AlephBFT-runway", "{:?} New unit received {:?}.", self.index(), &u);
Expand Down Expand Up @@ -364,7 +371,7 @@ where
}
}

fn on_request_parents(&mut self, node_id: NodeIndex, u_hash: H::Hash) {
fn on_request_parents(&mut self, node_id: NodeIndex, u_hash: <UFH::Hasher as Hasher>::Hash) {
debug!(target: "AlephBFT-runway", "{:?} Received parents request for hash {:?} from {:?}.", self.index(), u_hash, node_id);

match self.store.unit(&u_hash) {
Expand Down Expand Up @@ -416,8 +423,8 @@ where

fn on_parents_response(
&mut self,
u_hash: H::Hash,
parents: Vec<UncheckedSignedUnit<H, D, MK::Signature>>,
u_hash: <UFH::Hasher as Hasher>::Hash,
parents: Vec<UncheckedSignedUnit<UFH::Hasher, UFH::Data, MK::Signature>>,
) {
if self.store.unit(&u_hash).is_some() {
trace!(target: "AlephBFT-runway", "{:?} We got parents response but already imported the unit.", self.index());
Expand All @@ -427,20 +434,23 @@ where
self.handle_dag_result(result);
}

fn on_forking_notification(&mut self, notification: ForkingNotification<H, D, MK::Signature>) {
fn on_forking_notification(
&mut self,
notification: ForkingNotification<UFH::Hasher, UFH::Data, MK::Signature>,
) {
let result = self
.dag
.process_forking_notification(notification, &self.store);
self.handle_dag_result(result);
}

fn resolve_missing_parents(&mut self, u_hash: &H::Hash) {
fn resolve_missing_parents(&mut self, u_hash: &<UFH::Hasher as Hasher>::Hash) {
if self.missing_parents.remove(u_hash) {
self.send_resolved_request_notification(Request::Parents(*u_hash));
}
}

fn on_reconstruction_request(&mut self, request: ReconstructionRequest<H>) {
fn on_reconstruction_request(&mut self, request: ReconstructionRequest<UFH::Hasher>) {
use ReconstructionRequest::*;
match request {
Coord(coord) => {
Expand All @@ -452,15 +462,15 @@ where
}
}

fn on_unit_reconstructed(&mut self, unit: DagUnit<H, D, MK>) {
fn on_unit_reconstructed(&mut self, unit: DagUnit<UFH::Hasher, UFH::Data, MK>) {
let unit_hash = unit.hash();
trace!(target: "AlephBFT-runway", "Unit {:?} {} reconstructed.", unit_hash, unit.coord());
if self.backup_units_for_saver.unbounded_send(unit).is_err() {
error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), unit_hash);
}
}

fn on_unit_backup_saved(&mut self, unit: DagUnit<H, D, MK>) {
fn on_unit_backup_saved(&mut self, unit: DagUnit<UFH::Hasher, UFH::Data, MK>) {
let unit_hash = unit.hash();
self.store.insert(unit.clone());
self.dag.finished_processing(&unit_hash);
Expand Down Expand Up @@ -498,7 +508,7 @@ where
}
}

fn on_wrong_control_hash(&mut self, u_hash: H::Hash) {
fn on_wrong_control_hash(&mut self, u_hash: <UFH::Hasher as Hasher>::Hash) {
trace!(target: "AlephBFT-runway", "{:?} Dealing with wrong control hash notification {:?}.", self.index(), u_hash);
if self.missing_parents.insert(u_hash) {
self.send_message_for_network(RunwayNotificationOut::Request(Request::Parents(u_hash)));
Expand All @@ -507,7 +517,7 @@ where

fn send_message_for_network(
&mut self,
notification: RunwayNotificationOut<H, D, MK::Signature>,
notification: RunwayNotificationOut<UFH::Hasher, UFH::Data, MK::Signature>,
) {
if self
.unit_messages_for_network
Expand All @@ -519,14 +529,14 @@ where
}
}

fn send_resolved_request_notification(&mut self, notification: Request<H>) {
fn send_resolved_request_notification(&mut self, notification: Request<UFH::Hasher>) {
if self.resolved_requests.unbounded_send(notification).is_err() {
warn!(target: "AlephBFT-runway", "{:?} resolved_requests channel should be open", self.index());
self.exiting = true;
}
}

fn status(&self) -> RunwayStatus<'_, H> {
fn status(&self) -> RunwayStatus<'_, UFH::Hasher> {
RunwayStatus {
missing_coords: &self.missing_coords,
missing_parents: &self.missing_parents,
Expand All @@ -541,7 +551,7 @@ where

async fn run(
mut self,
data_from_backup: oneshot::Receiver<Vec<UncheckedSignedUnit<H, D, MK::Signature>>>,
data_from_backup: oneshot::Receiver<BackupUnits<UFH, MK>>,
mut terminator: Terminator,
) {
let index = self.index();
Expand Down Expand Up @@ -668,28 +678,26 @@ fn trivial_start(
}

pub struct RunwayIO<
H: Hasher,
MK: MultiKeychain,
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
DP: DataProvider,
UFH: UnitFinalizationHandler<Data = DP::Output, Hasher = H>,
UFH: UnitFinalizationHandler,
> {
pub data_provider: DP,
pub finalization_handler: UFH,
pub backup_write: W,
pub backup_read: R,
_phantom: PhantomData<(H, MK::Signature)>,
_phantom: PhantomData<MK::Signature>,
}

impl<
H: Hasher,
MK: MultiKeychain,
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
DP: DataProvider,
UFH: UnitFinalizationHandler<Data = DP::Output, Hasher = H>,
> RunwayIO<H, MK, W, R, DP, UFH>
UFH: UnitFinalizationHandler,
> RunwayIO<MK, W, R, DP, UFH>
{
pub fn new(
data_provider: DP,
Expand All @@ -707,19 +715,18 @@ impl<
}
}

pub(crate) async fn run<H, US, UL, MK, DP, UFH, SH>(
pub(crate) async fn run<US, UL, MK, DP, UFH, SH>(
config: Config,
runway_io: RunwayIO<H, MK, US, UL, DP, UFH>,
runway_io: RunwayIO<MK, US, UL, DP, UFH>,
keychain: MK,
spawn_handle: SH,
network_io: NetworkIO<H, DP::Output, MK>,
network_io: NetworkIO<UFH::Hasher, DP::Output, MK>,
mut terminator: Terminator,
) where
H: Hasher,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
DP: DataProvider,
UFH: UnitFinalizationHandler<Data = DP::Output, Hasher = H>,
UFH: UnitFinalizationHandler<Data = DP::Output>,
MK: MultiKeychain,
SH: SpawnHandle,
{
Expand Down
2 changes: 1 addition & 1 deletion mock/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<D: Debug> Network<D> {
}

#[async_trait::async_trait]
impl<D: Clone + Send + Debug> NetworkT<D> for Network<D> {
impl<D: Clone + Send + Debug + 'static> NetworkT<D> for Network<D> {
fn send(&self, data: D, recipient: Recipient) {
use Recipient::*;
match recipient {
Expand Down
2 changes: 1 addition & 1 deletion types/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum Recipient {
/// We refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/aleph_bft_api.html
/// Section 3.1.2 for a discussion of the required guarantees of this trait's implementation.
#[async_trait::async_trait]
pub trait Network<D>: Send {
pub trait Network<D>: Send + 'static{
/// Send a message to a single node or everyone, depending on the value of the recipient
/// argument.
///
Expand Down

0 comments on commit b3da33f

Please sign in to comment.