Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sumeragi): dynamic commit time based on view change index #4957

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iroha/tests/integration/triggers/time_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use iroha_test_samples::{gen_account_in, load_sample_wasm, ALICE_ID};
pub fn pipeline_time() -> Duration {
let default_parameters = SumeragiParameters::default();

default_parameters.pipeline_time()
default_parameters.pipeline_time(0, 0)
}

fn curr_time() -> core::time::Duration {
Expand Down
82 changes: 67 additions & 15 deletions crates/iroha_core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,23 @@ impl Sumeragi {
})
.ok()?;

let block_vc_index = match &block_msg {
match &block_msg {
BlockMessage::BlockCreated(bc) => {
Some(bc.block.header().view_change_index as usize)
if (bc.block.header().view_change_index as usize) < current_view_change_index {
trace!(
ty="BlockCreated",
block=%bc.block.hash(),
"Discarding message due to outdated view change index",
);
// ignore block_message
continue;
}
}
// Signed and Committed contain no block.
// Block sync updates are exempt from early pruning.
BlockMessage::BlockSigned(_)
| BlockMessage::BlockCommitted(_)
| BlockMessage::BlockSyncUpdate(_) => None,
};
if let Some(block_vc_index) = block_vc_index {
if block_vc_index < current_view_change_index {
// ignore block_message
continue;
}
| BlockMessage::BlockSyncUpdate(_) => {}
}
return Some(block_msg);
}
Expand Down Expand Up @@ -538,6 +540,13 @@ impl Sumeragi {
}
}
(BlockMessage::BlockCreated(block_created), Role::ValidatingPeer) => {
info!(
peer_id=%self.peer_id,
role=%self.role(),
block=%block_created.block.hash(),
"Block received"
);

let topology = &self
.topology
.is_consensus_required()
Expand All @@ -555,10 +564,23 @@ impl Sumeragi {
let msg = BlockSigned::from(&v_block.block);
self.broadcast_packet_to(msg, [topology.proxy_tail()]);

info!(
peer_id=%self.peer_id,
role=%self.role(),
block=%v_block.block.as_ref().hash(),
"Voted for the block"
);
*voting_block = Some(v_block);
}
}
(BlockMessage::BlockCreated(block_created), Role::ObservingPeer) => {
info!(
peer_id=%self.peer_id,
role=%self.role(),
block=%block_created.block.hash(),
"Block received"
);

let topology = &self
.topology
.is_consensus_required()
Expand All @@ -581,7 +603,7 @@ impl Sumeragi {
peer_id=%self.peer_id,
role=%self.role(),
block=%v_block.block.as_ref().hash(),
"Block signed and forwarded"
"Voted for the block"
);
}

