Skip to content

Commit

Permalink
[refactor]: Add SumeragiHandle
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <shanin1000@yandex.ru>
  • Loading branch information
Erigara committed May 10, 2023
1 parent cbc3bfe commit 68f0b2d
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 386 deletions.
41 changes: 19 additions & 22 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use iroha_core::{
prelude::{World, WorldStateView},
queue::Queue,
smartcontracts::isi::Registrable as _,
sumeragi::Sumeragi,
sumeragi::SumeragiHandle,
tx::{PeerId, TransactionValidator},
IrohaNetwork,
};
Expand Down Expand Up @@ -91,7 +91,7 @@ pub struct Iroha {
/// Queue of transactions
pub queue: Arc<Queue>,
/// Sumeragi consensus
pub sumeragi: Arc<Sumeragi>,
pub sumeragi: SumeragiHandle,
/// Kura — block storage
pub kura: Arc<Kura>,
/// Torii web server
Expand All @@ -115,7 +115,7 @@ impl Drop for Iroha {
}

struct NetworkRelay {
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
block_sync: BlockSynchronizerHandle,
gossiper: TransactionGossiperHandle,
network: IrohaNetwork,
Expand Down Expand Up @@ -271,25 +271,22 @@ impl Iroha {

let kura_thread_handler = Kura::start(Arc::clone(&kura));

let sumeragi = Arc::new(
// TODO: No function needs 10 parameters. It should accept one struct.
Sumeragi::new(
&config.sumeragi,
events_sender.clone(),
wsv,
transaction_validator,
Arc::clone(&queue),
Arc::clone(&kura),
network.clone(),
),
// TODO: No function needs 10 parameters. It should accept one struct.
let sumeragi = SumeragiHandle::start(
&config.sumeragi,
events_sender.clone(),
wsv,
transaction_validator,
Arc::clone(&queue),
Arc::clone(&kura),
network.clone(),
genesis,
&block_hashes,
);

let sumeragi_thread_handler =
Sumeragi::initialize_and_start_thread(Arc::clone(&sumeragi), genesis, &block_hashes);

let block_sync = BlockSynchronizer::from_configuration(
&config.block_sync,
Arc::clone(&sumeragi),
sumeragi.clone(),
Arc::clone(&kura),
PeerId::new(&config.torii.p2p_addr, &config.public_key),
network.clone(),
Expand All @@ -300,14 +297,14 @@ impl Iroha {
&config.sumeragi,
network.clone(),
Arc::clone(&queue),
Arc::clone(&sumeragi),
sumeragi.clone(),
)
.start();

let freeze_status = Arc::new(AtomicBool::new(false));

NetworkRelay {
sumeragi: Arc::clone(&sumeragi),
sumeragi: sumeragi.clone(),
block_sync,
gossiper,
network: network.clone(),
Expand All @@ -322,7 +319,7 @@ impl Iroha {
Arc::clone(&queue),
events_sender,
Arc::clone(&notify_shutdown),
Arc::clone(&sumeragi),
sumeragi.clone(),
Arc::clone(&kura),
);

Expand All @@ -336,7 +333,7 @@ impl Iroha {
sumeragi,
kura,
torii,
thread_handlers: vec![sumeragi_thread_handler, kura_thread_handler],
thread_handlers: vec![kura_thread_handler],
#[cfg(debug_assertions)]
freeze_status,
})
Expand Down
4 changes: 2 additions & 2 deletions cli/src/torii/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use iroha_core::{
kura::Kura,
prelude::*,
queue::{self, Queue},
sumeragi::Sumeragi,
sumeragi::SumeragiHandle,
EventsSender,
};
use thiserror::Error;
Expand All @@ -38,7 +38,7 @@ pub struct Torii {
queue: Arc<Queue>,
events: EventsSender,
notify_shutdown: Arc<Notify>,
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
kura: Arc<Kura>,
}

Expand Down
53 changes: 24 additions & 29 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use iroha_config::{
torii::uri,
GetConfiguration, PostConfiguration,
};
use iroha_core::smartcontracts::isi::query::ValidQueryRequest;
use iroha_core::{smartcontracts::isi::query::ValidQueryRequest, sumeragi::SumeragiHandle};
use iroha_crypto::SignatureOf;
use iroha_data_model::{
block::{
Expand Down Expand Up @@ -103,17 +103,15 @@ impl TryFrom<SignedQuery> for VerifiedQuery {
pub(crate) async fn handle_instructions(
iroha_cfg: Configuration,
queue: Arc<Queue>,
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
transaction: VersionedSignedTransaction,
) -> Result<Empty> {
let transaction: SignedTransaction = transaction.into_v1();
let transaction =
AcceptedTransaction::accept::<false>(transaction, &iroha_cfg.sumeragi.transaction_limits)
.map_err(Error::AcceptTransaction)?
.into();
#[allow(clippy::map_err_ignore)]
queue
.push(transaction, &sumeragi.wsv_mutex_access())
let transaction = AcceptedTransaction::accept::<false>(transaction, &iroha_cfg.sumeragi.transaction_limits)
.map_err(Error::AcceptTransaction)?
.into();
sumeragi
.wsv(|wsv| queue.push(transaction, wsv))
.map_err(|queue::Failure { tx, err }| {
iroha_logger::warn!(
tx_hash=%tx.hash(), ?err,
Expand All @@ -128,7 +126,7 @@ pub(crate) async fn handle_instructions(

#[iroha_futures::telemetry_future]
pub(crate) async fn handle_queries(
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
pagination: Pagination,
sorting: Sorting,
request: VersionedSignedQuery,
Expand All @@ -137,7 +135,7 @@ pub(crate) async fn handle_queries(
let request: VerifiedQuery = request.try_into()?;

let (result, filter) = {
let wsv = sumeragi.wsv_mutex_access().clone();
let wsv = sumeragi.wsv(Clone::clone);
let (valid_request, filter) = request.validate(&wsv)?;
let original_result = valid_request.execute(&wsv)?;
(filter.filter(original_result), filter)
Expand Down Expand Up @@ -226,12 +224,12 @@ async fn handle_schema() -> Json {
#[iroha_futures::telemetry_future]
async fn handle_pending_transactions(
queue: Arc<Queue>,
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
pagination: Pagination,
) -> Result<Scale<VersionedPendingTransactions>> {
Ok(Scale(
queue
.all_transactions(&sumeragi.wsv_mutex_access())
sumeragi
.wsv(|wsv| queue.all_transactions(wsv))
.into_iter()
.map(VersionedAcceptedTransaction::into_v1)
.map(SignedTransaction::from)
Expand Down Expand Up @@ -412,21 +410,20 @@ mod subscription {

#[iroha_futures::telemetry_future]
#[cfg(feature = "telemetry")]
async fn handle_version(sumeragi: Arc<Sumeragi>) -> Json {
async fn handle_version(sumeragi: SumeragiHandle) -> Json {
use iroha_version::Version;

#[allow(clippy::expect_used)]
let string = sumeragi
.wsv_mutex_access()
.latest_block_ref()
.wsv(WorldStateView::latest_block_ref)
.expect("Genesis not applied. Nothing we can do. Solve the issue and rerun.")
.version()
.to_string();
reply::json(&string)
}

#[cfg(feature = "telemetry")]
fn handle_metrics(sumeragi: &Sumeragi) -> Result<String> {
fn handle_metrics(sumeragi: &SumeragiHandle) -> Result<String> {
if let Err(error) = sumeragi.update_metrics() {
iroha_logger::error!(%error, "Error while calling sumeragi::update_metrics.");
}
Expand All @@ -438,7 +435,7 @@ fn handle_metrics(sumeragi: &Sumeragi) -> Result<String> {

#[cfg(feature = "telemetry")]
#[allow(clippy::unnecessary_wraps)]
fn handle_status(sumeragi: &Sumeragi) -> Result<warp::reply::Json, Infallible> {
fn handle_status(sumeragi: &SumeragiHandle) -> Result<warp::reply::Json, Infallible> {
if let Err(error) = sumeragi.update_metrics() {
iroha_logger::error!(%error, "Error while calling `sumeragi::update_metrics`.");
}
Expand All @@ -448,7 +445,7 @@ fn handle_status(sumeragi: &Sumeragi) -> Result<warp::reply::Json, Infallible> {

#[cfg(feature = "telemetry")]
#[allow(clippy::unused_async)]
async fn handle_status_precise(sumeragi: Arc<Sumeragi>, segment: String) -> Result<Json> {
async fn handle_status_precise(sumeragi: SumeragiHandle, segment: String) -> Result<Json> {
if let Err(error) = sumeragi.update_metrics() {
iroha_logger::error!(%error, "Error while calling `sumeragi::update_metrics`.");
}
Expand Down Expand Up @@ -477,7 +474,7 @@ impl Torii {
queue: Arc<Queue>,
events: EventsSender,
notify_shutdown: Arc<Notify>,
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
kura: Arc<Kura>,
) -> Self {
Self {
Expand All @@ -500,25 +497,23 @@ impl Torii {
let get_router_status_precise = endpoint2(
handle_status_precise,
status_path
.and(add_state!(self.sumeragi))
.and(add_state!(self.sumeragi.clone()))
.and(warp::path::param()),
);
let get_router_status_bare =
status_path
.and(add_state!(self.sumeragi))
.and_then(|sumeragi: Arc<_>| async move {
.and(add_state!(self.sumeragi.clone()))
.and_then(|sumeragi| async move {
Ok::<_, Infallible>(WarpResult(handle_status(&sumeragi)))
});
let get_router_metrics = warp::path(uri::METRICS)
.and(add_state!(self.sumeragi))
.and_then(|sumeragi: Arc<_>| async move {
.and_then(|sumeragi| async move {
Ok::<_, Infallible>(WarpResult(handle_metrics(&sumeragi)))
});
let get_api_version = warp::path(uri::API_VERSION)
.and(add_state!(self.sumeragi))
.and_then(|sumeragi: Arc<_>| async {
Ok::<_, Infallible>(handle_version(sumeragi).await)
});
.and(add_state!(self.sumeragi.clone()))
.and_then(|sumeragi| async { Ok::<_, Infallible>(handle_version(sumeragi).await) });

warp::get()
.and(get_router_status_precise.or(get_router_status_bare))
Expand Down
3 changes: 1 addition & 2 deletions client/benches/tps/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ impl Config {
.as_ref()
.expect("Must be some")
.sumeragi
.wsv_mutex_access()
.clone();
.wsv(Clone::clone);
let mut blocks = blocks_wsv
.all_blocks_by_value()
.into_iter()
Expand Down
18 changes: 8 additions & 10 deletions core/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use iroha_version::prelude::*;
use parity_scale_codec::{Decode, Encode};
use tokio::sync::mpsc;

use crate::{kura::Kura, sumeragi::Sumeragi, IrohaNetwork, NetworkMessage};
use crate::{kura::Kura, sumeragi::SumeragiHandle, IrohaNetwork, NetworkMessage};

/// [`BlockSynchronizer`] actor handle.
#[derive(Clone)]
Expand All @@ -37,9 +37,8 @@ impl BlockSynchronizerHandle {
}

/// Structure responsible for block synchronization between peers.
#[derive(Debug)]
pub struct BlockSynchronizer {
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
kura: Arc<Kura>,
peer_id: PeerId,
gossip_period: Duration,
Expand Down Expand Up @@ -91,10 +90,9 @@ impl BlockSynchronizer {

/// Sends request for latest blocks to a chosen peer
async fn request_latest_blocks_from_peer(&mut self, peer_id: PeerId) {
let (latest_hash, previous_hash) = {
let wsv = self.sumeragi.wsv_mutex_access();
(wsv.latest_block_hash(), wsv.previous_block_hash())
};
let (latest_hash, previous_hash) = self
.sumeragi
.wsv(|wsv| (wsv.latest_block_hash(), wsv.previous_block_hash()));
message::Message::GetBlocksAfter(message::GetBlocksAfter::new(
latest_hash,
previous_hash,
Expand All @@ -107,7 +105,7 @@ impl BlockSynchronizer {
/// Create [`Self`] from [`Configuration`]
pub fn from_configuration(
config: &Configuration,
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
kura: Arc<Kura>,
peer_id: PeerId,
network: IrohaNetwork,
Expand All @@ -126,7 +124,7 @@ impl BlockSynchronizer {
pub mod message {
//! Module containing messages for [`BlockSynchronizer`](super::BlockSynchronizer).
use super::*;
use crate::sumeragi::view_change::ProofChain;
use crate::{sumeragi::view_change::ProofChain, wsv::WorldStateView};

declare_versioned_with_scale!(VersionedMessage 1..2, Debug, Clone, iroha_macro::FromVariant);

Expand Down Expand Up @@ -220,7 +218,7 @@ pub mod message {
return;
}
let local_latest_block_hash =
block_sync.sumeragi.wsv_mutex_access().latest_block_hash();
block_sync.sumeragi.wsv(WorldStateView::latest_block_hash);
if *latest_hash == local_latest_block_hash
|| *previous_hash == local_latest_block_hash
{
Expand Down
18 changes: 8 additions & 10 deletions core/src/gossiper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use iroha_p2p::Broadcast;
use parity_scale_codec::{Decode, Encode};
use tokio::sync::mpsc;

use crate::{queue::Queue, sumeragi::Sumeragi, IrohaNetwork, NetworkMessage};
use crate::{queue::Queue, sumeragi::SumeragiHandle, IrohaNetwork, NetworkMessage};

/// [`Gossiper`] actor handle.
#[derive(Clone)]
Expand Down Expand Up @@ -42,7 +42,7 @@ pub struct TransactionGossiper {
/// [`iroha_p2p::Network`] actor handle
network: IrohaNetwork,
/// Sumearagi
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
/// Limits that all transactions need to obey, in terms of size
/// of WASM blob and number of instructions.
transaction_limits: TransactionLimits,
Expand All @@ -62,7 +62,7 @@ impl TransactionGossiper {
configuartion: &Configuration,
network: IrohaNetwork,
queue: Arc<Queue>,
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
) -> Self {
Self {
queue,
Expand Down Expand Up @@ -93,9 +93,10 @@ impl TransactionGossiper {
}

fn gossip_transactions(&self) {
let txs = self
.queue
.n_random_transactions(self.gossip_batch_size, &self.sumeragi.wsv_mutex_access());
let txs = self.sumeragi.wsv(|wsv| {
self.queue
.n_random_transactions(self.gossip_batch_size, wsv)
});

if txs.is_empty() {
iroha_logger::debug!("Nothing to gossip");
Expand All @@ -112,10 +113,7 @@ impl TransactionGossiper {
iroha_logger::trace!(size = txs.len(), "Received new transaction gossip");
for tx in txs {
match AcceptedTransaction::accept::<false>(tx.into_v1(), &self.transaction_limits) {
Ok(tx) => match self
.queue
.push(tx.into(), &self.sumeragi.wsv_mutex_access())
{
Ok(tx) => match self.sumeragi.wsv(|wsv| self.queue.push(tx.into(), wsv)) {
Ok(_) => {}
Err(crate::queue::Failure {
tx,
Expand Down
Loading

0 comments on commit 68f0b2d

Please sign in to comment.