Skip to content

Commit

Permalink
feat(sumeragi): dynamic commit time based on view change index
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <shanin1000@yandex.ru>
  • Loading branch information
Erigara authored and mversic committed Oct 8, 2024
1 parent a37d115 commit 220ef08
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 19 deletions.
2 changes: 1 addition & 1 deletion crates/iroha/tests/integration/triggers/time_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use iroha_test_samples::{gen_account_in, load_sample_wasm, ALICE_ID};
pub fn pipeline_time() -> Duration {
let default_parameters = SumeragiParameters::default();

default_parameters.pipeline_time()
default_parameters.pipeline_time(0, 0)
}

fn curr_time() -> core::time::Duration {
Expand Down
82 changes: 67 additions & 15 deletions crates/iroha_core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,23 @@ impl Sumeragi {
})
.ok()?;

let block_vc_index = match &block_msg {
match &block_msg {
BlockMessage::BlockCreated(bc) => {
Some(bc.block.header().view_change_index as usize)
if (bc.block.header().view_change_index as usize) < current_view_change_index {
trace!(
ty="BlockCreated",
block=%bc.block.hash(),
"Discarding message due to outdated view change index",
);
// ignore block_message
continue;
}
}
// Signed and Committed contain no block.
// Block sync updates are exempt from early pruning.
BlockMessage::BlockSigned(_)
| BlockMessage::BlockCommitted(_)
| BlockMessage::BlockSyncUpdate(_) => None,
};
if let Some(block_vc_index) = block_vc_index {
if block_vc_index < current_view_change_index {
// ignore block_message
continue;
}
| BlockMessage::BlockSyncUpdate(_) => {}
}
return Some(block_msg);
}
Expand Down Expand Up @@ -538,6 +540,13 @@ impl Sumeragi {
}
}
(BlockMessage::BlockCreated(block_created), Role::ValidatingPeer) => {
info!(
peer_id=%self.peer_id,
role=%self.role(),
block=%block_created.block.hash(),
"Block received"
);

let topology = &self
.topology
.is_consensus_required()
Expand All @@ -555,10 +564,23 @@ impl Sumeragi {
let msg = BlockSigned::from(&v_block.block);
self.broadcast_packet_to(msg, [topology.proxy_tail()]);

info!(
peer_id=%self.peer_id,
role=%self.role(),
block=%v_block.block.as_ref().hash(),
"Voted for the block"
);
*voting_block = Some(v_block);
}
}
(BlockMessage::BlockCreated(block_created), Role::ObservingPeer) => {
info!(
peer_id=%self.peer_id,
role=%self.role(),
block=%block_created.block.hash(),
"Block received"
);

let topology = &self
.topology
.is_consensus_required()
Expand All @@ -581,7 +603,7 @@ impl Sumeragi {
peer_id=%self.peer_id,
role=%self.role(),
block=%v_block.block.as_ref().hash(),
"Block signed and forwarded"
"Voted for the block"
);
}

