Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor validators #1022

Draft
wants to merge 9 commits into
base: development
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
refactor: refactor validators
stringhandler committed Apr 26, 2024

Verified

This commit was signed with the committer’s verified signature.
stringhandler stringhandler
commit f67caed3f5bbcb9eda90ca518fe1865870215127
3 changes: 2 additions & 1 deletion applications/tari_swarm_daemon/src/main.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ use std::{future::Future, pin::Pin};
use anyhow::{anyhow, Context};
use tari_common::configuration::Network;
use tari_shutdown::Shutdown;
use tokio::{fs, signal::unix::SignalKind};
use tokio::fs;

use crate::{
cli::{Cli, Commands, InitArgs},
@@ -244,6 +244,7 @@ fn exit_signal() -> anyhow::Result<BoxFuture<()>> {

#[cfg(unix)]
fn unix_exit_signal() -> anyhow::Result<BoxFuture<()>> {
use tokio::signal::unix::SignalKind;
let mut sighup = tokio::signal::unix::signal(SignalKind::hangup())?;
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;

Original file line number Diff line number Diff line change
@@ -122,7 +122,7 @@ impl Instance {
#[cfg(target_family = "windows")]
async fn terminate_win(&mut self) -> anyhow::Result<()> {
// Should probably also implement a clean exit
self.child().kill().await?;
self.child_mut().kill().await?;
Ok(())
}
}
29 changes: 29 additions & 0 deletions dan_layer/consensus/src/block_validations.rs
Original file line number Diff line number Diff line change
@@ -11,6 +11,35 @@ use crate::{
traits::{ConsensusSpec, LeaderStrategy, VoteSignatureService},
};

pub async fn check_block<TConsensusSpec: ConsensusSpec>(
candidate_block: &Block,
epoch_manager: &TConsensusSpec::EpochManager,
config: &HotstuffConfig,
network: Network,

leader_strategy: &TConsensusSpec::LeaderStrategy,
vote_signing_service: &TConsensusSpec::SignatureService,
) -> Result<(), HotStuffError> {
check_base_layer_block_hash::<TConsensusSpec>(&candidate_block, &epoch_manager, &config).await?;
check_network(candidate_block, network)?;
check_hash_and_height(&candidate_block)?;
let committee_for_block = epoch_manager
.get_committee_by_validator_public_key(candidate_block.epoch(), candidate_block.proposed_by())
.await?;
// TODO: Check shard matches
// TODO: Check base layer height rules
// TODO: Check if dummy block is correct
// TODO: Why do we have processed and committed in this struct
check_proposed_by_leader::<TConsensusSpec::Addr, TConsensusSpec::LeaderStrategy>(
&leader_strategy,
&committee_for_block,
&candidate_block,
)?;
check_signature(&candidate_block)?;
check_quorum_certificate::<TConsensusSpec>(&candidate_block, &vote_signing_service, &epoch_manager).await?;
Ok(())
}

pub fn check_network(candidate_block: &Block, network: Network) -> Result<(), ProposalValidationError> {
if candidate_block.network() != network {
return Err(ProposalValidationError::InvalidNetwork {
33 changes: 24 additions & 9 deletions dan_layer/consensus/src/hotstuff/error.rs
Original file line number Diff line number Diff line change
@@ -67,27 +67,23 @@ pub enum HotStuffError {
},
#[error("Pacemaker channel dropped: {details}")]
PacemakerChannelDropped { details: String },
#[error(
"Bad new view message: HighQC height {high_qc_height}, received new height {received_new_height}: {details}"
)]
BadNewViewMessage {
high_qc_height: NodeHeight,
received_new_height: NodeHeight,
details: String,
},

#[error("BUG Invariant error occurred: {0}")]
InvariantError(String),
#[error("Sync error: {0}")]
SyncError(anyhow::Error),
#[error("Fallen behind: local_height={local_height}, qc_height={qc_height}")]
#[error("Fallen behind: local_height={local_height}, qc_height={qc_height}, detected during: {detected_at}")]
FallenBehind {
local_height: NodeHeight,
qc_height: NodeHeight,
detected_at: String,
},
#[error("Transaction executor error: {0}")]
TransactionExecutorError(String),
#[error("Invalid sync request: {details}")]
InvalidSyncRequest { details: String },
#[error("New view validation error: {0}")]
NewViewValidationError(#[from] NewViewValidationError),
}

impl From<EpochManagerError> for HotStuffError {
@@ -96,6 +92,25 @@ impl From<EpochManagerError> for HotStuffError {
}
}

#[derive(Debug, thiserror::Error)]
pub enum NewViewValidationError {
#[error(
"NEWVIEW for height less than the locked block, locked block: {locked_height} new height: {new_view_height}"
)]
NewViewHeightLessThanLockedBlock {
locked_height: NodeHeight,
new_view_height: NodeHeight,
},
#[error(
"Bad new view message: HighQC height {high_qc_height}, received new height {received_new_height}: {details}"
)]
BadNewViewMessage {
high_qc_height: NodeHeight,
received_new_height: NodeHeight,
details: String,
},
}

#[derive(Debug, thiserror::Error)]
pub enum ProposalValidationError {
#[error("Storage error: {0}")]
38 changes: 17 additions & 21 deletions dan_layer/consensus/src/hotstuff/on_inbound_message.rs
Original file line number Diff line number Diff line change
@@ -20,14 +20,7 @@ use tokio::{sync::mpsc, time};

use super::config::HotstuffConfig;
use crate::{
block_validations::{
check_base_layer_block_hash,
check_hash_and_height,
check_network,
check_proposed_by_leader,
check_quorum_certificate,
check_signature,
},
block_validations::check_block,
hotstuff::error::HotStuffError,
messages::{HotstuffMessage, ProposalMessage, RequestMissingTransactionsMessage},
traits::{ConsensusSpec, OutboundMessaging},
@@ -86,7 +79,8 @@ where TConsensusSpec: ConsensusSpec
self.process_local_proposal(current_height, msg).await?;
},
HotstuffMessage::ForeignProposal(ref proposal) => {
self.check_proposal(proposal.block.clone()).await?;
self.check_proposal_and_handle_missing_transactions(proposal.block.clone())
.await?;
self.report_message_ready(from, msg)?;
},
msg => {
@@ -111,17 +105,19 @@ where TConsensusSpec: ConsensusSpec
self.message_buffer.clear_buffer();
}

async fn check_proposal(&mut self, block: Block) -> Result<Option<Block>, HotStuffError> {
check_base_layer_block_hash::<TConsensusSpec>(&block, &self.epoch_manager, &self.config).await?;
check_network(&block, self.network)?;
check_hash_and_height(&block)?;
let committee_for_block = self
.epoch_manager
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by())
.await?;
check_proposed_by_leader(&self.leader_strategy, &committee_for_block, &block)?;
check_signature(&block)?;
check_quorum_certificate::<TConsensusSpec>(&block, &self.vote_signing_service, &self.epoch_manager).await?;
async fn check_proposal_and_handle_missing_transactions(
&mut self,
block: Block,
) -> Result<Option<Block>, HotStuffError> {
check_block::<TConsensusSpec>(
&block,
&self.epoch_manager,
&self.config,
self.network,
&self.leader_strategy,
&self.vote_signing_service,
)
.await?;
self.handle_missing_transactions(block).await
}

@@ -150,7 +146,7 @@ where TConsensusSpec: ConsensusSpec
// return Ok(());
// }

let Some(ready_block) = self.check_proposal(block).await? else {
let Some(ready_block) = self.check_proposal_and_handle_missing_transactions(block).await? else {
// Block not ready
return Ok(());
};
80 changes: 24 additions & 56 deletions dan_layer/consensus/src/hotstuff/on_receive_new_view.rs
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ use super::vote_receiver::VoteReceiver;
use crate::{
hotstuff::{common::calculate_last_dummy_block, error::HotStuffError, pacemaker_handle::PaceMakerHandle},
messages::NewViewMessage,
new_view_validations::check_new_view_message,
traits::{ConsensusSpec, LeaderStrategy},
};

@@ -76,6 +77,26 @@ where TConsensusSpec: ConsensusSpec

#[allow(clippy::too_many_lines)]
pub async fn handle(&mut self, from: TConsensusSpec::Addr, message: NewViewMessage) -> Result<(), HotStuffError> {
let local_committee = self.epoch_manager.get_local_committee(message.epoch).await?;
let local_committee_shard = self.epoch_manager.get_local_committee_shard(message.epoch).await?;
let locked = self.store.with_read_tx(|tx| LockedBlock::get(tx))?;
match check_new_view_message::<TConsensusSpec>(
&message,
&self.epoch_manager,
&locked,
&self.leader_strategy,
&local_committee,
&local_committee_shard,
)
.await
{
Ok(()) => {},
Err(e) => {
warn!(target: LOG_TARGET, "❌ Ignoring NEW_VIEW message because it failed validation: {}", e);
return Ok(());
},
}

let NewViewMessage {
high_qc,
new_height,
@@ -90,29 +111,6 @@ where TConsensusSpec: ConsensusSpec
from
);

if !self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await? {
warn!(target: LOG_TARGET, "❌ Ignoring NEWVIEW for epoch {} because the epoch is invalid or we are not registered for that epoch", epoch);
return Ok(());
}

// TODO: This prevents syncing the blocks from previous epoch.
// if !self.epoch_manager.is_validator_in_local_committee(&from, epoch).await? {
// return Err(HotStuffError::ReceivedMessageFromNonCommitteeMember {
// epoch,
// sender: from.to_string(),
// context: format!("Received NEWVIEW from {}", from),
// });
// }

// We can never accept NEWVIEWS for heights that are lower than the locked block height
let locked = self.store.with_read_tx(|tx| LockedBlock::get(tx))?;
if new_height < locked.height() {
warn!(target: LOG_TARGET, "❌ Ignoring NEWVIEW for height less than the locked block, locked block: {} new height: {}", locked, new_height);
return Ok(());
}

self.validate_qc(&high_qc)?;

// Sync if we do not have the block for this valid QC
let exists = self
.store
@@ -126,23 +124,7 @@ where TConsensusSpec: ConsensusSpec
return Err(HotStuffError::FallenBehind {
local_height: leaf.height(),
qc_height: high_qc.block_height(),
});
}

let local_committee = self.epoch_manager.get_local_committee(epoch).await?;
let local_committee_shard = self.epoch_manager.get_local_committee_shard(epoch).await?;
let leader = self
.leader_strategy
.get_leader_for_next_block(&local_committee, new_height);
let our_node = self.epoch_manager.get_our_validator_node(epoch).await?;

if *leader != our_node.address {
warn!(target: LOG_TARGET, "❌ New View failed, leader is {} at height:{}", leader, new_height);
return Err(HotStuffError::NotTheLeader {
details: format!(
"Received NEWVIEW height {} but this not is not the leader for that height",
new_height
),
detected_at: "NEWVIEW".to_string(),
});
}

@@ -154,15 +136,6 @@ where TConsensusSpec: ConsensusSpec
self.vote_receiver.handle(from.clone(), vote, false).await?;
}

// Are nodes requesting to create more than the minimum number of dummy blocks?
if high_qc.block_height().saturating_sub(new_height).as_u64() > local_committee.len() as u64 {
return Err(HotStuffError::BadNewViewMessage {
details: format!("Validator {from} requested an invalid number of dummy blocks"),
high_qc_height: high_qc.block_height(),
received_new_height: new_height,
});
}

// Take note of unique NEWVIEWs so that we can count them
let newview_count = self.collect_new_views(from, new_height, &high_qc);

@@ -179,7 +152,7 @@ where TConsensusSpec: ConsensusSpec

let threshold = self.epoch_manager.get_local_threshold_for_epoch(epoch).await?;

info!(
debug!(
target: LOG_TARGET,
"🌟 Received NEWVIEW for height {} (QC: {}) has {} votes out of {}",
new_height,
@@ -190,7 +163,7 @@ where TConsensusSpec: ConsensusSpec
// Once we have received enough (quorum) NEWVIEWS, we can create the dummy block(s) and propose the next block.
// Any subsequent NEWVIEWs for this height/view are ignored.
if newview_count == threshold {
info!(target: LOG_TARGET, "🌟✅ NEWVIEW for block {} (high_qc: {}) has reached quorum ({}/{})", new_height, high_qc.as_high_qc(), newview_count, threshold);
debug!(target: LOG_TARGET, "🌟✅ NEWVIEW for block {} (high_qc: {}) has reached quorum ({}/{})", new_height, high_qc.as_high_qc(), newview_count, threshold);

let high_qc_block = self.store.with_read_tx(|tx| high_qc.get_block(tx))?;
// Determine how many missing blocks we must fill without actually creating them.
@@ -220,9 +193,4 @@ where TConsensusSpec: ConsensusSpec

Ok(())
}

fn validate_qc(&self, _qc: &QuorumCertificate) -> Result<(), HotStuffError> {
// TODO
Ok(())
}
}
1 change: 1 addition & 0 deletions dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
@@ -477,6 +477,7 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
return Err(HotStuffError::FallenBehind {
local_height,
qc_height,
detected_at: "DISPATCH_HOTSTUFF_MESSAGE".to_string(),
});
}
self.on_catch_up_sync(&from).await?;
1 change: 1 addition & 0 deletions dan_layer/consensus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -4,4 +4,5 @@
mod block_validations;
pub mod hotstuff;
pub mod messages;
mod new_view_validations;
pub mod traits;
Loading