Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] #3090: Propagate on-chain parameters #3457

Merged
merged 2 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 20 additions & 25 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use iroha_core::{
prelude::{World, WorldStateView},
queue::Queue,
smartcontracts::isi::Registrable as _,
sumeragi::Sumeragi,
tx::{PeerId, TransactionValidator},
sumeragi::{SumeragiHandle, SumeragiStartArgs},
tx::PeerId,
IrohaNetwork,
};
use iroha_data_model::prelude::*;
Expand Down Expand Up @@ -91,7 +91,7 @@ pub struct Iroha {
/// Queue of transactions
pub queue: Arc<Queue>,
/// Sumeragi consensus
pub sumeragi: Arc<Sumeragi>,
pub sumeragi: SumeragiHandle,
/// Kura — block storage
pub kura: Arc<Kura>,
/// Torii web server
Expand All @@ -115,7 +115,7 @@ impl Drop for Iroha {
}

struct NetworkRelay {
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
block_sync: BlockSynchronizerHandle,
gossiper: TransactionGossiperHandle,
network: IrohaNetwork,
Expand Down Expand Up @@ -245,7 +245,7 @@ impl Iroha {
)?;
let wsv = WorldStateView::from_configuration(config.wsv, world, Arc::clone(&kura));

let transaction_validator = TransactionValidator::new(config.sumeragi.transaction_limits);
let transaction_validator = wsv.transaction_validator();

// Validate every transaction in genesis block
if let Some(ref genesis) = genesis {
Expand All @@ -271,25 +271,20 @@ impl Iroha {

let kura_thread_handler = Kura::start(Arc::clone(&kura));

let sumeragi = Arc::new(
// TODO: No function needs 10 parameters. It should accept one struct.
Sumeragi::new(
&config.sumeragi,
events_sender.clone(),
wsv,
transaction_validator,
Arc::clone(&queue),
Arc::clone(&kura),
network.clone(),
),
);

let sumeragi_thread_handler =
Sumeragi::initialize_and_start_thread(Arc::clone(&sumeragi), genesis, &block_hashes);
let sumeragi = SumeragiHandle::start(SumeragiStartArgs {
configuration: &config.sumeragi,
events_sender: events_sender.clone(),
wsv,
queue: Arc::clone(&queue),
kura: Arc::clone(&kura),
network: network.clone(),
genesis_network: genesis,
block_hashes: &block_hashes,
});

let block_sync = BlockSynchronizer::from_configuration(
&config.block_sync,
Arc::clone(&sumeragi),
sumeragi.clone(),
Arc::clone(&kura),
PeerId::new(&config.torii.p2p_addr, &config.public_key),
network.clone(),
Expand All @@ -300,14 +295,14 @@ impl Iroha {
&config.sumeragi,
network.clone(),
Arc::clone(&queue),
Arc::clone(&sumeragi),
sumeragi.clone(),
)
.start();

let freeze_status = Arc::new(AtomicBool::new(false));

NetworkRelay {
sumeragi: Arc::clone(&sumeragi),
sumeragi: sumeragi.clone(),
block_sync,
gossiper,
network: network.clone(),
Expand All @@ -322,7 +317,7 @@ impl Iroha {
Arc::clone(&queue),
events_sender,
Arc::clone(&notify_shutdown),
Arc::clone(&sumeragi),
sumeragi.clone(),
Arc::clone(&kura),
);

Expand All @@ -336,7 +331,7 @@ impl Iroha {
sumeragi,
kura,
torii,
thread_handlers: vec![sumeragi_thread_handler, kura_thread_handler],
thread_handlers: vec![kura_thread_handler],
#[cfg(debug_assertions)]
freeze_status,
})
Expand Down
2 changes: 1 addition & 1 deletion cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async fn main() -> Result<(), color_eyre::Report> {
.as_ref(),
)?,
Some(&config.genesis),
&config.sumeragi.transaction_limits,
&config.wsv.transaction_limits,
)
.wrap_err("Failed to initialize genesis.")?
} else {
Expand Down
4 changes: 2 additions & 2 deletions cli/src/torii/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use iroha_core::{
kura::Kura,
prelude::*,
queue::{self, Queue},
sumeragi::Sumeragi,
sumeragi::SumeragiHandle,
EventsSender,
};
use thiserror::Error;
Expand All @@ -38,7 +38,7 @@ pub struct Torii {
queue: Arc<Queue>,
events: EventsSender,
notify_shutdown: Arc<Notify>,
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
kura: Arc<Kura>,
}

Expand Down
59 changes: 27 additions & 32 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use iroha_config::{
torii::uri,
GetConfiguration, PostConfiguration,
};
use iroha_core::smartcontracts::isi::query::ValidQueryRequest;
use iroha_core::{smartcontracts::isi::query::ValidQueryRequest, sumeragi::SumeragiHandle};
use iroha_crypto::SignatureOf;
use iroha_data_model::{
block::{
Expand Down Expand Up @@ -101,19 +101,17 @@ impl TryFrom<SignedQuery> for VerifiedQuery {

#[iroha_futures::telemetry_future]
pub(crate) async fn handle_instructions(
iroha_cfg: Configuration,
queue: Arc<Queue>,
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
transaction: VersionedSignedTransaction,
) -> Result<Empty> {
let transaction: SignedTransaction = transaction.into_v1();
let transaction =
AcceptedTransaction::accept::<false>(transaction, &iroha_cfg.sumeragi.transaction_limits)
.map_err(Error::AcceptTransaction)?
.into();
#[allow(clippy::map_err_ignore)]
queue
.push(transaction, &sumeragi.wsv_mutex_access())
let transaction_limits = sumeragi.wsv(|wsv| wsv.config.borrow().transaction_limits);
let transaction = AcceptedTransaction::accept::<false>(transaction, &transaction_limits)
appetrosyan marked this conversation as resolved.
Show resolved Hide resolved
.map_err(Error::AcceptTransaction)?
.into();
sumeragi
.wsv(|wsv| queue.push(transaction, wsv))
.map_err(|queue::Failure { tx, err }| {
iroha_logger::warn!(
tx_hash=%tx.hash(), ?err,
Expand All @@ -128,7 +126,7 @@ pub(crate) async fn handle_instructions(

#[iroha_futures::telemetry_future]
pub(crate) async fn handle_queries(
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
pagination: Pagination,
sorting: Sorting,
request: VersionedSignedQuery,
Expand All @@ -137,7 +135,7 @@ pub(crate) async fn handle_queries(
let request: VerifiedQuery = request.try_into()?;

let (result, filter) = {
let wsv = sumeragi.wsv_mutex_access().clone();
let wsv = sumeragi.wsv(Clone::clone);
appetrosyan marked this conversation as resolved.
Show resolved Hide resolved
let (valid_request, filter) = request.validate(&wsv)?;
let original_result = valid_request.execute(&wsv)?;
(filter.filter(original_result), filter)
Expand Down Expand Up @@ -226,12 +224,12 @@ async fn handle_schema() -> Json {
#[iroha_futures::telemetry_future]
async fn handle_pending_transactions(
queue: Arc<Queue>,
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
pagination: Pagination,
) -> Result<Scale<VersionedPendingTransactions>> {
Ok(Scale(
queue
.all_transactions(&sumeragi.wsv_mutex_access())
sumeragi
.wsv(|wsv| queue.all_transactions(wsv))
.into_iter()
.map(VersionedAcceptedTransaction::into_v1)
.map(SignedTransaction::from)
Expand Down Expand Up @@ -412,21 +410,20 @@ mod subscription {

#[iroha_futures::telemetry_future]
#[cfg(feature = "telemetry")]
async fn handle_version(sumeragi: Arc<Sumeragi>) -> Json {
async fn handle_version(sumeragi: SumeragiHandle) -> Json {
use iroha_version::Version;

#[allow(clippy::expect_used)]
let string = sumeragi
.wsv_mutex_access()
.latest_block_ref()
.wsv(WorldStateView::latest_block_ref)
.expect("Genesis not applied. Nothing we can do. Solve the issue and rerun.")
.version()
.to_string();
reply::json(&string)
}

#[cfg(feature = "telemetry")]
fn handle_metrics(sumeragi: &Sumeragi) -> Result<String> {
fn handle_metrics(sumeragi: &SumeragiHandle) -> Result<String> {
if let Err(error) = sumeragi.update_metrics() {
iroha_logger::error!(%error, "Error while calling sumeragi::update_metrics.");
}
Expand All @@ -438,7 +435,7 @@ fn handle_metrics(sumeragi: &Sumeragi) -> Result<String> {

#[cfg(feature = "telemetry")]
#[allow(clippy::unnecessary_wraps)]
fn handle_status(sumeragi: &Sumeragi) -> Result<warp::reply::Json, Infallible> {
fn handle_status(sumeragi: &SumeragiHandle) -> Result<warp::reply::Json, Infallible> {
if let Err(error) = sumeragi.update_metrics() {
iroha_logger::error!(%error, "Error while calling `sumeragi::update_metrics`.");
}
Expand All @@ -448,7 +445,7 @@ fn handle_status(sumeragi: &Sumeragi) -> Result<warp::reply::Json, Infallible> {

#[cfg(feature = "telemetry")]
#[allow(clippy::unused_async)]
async fn handle_status_precise(sumeragi: Arc<Sumeragi>, segment: String) -> Result<Json> {
async fn handle_status_precise(sumeragi: SumeragiHandle, segment: String) -> Result<Json> {
if let Err(error) = sumeragi.update_metrics() {
iroha_logger::error!(%error, "Error while calling `sumeragi::update_metrics`.");
}
Expand Down Expand Up @@ -477,7 +474,7 @@ impl Torii {
queue: Arc<Queue>,
events: EventsSender,
notify_shutdown: Arc<Notify>,
sumeragi: Arc<Sumeragi>,
sumeragi: SumeragiHandle,
kura: Arc<Kura>,
) -> Self {
Self {
Expand All @@ -500,25 +497,23 @@ impl Torii {
let get_router_status_precise = endpoint2(
handle_status_precise,
status_path
.and(add_state!(self.sumeragi))
.and(add_state!(self.sumeragi.clone()))
Arjentix marked this conversation as resolved.
Show resolved Hide resolved
.and(warp::path::param()),
);
let get_router_status_bare =
status_path
.and(add_state!(self.sumeragi))
.and_then(|sumeragi: Arc<_>| async move {
.and(add_state!(self.sumeragi.clone()))
.and_then(|sumeragi| async move {
Ok::<_, Infallible>(WarpResult(handle_status(&sumeragi)))
});
let get_router_metrics = warp::path(uri::METRICS)
.and(add_state!(self.sumeragi))
.and_then(|sumeragi: Arc<_>| async move {
.and_then(|sumeragi| async move {
Ok::<_, Infallible>(WarpResult(handle_metrics(&sumeragi)))
});
let get_api_version = warp::path(uri::API_VERSION)
.and(add_state!(self.sumeragi))
.and_then(|sumeragi: Arc<_>| async {
Ok::<_, Infallible>(handle_version(sumeragi).await)
});
.and(add_state!(self.sumeragi.clone()))
.and_then(|sumeragi| async { Ok::<_, Infallible>(handle_version(sumeragi).await) });

warp::get()
.and(get_router_status_precise.or(get_router_status_bare))
Expand Down Expand Up @@ -555,10 +550,10 @@ impl Torii {
.and_then(|| async { Ok::<_, Infallible>(handle_schema().await) }));

let post_router = warp::post()
.and(endpoint4(
.and(endpoint3(
handle_instructions,
warp::path(uri::TRANSACTION)
.and(add_state!(self.iroha_cfg, self.queue, self.sumeragi))
.and(add_state!(self.queue, self.sumeragi))
.and(warp::body::content_length_limit(
self.iroha_cfg.torii.max_content_len.into(),
))
Expand Down
4 changes: 2 additions & 2 deletions client/benches/torii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn query_requests(criterion: &mut Criterion) {
)
.build(),
Some(&configuration.genesis),
&configuration.sumeragi.transaction_limits,
&configuration.wsv.transaction_limits,
)
.expect("genesis creation failed");

Expand Down Expand Up @@ -140,7 +140,7 @@ fn instruction_submits(criterion: &mut Criterion) {
)
.build(),
Some(&configuration.genesis),
&configuration.sumeragi.transaction_limits,
&configuration.wsv.transaction_limits,
)
.expect("failed to create genesis");
let builder = PeerBuilder::new()
Expand Down
19 changes: 13 additions & 6 deletions client/benches/tps/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::{

use eyre::{Result, WrapErr};
use iroha_client::client::Client;
use iroha_data_model::prelude::*;
use iroha_data_model::{
parameter::{default::MAX_TRANSACTIONS_IN_BLOCK, ParametersBuilder},
prelude::*,
};
use serde::Deserialize;
use test_network::*;

Expand Down Expand Up @@ -56,11 +59,16 @@ impl Config {
#[allow(clippy::expect_used, clippy::unwrap_in_result)]
pub fn measure(self) -> Result<Tps> {
// READY
let (_rt, network, _genesis_client) =
<Network>::start_test_with_runtime(self.peers, self.max_txs_per_block, None);
let (_rt, network, client) = <Network>::start_test_with_runtime(self.peers, None);
let clients = network.clients();
wait_for_genesis_committed(&clients, 0);

client.submit_blocking(
ParametersBuilder::new()
.add_parameter(MAX_TRANSACTIONS_IN_BLOCK, self.max_txs_per_block)?
.into_set_parameters(),
)?;

let unit_names = (UnitName::MIN..).take(self.peers as usize);
let units = clients
.into_iter()
Expand Down Expand Up @@ -108,15 +116,14 @@ impl Config {
handle.join().expect("Transaction submitter panicked");
}

let blocks_out_of_measure = 1 + MeasurerUnit::PREPARATION_BLOCKS_NUMBER * self.peers;
let blocks_out_of_measure = 2 + MeasurerUnit::PREPARATION_BLOCKS_NUMBER * self.peers;
let blocks_wsv = network
.genesis
.iroha
.as_ref()
.expect("Must be some")
.sumeragi
.wsv_mutex_access()
.clone();
.wsv(Clone::clone);
let mut blocks = blocks_wsv
.all_blocks_by_value()
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion client/examples/million_accounts_genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn main_genesis() {
true,
generate_genesis(1_000_000_u32),
Some(&configuration.genesis),
&configuration.sumeragi.transaction_limits,
&configuration.wsv.transaction_limits,
)
.expect("genesis creation failed");

Expand Down
Loading