From 220ef088924cf4af4d3cc75ab57cbd89a793edb8 Mon Sep 17 00:00:00 2001 From: Shanin Roman Date: Mon, 5 Aug 2024 18:35:00 +0700 Subject: [PATCH] feat(sumeragi): dynamic commit time based on view change index Signed-off-by: Shanin Roman --- .../integration/triggers/time_trigger.rs | 2 +- crates/iroha_core/src/sumeragi/main_loop.rs | 82 +++++++++++++++---- crates/iroha_core/src/sumeragi/mod.rs | 15 +++- crates/iroha_data_model/src/parameter.rs | 11 ++- 4 files changed, 91 insertions(+), 19 deletions(-) diff --git a/crates/iroha/tests/integration/triggers/time_trigger.rs b/crates/iroha/tests/integration/triggers/time_trigger.rs index bc64181297..c70778f176 100644 --- a/crates/iroha/tests/integration/triggers/time_trigger.rs +++ b/crates/iroha/tests/integration/triggers/time_trigger.rs @@ -18,7 +18,7 @@ use iroha_test_samples::{gen_account_in, load_sample_wasm, ALICE_ID}; pub fn pipeline_time() -> Duration { let default_parameters = SumeragiParameters::default(); - default_parameters.pipeline_time() + default_parameters.pipeline_time(0, 0) } fn curr_time() -> core::time::Duration { diff --git a/crates/iroha_core/src/sumeragi/main_loop.rs b/crates/iroha_core/src/sumeragi/main_loop.rs index 2dfb4341c6..d77bd70eeb 100644 --- a/crates/iroha_core/src/sumeragi/main_loop.rs +++ b/crates/iroha_core/src/sumeragi/main_loop.rs @@ -178,21 +178,23 @@ impl Sumeragi { }) .ok()?; - let block_vc_index = match &block_msg { + match &block_msg { BlockMessage::BlockCreated(bc) => { - Some(bc.block.header().view_change_index as usize) + if (bc.block.header().view_change_index as usize) < current_view_change_index { + trace!( + ty="BlockCreated", + block=%bc.block.hash(), + "Discarding message due to outdated view change index", + ); + // ignore block_message + continue; + } } // Signed and Committed contain no block. // Block sync updates are exempt from early pruning. BlockMessage::BlockSigned(_) | BlockMessage::BlockCommitted(_) - | BlockMessage::BlockSyncUpdate(_) => None, - }; - if let Some(block_vc_index) = block_vc_index { - if block_vc_index < current_view_change_index { - // ignore block_message - continue; - } + | BlockMessage::BlockSyncUpdate(_) => {} } return Some(block_msg); } @@ -538,6 +540,13 @@ impl Sumeragi { } } (BlockMessage::BlockCreated(block_created), Role::ValidatingPeer) => { + info!( + peer_id=%self.peer_id, + role=%self.role(), + block=%block_created.block.hash(), + "Block received" + ); + let topology = &self .topology .is_consensus_required() @@ -555,10 +564,23 @@ impl Sumeragi { let msg = BlockSigned::from(&v_block.block); self.broadcast_packet_to(msg, [topology.proxy_tail()]); + info!( + peer_id=%self.peer_id, + role=%self.role(), + block=%v_block.block.as_ref().hash(), + "Voted for the block" + ); *voting_block = Some(v_block); } } (BlockMessage::BlockCreated(block_created), Role::ObservingPeer) => { + info!( + peer_id=%self.peer_id, + role=%self.role(), + block=%block_created.block.hash(), + "Block received" + ); + let topology = &self .topology .is_consensus_required() @@ -581,7 +603,7 @@ impl Sumeragi { peer_id=%self.peer_id, role=%self.role(), block=%v_block.block.as_ref().hash(), - "Block signed and forwarded" + "Voted for the block" ); } @@ -619,6 +641,7 @@ impl Sumeragi { info!( peer_id=%self.peer_id, role=%self.role(), + block=%hash, "Received block signatures" ); @@ -658,6 +681,7 @@ impl Sumeragi { ?actual_hash, "Block hash mismatch" ); + *voting_block = Some(voted_block); } else if let Err(err) = voted_block.block.add_signature(signature, &self.topology) { @@ -667,6 +691,7 @@ impl Sumeragi { ?err, "Signature not valid" ); + *voting_block = Some(voted_block); } else { *voting_block = self.try_commit_block(voted_block, is_genesis_peer); @@ -698,6 +723,12 @@ impl Sumeragi { BlockMessage::BlockCommitted(BlockCommitted { hash, signatures }), Role::Leader | Role::ValidatingPeer | Role::ObservingPeer, ) => { + info!( + peer_id=%self.peer_id, + role=%self.role(), + block=%hash, + "Received block committed", + ); if let Some(mut voted_block) = voting_block.take() { let actual_hash = voted_block.block.as_ref().hash(); @@ -799,7 +830,10 @@ impl Sumeragi { .world .parameters() .sumeragi - .pipeline_time(); + .pipeline_time( + self.topology.view_change_index(), + self.topology.max_faults() + 1, + ); std::thread::sleep(pipeline_time * 2); } else { @@ -1000,7 +1034,10 @@ pub(crate) fn run( let mut should_sleep = false; let mut view_change_proof_chain = ProofChain::default(); // Duration after which a view change is suggested - let mut view_change_time = state.world.view().parameters().sumeragi.pipeline_time(); + let mut view_change_time = state.world.view().parameters().sumeragi.pipeline_time( + sumeragi.topology.view_change_index(), + sumeragi.topology.max_faults() + 1, + ); // Instant when the previous view change or round happened. let mut last_view_change_time = Instant::now(); @@ -1049,7 +1086,12 @@ pub(crate) fn run( reset_state( &sumeragi.peer_id, - state.world.view().parameters().sumeragi.pipeline_time(), + state + .world + .view() + .parameters() + .sumeragi + .pipeline_time(view_change_index, sumeragi.topology.max_faults() + 1), view_change_index, &mut sumeragi.was_commit, &mut sumeragi.topology, @@ -1154,12 +1196,22 @@ pub(crate) fn run( // NOTE: View change must be periodically suggested until it is accepted. // Must be initialized to pipeline time but can increase by chosen amount - view_change_time += state.world.view().parameters().sumeragi.pipeline_time(); + view_change_time += state + .world + .view() + .parameters() + .sumeragi + .pipeline_time(view_change_index, sumeragi.topology.max_faults() + 1); } reset_state( &sumeragi.peer_id, - state.world.view().parameters().sumeragi.pipeline_time(), + state + .world + .view() + .parameters() + .sumeragi + .pipeline_time(view_change_index, sumeragi.topology.max_faults() + 1), view_change_index, &mut sumeragi.was_commit, &mut sumeragi.topology, diff --git a/crates/iroha_core/src/sumeragi/mod.rs b/crates/iroha_core/src/sumeragi/mod.rs index 25797b20d8..568ba67da4 100644 --- a/crates/iroha_core/src/sumeragi/mod.rs +++ b/crates/iroha_core/src/sumeragi/mod.rs @@ -44,6 +44,7 @@ pub struct SumeragiHandle { impl SumeragiHandle { /// Deposit a sumeragi control flow network message. pub fn incoming_control_flow_message(&self, msg: ControlFlowMessage) { + trace!(ty = "ViewChangeProofChain", "Incoming message"); if let Err(error) = self.control_message_sender.try_send(msg) { self.dropped_messages_metric.inc(); @@ -58,7 +59,19 @@ impl SumeragiHandle { /// Deposit a sumeragi network message. pub fn incoming_block_message(&self, msg: impl Into) { - if let Err(error) = self.message_sender.try_send(msg.into()) { + let msg = msg.into(); + let (ty, block) = match &msg { + BlockMessage::BlockCommitted(BlockCommitted { hash, .. }) => ("BlockCommitted", *hash), + BlockMessage::BlockCreated(BlockCreated { block }) => ("BlockCreated", block.hash()), + BlockMessage::BlockSigned(BlockSigned { hash, .. }) => ("BlockSigned", *hash), + BlockMessage::BlockSyncUpdate(BlockSyncUpdate { block }) => { + trace!(ty="BlockSyncUpdate", block=%block.hash(), "Incoming message"); + ("BlockSyncUpdate", block.hash()) + } + }; + trace!(ty, %block, "Incoming message"); + + if let Err(error) = self.message_sender.try_send(msg) { self.dropped_messages_metric.inc(); error!( diff --git a/crates/iroha_data_model/src/parameter.rs b/crates/iroha_data_model/src/parameter.rs index 42c37f9fa2..320bb86ae6 100644 --- a/crates/iroha_data_model/src/parameter.rs +++ b/crates/iroha_data_model/src/parameter.rs @@ -347,8 +347,15 @@ impl SumeragiParameters { /// Maximal amount of time it takes to commit a block #[cfg(feature = "transparent_api")] - pub fn pipeline_time(&self) -> Duration { - self.block_time() + self.commit_time() + pub fn pipeline_time(&self, view_change_index: usize, shift: usize) -> Duration { + let shifted_view_change_index = view_change_index.saturating_sub(shift); + self.block_time().saturating_add( + self.commit_time().saturating_mul( + (shifted_view_change_index + 1) + .try_into() + .unwrap_or(u32::MAX), + ), + ) } }