Skip to content

Commit

Permalink
A0-4153: Factor unit validation out of runway (#429)
Browse files Browse the repository at this point in the history
* Factor unit validation out of runway

This required quite a lot of other changes as well, so this is quite
large. It also looks... not ideal and in places _more_ convoluted than
beforehand, but completely untangling all this in a single commit would
require it to be even larger.

* Readability improvements from review

* Better canon explanation
  • Loading branch information
timorleph authored Apr 3, 2024
1 parent 4795e10 commit 1a33470
Show file tree
Hide file tree
Showing 29 changed files with 1,574 additions and 1,291 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.36.1"
version = "0.36.2"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand Down
1 change: 1 addition & 0 deletions consensus/src/alerts/handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
alerts::{Alert, AlertMessage, ForkProof, ForkingNotification},
units::Unit,
Data, Hasher, Keychain, MultiKeychain, Multisigned, NodeIndex, PartialMultisignature,
Recipient, SessionId, Signature, Signed, UncheckedSigned,
};
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
units::UncheckedSignedUnit, Data, Hasher, Index, Keychain, MultiKeychain, Multisigned,
NodeIndex, PartialMultisignature, Signable, Signature, UncheckedSigned,
units::{UncheckedSignedUnit, Unit},
Data, Hasher, Index, Keychain, MultiKeychain, Multisigned, NodeIndex, PartialMultisignature,
Signable, Signature, UncheckedSigned,
};
use aleph_bft_rmc::Message as RmcMessage;
use codec::{Decode, Encode};
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/alerts/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};
use aleph_bft_rmc::{DoublingDelayScheduler, Message as RmcMessage};
use futures::{FutureExt, StreamExt};
use log::{debug, error, warn};
use log::{debug, error, trace, warn};
use std::time::Duration;

const LOG_TARGET: &str = "AlephBFT-alerter";
Expand Down Expand Up @@ -147,6 +147,7 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
}

