Skip to content

Commit

Permalink
[feature] hyperledger-iroha#3468: implement server-side cursor
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <marin.versic101@gmail.com>
  • Loading branch information
mversic committed Jul 13, 2023
1 parent 64b2f04 commit e082216
Show file tree
Hide file tree
Showing 60 changed files with 1,213 additions and 614 deletions.
254 changes: 254 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ iroha_genesis = { version = "=2.0.0-pre-rc.16", path = "../genesis" }
iroha_wasm_builder = { version = "=2.0.0-pre-rc.16", path = "../wasm_builder" }


dashmap = "5.4.0"
async-trait = "0.1.60"
color-eyre = "0.6.2"
eyre = "0.6.8"
tracing = "0.1.37"
moka = { version = "0.11", features = ["future"] }
futures = { version = "0.3.25", default-features = false, features = ["std", "async-await"] }
parity-scale-codec = { version = "3.2.1", default-features = false, features = ["derive"] }
serde = { version = "1.0.151", features = ["derive"] }
Expand Down
52 changes: 49 additions & 3 deletions cli/src/torii/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ use std::{
convert::Infallible,
fmt::{Debug, Write as _},
net::ToSocketAddrs,
num::NonZeroU64,
sync::Arc,
time::{Duration, Instant},
};

use dashmap::DashMap;
use futures::{stream::FuturesUnordered, StreamExt};
use iroha_core::{
kura::Kura,
Expand All @@ -17,8 +20,9 @@ use iroha_core::{
sumeragi::SumeragiHandle,
EventsSender,
};
use iroha_data_model::Value;
use thiserror::Error;
use tokio::sync::Notify;
use tokio::{sync::Notify, time::sleep};
use utils::*;
use warp::{
http::StatusCode,
Expand All @@ -32,13 +36,50 @@ pub(crate) mod utils;
mod pagination;
pub mod routing;

type LiveQuery = Box<dyn Iterator<Item = Value> + Send + Sync>;

#[derive(Default)]
struct LiveQueryStore {
queries: DashMap<Vec<u8>, (LiveQuery, NonZeroU64, Instant)>,
}

impl LiveQueryStore {
fn insert(&self, request: Vec<u8>, (live_query, cursor): (LiveQuery, NonZeroU64)) {
self.queries
.insert(request, (live_query, cursor, Instant::now()));
}

fn remove(&self, request: &Vec<u8>) -> Option<(LiveQuery, NonZeroU64)> {
self.queries
.remove(request)
.map(|(_, (live_query, cursor, _))| (live_query, cursor))
}

// TODO: Add notifier channel to enable graceful shutdown
fn expired_query_cleanup(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
// Time query can remain in the store if unaccessed
let query_idle_time = Duration::from_millis(10_000);

loop {
sleep(query_idle_time).await;

self.queries.retain(|_, (_, _, last_access_time)| {
last_access_time.elapsed() <= query_idle_time
});
}
})
}
}

/// Main network handler and the only entrypoint of the Iroha.
pub struct Torii {
iroha_cfg: super::Configuration,
queue: Arc<Queue>,
events: EventsSender,
notify_shutdown: Arc<Notify>,
sumeragi: SumeragiHandle,
query_store: Arc<LiveQueryStore>,
kura: Arc<Kura>,
}

