From 68f0b2db0e92a62d2d9f22a960b00b3bce867480 Mon Sep 17 00:00:00 2001 From: Shanin Roman Date: Thu, 4 May 2023 11:37:32 +0300 Subject: [PATCH] [refactor]: Add `SumeragiHandle` Signed-off-by: Shanin Roman --- cli/src/lib.rs | 41 ++-- cli/src/torii/mod.rs | 4 +- cli/src/torii/routing.rs | 53 +++--- client/benches/tps/utils.rs | 3 +- core/src/block_sync.rs | 18 +- core/src/gossiper.rs | 18 +- core/src/lib.rs | 4 +- core/src/sumeragi/main_loop.rs | 334 +++++++++++++-------------------- core/src/sumeragi/mod.rs | 221 +++++++++++----------- telemetry/src/metrics.rs | 2 +- 10 files changed, 312 insertions(+), 386 deletions(-) diff --git a/cli/src/lib.rs b/cli/src/lib.rs index cfd60eb2b40..280c7a1bf2b 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -26,7 +26,7 @@ use iroha_core::{ prelude::{World, WorldStateView}, queue::Queue, smartcontracts::isi::Registrable as _, - sumeragi::Sumeragi, + sumeragi::SumeragiHandle, tx::{PeerId, TransactionValidator}, IrohaNetwork, }; @@ -91,7 +91,7 @@ pub struct Iroha { /// Queue of transactions pub queue: Arc, /// Sumeragi consensus - pub sumeragi: Arc, + pub sumeragi: SumeragiHandle, /// Kura — block storage pub kura: Arc, /// Torii web server @@ -115,7 +115,7 @@ impl Drop for Iroha { } struct NetworkRelay { - sumeragi: Arc, + sumeragi: SumeragiHandle, block_sync: BlockSynchronizerHandle, gossiper: TransactionGossiperHandle, network: IrohaNetwork, @@ -271,25 +271,22 @@ impl Iroha { let kura_thread_handler = Kura::start(Arc::clone(&kura)); - let sumeragi = Arc::new( - // TODO: No function needs 10 parameters. It should accept one struct. - Sumeragi::new( - &config.sumeragi, - events_sender.clone(), - wsv, - transaction_validator, - Arc::clone(&queue), - Arc::clone(&kura), - network.clone(), - ), + // TODO: No function needs 10 parameters. It should accept one struct. + let sumeragi = SumeragiHandle::start( + &config.sumeragi, + events_sender.clone(), + wsv, + transaction_validator, + Arc::clone(&queue), + Arc::clone(&kura), + network.clone(), + genesis, + &block_hashes, ); - let sumeragi_thread_handler = - Sumeragi::initialize_and_start_thread(Arc::clone(&sumeragi), genesis, &block_hashes); - let block_sync = BlockSynchronizer::from_configuration( &config.block_sync, - Arc::clone(&sumeragi), + sumeragi.clone(), Arc::clone(&kura), PeerId::new(&config.torii.p2p_addr, &config.public_key), network.clone(), @@ -300,14 +297,14 @@ impl Iroha { &config.sumeragi, network.clone(), Arc::clone(&queue), - Arc::clone(&sumeragi), + sumeragi.clone(), ) .start(); let freeze_status = Arc::new(AtomicBool::new(false)); NetworkRelay { - sumeragi: Arc::clone(&sumeragi), + sumeragi: sumeragi.clone(), block_sync, gossiper, network: network.clone(), @@ -322,7 +319,7 @@ impl Iroha { Arc::clone(&queue), events_sender, Arc::clone(¬ify_shutdown), - Arc::clone(&sumeragi), + sumeragi.clone(), Arc::clone(&kura), ); @@ -336,7 +333,7 @@ impl Iroha { sumeragi, kura, torii, - thread_handlers: vec![sumeragi_thread_handler, kura_thread_handler], + thread_handlers: vec![kura_thread_handler], #[cfg(debug_assertions)] freeze_status, }) diff --git a/cli/src/torii/mod.rs b/cli/src/torii/mod.rs index 777a6aa59f3..dd4089550de 100644 --- a/cli/src/torii/mod.rs +++ b/cli/src/torii/mod.rs @@ -14,7 +14,7 @@ use iroha_core::{ kura::Kura, prelude::*, queue::{self, Queue}, - sumeragi::Sumeragi, + sumeragi::SumeragiHandle, EventsSender, }; use thiserror::Error; @@ -38,7 +38,7 @@ pub struct Torii { queue: Arc, events: EventsSender, notify_shutdown: Arc, - sumeragi: Arc, + sumeragi: SumeragiHandle, kura: Arc, } diff --git a/cli/src/torii/routing.rs b/cli/src/torii/routing.rs index 89c75260eef..b16a4906758 100644 --- a/cli/src/torii/routing.rs +++ b/cli/src/torii/routing.rs @@ -16,7 +16,7 @@ use iroha_config::{ torii::uri, GetConfiguration, PostConfiguration, }; -use iroha_core::smartcontracts::isi::query::ValidQueryRequest; +use iroha_core::{smartcontracts::isi::query::ValidQueryRequest, sumeragi::SumeragiHandle}; use iroha_crypto::SignatureOf; use iroha_data_model::{ block::{ @@ -103,17 +103,15 @@ impl TryFrom for VerifiedQuery { pub(crate) async fn handle_instructions( iroha_cfg: Configuration, queue: Arc, - sumeragi: Arc, + sumeragi: SumeragiHandle, transaction: VersionedSignedTransaction, ) -> Result { let transaction: SignedTransaction = transaction.into_v1(); - let transaction = - AcceptedTransaction::accept::(transaction, &iroha_cfg.sumeragi.transaction_limits) - .map_err(Error::AcceptTransaction)? - .into(); - #[allow(clippy::map_err_ignore)] - queue - .push(transaction, &sumeragi.wsv_mutex_access()) + let transaction = AcceptedTransaction::accept::(transaction, &iroha_cfg.sumeragi.transaction_limits) + .map_err(Error::AcceptTransaction)? + .into(); + sumeragi + .wsv(|wsv| queue.push(transaction, wsv)) .map_err(|queue::Failure { tx, err }| { iroha_logger::warn!( tx_hash=%tx.hash(), ?err, @@ -128,7 +126,7 @@ pub(crate) async fn handle_instructions( #[iroha_futures::telemetry_future] pub(crate) async fn handle_queries( - sumeragi: Arc, + sumeragi: SumeragiHandle, pagination: Pagination, sorting: Sorting, request: VersionedSignedQuery, @@ -137,7 +135,7 @@ pub(crate) async fn handle_queries( let request: VerifiedQuery = request.try_into()?; let (result, filter) = { - let wsv = sumeragi.wsv_mutex_access().clone(); + let wsv = sumeragi.wsv(Clone::clone); let (valid_request, filter) = request.validate(&wsv)?; let original_result = valid_request.execute(&wsv)?; (filter.filter(original_result), filter) @@ -226,12 +224,12 @@ async fn handle_schema() -> Json { #[iroha_futures::telemetry_future] async fn handle_pending_transactions( queue: Arc, - sumeragi: Arc, + sumeragi: SumeragiHandle, pagination: Pagination, ) -> Result> { Ok(Scale( - queue - .all_transactions(&sumeragi.wsv_mutex_access()) + sumeragi + .wsv(|wsv| queue.all_transactions(wsv)) .into_iter() .map(VersionedAcceptedTransaction::into_v1) .map(SignedTransaction::from) @@ -412,13 +410,12 @@ mod subscription { #[iroha_futures::telemetry_future] #[cfg(feature = "telemetry")] -async fn handle_version(sumeragi: Arc) -> Json { +async fn handle_version(sumeragi: SumeragiHandle) -> Json { use iroha_version::Version; #[allow(clippy::expect_used)] let string = sumeragi - .wsv_mutex_access() - .latest_block_ref() + .wsv(WorldStateView::latest_block_ref) .expect("Genesis not applied. Nothing we can do. Solve the issue and rerun.") .version() .to_string(); @@ -426,7 +423,7 @@ async fn handle_version(sumeragi: Arc) -> Json { } #[cfg(feature = "telemetry")] -fn handle_metrics(sumeragi: &Sumeragi) -> Result { +fn handle_metrics(sumeragi: &SumeragiHandle) -> Result { if let Err(error) = sumeragi.update_metrics() { iroha_logger::error!(%error, "Error while calling sumeragi::update_metrics."); } @@ -438,7 +435,7 @@ fn handle_metrics(sumeragi: &Sumeragi) -> Result { #[cfg(feature = "telemetry")] #[allow(clippy::unnecessary_wraps)] -fn handle_status(sumeragi: &Sumeragi) -> Result { +fn handle_status(sumeragi: &SumeragiHandle) -> Result { if let Err(error) = sumeragi.update_metrics() { iroha_logger::error!(%error, "Error while calling `sumeragi::update_metrics`."); } @@ -448,7 +445,7 @@ fn handle_status(sumeragi: &Sumeragi) -> Result { #[cfg(feature = "telemetry")] #[allow(clippy::unused_async)] -async fn handle_status_precise(sumeragi: Arc, segment: String) -> Result { +async fn handle_status_precise(sumeragi: SumeragiHandle, segment: String) -> Result { if let Err(error) = sumeragi.update_metrics() { iroha_logger::error!(%error, "Error while calling `sumeragi::update_metrics`."); } @@ -477,7 +474,7 @@ impl Torii { queue: Arc, events: EventsSender, notify_shutdown: Arc, - sumeragi: Arc, + sumeragi: SumeragiHandle, kura: Arc, ) -> Self { Self { @@ -500,25 +497,23 @@ impl Torii { let get_router_status_precise = endpoint2( handle_status_precise, status_path - .and(add_state!(self.sumeragi)) + .and(add_state!(self.sumeragi.clone())) .and(warp::path::param()), ); let get_router_status_bare = status_path - .and(add_state!(self.sumeragi)) - .and_then(|sumeragi: Arc<_>| async move { + .and(add_state!(self.sumeragi.clone())) + .and_then(|sumeragi| async move { Ok::<_, Infallible>(WarpResult(handle_status(&sumeragi))) }); let get_router_metrics = warp::path(uri::METRICS) .and(add_state!(self.sumeragi)) - .and_then(|sumeragi: Arc<_>| async move { + .and_then(|sumeragi| async move { Ok::<_, Infallible>(WarpResult(handle_metrics(&sumeragi))) }); let get_api_version = warp::path(uri::API_VERSION) - .and(add_state!(self.sumeragi)) - .and_then(|sumeragi: Arc<_>| async { - Ok::<_, Infallible>(handle_version(sumeragi).await) - }); + .and(add_state!(self.sumeragi.clone())) + .and_then(|sumeragi| async { Ok::<_, Infallible>(handle_version(sumeragi).await) }); warp::get() .and(get_router_status_precise.or(get_router_status_bare)) diff --git a/client/benches/tps/utils.rs b/client/benches/tps/utils.rs index 98994235951..4c196634368 100644 --- a/client/benches/tps/utils.rs +++ b/client/benches/tps/utils.rs @@ -115,8 +115,7 @@ impl Config { .as_ref() .expect("Must be some") .sumeragi - .wsv_mutex_access() - .clone(); + .wsv(Clone::clone); let mut blocks = blocks_wsv .all_blocks_by_value() .into_iter() diff --git a/core/src/block_sync.rs b/core/src/block_sync.rs index 10f8e0c43a1..c913eedc586 100644 --- a/core/src/block_sync.rs +++ b/core/src/block_sync.rs @@ -16,7 +16,7 @@ use iroha_version::prelude::*; use parity_scale_codec::{Decode, Encode}; use tokio::sync::mpsc; -use crate::{kura::Kura, sumeragi::Sumeragi, IrohaNetwork, NetworkMessage}; +use crate::{kura::Kura, sumeragi::SumeragiHandle, IrohaNetwork, NetworkMessage}; /// [`BlockSynchronizer`] actor handle. #[derive(Clone)] @@ -37,9 +37,8 @@ impl BlockSynchronizerHandle { } /// Structure responsible for block synchronization between peers. -#[derive(Debug)] pub struct BlockSynchronizer { - sumeragi: Arc, + sumeragi: SumeragiHandle, kura: Arc, peer_id: PeerId, gossip_period: Duration, @@ -91,10 +90,9 @@ impl BlockSynchronizer { /// Sends request for latest blocks to a chosen peer async fn request_latest_blocks_from_peer(&mut self, peer_id: PeerId) { - let (latest_hash, previous_hash) = { - let wsv = self.sumeragi.wsv_mutex_access(); - (wsv.latest_block_hash(), wsv.previous_block_hash()) - }; + let (latest_hash, previous_hash) = self + .sumeragi + .wsv(|wsv| (wsv.latest_block_hash(), wsv.previous_block_hash())); message::Message::GetBlocksAfter(message::GetBlocksAfter::new( latest_hash, previous_hash, @@ -107,7 +105,7 @@ impl BlockSynchronizer { /// Create [`Self`] from [`Configuration`] pub fn from_configuration( config: &Configuration, - sumeragi: Arc, + sumeragi: SumeragiHandle, kura: Arc, peer_id: PeerId, network: IrohaNetwork, @@ -126,7 +124,7 @@ impl BlockSynchronizer { pub mod message { //! Module containing messages for [`BlockSynchronizer`](super::BlockSynchronizer). use super::*; - use crate::sumeragi::view_change::ProofChain; + use crate::{sumeragi::view_change::ProofChain, wsv::WorldStateView}; declare_versioned_with_scale!(VersionedMessage 1..2, Debug, Clone, iroha_macro::FromVariant); @@ -220,7 +218,7 @@ pub mod message { return; } let local_latest_block_hash = - block_sync.sumeragi.wsv_mutex_access().latest_block_hash(); + block_sync.sumeragi.wsv(WorldStateView::latest_block_hash); if *latest_hash == local_latest_block_hash || *previous_hash == local_latest_block_hash { diff --git a/core/src/gossiper.rs b/core/src/gossiper.rs index 0e20eed2161..d894f692448 100644 --- a/core/src/gossiper.rs +++ b/core/src/gossiper.rs @@ -11,7 +11,7 @@ use iroha_p2p::Broadcast; use parity_scale_codec::{Decode, Encode}; use tokio::sync::mpsc; -use crate::{queue::Queue, sumeragi::Sumeragi, IrohaNetwork, NetworkMessage}; +use crate::{queue::Queue, sumeragi::SumeragiHandle, IrohaNetwork, NetworkMessage}; /// [`Gossiper`] actor handle. #[derive(Clone)] @@ -42,7 +42,7 @@ pub struct TransactionGossiper { /// [`iroha_p2p::Network`] actor handle network: IrohaNetwork, /// Sumearagi - sumeragi: Arc, + sumeragi: SumeragiHandle, /// Limits that all transactions need to obey, in terms of size /// of WASM blob and number of instructions. transaction_limits: TransactionLimits, @@ -62,7 +62,7 @@ impl TransactionGossiper { configuartion: &Configuration, network: IrohaNetwork, queue: Arc, - sumeragi: Arc, + sumeragi: SumeragiHandle, ) -> Self { Self { queue, @@ -93,9 +93,10 @@ impl TransactionGossiper { } fn gossip_transactions(&self) { - let txs = self - .queue - .n_random_transactions(self.gossip_batch_size, &self.sumeragi.wsv_mutex_access()); + let txs = self.sumeragi.wsv(|wsv| { + self.queue + .n_random_transactions(self.gossip_batch_size, wsv) + }); if txs.is_empty() { iroha_logger::debug!("Nothing to gossip"); @@ -112,10 +113,7 @@ impl TransactionGossiper { iroha_logger::trace!(size = txs.len(), "Received new transaction gossip"); for tx in txs { match AcceptedTransaction::accept::(tx.into_v1(), &self.transaction_limits) { - Ok(tx) => match self - .queue - .push(tx.into(), &self.sumeragi.wsv_mutex_access()) - { + Ok(tx) => match self.sumeragi.wsv(|wsv| self.queue.push(tx.into(), wsv)) { Ok(_) => {} Err(crate::queue::Failure { tx, diff --git a/core/src/lib.rs b/core/src/lib.rs index 5311eee91b6..51f90412d93 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -80,7 +80,7 @@ pub mod handler { /// Call shutdown function and join thread on drop pub struct ThreadHandler { /// Shutdown function: after calling it, the thread must terminate in finite amount of time - shutdown: Option>, + shutdown: Option>, handle: Option>, } @@ -88,7 +88,7 @@ pub mod handler { /// [`Self`] constructor #[must_use] #[inline] - pub fn new(shutdown: Box, handle: JoinHandle<()>) -> Self { + pub fn new(shutdown: Box, handle: JoinHandle<()>) -> Self { Self { shutdown: Some(shutdown), handle: Some(handle), diff --git a/core/src/sumeragi/main_loop.rs b/core/src/sumeragi/main_loop.rs index 57d3da39e0e..9534ae94d4a 100644 --- a/core/src/sumeragi/main_loop.rs +++ b/core/src/sumeragi/main_loop.rs @@ -31,7 +31,7 @@ pub struct Sumeragi { /// An actor that sends events pub events_sender: EventsSender, /// The world state view instance that is used in public contexts - pub wsv: Mutex, + pub public_wsv: Arc>, /// Time by which a newly created block should be committed. Prevents malicious nodes /// from stalling the network by not participating in consensus pub commit_time: Duration, @@ -47,24 +47,10 @@ pub struct Sumeragi { /// [`iroha_p2p::Network`] actor address pub network: IrohaNetwork, /// Receiver channel. - // TODO: Mutex shouldn't be required and must be removed - pub message_receiver: Mutex>, + pub message_receiver: mpsc::Receiver, /// Only used in testing. Causes the genesis peer to withhold blocks when it /// is the proxy tail. pub debug_force_soft_fork: bool, -} - -impl Debug for Sumeragi { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("Sumeragi") - .field("public_key", &self.key_pair.public_key()) - .field("peer_id", &self.peer_id) - .finish() - } -} - -/// Internal structure that retains the state. -pub struct State { /// The view change index of latest [`VersionedCommittedBlock`] pub latest_block_view_change_index: u64, /// The hash of the latest [`VersionedCommittedBlock`] @@ -94,40 +80,16 @@ pub struct State { pub transaction_cache: Vec, } -impl Sumeragi { - #[allow(clippy::too_many_arguments)] - pub(crate) fn new( - configuration: &Configuration, - queue: Arc, - events_sender: EventsSender, - wsv: WorldStateView, - transaction_validator: TransactionValidator, - kura: Arc, - network: IrohaNetwork, - message_receiver: mpsc::Receiver, - ) -> Self { - #[cfg(debug_assertions)] - let soft_fork = configuration.debug_force_soft_fork; - #[cfg(not(debug_assertions))] - let soft_fork = false; - - Self { - key_pair: configuration.key_pair.clone(), - queue, - peer_id: configuration.peer_id.clone(), - events_sender, - wsv: Mutex::new(wsv), - commit_time: Duration::from_millis(configuration.commit_time_limit_ms), - block_time: Duration::from_millis(configuration.block_time_ms), - transaction_validator, - kura, - network, - message_receiver: Mutex::new(message_receiver), - max_txs_in_block: configuration.max_transactions_in_block as usize, - debug_force_soft_fork: soft_fork, - } +impl Debug for Sumeragi { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("Sumeragi") + .field("public_key", &self.key_pair.public_key()) + .field("peer_id", &self.peer_id) + .finish() } +} +impl Sumeragi { /// Send a sumeragi packet over the network to the specified `peer`. /// # Errors /// Fails if network sending fails @@ -186,19 +148,15 @@ impl Sumeragi { } #[allow(clippy::panic)] - fn receive_network_packet( - &self, - state: &State, - view_change_proof_chain: &mut ProofChain, - ) -> Option { - let current_topology = &state.current_topology; - match self.message_receiver.lock().try_recv() { + fn receive_network_packet(&self, view_change_proof_chain: &mut ProofChain) -> Option { + let current_topology = &self.current_topology; + match self.message_receiver.try_recv() { Ok(packet) => { if let Err(error) = view_change_proof_chain.merge( packet.view_change_proofs, ¤t_topology.sorted_peers, current_topology.max_faults(), - state.latest_block_hash, + self.latest_block_hash, ) { trace!(%error, "Failed to add proofs into view change proof chain") } @@ -216,13 +174,12 @@ impl Sumeragi { #[allow(clippy::panic, clippy::panic_in_result_fn)] fn init_listen_for_genesis( - &self, - state: &mut State, + &mut self, shutdown_receiver: &mut tokio::sync::oneshot::Receiver<()>, ) -> Result<(), EarlyReturn> { trace!("Listen for genesis"); assert!( - state.current_topology.is_consensus_required(), + self.current_topology.is_consensus_required(), "Only peer in network, yet required to receive genesis topology. This is a configuration error." ); loop { @@ -233,7 +190,7 @@ impl Sumeragi { })?; // we must connect to peers so that our block_sync can find us // the genesis block. - match self.message_receiver.lock().try_recv() { + match self.message_receiver.try_recv() { Ok(packet) => { let block = match packet.message { Message::BlockCreated(block_created) => { @@ -250,9 +207,9 @@ impl Sumeragi { let _enter = span.enter(); match block_created.validate_and_extract_block::( &self.transaction_validator, - state.wsv.clone(), - state.latest_block_hash, - state.latest_block_height, + self.wsv.clone(), + self.latest_block_hash, + self.latest_block_height, ) { Ok(block) => block, Err(error) => { @@ -268,9 +225,9 @@ impl Sumeragi { // Omit signature verification during genesis round match block_sync_update.validate_and_extract_block::( &self.transaction_validator, - state.wsv.clone(), - state.latest_block_hash, - state.latest_block_height, + self.wsv.clone(), + self.latest_block_hash, + self.latest_block_height, ) { Ok(block) => block, Err(error) => { @@ -286,7 +243,7 @@ impl Sumeragi { }; if block.header().is_genesis() { - commit_block(self, state, block); + commit_block(self, block); return Err(EarlyReturn::GenesisBlockReceivedAndCommitted); } debug!("Received a block that was not genesis."); @@ -298,59 +255,51 @@ impl Sumeragi { } } -fn commit_block(sumeragi: &Sumeragi, state: &mut State, block: impl Into) { +fn commit_block(sumeragi: &mut Sumeragi, block: impl Into) { let committed_block = block.into(); - state.finalized_wsv = state.wsv.clone(); - update_state(state, sumeragi, &committed_block); - state.previous_block_hash = state.latest_block_hash; + sumeragi.finalized_wsv = sumeragi.wsv.clone(); + update_state(sumeragi, &committed_block); + sumeragi.previous_block_hash = sumeragi.latest_block_hash; info!( addr=%sumeragi.peer_id.address, - role=%state.current_topology.role(&sumeragi.peer_id), - block_height=%state.latest_block_height, + role=%sumeragi.current_topology.role(&sumeragi.peer_id), + block_height=%sumeragi.latest_block_height, block_hash=%committed_block.hash(), "Committing block" ); - update_topology(state, sumeragi, &committed_block); + update_topology(sumeragi, &committed_block); sumeragi.kura.store_block(committed_block); - cache_transaction(state, sumeragi); + cache_transaction(sumeragi); } -fn replace_top_block( - sumeragi: &Sumeragi, - state: &mut State, - block: impl Into, -) { +fn replace_top_block(sumeragi: &mut Sumeragi, block: impl Into) { let committed_block = block.into(); - state.wsv = state.finalized_wsv.clone(); - update_state(state, sumeragi, &committed_block); + sumeragi.wsv = sumeragi.finalized_wsv.clone(); + update_state(sumeragi, &committed_block); // state.previous_block_hash stays the same. info!( addr=%sumeragi.peer_id.address, - role=%state.current_topology.role(&sumeragi.peer_id), - block_height=%state.latest_block_height, + role=%sumeragi.current_topology.role(&sumeragi.peer_id), + block_height=%sumeragi.latest_block_height, block_hash=%committed_block.hash(), "Replacing top block" ); - update_topology(state, sumeragi, &committed_block); + update_topology(sumeragi, &committed_block); sumeragi.kura.replace_top_block(committed_block); - cache_transaction(state, sumeragi) + cache_transaction(sumeragi) } -fn update_topology( - state: &mut State, - sumeragi: &Sumeragi, - committed_block: &VersionedCommittedBlock, -) { +fn update_topology(sumeragi: &mut Sumeragi, committed_block: &VersionedCommittedBlock) { let mut topology = Topology { sorted_peers: committed_block.header().committed_with_topology.clone(), }; @@ -363,52 +312,51 @@ fn update_topology( ); topology.rotate_set_a(); topology.update_peer_list( - &state + &sumeragi .wsv .peers_ids() .iter() .map(|id| id.clone()) .collect::>(), ); - state.current_topology = topology; - sumeragi.connect_peers(&state.current_topology); + sumeragi.current_topology = topology; + sumeragi.connect_peers(&sumeragi.current_topology); } -fn update_state(state: &mut State, sumeragi: &Sumeragi, committed_block: &VersionedCommittedBlock) { - state +fn update_state(sumeragi: &mut Sumeragi, committed_block: &VersionedCommittedBlock) { + sumeragi .wsv .apply(committed_block) .expect("Failed to apply block on WSV. Bailing."); - sumeragi.send_events(state.wsv.events_buffer.replace(Vec::new())); + sumeragi.send_events(sumeragi.wsv.events_buffer.replace(Vec::new())); // Update WSV copy that is public facing - *sumeragi.wsv.lock() = state.wsv.clone(); + *sumeragi.public_wsv.lock() = sumeragi.wsv.clone(); // This sends "Block committed" event, so it should be done // AFTER public facing WSV update sumeragi.send_events(committed_block); - state.latest_block_height = committed_block.header().height; - state.latest_block_hash = Some(committed_block.hash()); - state.latest_block_view_change_index = committed_block.header().view_change_index; + sumeragi.latest_block_height = committed_block.header().height; + sumeragi.latest_block_hash = Some(committed_block.hash()); + sumeragi.latest_block_view_change_index = committed_block.header().view_change_index; } -fn cache_transaction(state: &mut State, sumeragi: &Sumeragi) { - state.transaction_cache.retain(|tx| { - !tx.is_in_blockchain(&state.wsv) && !tx.is_expired(sumeragi.queue.tx_time_to_live) +fn cache_transaction(sumeragi: &mut Sumeragi) { + sumeragi.transaction_cache.retain(|tx| { + !tx.is_in_blockchain(&sumeragi.wsv) && !tx.is_expired(sumeragi.queue.tx_time_to_live) }); } fn suggest_view_change( sumeragi: &Sumeragi, - state: &State, view_change_proof_chain: &mut ProofChain, current_view_change_index: u64, ) { let suspect_proof = { let mut proof = Proof { - latest_block_hash: state.latest_block_hash, + latest_block_hash: sumeragi.latest_block_hash, view_change_index: current_view_change_index, signatures: Vec::new(), }; @@ -420,9 +368,9 @@ fn suggest_view_change( view_change_proof_chain .insert_proof( - &state.current_topology.sorted_peers, - state.current_topology.max_faults(), - state.latest_block_hash, + &sumeragi.current_topology.sorted_peers, + sumeragi.current_topology.max_faults(), + sumeragi.latest_block_hash, suspect_proof, ) .unwrap_or_else(|err| error!("{err}")); @@ -435,28 +383,27 @@ fn suggest_view_change( } fn prune_view_change_proofs_and_calculate_current_index( - state: &State, + sumeargi: &Sumeragi, view_change_proof_chain: &mut ProofChain, ) -> u64 { - view_change_proof_chain.prune(state.latest_block_hash); + view_change_proof_chain.prune(sumeargi.latest_block_hash); view_change_proof_chain.verify_with_state( - &state.current_topology.sorted_peers, - state.current_topology.max_faults(), - state.latest_block_hash, + &sumeargi.current_topology.sorted_peers, + sumeargi.current_topology.max_faults(), + sumeargi.latest_block_hash, ) as u64 } #[allow(clippy::too_many_lines)] fn handle_message( message: Message, - sumeragi: &Sumeragi, - state: &mut State, + sumeragi: &mut Sumeragi, voting_block: &mut Option, current_view_change_index: u64, view_change_proof_chain: &mut ProofChain, voting_signatures: &mut Vec>, ) { - let current_topology = &state.current_topology; + let current_topology = &sumeragi.current_topology; let role = current_topology.role(&sumeragi.peer_id); let addr = &sumeragi.peer_id.address; @@ -472,9 +419,9 @@ fn handle_message( .clone() .validate_and_extract_block::( &sumeragi.transaction_validator, - state.wsv.clone(), - state.latest_block_hash, - state.latest_block_height, + sumeragi.wsv.clone(), + sumeragi.latest_block_hash, + sumeragi.latest_block_height, ) .or_else(|_| /* If the block fails validation we must check again using the finaziled wsv. @@ -482,9 +429,9 @@ fn handle_message( wsv but not the current one. */ block_sync_update.validate_and_extract_block::( &sumeragi.transaction_validator, - state.finalized_wsv.clone(), - state.previous_block_hash, - state.latest_block_height.saturating_sub(1), + sumeragi.finalized_wsv.clone(), + sumeragi.previous_block_hash, + sumeragi.latest_block_height.saturating_sub(1), )) { Ok(block) => block, Err(error) => { @@ -493,42 +440,42 @@ fn handle_message( } }; - if state.previous_block_hash == block.header().previous_block_hash - && state.latest_block_height == block.header().height - && state.latest_block_hash != Some(block.hash()) - && state.latest_block_view_change_index < block.header().view_change_index + if sumeragi.previous_block_hash == block.header().previous_block_hash + && sumeragi.latest_block_height == block.header().height + && sumeragi.latest_block_hash != Some(block.hash()) + && sumeragi.latest_block_view_change_index < block.header().view_change_index { error!( %addr, %role, - peer_latest_block_hash=?state.latest_block_hash, - peer_latest_block_view_change_index=?state.latest_block_view_change_index, + peer_latest_block_hash=?sumeragi.latest_block_hash, + peer_latest_block_view_change_index=?sumeragi.latest_block_view_change_index, consensus_latest_block_hash=%block.hash(), consensus_latest_block_view_change_index=%block.header().view_change_index, "Soft fork occurred: peer in inconsistent state. Rolling back and replacing top block." ); - replace_top_block(sumeragi, state, block); + replace_top_block(sumeragi, block); return; } - if state.latest_block_hash != block.header().previous_block_hash { + if sumeragi.latest_block_hash != block.header().previous_block_hash { error!( %addr, %role, actual = ?block.header().previous_block_hash, - expected = ?state.latest_block_hash, + expected = ?sumeragi.latest_block_hash, "Mismatch between the actual and expected hashes of the latest block." ); return; } - if state.latest_block_height + 1 != block.header().height { + if sumeragi.latest_block_height + 1 != block.header().height { error!( %addr, %role, actual = block.header().height, - expected = state.latest_block_height + 1, + expected = sumeragi.latest_block_height + 1, "Mismatch between the actual and expected height of the block." ); return; } - commit_block(sumeragi, state, block); + commit_block(sumeragi, block); } (Message::BlockCommitted(BlockCommitted { hash, signatures }), _) => { if role == Role::ProxyTail && current_topology.is_consensus_required() @@ -545,7 +492,7 @@ fn handle_message( add_signatures::(&mut voted_block, signatures.transmute()); match voted_block.block.commit(current_topology) { - Ok(committed_block) => commit_block(sumeragi, state, committed_block), + Ok(committed_block) => commit_block(sumeragi, committed_block), Err((_, err)) => { error!(%addr, %role, %hash, ?err, "Block failed to be committed") } @@ -563,7 +510,7 @@ fn handle_message( } } (Message::BlockCreated(block_created), Role::ValidatingPeer) => { - if let Some(block) = vote_for_block(sumeragi, state, block_created) { + if let Some(block) = vote_for_block(sumeragi, block_created) { let block_hash = block.block.hash(); let msg = MessagePacket::new( @@ -578,7 +525,7 @@ fn handle_message( } } (Message::BlockCreated(block_created), Role::ObservingPeer) => { - if let Some(block) = vote_for_block(sumeragi, state, block_created) { + if let Some(block) = vote_for_block(sumeragi, block_created) { if current_view_change_index >= 1 { let block_hash = block.block.hash(); @@ -596,7 +543,7 @@ fn handle_message( (Message::BlockCreated(block_created), Role::ProxyTail) => { // NOTE: False positive from nursery #[allow(clippy::iter_with_drain)] - if let Some(mut new_block) = vote_for_block(sumeragi, state, block_created) { + if let Some(mut new_block) = vote_for_block(sumeragi, block_created) { // NOTE: Up until this point it was unknown which block is expected to be received, // therefore all the signatures (of any hash) were collected and will now be pruned add_signatures::(&mut new_block, voting_signatures.drain(..)); @@ -634,27 +581,26 @@ fn handle_message( } fn process_message_independent( - sumeragi: &Sumeragi, - state: &mut State, + sumeragi: &mut Sumeragi, voting_block: &mut Option, current_view_change_index: u64, view_change_proof_chain: &mut ProofChain, round_start_time: &Instant, is_genesis_peer: bool, ) { - let current_topology = &state.current_topology; + let current_topology = &sumeragi.current_topology; let role = current_topology.role(&sumeragi.peer_id); let addr = &sumeragi.peer_id.address; match role { Role::Leader => { if voting_block.is_none() { - let cache_full = state.transaction_cache.len() >= sumeragi.max_txs_in_block; + let cache_full = sumeragi.transaction_cache.len() >= sumeragi.max_txs_in_block; let deadline_reached = round_start_time.elapsed() > sumeragi.block_time; - let cache_non_empty = !state.transaction_cache.is_empty(); + let cache_non_empty = !sumeragi.transaction_cache.is_empty(); if cache_full || (deadline_reached && cache_non_empty) { - let transactions = state.transaction_cache.clone(); + let transactions = sumeragi.transaction_cache.clone(); info!(txns=%transactions.len(), "Creating block..."); // TODO: properly process triggers! @@ -662,13 +608,13 @@ fn process_message_independent( let new_block = BlockBuilder { transactions, event_recommendations, - height: state.latest_block_height + 1, - previous_block_hash: state.latest_block_hash, + height: sumeragi.latest_block_height + 1, + previous_block_hash: sumeragi.latest_block_hash, view_change_index: current_view_change_index, - committed_with_topology: state.current_topology.clone(), + committed_with_topology: sumeragi.current_topology.clone(), key_pair: sumeragi.key_pair.clone(), transaction_validator: &sumeragi.transaction_validator, - wsv: state.wsv.clone(), + wsv: sumeragi.wsv.clone(), } .build(); @@ -693,7 +639,7 @@ fn process_message_independent( ); sumeragi.broadcast_packet(msg); - commit_block(sumeragi, state, committed_block); + commit_block(sumeragi, committed_block); } Err(err) => error!(%addr, role=%Role::Leader, ?err), } @@ -727,7 +673,7 @@ fn process_message_independent( { sumeragi.broadcast_packet(msg); } - commit_block(sumeragi, state, committed_block); + commit_block(sumeragi, committed_block); } Err((block, err)) => { // Restore the current voting block and continue the round @@ -802,21 +748,22 @@ fn should_terminate(shutdown_receiver: &mut tokio::sync::oneshot::Receiver<()>) /// Execute the main loop of [`Sumeragi`] pub(crate) fn run( genesis_network: Option, - sumeragi: &Sumeragi, - mut state: State, + mut sumeragi: Sumeragi, mut shutdown_receiver: tokio::sync::oneshot::Receiver<()>, ) { // Connect peers with initial topology - sumeragi.connect_peers(&state.current_topology); + sumeragi.connect_peers(&sumeragi.current_topology); let span = span!(tracing::Level::TRACE, "genesis").entered(); - let is_genesis_peer = if state.latest_block_height == 0 || state.latest_block_hash.is_none() { + let is_genesis_peer = if sumeragi.latest_block_height == 0 + || sumeragi.latest_block_hash.is_none() + { if let Some(genesis_network) = genesis_network { - sumeragi_init_commit_genesis(sumeragi, &mut state, genesis_network); + sumeragi_init_commit_genesis(&mut sumeragi, genesis_network); true } else { sumeragi - .init_listen_for_genesis(&mut state, &mut shutdown_receiver) + .init_listen_for_genesis(&mut shutdown_receiver) .unwrap_or_else(|err| assert_ne!(EarlyReturn::Disconnected, err, "Disconnected")); false } @@ -826,10 +773,10 @@ pub(crate) fn run( span.exit(); // Assert initialization was done properly. - assert_eq!(state.latest_block_hash, state.wsv.latest_block_hash()); + assert_eq!(sumeragi.latest_block_hash, sumeragi.wsv.latest_block_hash()); trace!( me=%sumeragi.peer_id.public_key, - role_in_next_round=%state.current_topology.role(&sumeragi.peer_id), + role_in_next_round=%sumeragi.current_topology.role(&sumeragi.peer_id), "Finished sumeragi init.", ); @@ -857,7 +804,7 @@ pub(crate) fn run( let span_for_sumeragi_cycle = span!(Level::TRACE, "main_thread_cycle"); let _enter_for_sumeragi_cycle = span_for_sumeragi_cycle.enter(); - state + sumeragi .transaction_cache // Checking if transactions are in the blockchain is costly .retain(|tx| { @@ -870,12 +817,12 @@ pub(crate) fn run( let mut expired_transactions = Vec::new(); sumeragi.queue.get_transactions_for_block( - &state.wsv, + &sumeragi.wsv, sumeragi.max_txs_in_block, - &mut state.transaction_cache, + &mut sumeragi.transaction_cache, &mut expired_transactions, ); - debug!("Transaction cache: {:?}", state.transaction_cache); + debug!("Transaction cache: {:?}", sumeragi.transaction_cache); sumeragi.send_events( expired_transactions .iter() @@ -884,7 +831,7 @@ pub(crate) fn run( ); let current_view_change_index = prune_view_change_proofs_and_calculate_current_index( - &state, + &sumeragi, &mut view_change_proof_chain, ); @@ -893,9 +840,9 @@ pub(crate) fn run( sumeragi.pipeline_time(), current_view_change_index, &mut old_view_change_index, - state.latest_block_height, + sumeragi.latest_block_height, &mut old_latest_block_height, - &mut state.current_topology, + &mut sumeragi.current_topology, &mut voting_block, &mut voting_signatures, &mut round_start_time, @@ -903,9 +850,9 @@ pub(crate) fn run( &mut view_change_time, ); - let node_expects_block = !state.transaction_cache.is_empty(); + let node_expects_block = !sumeragi.transaction_cache.is_empty(); if node_expects_block && last_view_change_time.elapsed() > view_change_time { - let role = state.current_topology.role(&sumeragi.peer_id); + let role = sumeragi.current_topology.role(&sumeragi.peer_id); if let Some(VotingBlock { block, .. }) = voting_block.as_ref() { // NOTE: Suspecting the tail node because it hasn't yet committed a block produced by leader @@ -917,8 +864,7 @@ pub(crate) fn run( } suggest_view_change( - sumeragi, - &state, + &sumeragi, &mut view_change_proof_chain, current_view_change_index, ); @@ -929,7 +875,7 @@ pub(crate) fn run( } sumeragi - .receive_network_packet(&state, &mut view_change_proof_chain) + .receive_network_packet(&mut view_change_proof_chain) .map_or_else( || { should_sleep = true; @@ -937,8 +883,7 @@ pub(crate) fn run( |message| { handle_message( message, - sumeragi, - &mut state, + &mut sumeragi, &mut voting_block, current_view_change_index, &mut view_change_proof_chain, @@ -948,8 +893,7 @@ pub(crate) fn run( ); process_message_independent( - sumeragi, - &mut state, + &mut sumeragi, &mut voting_block, current_view_change_index, &mut view_change_proof_chain, @@ -990,14 +934,10 @@ fn expired_event(txn: &impl Transaction) -> Event { .into() } -fn vote_for_block( - sumeragi: &Sumeragi, - state: &State, - block_created: BlockCreated, -) -> Option { +fn vote_for_block(sumeragi: &Sumeragi, block_created: BlockCreated) -> Option { let block_hash = block_created.hash(); let addr = &sumeragi.peer_id.address; - let role = state.current_topology.role(&sumeragi.peer_id); + let role = sumeragi.current_topology.role(&sumeragi.peer_id); trace!(%addr, %role, block_hash=%block_hash, "Block received, voting..."); let mut block = { @@ -1006,9 +946,9 @@ fn vote_for_block( match block_created.validate_and_extract_block::( &sumeragi.transaction_validator, - state.wsv.clone(), - state.latest_block_hash, - state.latest_block_height, + sumeragi.wsv.clone(), + sumeragi.latest_block_hash, + sumeragi.latest_block_height, ) { Ok(block) => block, Err(err) => { @@ -1018,22 +958,22 @@ fn vote_for_block( } }; - if state + if sumeragi .current_topology .filter_signatures_by_roles(&[Role::Leader], block.retain_verified_signatures()) .is_empty() { error!( - %addr, %role, leader=%state.current_topology.leader().address, hash=%block.hash(), + %addr, %role, leader=%sumeragi.current_topology.leader().address, hash=%block.hash(), "The block is rejected as it is not signed by the leader." ); return None; } - if block.header.committed_with_topology != state.current_topology.sorted_peers { + if block.header.committed_with_topology != sumeragi.current_topology.sorted_peers { error!( - %addr, %role, block_topology=?block.header.committed_with_topology, my_topology=?state.current_topology, hash=%block.hash(), + %addr, %role, block_topology=?block.header.committed_with_topology, my_topology=?sumeragi.current_topology, hash=%block.hash(), "The block is rejected as because the topology field is incorrect." ); @@ -1048,17 +988,13 @@ fn vote_for_block( Some(VotingBlock::new(signed_block)) } -fn sumeragi_init_commit_genesis( - sumeragi: &Sumeragi, - state: &mut State, - genesis_network: GenesisNetwork, -) { +fn sumeragi_init_commit_genesis(sumeragi: &mut Sumeragi, genesis_network: GenesisNetwork) { std::thread::sleep(Duration::from_millis(250)); info!("Initializing iroha using the genesis block."); - assert_eq!(state.latest_block_height, 0); - assert_eq!(state.latest_block_hash, None); + assert_eq!(sumeragi.latest_block_height, 0); + assert_eq!(sumeragi.latest_block_hash, None); let transactions = genesis_network.transactions; // Don't start genesis round. Instead just commit the genesis block. @@ -1073,10 +1009,10 @@ fn sumeragi_init_commit_genesis( height: 1, previous_block_hash: None, view_change_index: 0, - committed_with_topology: state.current_topology.clone(), + committed_with_topology: sumeragi.current_topology.clone(), key_pair: sumeragi.key_pair.clone(), transaction_validator: &sumeragi.transaction_validator, - wsv: state.wsv.clone(), + wsv: sumeragi.wsv.clone(), } .build(); @@ -1084,7 +1020,7 @@ fn sumeragi_init_commit_genesis( info!(block_hash = %block.hash(), "Publishing genesis block."); info!( - role = ?state.current_topology.role(&sumeragi.peer_id), + role = ?sumeragi.current_topology.role(&sumeragi.peer_id), block_hash = %block.hash(), "Created a block to commit.", ); @@ -1093,7 +1029,7 @@ fn sumeragi_init_commit_genesis( let msg = MessagePacket::new(ProofChain::default(), BlockCreated::from(block.clone())); sumeragi.broadcast_packet(msg); // Omit signature verification during genesis round - commit_block(sumeragi, state, block.commit_unchecked()); + commit_block(sumeragi, block.commit_unchecked()); } } diff --git a/core/src/sumeragi/mod.rs b/core/src/sumeragi/mod.rs index fb6bbbdc223..742828454e6 100644 --- a/core/src/sumeragi/mod.rs +++ b/core/src/sumeragi/mod.rs @@ -28,8 +28,7 @@ pub mod message; pub mod network_topology; pub mod view_change; -use main_loop::State; -use parking_lot::{Mutex, MutexGuard}; +use parking_lot::Mutex; use self::{ message::{Message, *}, @@ -52,50 +51,23 @@ struct LastUpdateMetricsData { metric_tx_amounts_counter: u64, } -/// `Sumeragi` is the implementation of the consensus. -#[derive(Debug)] -pub struct Sumeragi { - internal: main_loop::Sumeragi, - config: Configuration, - metrics: Metrics, - last_update_metrics_mutex: Mutex, +/// Handle to `Sumeragi` actor +#[derive(Clone)] +pub struct SumeragiHandle { + public_wsv: Arc>, message_sender: mpsc::SyncSender, + metrics: Metrics, + last_update_metrics_mutex: Arc>, + network: IrohaNetwork, + kura: Arc, + queue: Arc, + _thread_handle: Arc, } -impl Sumeragi { - /// Construct [`Sumeragi`]. - #[allow(clippy::too_many_arguments, clippy::mutex_integer)] - pub fn new( - configuration: &Configuration, - events_sender: EventsSender, - wsv: WorldStateView, - transaction_validator: TransactionValidator, - queue: Arc, - kura: Arc, - network: IrohaNetwork, - ) -> Self { - let (message_sender, message_receiver) = mpsc::sync_channel(100); - - Self { - internal: main_loop::Sumeragi::new( - configuration, - queue, - events_sender, - wsv, - transaction_validator, - kura, - network, - message_receiver, - ), - message_sender, - config: configuration.clone(), - metrics: Metrics::default(), - last_update_metrics_mutex: Mutex::new(LastUpdateMetricsData { - block_height: 0, - metric_tx_amounts: 0.0_f64, - metric_tx_amounts_counter: 0, - }), - } +impl SumeragiHandle { + /// Pass closure inside and apply fn to [`WorldStateView`] + pub fn wsv(&self, f: impl FnOnce(&WorldStateView) -> T) -> T { + f(&self.public_wsv.lock()) } /// Update the metrics on the world state view. @@ -105,30 +77,43 @@ impl Sumeragi { /// /// # Panics /// - If either mutex is poisoned - #[allow( - clippy::expect_used, - clippy::unwrap_in_result, - clippy::cast_precision_loss, - clippy::float_arithmetic, - clippy::mutex_integer - )] + #[allow(clippy::cast_precision_loss)] pub fn update_metrics(&self) -> Result<()> { let online_peers_count: u64 = self - .internal .network .online_peers(std::collections::HashSet::len) .try_into() .expect("casting usize to u64"); - let wsv_guard = self.internal.wsv.lock(); + let ( + height, + domains, + genesis_timestamp, + metric_tx_amounts_counter, + metric_tx_amounts, + latest_block_view_change_index, + ) = self.wsv(|wsv| { + ( + wsv.height(), + // Not very nice to clone, but this way we don't hold lock + wsv.domains() + .iter() + .map(|domain_ref| (domain_ref.key().clone(), domain_ref.value().accounts.len())) + .collect::>(), + wsv.genesis_timestamp(), + wsv.metric_tx_amounts_counter.get(), + wsv.metric_tx_amounts.get(), + wsv.latest_block_view_change_index(), + ) + }); let mut last_guard = self.last_update_metrics_mutex.lock(); let start_index = last_guard.block_height; { let mut block_index = start_index; - while block_index < wsv_guard.height() { - let Some(block) = self.internal.kura.get_block_by_height(block_index + 1) else { + while block_index < height { + let Some(block) = self.kura.get_block_by_height(block_index + 1) else { break; }; block_index += 1; @@ -152,13 +137,11 @@ impl Sumeragi { last_guard.block_height = block_index; } - self.metrics.domains.set(wsv_guard.domains().len() as u64); + self.metrics.domains.set(domains.len() as u64); - let diff_count = - wsv_guard.metric_tx_amounts_counter.get() - last_guard.metric_tx_amounts_counter; - let diff_amount_per_count = (wsv_guard.metric_tx_amounts.get() - - last_guard.metric_tx_amounts) - / (diff_count as f64); + let diff_count = metric_tx_amounts_counter - last_guard.metric_tx_amounts_counter; + let diff_amount_per_count = + (metric_tx_amounts - last_guard.metric_tx_amounts) / (diff_count as f64); for _ in 0..diff_count { last_guard.metric_tx_amounts_counter += 1; last_guard.metric_tx_amounts += diff_amount_per_count; @@ -167,30 +150,28 @@ impl Sumeragi { } #[allow(clippy::cast_possible_truncation)] - if let Some(timestamp) = wsv_guard.genesis_timestamp() { + if let Some(timestamp) = genesis_timestamp { // this will overflow in 584942417years. self.metrics .uptime_since_genesis_ms .set((current_time().as_millis() - timestamp) as u64) }; - let domains = wsv_guard.domains(); + self.metrics.domains.set(domains.len() as u64); self.metrics.connected_peers.set(online_peers_count); - for domain in domains { + for (domain_id, accounts_len) in domains { self.metrics .accounts - .get_metric_with_label_values(&[domain.id().name.as_ref()]) + .get_metric_with_label_values(&[domain_id.name.as_ref()]) .wrap_err("Failed to compose domains")? - .set(domain.accounts.len() as u64); + .set(accounts_len as u64); } self.metrics .view_changes - .set(wsv_guard.latest_block_view_change_index()); + .set(latest_block_view_change_index); - self.metrics - .queue_size - .set(self.internal.queue.tx_len() as u64); + self.metrics.queue_size.set(self.queue.tx_len() as u64); Ok(()) } @@ -200,29 +181,32 @@ impl Sumeragi { &self.metrics } - /// Access the world state view object in a locking fashion. - /// If you intend to do anything substantial you should clone - /// and release the lock. This is because no blocks can be produced - /// while this lock is held. - // TODO: Return result. + /// Deposit a sumeragi network message. #[allow(clippy::expect_used)] - pub fn wsv_mutex_access(&self) -> MutexGuard { - self.internal.wsv.lock() + pub fn incoming_message(&self, msg: MessagePacket) { + if let Err(error) = self.message_sender.try_send(msg) { + self.metrics.dropped_messages.inc(); + error!(?error, "This peer is faulty. Incoming messages have to be dropped due to low processing speed."); + } } - /// Start the sumeragi thread for this sumeragi instance. + /// Start [`Sumeragi`] actor and return handle to it. /// /// # Panics - /// - If either mutex is poisoned. - /// - If topology was built wrong (programmer error) - /// - Sumeragi thread failed to spawn. - #[allow(clippy::expect_used)] - pub fn initialize_and_start_thread( - sumeragi: Arc, + /// May panic if something is of during initialization which is bug. + #[allow(clippy::too_many_arguments)] + pub fn start( + configuration: &Configuration, + events_sender: EventsSender, + wsv: WorldStateView, + transaction_validator: TransactionValidator, + queue: Arc, + kura: Arc, + network: IrohaNetwork, genesis_network: Option, block_hashes: &[HashOf], - ) -> ThreadHandler { - let wsv = sumeragi.wsv_mutex_access().clone(); + ) -> SumeragiHandle { + let (message_sender, message_receiver) = mpsc::sync_channel(100); for (block_hash, i) in block_hashes .iter() @@ -230,7 +214,7 @@ impl Sumeragi { .zip(1u64..) { let block_height: u64 = i; - let block_ref = sumeragi.internal.kura.get_block_by_height(block_height).expect("Sumeragi could not load block that was reported as present. Please check that the block storage was not disconnected."); + let block_ref = kura.get_block_by_height(block_height).expect("Sumeragi could not load block that was reported as present. Please check that the block storage was not disconnected."); assert_eq!( block_ref.hash(), *block_hash, @@ -243,12 +227,11 @@ impl Sumeragi { let finalized_wsv = wsv.clone(); if !block_hashes.is_empty() { - let block_ref = sumeragi.internal.kura.get_block_by_height(block_hashes.len() as u64).expect("Sumeragi could not load block that was reported as present. Please check that the block storage was not disconnected."); + let block_ref = kura.get_block_by_height(block_hashes.len() as u64).expect("Sumeragi could not load block that was reported as present. Please check that the block storage was not disconnected."); wsv.apply(&block_ref) .expect("Failed to apply block to wsv in init."); } - *sumeragi.wsv_mutex_access() = wsv.clone(); info!("Sumeragi has finished loading blocks and setting up the WSV"); @@ -258,10 +241,10 @@ impl Sumeragi { let previous_block_hash = wsv.previous_block_hash(); let current_topology = if latest_block_height == 0 { - assert!(!sumeragi.config.trusted_peers.peers.is_empty()); - Topology::new(sumeragi.config.trusted_peers.peers.clone()) + assert!(!configuration.trusted_peers.peers.is_empty()); + Topology::new(configuration.trusted_peers.peers.clone()) } else { - let block_ref = sumeragi.internal.kura.get_block_by_height(latest_block_height).expect("Sumeragi could not load block that was reported as present. Please check that the block storage was not disconnected."); + let block_ref = kura.get_block_by_height(latest_block_height).expect("Sumeragi could not load block that was reported as present. Please check that the block storage was not disconnected."); let mut topology = Topology { sorted_peers: block_ref.header().committed_with_topology.clone(), }; @@ -269,11 +252,31 @@ impl Sumeragi { topology }; - let sumeragi_state_machine_data = State { - previous_block_hash, + let public_wsv = Arc::new(Mutex::new(wsv.clone())); + + #[cfg(debug_assertions)] + let debug_force_soft_fork = configuration.debug_force_soft_fork; + #[cfg(not(debug_assertions))] + let debug_force_soft_fork = false; + + let sumeragi = main_loop::Sumeragi { + key_pair: configuration.key_pair.clone(), + queue: Arc::clone(&queue), + peer_id: configuration.peer_id.clone(), + events_sender, + public_wsv: Arc::clone(&public_wsv), + commit_time: Duration::from_millis(configuration.commit_time_limit_ms), + block_time: Duration::from_millis(configuration.block_time_ms), + max_txs_in_block: configuration.max_transactions_in_block as usize, + transaction_validator, + kura: Arc::clone(&kura), + network: network.clone(), + message_receiver, + debug_force_soft_fork, + latest_block_view_change_index, latest_block_hash, + previous_block_hash, latest_block_height, - latest_block_view_change_index, current_topology, wsv, finalized_wsv, @@ -286,12 +289,7 @@ impl Sumeragi { let thread_handle = std::thread::Builder::new() .name("sumeragi thread".to_owned()) .spawn(move || { - main_loop::run( - genesis_network, - &sumeragi.internal, - sumeragi_state_machine_data, - shutdown_receiver, - ); + main_loop::run(genesis_network, sumeragi, shutdown_receiver); }) .expect("Sumeragi thread spawn should not fail."); @@ -299,15 +297,20 @@ impl Sumeragi { let _result = shutdown_sender.send(()); }; - ThreadHandler::new(Box::new(shutdown), thread_handle) - } - - /// Deposit a sumeragi network message. - #[allow(clippy::expect_used)] - pub fn incoming_message(&self, msg: MessagePacket) { - if let Err(error) = self.message_sender.try_send(msg) { - self.metrics.dropped_messages.inc(); - error!(?error, "This peer is faulty. Incoming messages have to be dropped due to low processing speed."); + let thread_handle = ThreadHandler::new(Box::new(shutdown), thread_handle); + SumeragiHandle { + network, + queue, + kura, + message_sender, + public_wsv, + metrics: Metrics::default(), + last_update_metrics_mutex: Arc::new(Mutex::new(LastUpdateMetricsData { + block_height: 0, + metric_tx_amounts: 0.0_f64, + metric_tx_amounts_counter: 0, + })), + _thread_handle: Arc::new(thread_handle), } } } diff --git a/telemetry/src/metrics.rs b/telemetry/src/metrics.rs index 94c4181b9c9..a1fa50b8a4e 100644 --- a/telemetry/src/metrics.rs +++ b/telemetry/src/metrics.rs @@ -57,7 +57,7 @@ impl> From<&T> for Status { } /// A strict superset of [`Status`]. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Metrics { /// Total number of transactions pub txs: IntCounterVec,