Skip to content

Commit

Permalink
review changes for the Unit-Batch Api
Browse files Browse the repository at this point in the history
  • Loading branch information
fixxxedpoint committed Apr 22, 2024
1 parent 276c915 commit c83c1ae
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 62 deletions.
6 changes: 3 additions & 3 deletions consensus/src/extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod election;
mod extender;
mod units;

use aleph_bft_types::{FinalizationHandler, OrderedUnit};
use aleph_bft_types::{BatchOfUnits, FinalizationHandler};
use extender::Extender;

/// A struct responsible for executing the Consensus protocol on a local copy of the Dag.
Expand All @@ -20,13 +20,13 @@ pub struct Ordering<
H: Hasher,
D: Data,
MK: MultiKeychain,
FH: FinalizationHandler<Vec<OrderedUnit<D, H>>>,
FH: FinalizationHandler<BatchOfUnits<D, H>>,
> {
extender: Extender<DagUnit<H, D, MK>>,
finalization_handler: FH,
}

impl<H: Hasher, D: Data, MK: MultiKeychain, FH: FinalizationHandler<Vec<OrderedUnit<D, H>>>>
impl<H: Hasher, D: Data, MK: MultiKeychain, FH: FinalizationHandler<BatchOfUnits<D, H>>>
Ordering<H, D, MK, FH>
{
pub fn new(finalization_handler: FH) -> Self {
Expand Down
74 changes: 23 additions & 51 deletions consensus/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
Config, Data, DataProvider, Hasher, MultiKeychain, Network, NodeIndex, Receiver, Recipient,
Round, Sender, Signature, SpawnHandle, Terminator, UncheckedSigned,
};
use aleph_bft_types::{FinalizationHandler, NodeMap, OrderedUnit};
use aleph_bft_types::{BatchOfUnits, FinalizationHandler, NodeMap};
use codec::{Decode, Encode};
use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamExt};
use futures_timer::Delay;
Expand All @@ -23,7 +23,6 @@ use std::{
collections::HashSet,
convert::TryInto,
fmt::{self, Debug},
marker::PhantomData,
time::Duration,
};

Expand Down Expand Up @@ -107,29 +106,44 @@ enum TaskDetails<H: Hasher, D: Data, S: Signature> {
}

