Skip to content

Commit

Permalink
Merge pull request AleoNet#5 from ljedrz/full_consensus_test
Browse files Browse the repository at this point in the history
Add a full consensus integration test
  • Loading branch information
ljedrz authored Mar 14, 2023
2 parents 45bfa1d + a2258ff commit db5383b
Show file tree
Hide file tree
Showing 33 changed files with 557 additions and 90 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ edition = "2021"

[features]
default = [ ]
test = []
timer = [ "aleo-std/timer", "snarkos-node-ledger/timer" ]

[dependencies] # added for BFT
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion .parameters.json → node/bft-consensus/.parameters.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"consensus_api_grpc": {
"get_collections_timeout": "5_000ms",
"remove_collections_timeout": "5_000ms",
"socket_addr": "/ip4/0.0.0.0/tcp/0/http"
"socket_addr": "/ip4/127.0.0.1/tcp/0/http"
},
"gc_depth": 50,
"header_num_of_batches_threshold": 32,
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
3 changes: 2 additions & 1 deletion node/bft-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
tracing = "0.1.36"
serde_json = "1"
snarkos-account = { path = "../../account" }
snarkos-node-consensus = { path = "../consensus" }
snarkos-node-messages = { path = "../messages" }
snarkos-node-router = { path = "../router" }
snarkos-node-tcp = { path = "../tcp" }
snarkvm = { workspace = true }