fn handle_alert_from_runway(&mut self, alert: Alert<H, D, MK::Signature>) {
trace!(target: LOG_TARGET, "Handling alert {:?}.", alert);
let (message, recipient, hash) = self.handler.on_own_alert(alert.clone());
self.send_message_for_network(message, recipient);
if let Some(multisigned) = self.rmc_service.start_rmc(hash) {
Expand Down
8 changes: 4 additions & 4 deletions consensus/src/backup/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::{channel::oneshot, AsyncRead, AsyncReadExt};
use log::{error, info, warn};

use crate::{
units::{UncheckedSignedUnit, UnitCoord},
units::{UncheckedSignedUnit, Unit, UnitCoord},
Data, Hasher, NodeIndex, Round, SessionId, Signature,
};

Expand Down Expand Up @@ -246,7 +246,7 @@ mod tests {
use crate::{
backup::BackupLoader,
units::{
create_preunits, creator_set, preunit_to_unchecked_signed_unit, preunit_to_unit,
create_preunits, creator_set, preunit_to_full_unit, preunit_to_unchecked_signed_unit,
UncheckedSignedUnit as GenericUncheckedSignedUnit,
},
NodeCount, NodeIndex, Round, SessionId,
Expand Down Expand Up @@ -277,14 +277,14 @@ mod tests {

let units: Vec<_> = pre_units
.iter()
.map(|(pre_unit, _)| preunit_to_unit(pre_unit.clone(), session_id))
.map(|pre_unit| preunit_to_full_unit(pre_unit.clone(), session_id))
.collect();
for creator in creators.iter_mut() {
creator.add_units(&units);
}

let mut unchecked_signed_units = Vec::with_capacity(pre_units.len());
for ((pre_unit, _), keychain) in pre_units.into_iter().zip(keychains.iter()) {
for (pre_unit, keychain) in pre_units.into_iter().zip(keychains.iter()) {
unchecked_signed_units.push(preunit_to_unchecked_signed_unit(
pre_unit, session_id, keychain,
))
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ mod tests {
let units: Vec<TestUnit> = (0..5)
.map(|k| {
preunit_to_unchecked_signed_unit(
creators[k].create_unit(0).unwrap().0,
creators[k].create_unit(0).unwrap(),
0,
&keychains[k],
)
Expand Down
47 changes: 18 additions & 29 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ use log::{debug, error};

use crate::{
config::Config,
creation::{self, SignedUnitWithParents},
extension::Service as Extender,
handle_task_termination,
reconstruction::Service as ReconstructionService,
runway::{NotificationIn, NotificationOut},
creation, handle_task_termination,
reconstruction::{ReconstructedUnit, Request, Service as ReconstructionService},
runway::ExplicitParents,
units::SignedUnit,
Data, DataProvider, Hasher, MultiKeychain, Receiver, Round, Sender, SpawnHandle, Terminator,
};

pub struct IO<H: Hasher, D: Data, MK: MultiKeychain, DP: DataProvider<D>> {
pub incoming_notifications: Receiver<NotificationIn<H>>,
pub outgoing_notifications: Sender<NotificationOut<H>>,
pub units_for_runway: Sender<SignedUnitWithParents<H, D, MK>>,
pub units_from_runway: Receiver<SignedUnit<H, D, MK>>,
pub parents_from_runway: Receiver<ExplicitParents<H>>,
pub units_for_runway: Sender<ReconstructedUnit<SignedUnit<H, D, MK>>>,
pub requests_for_runway: Sender<Request<H>>,
pub new_units_for_runway: Sender<SignedUnit<H, D, MK>>,
pub data_provider: DP,
pub ordered_batch_tx: Sender<Vec<H::Hash>>,
pub starting_round: oneshot::Receiver<Option<Round>>,
}

Expand All @@ -33,30 +33,22 @@ pub async fn run<H: Hasher, D: Data, MK: MultiKeychain, DP: DataProvider<D>>(
) {
debug!(target: "AlephBFT", "{:?} Starting all services...", conf.node_ix());
let IO {
incoming_notifications,
outgoing_notifications,
units_from_runway,
parents_from_runway,
units_for_runway,
requests_for_runway,
new_units_for_runway,
data_provider,
ordered_batch_tx,
starting_round,
} = io;

let index = conf.node_ix();

let (electors_tx, electors_rx) = mpsc::unbounded();
let extender = Extender::<H>::new(index, electors_rx, ordered_batch_tx);
let extender_terminator = terminator.add_offspring_connection("AlephBFT-extender");
let mut extender_handle = spawn_handle
.spawn_essential("consensus/extender", async move {
extender.run(extender_terminator).await
})
.fuse();

let (parents_for_creator, parents_from_dag) = mpsc::unbounded();

let creator_terminator = terminator.add_offspring_connection("creator");
let io = creation::IO {
outgoing_units: units_for_runway,
outgoing_units: new_units_for_runway,
incoming_parents: parents_from_dag,
data_provider,
};
Expand All @@ -75,10 +67,11 @@ pub async fn run<H: Hasher, D: Data, MK: MultiKeychain, DP: DataProvider<D>>(
};

let reconstruction = ReconstructionService::new(
incoming_notifications,
outgoing_notifications,
units_from_runway,
parents_from_runway,
requests_for_runway,
units_for_runway,
parents_for_creator,
electors_tx,
);

let reconstruction_terminator = terminator.add_offspring_connection("reconstruction");
Expand All @@ -97,9 +90,6 @@ pub async fn run<H: Hasher, D: Data, MK: MultiKeychain, DP: DataProvider<D>>(
_ = creator_panic_handle.fuse() => {
error!(target: "AlephBFT-consensus", "{:?} creator task terminated early with its task being dropped.", index);
},
_ = extender_handle => {
debug!(target: "AlephBFT-consensus", "{:?} extender task terminated early.", index);
}
}
debug!(target: "AlephBFT", "{:?} All services stopping.", index);

Expand All @@ -114,7 +104,6 @@ pub async fn run<H: Hasher, D: Data, MK: MultiKeychain, DP: DataProvider<D>>(
)
.await;
handle_task_termination(creator_handle, "AlephBFT-consensus", "Creator", index).await;
handle_task_termination(extender_handle, "AlephBFT-consensus", "Extender", index).await;

debug!(target: "AlephBFT", "{:?} All services stopped.", index);
}
Loading

0 comments on commit 1a33470

Please sign in to comment.