#[derive(Clone)]
pub struct LocalIO<D, DP: DataProvider, FH: FinalizationHandler<D>, US: AsyncWrite, UL: AsyncRead> {
pub struct LocalIO<DP: DataProvider, FH, US: AsyncWrite, UL: AsyncRead> {
data_provider: DP,
finalization_handler: FH,
unit_saver: US,
unit_loader: UL,
_phantom: PhantomData<D>,
}

impl<D, DP: DataProvider, FH: FinalizationHandler<D>, US: AsyncWrite, UL: AsyncRead>
LocalIO<D, DP, FH, US, UL>
impl<DP: DataProvider, FH: FinalizationHandler<DP::Output>, US: AsyncWrite, UL: AsyncRead>
LocalIO<DP, FH, US, UL>
{
pub fn new(
data_provider: DP,
finalization_handler: FH,
unit_saver: US,
unit_loader: UL,
) -> LocalIO<D, DP, FH, US, UL> {
) -> Self {
LocalIO {
data_provider,
finalization_handler,
unit_saver,
unit_loader,
}
}

pub fn new_with_unit_finalization_handler<
H: Hasher,
UFH: FinalizationHandler<BatchOfUnits<DP::Output, H>>,
>(
data_provider: DP,
finalization_handler: UFH,
unit_saver: US,
unit_loader: UL,
) -> LocalIO<DP, UFH, US, UL> {
LocalIO {
data_provider,
finalization_handler,
unit_saver,
unit_loader,
_phantom: PhantomData,
}
}
}
Expand Down Expand Up @@ -568,48 +582,6 @@ where
/// [docs for devs](https://cardinal-cryptography.github.io/AlephBFT/index.html)
/// or the [original paper](https://arxiv.org/abs/1908.05156).
pub async fn run_session<
H: Hasher,
DP: DataProvider,
FH: FinalizationHandler<DP::Output>,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
N: Network<NetworkData<H, DP::Output, MK::Signature, MK::PartialMultisignature>> + 'static,
SH: SpawnHandle,
MK: MultiKeychain,
>(
config: Config,
local_io: LocalIO<DP::Output, DP, FH, US, UL>,
network: N,
keychain: MK,
spawn_handle: SH,
terminator: Terminator,
) {
let local_io: LocalIO<BatchOfUnits<DP::Output, H>, DP, FH, US, UL> = LocalIO {
data_provider: local_io.data_provider,
finalization_handler: local_io.finalization_handler,
unit_saver: local_io.unit_saver,
unit_loader: local_io.unit_loader,
_phantom: PhantomData,
};
run_session_for_units(
config,
local_io,
network,
keychain,
spawn_handle,
terminator,
)
.await
}

pub type BatchOfUnits<D, H> = Vec<OrderedUnit<D, H>>;

/// Starts the consensus algorithm as an async task. It stops establishing consensus for new data items after
/// reaching the threshold specified in [`Config::max_round`] or upon receiving a stop signal from `exit`.
/// Please note that this interface is less stable than [`run_session`] as it exposes intrinsics (i.e. units)
/// which migh be subject to change.
#[doc(hidden)]
pub async fn run_session_for_units<
H: Hasher,
DP: DataProvider,
FH: FinalizationHandler<BatchOfUnits<DP::Output, H>>,
Expand All @@ -620,7 +592,7 @@ pub async fn run_session_for_units<
MK: MultiKeychain,
>(
config: Config,
local_io: LocalIO<BatchOfUnits<DP::Output, H>, DP, FH, US, UL>,
local_io: LocalIO<DP, FH, US, UL>,
network: N,
keychain: MK,
spawn_handle: SH,
Expand Down
14 changes: 7 additions & 7 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
Config, Data, DataProvider, Hasher, Index, Keychain, MultiKeychain, NodeIndex, Receiver, Round,
Sender, Signature, Signed, SpawnHandle, Terminator, UncheckedSigned,
};
use aleph_bft_types::{FinalizationHandler, OrderedUnit, Recipient};
use aleph_bft_types::{BatchOfUnits, FinalizationHandler, Recipient};
use futures::{
channel::{mpsc, oneshot},
future::pending,
Expand Down Expand Up @@ -103,7 +103,7 @@ struct Runway<H, D, FH, MK>
where
H: Hasher,
D: Data,
FH: FinalizationHandler<Vec<OrderedUnit<D, H>>>,
FH: FinalizationHandler<BatchOfUnits<D, H>>,
MK: MultiKeychain,
{
missing_coords: HashSet<UnitCoord>,
Expand Down Expand Up @@ -208,7 +208,7 @@ impl<'a, H: Hasher> Display for RunwayStatus<'a, H> {
struct RunwayConfig<
H: Hasher,
D: Data,
FH: FinalizationHandler<Vec<OrderedUnit<D, H>>>,
FH: FinalizationHandler<BatchOfUnits<D, H>>,
MK: MultiKeychain,
> {
finalization_handler: FH,
Expand All @@ -228,7 +228,7 @@ impl<H, D, FH, MK> Runway<H, D, FH, MK>
where
H: Hasher,
D: Data,
FH: FinalizationHandler<Vec<OrderedUnit<D, H>>>,
FH: FinalizationHandler<BatchOfUnits<D, H>>,
MK: MultiKeychain,
{
fn new(config: RunwayConfig<H, D, FH, MK>, keychain: MK, validator: Validator<MK>) -> Self {
Expand Down Expand Up @@ -673,7 +673,7 @@ pub struct RunwayIO<
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
DP: DataProvider,
FH: FinalizationHandler<Vec<OrderedUnit<DP::Output, H>>>,
FH: FinalizationHandler<BatchOfUnits<DP::Output, H>>,
> {
pub data_provider: DP,
pub finalization_handler: FH,
Expand All @@ -688,7 +688,7 @@ impl<
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
DP: DataProvider,
FH: FinalizationHandler<Vec<OrderedUnit<DP::Output, H>>>,
FH: FinalizationHandler<BatchOfUnits<DP::Output, H>>,
> RunwayIO<H, MK, W, R, DP, FH>
{
pub fn new(
Expand Down Expand Up @@ -719,7 +719,7 @@ pub(crate) async fn run<H, US, UL, MK, DP, FH, SH>(
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
DP: DataProvider,
FH: FinalizationHandler<Vec<OrderedUnit<DP::Output, H>>>,
FH: FinalizationHandler<BatchOfUnits<DP::Output, H>>,
MK: MultiKeychain,
SH: SpawnHandle,
{
Expand Down
2 changes: 1 addition & 1 deletion types/src/dataio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub type BatchOfUnits<D, H> = Vec<OrderedUnit<D, H>>;
impl<D: Data, H: Hasher, FH: FinalizationHandler<D>> FinalizationHandler<BatchOfUnits<D, H>>
for FH
{
fn data_finalized(&mut self, batch: Vec<OrderedUnit<D, H>>) {
fn data_finalized(&mut self, batch: BatchOfUnits<D, H>) {
for unit in batch {
if let Some(data) = unit.data {
self.data_finalized(data)
Expand Down

0 comments on commit c83c1ae

Please sign in to comment.