Expand All @@ -64,10 +105,13 @@ pub enum Error {
/// Error while getting Prometheus metrics
#[error("Failed to produce Prometheus metrics")]
Prometheus(#[source] eyre::Report),
/// Error while resuming cursor
#[error("Failed to find cursor")]
UnknownCursor,
}

/// Status code for query error response.
pub(crate) fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode {
fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode {
use iroha_data_model::{
isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*,
ValidationFail::*,
Expand Down Expand Up @@ -110,7 +154,9 @@ impl Error {
use Error::*;
match self {
Query(e) => query_status_code(e),
AcceptTransaction(_) | ConfigurationReload(_) => StatusCode::BAD_REQUEST,
AcceptTransaction(_) | ConfigurationReload(_) | UnknownCursor => {
StatusCode::BAD_REQUEST
}
Config(_) => StatusCode::NOT_FOUND,
PushIntoQueue(err) => match **err {
queue::Error::Full => StatusCode::INTERNAL_SERVER_ERROR,
Expand Down
4 changes: 3 additions & 1 deletion cli/src/torii/pagination.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use iroha_data_model::prelude::*;
use iroha_data_model::query::Pagination;

/// Describes a collection to which pagination can be applied.
/// Implemented for the [`Iterator`] implementors.
Expand Down Expand Up @@ -54,6 +54,8 @@ pub fn paginate() -> impl warp::Filter<Extract = (Pagination,), Error = warp::Re

#[cfg(test)]
mod tests {
use iroha_data_model::query::pagination::Pagination;

use super::*;

#[test]
Expand Down
118 changes: 94 additions & 24 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,30 @@ use iroha_data_model::{
VersionedCommittedBlock,
},
prelude::*,
query::{ForwardCursor, Pagination, Sorting},
};
use iroha_logger::prelude::*;
#[cfg(feature = "telemetry")]
use iroha_telemetry::metrics::Status;
use pagination::{paginate, Paginate};
use parity_scale_codec::Encode;
use tokio::task;

use super::*;
use crate::stream::{Sink, Stream};

/// Filter for warp which extracts sorting
pub fn sorting() -> impl warp::Filter<Extract = (Sorting,), Error = warp::Rejection> + Copy {
fn sorting() -> impl warp::Filter<Extract = (Sorting,), Error = warp::Rejection> + Copy {
warp::query()
}

/// Filter for warp which extracts cursor
fn cursor() -> impl warp::Filter<Extract = (ForwardCursor,), Error = warp::Rejection> + Copy {
warp::query()
}

#[iroha_futures::telemetry_future]
pub(crate) async fn handle_instructions(
async fn handle_instructions(
queue: Arc<Queue>,
sumeragi: SumeragiHandle,
transaction: VersionedSignedTransaction,
Expand All @@ -68,29 +75,84 @@ pub(crate) async fn handle_instructions(
}

#[iroha_futures::telemetry_future]
pub(crate) async fn handle_queries(
async fn handle_queries(
sumeragi: SumeragiHandle,
pagination: Pagination,
sorting: Sorting,
query_store: Arc<LiveQueryStore>,
fetch_size: NonZeroU64,

request: VersionedSignedQuery,
) -> Result<Scale<VersionedQueryResult>> {
let mut wsv = sumeragi.wsv_clone();
sorting: Sorting,
pagination: Pagination,

let valid_request = ValidQueryRequest::validate(request, &mut wsv)?;
let result = valid_request.execute(&wsv).map_err(ValidationFail::from)?;
cursor: ForwardCursor,
) -> Result<Scale<VersionedQueryResponse>> {
let encoded_request = (&request, &sorting, &pagination).encode();

let result = match result {
LazyValue::Value(value) => value,
LazyValue::Iter(iter) => {
Value::Vec(apply_sorting_and_pagination(iter, &sorting, pagination))
let mut live_query: (_, NonZeroU64) = if let Some(cursor) = cursor {
if let Some((live_query, prev_cursor)) = query_store.remove(&encoded_request) {
if cursor != prev_cursor {
return Err(Error::UnknownCursor);
}

(live_query, cursor)
} else {
return Err(Error::UnknownCursor);
}
} else {
let mut wsv = sumeragi.wsv_clone();

let valid_request = ValidQueryRequest::validate(request, &mut wsv)?;
let res = valid_request.execute(&wsv).map_err(ValidationFail::from)?;

match res {
LazyValue::Iter(iter) => (
Box::new(apply_sorting_and_pagination(iter, &sorting, pagination).into_iter()),
NonZeroU64::new(0).expect("Valid"),
),
LazyValue::Value(result) => {
return Ok(Scale(
QueryResponse {
result,
cursor: None,
pagination,
sorting,
}
.into(),
));
}
}
};

let paginated_result = QueryResult {
result,
let result = live_query
.0
.by_ref()
.take(
fetch_size
.get()
.try_into()
.expect("u64 larger than usize::MAX"),
)
.collect::<Vec<_>>();

let cursor = if result.len() as u64 >= fetch_size.get() {
query_store.insert(encoded_request, live_query);

cursor.map(|cursor| {
cursor
.checked_add(fetch_size.get())
.expect("Cursor size too big")
})
} else {
None
};

let paginated_result = QueryResponse {
result: Value::Vec(result),
cursor,
pagination,
sorting,
};

Ok(Scale(paginated_result.into()))
}

Expand Down Expand Up @@ -297,7 +359,10 @@ mod subscription {
/// There should be a [`warp::filters::ws::Message::close()`]
/// message to end subscription
#[iroha_futures::telemetry_future]
pub async fn handle_subscription(events: EventsSender, stream: WebSocket) -> eyre::Result<()> {
pub(crate) async fn handle_subscription(
events: EventsSender,
stream: WebSocket,
) -> eyre::Result<()> {
let mut consumer = event::Consumer::new(stream).await?;

match subscribe_forever(events, &mut consumer).await {
Expand Down Expand Up @@ -409,6 +474,7 @@ impl Torii {
queue,
notify_shutdown,
sumeragi,
query_store: Arc::default(),
kura,
}
}
Expand Down Expand Up @@ -448,9 +514,7 @@ impl Torii {
}

/// Helper function to create router. This router can tested without starting up an HTTP server
pub(crate) fn create_api_router(
&self,
) -> impl warp::Filter<Extract = impl warp::Reply> + Clone + Send {
fn create_api_router(&self) -> impl warp::Filter<Extract = impl warp::Reply> + Clone + Send {
let health_route = warp::get()
.and(warp::path(uri::HEALTH))
.and_then(|| async { Ok::<_, Infallible>(handle_health()) });
Expand Down Expand Up @@ -483,13 +547,18 @@ impl Torii {
))
.and(body::versioned()),
))
.or(endpoint4(
.or(endpoint7(
handle_queries,
warp::path(uri::QUERY)
.and(add_state!(self.sumeragi))
.and(paginate())
.and(add_state!(
self.sumeragi,
self.query_store,
self.iroha_cfg.torii.fetch_size,
))
.and(body::versioned())
.and(sorting())
.and(body::versioned()),
.and(paginate())
.and(cursor()),
))
.or(endpoint2(
handle_post_configuration,
Expand Down Expand Up @@ -614,13 +683,14 @@ impl Torii {
/// # Errors
/// Can fail due to listening to network or if http server fails
#[iroha_futures::telemetry_future]
pub async fn start(self) -> eyre::Result<()> {
pub(crate) async fn start(self) -> eyre::Result<()> {
let mut handles = vec![];

let torii = Arc::new(self);
#[cfg(feature = "telemetry")]
handles.extend(Arc::clone(&torii).start_telemetry()?);
handles.extend(Arc::clone(&torii).start_api()?);
handles.push(Arc::clone(&torii.query_store).expired_query_cleanup());

handles
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion cli/src/torii/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ impl<O: Reply, E: Reply> Reply for WarpResult<O, E> {
}
}

iroha_cli_derive::generate_endpoints!(2, 3, 4, 5);
iroha_cli_derive::generate_endpoints!(2, 3, 4, 5, 7);
21 changes: 13 additions & 8 deletions client/benches/torii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,19 @@ fn query_requests(criterion: &mut Criterion) {
let mut failures_count = 0;
let _dropable = group.throughput(Throughput::Bytes(request.encode().len() as u64));
let _dropable2 = group.bench_function("query", |b| {
b.iter(|| match iroha_client.request(request.clone()) {
Ok(assets) => {
assert!(!assets.is_empty());
success_count += 1;
}
Err(e) => {
eprintln!("Query failed: {e}");
failures_count += 1;
b.iter(|| {
match iroha_client
.request(request.clone())
.and_then(|iter| iter.collect::<Result<Vec<_>, _>>())
{
Ok(assets) => {
assert!(!assets.is_empty());
success_count += 1;
}
Err(e) => {
eprintln!("Query failed: {e}");
failures_count += 1;
}
}
});
});
Expand Down
Loading

0 comments on commit e082216

Please sign in to comment.