Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] #3929: Lazy queries inside wasm #3958

Merged
merged 2 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ tempfile = { workspace = true }
dashmap = { workspace = true }

thread-local-panic-hook = { version = "0.1.0", optional = true }
uuid = { version = "1.4.1", features = ["v4"] }

[dev-dependencies]
serial_test = "2.0.0"
Expand Down
29 changes: 23 additions & 6 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use iroha_core::{
handler::ThreadHandler,
kura::Kura,
prelude::{World, WorldStateView},
query::store::LiveQueryStore,
queue::Queue,
smartcontracts::isi::Registrable as _,
snapshot::{try_read_snapshot, SnapshotMaker, SnapshotMakerHandle},
Expand Down Expand Up @@ -92,12 +93,13 @@ pub struct Iroha {
pub kura: Arc<Kura>,
/// Torii web server
pub torii: Option<Torii>,
/// Snapshot service,
/// Snapshot service
pub snapshot_maker: SnapshotMakerHandle,
/// Thread handlers
thread_handlers: Vec<ThreadHandler>,
/// A boolean value indicating whether or not the peers will recieve data from the network. Used in
/// sumeragi testing.

/// A boolean value indicating whether or not the peers will receive data from the network.
/// Used in sumeragi testing.
#[cfg(debug_assertions)]
pub freeze_status: Arc<AtomicBool>,
}
Expand Down Expand Up @@ -241,13 +243,25 @@ impl Iroha {
std::path::Path::new(&config.kura.block_store_path),
config.kura.debug_output_new_blocks,
)?;
let live_query_store_handle =
LiveQueryStore::from_configuration(config.live_query_store).start();
mversic marked this conversation as resolved.
Show resolved Hide resolved

let notify_shutdown = Arc::new(Notify::new());
let block_count = kura.init()?;
let wsv = try_read_snapshot(&config.snapshot.dir_path, &kura, block_count).map_or_else(
let wsv = try_read_snapshot(
&config.snapshot.dir_path,
&kura,
live_query_store_handle.clone(),
block_count,
)
.map_or_else(
|error| {
iroha_logger::warn!(%error, "Failed to load wsv from snapshot, creating empty wsv");
WorldStateView::from_configuration(config.wsv, world, Arc::clone(&kura))
WorldStateView::from_configuration(
*config.wsv,
world,
Arc::clone(&kura),
live_query_store_handle.clone(),
)
},
|wsv| {
iroha_logger::info!(
Expand Down Expand Up @@ -298,6 +312,8 @@ impl Iroha {
#[cfg(debug_assertions)]
let freeze_status = Arc::new(AtomicBool::new(false));

let notify_shutdown = Arc::new(Notify::new());

NetworkRelay {
sumeragi: sumeragi.clone(),
block_sync,
Expand All @@ -318,6 +334,7 @@ impl Iroha {
events_sender,
Arc::clone(&notify_shutdown),
sumeragi.clone(),
live_query_store_handle,
Arc::clone(&kura),
);

Expand Down
12 changes: 6 additions & 6 deletions cli/src/samples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ pub fn get_config_proxy(peers: UniqueVec<PeerId>, key_pair: Option<KeyPair>) ->
ConfigurationProxy {
public_key: Some(public_key.clone()),
private_key: Some(private_key.clone()),
sumeragi: Some(iroha_config::sumeragi::ConfigurationProxy {
sumeragi: Some(Box::new(iroha_config::sumeragi::ConfigurationProxy {
max_transactions_in_block: Some(2),
trusted_peers: Some(TrustedPeers { peers }),
..iroha_config::sumeragi::ConfigurationProxy::default()
}),
torii: Some(iroha_config::torii::ConfigurationProxy {
})),
torii: Some(Box::new(iroha_config::torii::ConfigurationProxy {
p2p_addr: Some(DEFAULT_TORII_P2P_ADDR.clone()),
api_url: Some(DEFAULT_API_ADDR.clone()),
..iroha_config::torii::ConfigurationProxy::default()
}),
})),
block_sync: Some(iroha_config::block_sync::ConfigurationProxy {
block_batch_size: Some(1),
gossip_period_ms: Some(500),
Expand All @@ -78,10 +78,10 @@ pub fn get_config_proxy(peers: UniqueVec<PeerId>, key_pair: Option<KeyPair>) ->
queue: Some(iroha_config::queue::ConfigurationProxy {
..iroha_config::queue::ConfigurationProxy::default()
}),
genesis: Some(iroha_config::genesis::ConfigurationProxy {
genesis: Some(Box::new(iroha_config::genesis::ConfigurationProxy {
account_private_key: Some(Some(private_key)),
account_public_key: Some(public_key),
}),
})),
..ConfigurationProxy::default()
}
}
Expand Down
126 changes: 37 additions & 89 deletions cli/src/torii/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,18 @@ use std::{
fmt::{Debug, Write as _},
net::ToSocketAddrs,
sync::Arc,
time::{Duration, Instant},
};

use dashmap::DashMap;
use futures::{stream::FuturesUnordered, StreamExt};
use iroha_core::{
kura::Kura,
prelude::*,
query::store::LiveQueryStoreHandle,
queue::{self, Queue},
sumeragi::SumeragiHandle,
EventsSender,
};
use iroha_data_model::Value;
use parity_scale_codec::Encode;
use tokio::{sync::Notify, time::sleep};
use tokio::sync::Notify;
use utils::*;
use warp::{
http::StatusCode,
Expand All @@ -30,71 +27,25 @@ use warp::{
Filter as _, Reply,
};

use self::cursor::Batched;

#[macro_use]
pub(crate) mod utils;
mod cursor;
mod pagination;
mod routing;

type LiveQuery = Batched<Vec<Value>>;

#[derive(Default)]
struct LiveQueryStore {
queries: DashMap<(String, Vec<u8>), (LiveQuery, Instant)>,
}

impl LiveQueryStore {
fn insert<T: Encode>(&self, query_id: String, request: T, live_query: LiveQuery) {
self.queries
.insert((query_id, request.encode()), (live_query, Instant::now()));
}

fn remove<T: Encode>(&self, query_id: &str, request: &T) -> Option<LiveQuery> {
self.queries
.remove(&(query_id.to_string(), request.encode()))
.map(|(_, (output, _))| output)
}

fn expired_query_cleanup(
self: Arc<Self>,
idle_time: Duration,
notify_shutdown: Arc<Notify>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
loop {
tokio::select! {
_ = sleep(idle_time) => {
self.queries
.retain(|_, (_, last_access_time)| last_access_time.elapsed() <= idle_time);
},
_ = notify_shutdown.notified() => {
iroha_logger::info!("Query cleanup service is being shut down.");
break;
}
else => break,
}
}
})
}
}

/// Main network handler and the only entrypoint of the Iroha.
pub struct Torii {
iroha_cfg: super::Configuration,
queue: Arc<Queue>,
events: EventsSender,
notify_shutdown: Arc<Notify>,
sumeragi: SumeragiHandle,
query_store: Arc<LiveQueryStore>,
query_service: LiveQueryStoreHandle,
kura: Arc<Kura>,
}

/// Torii errors.
#[derive(Debug, thiserror::Error, displaydoc::Display)]
pub enum Error {
/// Failed to execute or validate query
/// Failed to process query
Query(#[from] iroha_data_model::ValidationFail),
/// Failed to accept transaction
AcceptTransaction(#[from] iroha_core::tx::AcceptTransactionFail),
Expand All @@ -107,43 +58,14 @@ pub enum Error {
#[cfg(feature = "telemetry")]
/// Error while getting Prometheus metrics
Prometheus(#[source] eyre::Report),
/// Error while resuming cursor
UnknownCursor,
}

/// Status code for query error response.
fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode {
use iroha_data_model::{
isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*,
ValidationFail::*,
};

match validation_error {
NotPermitted(_) => StatusCode::FORBIDDEN,
QueryFailed(query_error)
| InstructionFailed(InstructionExecutionError::Query(query_error)) => match query_error {
Evaluate(_) | Conversion(_) => StatusCode::BAD_REQUEST,
Signature(_) => StatusCode::UNAUTHORIZED,
Find(_) => StatusCode::NOT_FOUND,
},
TooComplex => StatusCode::UNPROCESSABLE_ENTITY,
InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
InstructionFailed(error) => {
iroha_logger::error!(
?error,
"Query validation failed with unexpected error. This means a bug inside Runtime Executor",
);
StatusCode::INTERNAL_SERVER_ERROR
}
}
}

impl Reply for Error {
fn into_response(self) -> Response {
use Error::*;
match self {
Query(err) => {
reply::with_status(utils::Scale(&err), query_status_code(&err)).into_response()
Self::Query(err) => {
reply::with_status(utils::Scale(&err), Self::query_status_code(&err))
.into_response()
}
_ => reply::with_status(Self::to_string(&self), self.status_code()).into_response(),
}
Expand All @@ -153,11 +75,10 @@ impl Reply for Error {
impl Error {
fn status_code(&self) -> StatusCode {
use Error::*;

match self {
Query(e) => query_status_code(e),
AcceptTransaction(_) | ConfigurationReload(_) | UnknownCursor => {
StatusCode::BAD_REQUEST
}
Query(e) => Self::query_status_code(e),
AcceptTransaction(_) | ConfigurationReload(_) => StatusCode::BAD_REQUEST,
Config(_) => StatusCode::NOT_FOUND,
PushIntoQueue(err) => match **err {
queue::Error::Full => StatusCode::INTERNAL_SERVER_ERROR,
Expand All @@ -169,6 +90,33 @@ impl Error {
}
}

fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode {
use iroha_data_model::{
isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*,
ValidationFail::*,
};

match validation_error {
NotPermitted(_) => StatusCode::FORBIDDEN,
QueryFailed(query_error)
| InstructionFailed(InstructionExecutionError::Query(query_error)) => match query_error
{
Evaluate(_) | Conversion(_) | UnknownCursor => StatusCode::BAD_REQUEST,
Signature(_) => StatusCode::UNAUTHORIZED,
Find(_) => StatusCode::NOT_FOUND,
},
TooComplex => StatusCode::UNPROCESSABLE_ENTITY,
InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
InstructionFailed(error) => {
iroha_logger::error!(
?error,
"Query validation failed with unexpected error. This means a bug inside Runtime Executor",
);
StatusCode::INTERNAL_SERVER_ERROR
}
}
}

fn to_string(err: &dyn std::error::Error) -> String {
let mut s = "Error:\n".to_owned();
let mut idx = 0_i32;
Expand Down
Loading
Loading