Skip to content

Commit

Permalink
[feature] #3929: Lazy queries inside wasm
Browse files Browse the repository at this point in the history
Signed-off-by: Daniil Polyakov <arjentix@gmail.com>
  • Loading branch information
Arjentix committed Oct 18, 2023
1 parent f9f5ede commit 24c3220
Show file tree
Hide file tree
Showing 70 changed files with 1,395 additions and 658 deletions.
116 changes: 66 additions & 50 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ tempfile = { workspace = true }
dashmap = { workspace = true }

thread-local-panic-hook = { version = "0.1.0", optional = true }
uuid = { version = "1.4.1", features = ["v4"] }

[dev-dependencies]
serial_test = "2.0.0"
Expand Down
29 changes: 23 additions & 6 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use iroha_core::{
handler::ThreadHandler,
kura::Kura,
prelude::{World, WorldStateView},
query::store::LiveQueryStore,
queue::Queue,
smartcontracts::isi::Registrable as _,
snapshot::{try_read_snapshot, SnapshotMaker, SnapshotMakerHandle},
Expand Down Expand Up @@ -92,12 +93,13 @@ pub struct Iroha {
pub kura: Arc<Kura>,
/// Torii web server
pub torii: Option<Torii>,
/// Snapshot service,
/// Snapshot service
pub snapshot_maker: SnapshotMakerHandle,
/// Thread handlers
thread_handlers: Vec<ThreadHandler>,
/// A boolean value indicating whether or not the peers will recieve data from the network. Used in
/// sumeragi testing.

/// A boolean value indicating whether or not the peers will receive data from the network.
/// Used in sumeragi testing.
#[cfg(debug_assertions)]
pub freeze_status: Arc<AtomicBool>,
}
Expand Down Expand Up @@ -241,13 +243,25 @@ impl Iroha {
std::path::Path::new(&config.kura.block_store_path),
config.kura.debug_output_new_blocks,
)?;
let live_query_store_handle =
LiveQueryStore::from_configuration(config.live_query_store).start();

let notify_shutdown = Arc::new(Notify::new());
let block_count = kura.init()?;
let wsv = try_read_snapshot(&config.snapshot.dir_path, &kura, block_count).map_or_else(
let wsv = try_read_snapshot(
&config.snapshot.dir_path,
&kura,
live_query_store_handle.clone(),
block_count,
)
.map_or_else(
|error| {
iroha_logger::warn!(%error, "Failed to load wsv from snapshot, creating empty wsv");
WorldStateView::from_configuration(config.wsv, world, Arc::clone(&kura))
WorldStateView::from_configuration(
config.wsv,
world,
Arc::clone(&kura),
live_query_store_handle.clone(),
)
},
|wsv| {
iroha_logger::info!(
Expand Down Expand Up @@ -298,6 +312,8 @@ impl Iroha {
#[cfg(debug_assertions)]
let freeze_status = Arc::new(AtomicBool::new(false));

let notify_shutdown = Arc::new(Notify::new());

NetworkRelay {
sumeragi: sumeragi.clone(),
block_sync,
Expand All @@ -318,6 +334,7 @@ impl Iroha {
events_sender,
Arc::clone(&notify_shutdown),
sumeragi.clone(),
live_query_store_handle,
Arc::clone(&kura),
);

Expand Down
126 changes: 37 additions & 89 deletions cli/src/torii/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,18 @@ use std::{
fmt::{Debug, Write as _},
net::ToSocketAddrs,
sync::Arc,
time::{Duration, Instant},
};

use dashmap::DashMap;
use futures::{stream::FuturesUnordered, StreamExt};
use iroha_core::{
kura::Kura,
prelude::*,
query::store::LiveQueryStoreHandle,
queue::{self, Queue},
sumeragi::SumeragiHandle,
EventsSender,
};
use iroha_data_model::Value;
use parity_scale_codec::Encode;
use tokio::{sync::Notify, time::sleep};
use tokio::sync::Notify;
use utils::*;
use warp::{
http::StatusCode,
Expand All @@ -30,71 +27,25 @@ use warp::{
Filter as _, Reply,
};

use self::cursor::Batched;

#[macro_use]
pub(crate) mod utils;
mod cursor;
mod pagination;
mod routing;

type LiveQuery = Batched<Vec<Value>>;

#[derive(Default)]
struct LiveQueryStore {
queries: DashMap<(String, Vec<u8>), (LiveQuery, Instant)>,
}

impl LiveQueryStore {
fn insert<T: Encode>(&self, query_id: String, request: T, live_query: LiveQuery) {
self.queries
.insert((query_id, request.encode()), (live_query, Instant::now()));
}

fn remove<T: Encode>(&self, query_id: &str, request: &T) -> Option<LiveQuery> {
self.queries
.remove(&(query_id.to_string(), request.encode()))
.map(|(_, (output, _))| output)
}

fn expired_query_cleanup(
self: Arc<Self>,
idle_time: Duration,
notify_shutdown: Arc<Notify>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
loop {
tokio::select! {
_ = sleep(idle_time) => {
self.queries
.retain(|_, (_, last_access_time)| last_access_time.elapsed() <= idle_time);
},
_ = notify_shutdown.notified() => {
iroha_logger::info!("Query cleanup service is being shut down.");
break;
}
else => break,
}
}
})
}
}

/// Main network handler and the only entrypoint of the Iroha.
pub struct Torii {
iroha_cfg: super::Configuration,
queue: Arc<Queue>,
events: EventsSender,
notify_shutdown: Arc<Notify>,
sumeragi: SumeragiHandle,
query_store: Arc<LiveQueryStore>,
query_service: LiveQueryStoreHandle,
kura: Arc<Kura>,
}

/// Torii errors.
#[derive(Debug, thiserror::Error, displaydoc::Display)]
pub enum Error {
/// Failed to execute or validate query
/// Failed to process query
Query(#[from] iroha_data_model::ValidationFail),
/// Failed to accept transaction
AcceptTransaction(#[from] iroha_core::tx::AcceptTransactionFail),
Expand All @@ -107,43 +58,14 @@ pub enum Error {
#[cfg(feature = "telemetry")]
/// Error while getting Prometheus metrics
Prometheus(#[source] eyre::Report),
/// Error while resuming cursor
UnknownCursor,
}

/// Status code for query error response.
fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode {
use iroha_data_model::{
isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*,
ValidationFail::*,
};

match validation_error {
NotPermitted(_) => StatusCode::FORBIDDEN,
QueryFailed(query_error)
| InstructionFailed(InstructionExecutionError::Query(query_error)) => match query_error {
Evaluate(_) | Conversion(_) => StatusCode::BAD_REQUEST,
Signature(_) => StatusCode::UNAUTHORIZED,
Find(_) => StatusCode::NOT_FOUND,
},
TooComplex => StatusCode::UNPROCESSABLE_ENTITY,
InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
InstructionFailed(error) => {
iroha_logger::error!(
?error,
"Query validation failed with unexpected error. This means a bug inside Runtime Executor",
);
StatusCode::INTERNAL_SERVER_ERROR
}
}
}

impl Reply for Error {
fn into_response(self) -> Response {
use Error::*;
match self {
Query(err) => {
reply::with_status(utils::Scale(&err), query_status_code(&err)).into_response()
Self::Query(err) => {
reply::with_status(utils::Scale(&err), Self::query_status_code(&err))
.into_response()
}
_ => reply::with_status(Self::to_string(&self), self.status_code()).into_response(),
}
Expand All @@ -153,11 +75,10 @@ impl Reply for Error {
impl Error {
fn status_code(&self) -> StatusCode {
use Error::*;

match self {
Query(e) => query_status_code(e),
AcceptTransaction(_) | ConfigurationReload(_) | UnknownCursor => {
StatusCode::BAD_REQUEST
}
Query(e) => Self::query_status_code(e),
AcceptTransaction(_) | ConfigurationReload(_) => StatusCode::BAD_REQUEST,
Config(_) => StatusCode::NOT_FOUND,
PushIntoQueue(err) => match **err {
queue::Error::Full => StatusCode::INTERNAL_SERVER_ERROR,
Expand All @@ -169,6 +90,33 @@ impl Error {
}
}

fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode {
use iroha_data_model::{
isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*,
ValidationFail::*,
};

match validation_error {
NotPermitted(_) => StatusCode::FORBIDDEN,
QueryFailed(query_error)
| InstructionFailed(InstructionExecutionError::Query(query_error)) => match query_error
{
Evaluate(_) | Conversion(_) | UnknownCursor => StatusCode::BAD_REQUEST,
Signature(_) => StatusCode::UNAUTHORIZED,
Find(_) => StatusCode::NOT_FOUND,
},
TooComplex => StatusCode::UNPROCESSABLE_ENTITY,
InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
InstructionFailed(error) => {
iroha_logger::error!(
?error,
"Query validation failed with unexpected error. This means a bug inside Runtime Executor",
);
StatusCode::INTERNAL_SERVER_ERROR
}
}
}

fn to_string(err: &dyn std::error::Error) -> String {
let mut s = "Error:\n".to_owned();
let mut idx = 0_i32;
Expand Down
Loading

0 comments on commit 24c3220

Please sign in to comment.