Skip to content

Commit

Permalink
fix: Leader as transaction ordering service (#4967)
Browse files Browse the repository at this point in the history
Signed-off-by: Sam H. Smith <sam.henning.smith@protonmail.com>
  • Loading branch information
SamHSmith authored Oct 8, 2024
1 parent 5e45ade commit 242d857
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 78 deletions.
102 changes: 60 additions & 42 deletions crates/iroha_core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,31 @@ mod pending {
Self(Pending { transactions })
}

/// Create new BlockPayload
pub fn new_unverified(
prev_block: Option<&SignedBlock>,
view_change_index: usize,
transactions_a: Vec<AcceptedTransaction>,
consensus_estimation: Duration,
) -> BlockPayload {
let transactions = transactions_a
.into_iter()
.map(|tx| CommittedTransaction {
value: tx.clone().into(),
error: None,
})
.collect::<Vec<_>>();
BlockPayload {
header: Self::make_header(
prev_block,
view_change_index,
&transactions,
consensus_estimation,
),
transactions,
}
}

fn make_header(
prev_block: Option<&SignedBlock>,
view_change_index: usize,
Expand Down Expand Up @@ -264,7 +289,7 @@ mod chained {
pub struct Chained(pub(super) BlockPayload);

impl BlockBuilder<Chained> {
/// Sign this block and get [`SignedBlock`].
/// Sign this block as Leader and get [`SignedBlock`].
pub fn sign(self, private_key: &PrivateKey) -> WithEvents<ValidBlock> {
WithEvents::new(ValidBlock(self.0 .0.sign(private_key)))
}
Expand Down Expand Up @@ -415,7 +440,8 @@ mod valid {
Ok(())
}

/// Validate a block against the current state of the world.
/// Validate a block against the current state of the world. Individual transaction
/// errors will be updated.
///
/// # Errors
///
Expand All @@ -430,7 +456,7 @@ mod valid {
/// - Error during validation of individual transactions
/// - Transaction in the genesis block is not signed by the genesis public key
pub fn validate(
block: SignedBlock,
mut block: SignedBlock,
topology: &Topology,
expected_chain_id: &ChainId,
genesis_account: &AccountId,
Expand All @@ -442,9 +468,12 @@ mod valid {
return WithEvents::new(Err((block, error)));
}

if let Err(error) =
Self::validate_transactions(&block, expected_chain_id, genesis_account, state_block)
{
if let Err(error) = Self::validate_transactions(
&mut block,
expected_chain_id,
genesis_account,
state_block,
) {
return WithEvents::new(Err((block, error.into())));
}

Expand All @@ -456,7 +485,7 @@ mod valid {
/// * If block header is valid, `voting_block` will be released,
/// and transactions will be validated with write state
pub fn validate_keep_voting_block<'state>(
block: SignedBlock,
mut block: SignedBlock,
topology: &Topology,
expected_chain_id: &ChainId,
genesis_account: &AccountId,
Expand All @@ -480,7 +509,7 @@ mod valid {
};

if let Err(error) = Self::validate_transactions(
&block,
&mut block,
expected_chain_id,
genesis_account,
&mut state_block,
Expand Down Expand Up @@ -577,7 +606,7 @@ mod valid {
}

fn validate_transactions(
block: &SignedBlock,
block: &mut SignedBlock,
expected_chain_id: &ChainId,
genesis_account: &AccountId,
state_block: &mut StateBlock<'_>,
Expand All @@ -589,40 +618,29 @@ mod valid {
(params.sumeragi().max_clock_drift(), params.transaction)
};

block
.transactions()
// TODO: Unnecessary clone?
.cloned()
.try_for_each(|CommittedTransaction { value, error }| {
let tx = if is_genesis {
AcceptedTransaction::accept_genesis(
value,
expected_chain_id,
max_clock_drift,
genesis_account,
)
} else {
AcceptedTransaction::accept(
value,
expected_chain_id,
max_clock_drift,
tx_limits,
)
}?;

if error.is_some() {
match state_block.validate(tx) {
Err(rejected_transaction) => Ok(rejected_transaction),
Ok(_) => Err(TransactionValidationError::RejectedIsValid),
}?;
} else {
state_block
.validate(tx)
.map_err(|(_tx, error)| TransactionValidationError::NotValid(error))?;
}
for CommittedTransaction { value, error } in block.transactions_mut() {
let tx = if is_genesis {
AcceptedTransaction::accept_genesis(
value.clone(),
expected_chain_id,
max_clock_drift,
genesis_account,
)
} else {
AcceptedTransaction::accept(
value.clone(),
expected_chain_id,
max_clock_drift,
tx_limits,
)
}?;

Ok(())
})
*error = match state_block.validate(tx) {
Ok(_) => None,
Err((_tx, error)) => Some(Box::new(error)),
};
}
Ok(())
}

/// Add additional signature for [`Self`]
Expand Down
84 changes: 48 additions & 36 deletions crates/iroha_core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,6 @@ impl Sumeragi {
block=%block_created.block.hash(),
"Block received"
);

if let Some(mut valid_block) = self.validate_block(
state,
&self.topology,
Expand All @@ -605,7 +604,6 @@ impl Sumeragi {
) {
// NOTE: Up until this point it was unknown which block is expected to be received,
// therefore all the signatures (of any hash) were collected and will now be pruned

for signature in core::mem::take(voting_signatures) {
if let Err(error) =
valid_block.block.add_signature(signature, &self.topology)
Expand Down Expand Up @@ -827,6 +825,7 @@ impl Sumeragi {
fn try_create_block<'state>(
&mut self,
state: &'state State,
genesis_account: &AccountId,
voting_block: &mut Option<VotingBlock<'state>>,
) {
assert_eq!(self.role(), Role::Leader);
Expand All @@ -839,7 +838,11 @@ impl Sumeragi {
.max_transactions
.try_into()
.expect("INTERNAL BUG: transactions in block exceed usize::MAX");
let block_time = state.world.view().parameters.sumeragi.block_time();
let block_time = if self.topology.view_change_index() > 0 {
Duration::from_secs(0)
} else {
state.world.view().parameters.sumeragi.block_time()
};
let tx_cache_full = self.transaction_cache.len() >= max_transactions.get();
let deadline_reached = self.round_start_time.elapsed() > block_time;
let tx_cache_non_empty = !self.transaction_cache.is_empty();
Expand All @@ -851,47 +854,57 @@ impl Sumeragi {
.map(|tx| tx.deref().clone())
.collect::<Vec<_>>();

let mut state_block = state.block();
let create_block_start_time = Instant::now();
let new_block = BlockBuilder::new(transactions)
.chain(self.topology.view_change_index(), &mut state_block)
.sign(self.key_pair.private_key())
.unpack(|e| self.send_event(e));

let created_in = create_block_start_time.elapsed();
let pipeline_time = state.world.view().parameters().sumeragi.pipeline_time();
if created_in > pipeline_time / 2 {
warn!(
role=%self.role(),
peer_id=%self.peer_id,
"Creating block takes too much time. \
This might prevent consensus from operating. \
Consider increasing `commit_time` or decreasing `max_transactions_in_block`"
);
}
let pre_signed_block = BlockBuilder::new_unverified(
state.view().latest_block().as_deref(),
self.topology.view_change_index(),
transactions,
state
.view()
.world
.parameters()
.sumeragi
.consensus_estimation(),
)
.sign(self.key_pair.private_key());

let block_created_msg = BlockCreated {
block: pre_signed_block,
};
if self.topology.is_consensus_required().is_some() {
info!(
peer_id=%self.peer_id,
block=%new_block.as_ref().hash(),
view_change_index=%self.topology.view_change_index(),
txns=%new_block.as_ref().transactions().len(),
created_in_ms=%created_in.as_millis(),
"Block created"
);
self.broadcast_packet(block_created_msg.clone());
}

let msg = BlockCreated::from(&new_block);
*voting_block = Some(VotingBlock::new(new_block, state_block));
self.broadcast_packet(msg);
info!(
peer_id=%self.peer_id,
view_change_index=%self.topology.view_change_index(),
block_hash=%block_created_msg.block.hash(),
txns=%block_created_msg.block.transactions().len(),
"Block created"
);

let new_voting_block = self
.validate_block(
state,
&self.topology,
genesis_account,
block_created_msg,
voting_block,
)
.expect("We just created this block ourselves, it has to be valid.");

if self.topology.is_consensus_required().is_some() {
*voting_block = Some(new_voting_block);
} else {
let committed_block = new_block
let committed_block = new_voting_block
.block
.commit(&self.topology)
.unpack(|e| self.send_event(e))
.expect("INTERNAL BUG: Leader failed to commit created block");

let msg = BlockCommitted::from(&committed_block);
self.broadcast_packet(msg);
self.commit_block(committed_block, state_block);
self.commit_block(committed_block, new_voting_block.state_block);
*voting_block = None;
}
}
}
Expand Down Expand Up @@ -1009,7 +1022,6 @@ pub(crate) fn run(
let _enter_for_sumeragi_cycle = span_for_sumeragi_cycle.enter();

let state_view = state.view();

sumeragi
.transaction_cache
// Checking if transactions are in the blockchain is costly
Expand Down Expand Up @@ -1167,7 +1179,7 @@ pub(crate) fn run(
.set(sumeragi.topology.view_change_index() as u64);

if sumeragi.role() == Role::Leader && voting_block.is_none() {
sumeragi.try_create_block(&state, &mut voting_block);
sumeragi.try_create_block(&state, &genesis_account, &mut voting_block);
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/iroha_data_model/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ impl SignedBlock {
block.payload.transactions.iter()
}

/// Block transactions with mutable access
#[inline]
pub fn transactions_mut(&mut self) -> &mut [CommittedTransaction] {
let SignedBlock::V1(block) = self;
block.payload.transactions.as_mut_slice()
}

/// Signatures of peers which approved this block.
#[inline]
pub fn signatures(
Expand Down

0 comments on commit 242d857

Please sign in to comment.