Expand Down Expand Up @@ -619,6 +641,7 @@ impl Sumeragi {
info!(
peer_id=%self.peer_id,
role=%self.role(),
block=%hash,
"Received block signatures"
);

Expand Down Expand Up @@ -658,6 +681,7 @@ impl Sumeragi {
?actual_hash,
"Block hash mismatch"
);
*voting_block = Some(voted_block);
} else if let Err(err) =
voted_block.block.add_signature(signature, &self.topology)
{
Expand All @@ -667,6 +691,7 @@ impl Sumeragi {
?err,
"Signature not valid"
);
*voting_block = Some(voted_block);
} else {
*voting_block =
self.try_commit_block(voted_block, is_genesis_peer);
Expand Down Expand Up @@ -698,6 +723,12 @@ impl Sumeragi {
BlockMessage::BlockCommitted(BlockCommitted { hash, signatures }),
Role::Leader | Role::ValidatingPeer | Role::ObservingPeer,
) => {
info!(
peer_id=%self.peer_id,
role=%self.role(),
block=%hash,
"Received block committed",
);
if let Some(mut voted_block) = voting_block.take() {
let actual_hash = voted_block.block.as_ref().hash();

Expand Down Expand Up @@ -799,7 +830,10 @@ impl Sumeragi {
.world
.parameters()
.sumeragi
.pipeline_time();
.pipeline_time(
self.topology.view_change_index(),
self.topology.max_faults() + 1,
);

std::thread::sleep(pipeline_time * 2);
} else {
Expand Down Expand Up @@ -1000,7 +1034,10 @@ pub(crate) fn run(
let mut should_sleep = false;
let mut view_change_proof_chain = ProofChain::default();
// Duration after which a view change is suggested
let mut view_change_time = state.world.view().parameters().sumeragi.pipeline_time();
let mut view_change_time = state.world.view().parameters().sumeragi.pipeline_time(
sumeragi.topology.view_change_index(),
sumeragi.topology.max_faults() + 1,
);
// Instant when the previous view change or round happened.
let mut last_view_change_time = Instant::now();

Expand Down Expand Up @@ -1049,7 +1086,12 @@ pub(crate) fn run(

reset_state(
&sumeragi.peer_id,
state.world.view().parameters().sumeragi.pipeline_time(),
state
.world
.view()
.parameters()
.sumeragi
.pipeline_time(view_change_index, sumeragi.topology.max_faults() + 1),
view_change_index,
&mut sumeragi.was_commit,
&mut sumeragi.topology,
Expand Down Expand Up @@ -1154,12 +1196,22 @@ pub(crate) fn run(

// NOTE: View change must be periodically suggested until it is accepted.
// Must be initialized to pipeline time but can increase by chosen amount
view_change_time += state.world.view().parameters().sumeragi.pipeline_time();
view_change_time += state
.world
.view()
.parameters()
.sumeragi
.pipeline_time(view_change_index, sumeragi.topology.max_faults() + 1);
}

reset_state(
&sumeragi.peer_id,
state.world.view().parameters().sumeragi.pipeline_time(),
state
.world
.view()
.parameters()
.sumeragi
.pipeline_time(view_change_index, sumeragi.topology.max_faults() + 1),
view_change_index,
&mut sumeragi.was_commit,
&mut sumeragi.topology,
Expand Down
15 changes: 14 additions & 1 deletion crates/iroha_core/src/sumeragi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct SumeragiHandle {
impl SumeragiHandle {
/// Deposit a sumeragi control flow network message.
pub fn incoming_control_flow_message(&self, msg: ControlFlowMessage) {
trace!(ty = "ViewChangeProofChain", "Incoming message");
if let Err(error) = self.control_message_sender.try_send(msg) {
self.dropped_messages_metric.inc();

Expand All @@ -58,7 +59,19 @@ impl SumeragiHandle {

/// Deposit a sumeragi network message.
pub fn incoming_block_message(&self, msg: impl Into<BlockMessage>) {
if let Err(error) = self.message_sender.try_send(msg.into()) {
let msg = msg.into();
let (ty, block) = match &msg {
BlockMessage::BlockCommitted(BlockCommitted { hash, .. }) => ("BlockCommitted", *hash),
BlockMessage::BlockCreated(BlockCreated { block }) => ("BlockCreated", block.hash()),
BlockMessage::BlockSigned(BlockSigned { hash, .. }) => ("BlockSigned", *hash),
BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }) => {
trace!(ty="BlockSyncUpdate", block=%block.hash(), "Incoming message");
("BlockSyncUpdate", block.hash())
}
};
trace!(ty, %block, "Incoming message");

if let Err(error) = self.message_sender.try_send(msg) {
self.dropped_messages_metric.inc();

error!(
Expand Down
11 changes: 9 additions & 2 deletions crates/iroha_data_model/src/parameter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,15 @@ impl SumeragiParameters {

/// Maximal amount of time it takes to commit a block
#[cfg(feature = "transparent_api")]
pub fn pipeline_time(&self) -> Duration {
self.block_time() + self.commit_time()
pub fn pipeline_time(&self, view_change_index: usize, shift: usize) -> Duration {
let shifted_view_change_index = view_change_index.saturating_sub(shift);
self.block_time().saturating_add(
self.commit_time().saturating_mul(
(shifted_view_change_index + 1)
.try_into()
.unwrap_or(u32::MAX),
),
)
}
}

Expand Down

0 comments on commit 220ef08

Please sign in to comment.