# below are the dependencies for BFT
Expand Down
141 changes: 87 additions & 54 deletions node/bft-consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use async_trait::async_trait;
use bytes::BytesMut;
use eyre::Context;
use fastcrypto::{
bls12381::min_sig::BLS12381KeyPair,
bls12381::min_sig::{BLS12381KeyPair, BLS12381PublicKey},
ed25519::Ed25519KeyPair,
encoding::{Base64, Encoding},
traits::{EncodeDecodeBase64, KeyPair, ToFromBytes},
Expand All @@ -34,9 +34,10 @@ use std::sync::Arc;
use thiserror::Error;
use tracing::*;

use snarkos_account::Account;
use snarkos_node_consensus::Consensus as AleoConsensus;
use snarkos_node_messages::Message;
use snarkos_node_messages::{Data, Message, NewBlock};
use snarkos_node_router::Router;
use snarkos_node_tcp::protocols::Writing;
use snarkvm::prelude::{ConsensusStorage, Network};

pub struct BftConsensus<N: Network, C: ConsensusStorage<N>> {
Expand All @@ -50,7 +51,7 @@ pub struct BftConsensus<N: Network, C: ConsensusStorage<N>> {
committee: Arc<ArcSwap<Committee>>,
worker_cache: Arc<ArcSwap<WorkerCache>>,
aleo_consensus: AleoConsensus<N, C>,
aleo_account: Account<N>,
aleo_router: Router<N>,
}

#[derive(Error, Debug)]
Expand All @@ -60,41 +61,41 @@ pub enum BftError {
}

impl<N: Network, C: ConsensusStorage<N>> BftConsensus<N, C> {
pub fn new(id: u32, aleo_account: Account<N>, aleo_consensus: AleoConsensus<N, C>) -> Result<Self> {
let primary_key_file = format!(".primary-{id}-key.json");
pub fn new(id: u32, aleo_consensus: AleoConsensus<N, C>, aleo_router: Router<N>) -> Result<Self> {
let primary_key_file = format!("{}/.primary-{id}-key.json", env!("CARGO_MANIFEST_DIR"));
let primary_keypair =
read_authority_keypair_from_file(primary_key_file).expect("Failed to load the node's primary keypair");
let primary_network_key_file = format!(".primary-{id}-network-key.json");
let primary_network_key_file = format!("{}/.primary-{id}-network-key.json", env!("CARGO_MANIFEST_DIR"));
let network_keypair = read_network_keypair_from_file(primary_network_key_file)
.expect("Failed to load the node's primary network keypair");
let worker_key_file = format!(".worker-{id}-key.json");
let worker_key_file = format!("{}/.worker-{id}-key.json", env!("CARGO_MANIFEST_DIR"));
let worker_keypair =
read_network_keypair_from_file(worker_key_file).expect("Failed to load the node's worker keypair");
debug!("creating task {}", id);
// Read the committee, workers and node's keypair from file.
let committee_file = ".committee.json";
let committee_file = format!("{}/.committee.json", env!("CARGO_MANIFEST_DIR"));
let committee = Arc::new(ArcSwap::from_pointee(
Committee::import(committee_file)
Committee::import(&committee_file)
.context("Failed to load the committee information")
.map_err(|e| BftError::EyreReport(e.to_string()))?,
));
let workers_file = ".workers.json";
let workers_file = format!("{}/.workers.json", env!("CARGO_MANIFEST_DIR"));
let worker_cache = Arc::new(ArcSwap::from_pointee(
WorkerCache::import(workers_file)
WorkerCache::import(&workers_file)
.context("Failed to load the worker information")
.map_err(|e| BftError::EyreReport(e.to_string()))?,
));

// Load default parameters if none are specified.
let filename = ".parameters.json";
let parameters = Parameters::import(filename)
let filename = format!("{}/.parameters.json", env!("CARGO_MANIFEST_DIR"));
let parameters = Parameters::import(&filename)
.context("Failed to load the node's parameters")
.map_err(|e| BftError::EyreReport(e.to_string()))?;

// Make the data store.
let store_path = format!(".db-{id}-key.json");
let store_path = format!("{}/.db-{id}-key.json", env!("CARGO_MANIFEST_DIR"));
let p_store = NodeStorage::reopen(store_path);
let store_path = format!(".db-{id}-0-key.json");
let store_path = format!("{}/.db-{id}-0-key.json", env!("CARGO_MANIFEST_DIR"));
let w_store = NodeStorage::reopen(store_path);
Ok(Self {
id,
Expand All @@ -107,7 +108,7 @@ impl<N: Network, C: ConsensusStorage<N>> BftConsensus<N, C> {
committee,
worker_cache,
aleo_consensus,
aleo_account,
aleo_router,
})
}

Expand All @@ -117,20 +118,24 @@ impl<N: Network, C: ConsensusStorage<N>> BftConsensus<N, C> {
pub async fn start(self) -> Result<(PrimaryNode, WorkerNode)> {
let primary_pub = self.primary_keypair.public().clone();
let primary = PrimaryNode::new(self.parameters.clone(), true);
let bft_execution_state =
BftExecutionState::new(primary_pub.clone(), self.aleo_router.clone(), self.aleo_consensus.clone());

primary
.start(
self.primary_keypair,
self.network_keypair,
self.committee.clone(),
self.worker_cache.clone(),
&self.p_store,
Arc::new(MyExecutionState::new(self.id, self.aleo_account, self.aleo_consensus.clone())),
Arc::new(bft_execution_state),
)
.await?;

info!("created primary id {}", self.id);
info!("Created a primary with id {} and public key {}", self.id, primary_pub);

let worker = WorkerNode::new(0, self.parameters.clone());
let worker_pub = self.worker_keypair.public().clone();
worker
.start(
primary_pub,
Expand All @@ -141,46 +146,55 @@ impl<N: Network, C: ConsensusStorage<N>> BftConsensus<N, C> {
TransactionValidator(self.aleo_consensus),
)
.await?;
info!("created worker id {}", self.id);
info!("Created a worker with id 0 and public key {}", worker_pub);

Ok((primary, worker))
}
}

pub struct MyExecutionState<N: Network, C: ConsensusStorage<N>> {
id: u32,
account: Account<N>,
pub struct BftExecutionState<N: Network, C: ConsensusStorage<N>> {
primary_pub: BLS12381PublicKey,
router: Router<N>,
consensus: AleoConsensus<N, C>,
}

impl<N: Network, C: ConsensusStorage<N>> MyExecutionState<N, C> {
pub(crate) fn new(id: u32, account: Account<N>, consensus: AleoConsensus<N, C>) -> Self {
Self { id, account, consensus }
impl<N: Network, C: ConsensusStorage<N>> BftExecutionState<N, C> {
pub(crate) fn new(primary_pub: BLS12381PublicKey, router: Router<N>, consensus: AleoConsensus<N, C>) -> Self {
Self { primary_pub, router, consensus }
}
}

#[async_trait]
impl<N: Network, C: ConsensusStorage<N>> ExecutionState for MyExecutionState<N, C> {
impl<N: Network, C: ConsensusStorage<N>> ExecutionState for BftExecutionState<N, C> {
/// Receive the consensus result with the ordered transactions in `ConsensusOutupt`
async fn handle_consensus_output(&self, consensus_output: ConsensusOutput) {
if !consensus_output.batches.is_empty() {
info!(
"Node {} consensus output for round {}: {:?} batches, leader: {:?}",
self.id,
consensus_output.sub_dag.leader.header.round,
consensus_output.batches.len(),
consensus_output.sub_dag.leader.header.author,
);

/*
TODO: the following generally works, but there are some open points
1. currently, only blocks produced by the beacon are considered valid
2. can the Aleo mempool have a different set of transactions than ones accepted by the consensus?
3. we can't fail here, i.e. the checks by the TransactionValidator must be final
4. every validator can create a block, but should they? the downstream can create it on its own too
*/
let leader = &consensus_output.sub_dag.leader.header.author;
let mut leader_id = leader.to_string();
leader_id.truncate(8);

let mut validator_id = self.primary_pub.to_string();
validator_id.truncate(8);

info!(
"Consensus (id: {}) output for round {}: {} batches, leader: {}",
validator_id,
consensus_output.sub_dag.leader.header.round,
consensus_output.sub_dag.num_batches(),
leader_id,
);

if consensus_output.batches.is_empty() {
info!("There are no batches to process; not attempting to create a block.");
} else {
if self.primary_pub != *leader {
info!("I'm not the current leader (id: {}), yielding block production.", validator_id);
return;
} else {
info!("I'm the current leader (id: {}); producing a block.", validator_id);
}

let consensus = self.consensus.clone();
let account = self.account.clone();
let private_key = *self.router.private_key();
let next_block = tokio::task::spawn_blocking(move || {
// Collect all the transactions contained in the agreed upon batches.
let mut transactions = Vec::new();
Expand Down Expand Up @@ -223,7 +237,7 @@ impl<N: Network, C: ConsensusStorage<N>> ExecutionState for MyExecutionState<N,
}

// Propose a new block.
let next_block = match consensus.propose_next_block(account.private_key(), &mut rand::thread_rng()) {
let next_block = match consensus.propose_next_block(&private_key, &mut rand::thread_rng()) {
Ok(block) => block,
Err(error) => bail!("Failed to propose the next block: {error}"),
};
Expand Down Expand Up @@ -251,6 +265,8 @@ impl<N: Network, C: ConsensusStorage<N>> ExecutionState for MyExecutionState<N,
}
}

info!("Produced a block with {num_valid_txs} transactions.");

Ok(Some(next_block))
})
.await;
Expand All @@ -264,17 +280,31 @@ impl<N: Network, C: ConsensusStorage<N>> ExecutionState for MyExecutionState<N,
}
};

info!(
"Produced a block with the following txs: {:?}",
next_block.transactions().iter().map(|tx| tx.id()).collect::<Vec<_>>()
);
let next_block_round = next_block.round();
let next_block_height = next_block.height();
let next_block_hash = next_block.hash();

// Serialize the block ahead of time to not do it for each peer.
let serialized_block = match Data::Object(next_block).serialize().await {
Ok(serialized_block) => Data::Buffer(serialized_block),
Err(error) => unreachable!("Failed to serialize own block: {error}"),
};

// Prepare the block to be sent to all peers.
let message = Message::<N>::NewBlock(NewBlock::new(
next_block_round,
next_block_height,
next_block_hash,
serialized_block,
));

// Broadcast the new block.
self.router.broadcast(message).unwrap();
}
}

async fn last_executed_sub_dag_index(&self) -> u64 {
info!("Node {} last_executed_sub_dag_index() called", self.id);
// TODO: get this info from storage somewhere
// self.node.last_executed_sub_dag_index().await
// TODO: this seems like a potential optimization, but shouldn't be needed
0
}
}
Expand All @@ -299,7 +329,7 @@ impl<N: Network, C: ConsensusStorage<N>> narwhal_worker::TransactionValidator fo
/// Determines if a transaction valid for the worker to consider putting in a batch
fn validate(&self, transaction: &[u8]) -> Result<(), Self::Error> {
let bytes = BytesMut::from(transaction);
let message = Message::deserialize(bytes)?;
let message = Message::<N>::deserialize(bytes)?;

let unconfirmed_transaction = if let Message::UnconfirmedTransaction(unconfirmed_transaction) = message {
unconfirmed_transaction
Expand All @@ -312,7 +342,10 @@ impl<N: Network, C: ConsensusStorage<N>> narwhal_worker::TransactionValidator fo
Err(error) => bail!("[UnconfirmedTransaction] {error}"),
};

self.0.check_transaction_basic(&transaction)?;
if let Err(err) = self.0.check_transaction_basic(&transaction) {
error!("Failed to validate a transaction: {err}");
return Err(err);
}

Ok(())
}
Expand Down
30 changes: 30 additions & 0 deletions node/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,35 @@ version = "0.3"
[dependencies.tracing]
version = "0.1"

[dev-dependencies.bytes]
version = "1"

[dev-dependencies.multiaddr]
version = "0.17"

[dev-dependencies.narwhal-types]
git = "https://github.com/eqlabs/bullshark-bft/"
package = "narwhal-types"

[dev-dependencies.snarkos-account]
path = "../../account"

[dev-dependencies.snarkos-node]
path = ".."
features = ["test"]

[dev-dependencies.snarkos-node-messages]
path = "../messages"

[dev-dependencies.tokio]
version = "1"
features = ["macros", "rt-multi-thread"]

[dev-dependencies.tonic]
version = "0.8"

[dev-dependencies.tracing-subscriber]
version = "0.3"

[dev-dependencies.tracing-test]
version = "0.2"
6 changes: 2 additions & 4 deletions node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,11 +507,9 @@ impl<N: Network, C: ConsensusStorage<N>> Consensus<N, C> {

/* Signature */

// Ensure the block is signed by an authorized beacon.
// Ensure the block is signed by an authorized entity.
let signer = block.signature().to_address();
if !self.beacons.read().contains_key(&signer) {
bail!("Block {} ({}) is signed by an unauthorized beacon ({})", block.height(), block.hash(), signer);
}
// TODO: check against the committee.

// Check the signature.
if !block.signature().verify(&signer, &[*block.hash()]) {
Expand Down
Loading

0 comments on commit db5383b

Please sign in to comment.