Expand Down Expand Up @@ -619,6 +641,7 @@ impl Sumeragi {
info!(
peer_id=%self.peer_id,
role=%self.role(),
block=%hash,
"Received block signatures"
);

Expand Down Expand Up @@ -658,6 +681,7 @@ impl Sumeragi {
?actual_hash,
"Block hash mismatch"
);
*voting_block = Some(voted_block);
} else if let Err(err) =
voted_block.block.add_signature(signature, &self.topology)
{
Expand All @@ -667,6 +691,7 @@ impl Sumeragi {
?err,
"Signature not valid"
);
*voting_block = Some(voted_block);
} else {
*voting_block =
self.try_commit_block(voted_block, is_genesis_peer);
Expand Down Expand Up @@ -698,6 +723,12 @@ impl Sumeragi {
BlockMessage::BlockCommitted(BlockCommitted { hash, signatures }),
Role::Leader | Role::ValidatingPeer | Role::ObservingPeer,
) => {
info!(
peer_id=%self.peer_id,
role=%self.role(),
block=%hash,
"Received block committed",
);
if let Some(mut voted_block) = voting_block.take() {
let actual_hash = voted_block.block.as_ref().hash();

Expand Down Expand Up @@ -799,7 +830,10 @@ impl Sumeragi {
.world
.parameters()
.sumeragi
.pipeline_time();
.pipeline_time(
self.topology.view_change_index(),
self.topology.max_faults() + 1,
);

std::thread::sleep(pipeline_time * 2);
} else {
Expand Down Expand Up @@ -1000,7 +1034,10 @@ pub(crate) fn run(
let mut should_sleep = false;
let mut view_change_proof_chain = ProofChain::default();
// Duration after which a view change is suggested
let mut view_change_time = state.world.view().parameters().sumeragi.pipeline_time();
let mut view_change_time = state.world.view().parameters().sumeragi.pipeline_time(
sumeragi.topology.view_change_index(),
sumeragi.topology.max_faults() + 1,
);
// Instant when the previous view change or round happened.
let mut last_view_change_time = Instant::now();

Expand Down Expand Up @@ -1049,7 +1086,12 @@ pub(crate) fn run(

reset_state(
&sumeragi.peer_id,
state.world.view().parameters().sumeragi.pipeline_time(),
state
.world
.view()
.parameters()
.sumeragi
.pipeline_time(view_change_index, sumeragi.topology.max_faults() + 1),
view_change_index,
&mut sumeragi.was_commit,
&mut sumeragi.topology,
Expand Down Expand Up @@ -1154,12 +1196,22 @@ pub(crate) fn run(

// NOTE: View change must be periodically suggested until it is accepted.
// Must be initialized to pipeline time but can increase by chosen amount
view_change_time += state.world.view().parameters().sumeragi.pipeline_time();
view_change_time += state
.world
.view()
.parameters()
.sumeragi
.pipeline_time(view_change_index, sumeragi.topology.max_faults() + 1);
}

reset_state(
&sumeragi.peer_id,
state.world.view().parameters().sumeragi.pipeline_time(),
state
.world
.view()
.parameters()
.sumeragi
.pipeline_time(view_change_index, sumeragi.topology.max_faults() + 1),
view_change_index,
&mut sumeragi.was_commit,
&mut sumeragi.topology,
Expand Down
15 changes: 14 additions & 1 deletion crates/iroha_core/src/sumeragi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct SumeragiHandle {
impl SumeragiHandle {
/// Deposit a sumeragi control flow network message.
pub fn incoming_control_flow_message(&self, msg: ControlFlowMessage) {
trace!(ty = "ViewChangeProofChain", "Incoming message");
if let Err(error) = self.control_message_sender.try_send(msg) {
self.dropped_messages_metric.inc();

Expand All @@ -58,7 +59,19 @@ impl SumeragiHandle {

/// Deposit a sumeragi network message.
pub fn incoming_block_message(&self, msg: impl Into<BlockMessage>) {
if let Err(error) = self.message_sender.try_send(msg.into()) {
let msg = msg.into();
let (ty, block) = match &msg {
BlockMessage::BlockCommitted(BlockCommitted { hash, .. }) => ("BlockCommitted", *hash),
BlockMessage::BlockCreated(BlockCreated { block }) => ("BlockCreated", block.hash()),
BlockMessage::BlockSigned(BlockSigned { hash, .. }) => ("BlockSigned", *hash),
BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }) => {
trace!(ty="BlockSyncUpdate", block=%block.hash(), "Incoming message");
("BlockSyncUpdate", block.hash())
}
};
trace!(ty, %block, "Incoming message");

if let Err(error) = self.message_sender.try_send(msg) {
self.dropped_messages_metric.inc();

error!(
Expand Down
11 changes: 9 additions & 2 deletions crates/iroha_data_model/src/parameter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,15 @@ impl SumeragiParameters {

/// Maximal amount of time it takes to commit a block
#[cfg(feature = "transparent_api")]
pub fn pipeline_time(&self) -> Duration {
self.block_time() + self.commit_time()
pub fn pipeline_time(&self, view_change_index: usize, shift: usize) -> Duration {
let shifted_view_change_index = view_change_index.saturating_sub(shift);
self.block_time().saturating_add(
self.commit_time().saturating_mul(
(shifted_view_change_index + 1)
.try_into()
.unwrap_or(u32::MAX),
),
)
}
}

Expand Down
Loading