From b3da33fc55a6223ce38a7aa4584647a7ffd9fe27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Mon, 6 May 2024 12:17:42 +0200 Subject: [PATCH] using UnitFinalizationHandler and its constraints (Data, Hasher) instead of seperatly providing Data and Hasher --- consensus/src/extension/mod.rs | 15 +--- consensus/src/member.rs | 3 +- consensus/src/runway/mod.rs | 139 +++++++++++++++++---------------- mock/src/network.rs | 2 +- types/src/network.rs | 2 +- 5 files changed, 79 insertions(+), 82 deletions(-) diff --git a/consensus/src/extension/mod.rs b/consensus/src/extension/mod.rs index 87b9edd2..3a1252c2 100644 --- a/consensus/src/extension/mod.rs +++ b/consensus/src/extension/mod.rs @@ -16,17 +16,12 @@ use extender::Extender; /// /// We refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/internals.html /// Section 5.4 for a discussion of this component. -pub struct Ordering< - MK: MultiKeychain, - UFH: UnitFinalizationHandler, -> { +pub struct Ordering { extender: Extender>, finalization_handler: UFH, } -impl - Ordering -{ +impl Ordering { pub fn new(finalization_handler: UFH) -> Self { let extender = Extender::new(); Ordering { @@ -35,13 +30,9 @@ impl } } - fn handle_batch(&mut self, batch: Vec>) { - self.finalization_handler.batch_finalized(batch); - } - pub fn add_unit(&mut self, unit: DagUnit) { for batch in self.extender.add_unit(unit) { - self.handle_batch(batch); + self.finalization_handler.batch_finalized(batch); } } } diff --git a/consensus/src/member.rs b/consensus/src/member.rs index d6ab13ad..9640c988 100644 --- a/consensus/src/member.rs +++ b/consensus/src/member.rs @@ -635,8 +635,7 @@ pub async fn run_session< UFH: UnitFinalizationHandler, US: AsyncWrite + Send + Sync + 'static, UL: AsyncRead + Send + Sync + 'static, - N: Network> - + 'static, + N: Network>, SH: SpawnHandle, MK: MultiKeychain, >( diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index a3dee123..ba07bb10 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -99,29 +99,27 @@ type CollectionResponse = UncheckedSigned< ::Signature, >; -struct Runway +struct Runway where - H: Hasher, - D: Data, - UFH: UnitFinalizationHandler, + FH: UnitFinalizationHandler, MK: MultiKeychain, { missing_coords: HashSet, - missing_parents: HashSet, - store: UnitStore>, + missing_parents: HashSet<::Hash>, + store: UnitStore>, keychain: MK, - dag: Dag, - ordering: Ordering, - alerts_for_alerter: Sender>, - notifications_from_alerter: Receiver>, - unit_messages_from_network: Receiver>, - unit_messages_for_network: Sender>, - responses_for_collection: Sender>, - resolved_requests: Sender>, - parents_for_creator: Sender>, - backup_units_for_saver: Sender>, - backup_units_from_saver: Receiver>, - new_units_from_creation: Receiver>, + dag: Dag, + ordering: Ordering, + alerts_for_alerter: Sender>, + notifications_from_alerter: Receiver>, + unit_messages_from_network: Receiver>, + unit_messages_for_network: Sender>, + responses_for_collection: Sender>, + resolved_requests: Sender>, + parents_for_creator: Sender>, + backup_units_for_saver: Sender>, + backup_units_from_saver: Receiver>, + new_units_from_creation: Receiver>, exiting: bool, } @@ -205,33 +203,36 @@ impl<'a, H: Hasher> Display for RunwayStatus<'a, H> { } } -struct RunwayConfig< - H: Hasher, - D: Data, - UFH: UnitFinalizationHandler, - MK: MultiKeychain, -> { +struct RunwayConfig { finalization_handler: UFH, - backup_units_for_saver: Sender>, - backup_units_from_saver: Receiver>, - alerts_for_alerter: Sender>, - notifications_from_alerter: Receiver>, - unit_messages_from_network: Receiver>, - unit_messages_for_network: Sender>, - responses_for_collection: Sender>, - parents_for_creator: Sender>, - resolved_requests: Sender>, - new_units_from_creation: Receiver>, + backup_units_for_saver: Sender>, + backup_units_from_saver: Receiver>, + alerts_for_alerter: Sender>, + notifications_from_alerter: + Receiver>, + unit_messages_from_network: + Receiver>, + unit_messages_for_network: Sender>, + responses_for_collection: Sender>, + parents_for_creator: Sender>, + resolved_requests: Sender>, + new_units_from_creation: Receiver>, } -impl Runway +type BackupUnits = Vec< + UncheckedSignedUnit< + ::Hasher, + ::Data, + ::Signature, + >, +>; + +impl Runway where - H: Hasher, - D: Data, - UFH: UnitFinalizationHandler, + UFH: UnitFinalizationHandler, MK: MultiKeychain, { - fn new(config: RunwayConfig, keychain: MK, validator: Validator) -> Self { + fn new(config: RunwayConfig, keychain: MK, validator: Validator) -> Self { let n_members = keychain.node_count(); let RunwayConfig { finalization_handler, @@ -275,7 +276,7 @@ where self.keychain.index() } - fn handle_dag_result(&mut self, result: DagResult) { + fn handle_dag_result(&mut self, result: DagResult) { let DagResult { units, requests, @@ -295,12 +296,18 @@ where } } - fn on_unit_received(&mut self, unit: UncheckedSignedUnit) { + fn on_unit_received( + &mut self, + unit: UncheckedSignedUnit, + ) { let result = self.dag.add_unit(unit, &self.store); self.handle_dag_result(result); } - fn on_unit_message(&mut self, message: RunwayNotificationIn) { + fn on_unit_message( + &mut self, + message: RunwayNotificationIn, + ) { match message { RunwayNotificationIn::NewUnit(u) => { trace!(target: "AlephBFT-runway", "{:?} New unit received {:?}.", self.index(), &u); @@ -364,7 +371,7 @@ where } } - fn on_request_parents(&mut self, node_id: NodeIndex, u_hash: H::Hash) { + fn on_request_parents(&mut self, node_id: NodeIndex, u_hash: ::Hash) { debug!(target: "AlephBFT-runway", "{:?} Received parents request for hash {:?} from {:?}.", self.index(), u_hash, node_id); match self.store.unit(&u_hash) { @@ -416,8 +423,8 @@ where fn on_parents_response( &mut self, - u_hash: H::Hash, - parents: Vec>, + u_hash: ::Hash, + parents: Vec>, ) { if self.store.unit(&u_hash).is_some() { trace!(target: "AlephBFT-runway", "{:?} We got parents response but already imported the unit.", self.index()); @@ -427,20 +434,23 @@ where self.handle_dag_result(result); } - fn on_forking_notification(&mut self, notification: ForkingNotification) { + fn on_forking_notification( + &mut self, + notification: ForkingNotification, + ) { let result = self .dag .process_forking_notification(notification, &self.store); self.handle_dag_result(result); } - fn resolve_missing_parents(&mut self, u_hash: &H::Hash) { + fn resolve_missing_parents(&mut self, u_hash: &::Hash) { if self.missing_parents.remove(u_hash) { self.send_resolved_request_notification(Request::Parents(*u_hash)); } } - fn on_reconstruction_request(&mut self, request: ReconstructionRequest) { + fn on_reconstruction_request(&mut self, request: ReconstructionRequest) { use ReconstructionRequest::*; match request { Coord(coord) => { @@ -452,7 +462,7 @@ where } } - fn on_unit_reconstructed(&mut self, unit: DagUnit) { + fn on_unit_reconstructed(&mut self, unit: DagUnit) { let unit_hash = unit.hash(); trace!(target: "AlephBFT-runway", "Unit {:?} {} reconstructed.", unit_hash, unit.coord()); if self.backup_units_for_saver.unbounded_send(unit).is_err() { @@ -460,7 +470,7 @@ where } } - fn on_unit_backup_saved(&mut self, unit: DagUnit) { + fn on_unit_backup_saved(&mut self, unit: DagUnit) { let unit_hash = unit.hash(); self.store.insert(unit.clone()); self.dag.finished_processing(&unit_hash); @@ -498,7 +508,7 @@ where } } - fn on_wrong_control_hash(&mut self, u_hash: H::Hash) { + fn on_wrong_control_hash(&mut self, u_hash: ::Hash) { trace!(target: "AlephBFT-runway", "{:?} Dealing with wrong control hash notification {:?}.", self.index(), u_hash); if self.missing_parents.insert(u_hash) { self.send_message_for_network(RunwayNotificationOut::Request(Request::Parents(u_hash))); @@ -507,7 +517,7 @@ where fn send_message_for_network( &mut self, - notification: RunwayNotificationOut, + notification: RunwayNotificationOut, ) { if self .unit_messages_for_network @@ -519,14 +529,14 @@ where } } - fn send_resolved_request_notification(&mut self, notification: Request) { + fn send_resolved_request_notification(&mut self, notification: Request) { if self.resolved_requests.unbounded_send(notification).is_err() { warn!(target: "AlephBFT-runway", "{:?} resolved_requests channel should be open", self.index()); self.exiting = true; } } - fn status(&self) -> RunwayStatus<'_, H> { + fn status(&self) -> RunwayStatus<'_, UFH::Hasher> { RunwayStatus { missing_coords: &self.missing_coords, missing_parents: &self.missing_parents, @@ -541,7 +551,7 @@ where async fn run( mut self, - data_from_backup: oneshot::Receiver>>, + data_from_backup: oneshot::Receiver>, mut terminator: Terminator, ) { let index = self.index(); @@ -668,28 +678,26 @@ fn trivial_start( } pub struct RunwayIO< - H: Hasher, MK: MultiKeychain, W: AsyncWrite + Send + Sync + 'static, R: AsyncRead + Send + Sync + 'static, DP: DataProvider, - UFH: UnitFinalizationHandler, + UFH: UnitFinalizationHandler, > { pub data_provider: DP, pub finalization_handler: UFH, pub backup_write: W, pub backup_read: R, - _phantom: PhantomData<(H, MK::Signature)>, + _phantom: PhantomData, } impl< - H: Hasher, MK: MultiKeychain, W: AsyncWrite + Send + Sync + 'static, R: AsyncRead + Send + Sync + 'static, DP: DataProvider, - UFH: UnitFinalizationHandler, - > RunwayIO + UFH: UnitFinalizationHandler, + > RunwayIO { pub fn new( data_provider: DP, @@ -707,19 +715,18 @@ impl< } } -pub(crate) async fn run( +pub(crate) async fn run( config: Config, - runway_io: RunwayIO, + runway_io: RunwayIO, keychain: MK, spawn_handle: SH, - network_io: NetworkIO, + network_io: NetworkIO, mut terminator: Terminator, ) where - H: Hasher, US: AsyncWrite + Send + Sync + 'static, UL: AsyncRead + Send + Sync + 'static, DP: DataProvider, - UFH: UnitFinalizationHandler, + UFH: UnitFinalizationHandler, MK: MultiKeychain, SH: SpawnHandle, { diff --git a/mock/src/network.rs b/mock/src/network.rs index 55448b1e..22bd5f30 100644 --- a/mock/src/network.rs +++ b/mock/src/network.rs @@ -51,7 +51,7 @@ impl Network { } #[async_trait::async_trait] -impl NetworkT for Network { +impl NetworkT for Network { fn send(&self, data: D, recipient: Recipient) { use Recipient::*; match recipient { diff --git a/types/src/network.rs b/types/src/network.rs index 9b76e8ce..13fc1095 100644 --- a/types/src/network.rs +++ b/types/src/network.rs @@ -31,7 +31,7 @@ pub enum Recipient { /// We refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/aleph_bft_api.html /// Section 3.1.2 for a discussion of the required guarantees of this trait's implementation. #[async_trait::async_trait] -pub trait Network: Send { +pub trait Network: Send + 'static{ /// Send a message to a single node or everyone, depending on the value of the recipient /// argument. ///