Skip to content

Commit

Permalink
[feature] hyperledger-iroha#3468: implement server-side cursor
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <marin.versic101@gmail.com>
  • Loading branch information
mversic committed Jul 12, 2023
1 parent 729f2e7 commit 34e8e75
Show file tree
Hide file tree
Showing 28 changed files with 489 additions and 323 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ iroha_genesis = { version = "=2.0.0-pre-rc.16", path = "../genesis" }
iroha_wasm_builder = { version = "=2.0.0-pre-rc.16", path = "../wasm_builder" }


dashmap = "5.4.0"
async-trait = "0.1.60"
color-eyre = "0.6.2"
eyre = "0.6.8"
Expand Down
16 changes: 14 additions & 2 deletions cli/src/torii/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use std::{
convert::Infallible,
fmt::{Debug, Write as _},
net::ToSocketAddrs,
num::NonZeroU64,
sync::Arc,
};

use dashmap::DashMap;
use futures::{stream::FuturesUnordered, StreamExt};
use iroha_core::{
kura::Kura,
Expand All @@ -17,6 +19,7 @@ use iroha_core::{
sumeragi::SumeragiHandle,
EventsSender,
};
use iroha_data_model::Value;
use thiserror::Error;
use tokio::sync::Notify;
use utils::*;
Expand All @@ -32,13 +35,17 @@ pub(crate) mod utils;
mod pagination;
pub mod routing;

type LiveQuery = Box<dyn Iterator<Item = Value> + Send + Sync>;
type LiveQueryStore = DashMap<Vec<u8>, (LiveQuery, NonZeroU64)>;

/// Main network handler and the only entrypoint of the Iroha.
pub struct Torii {
iroha_cfg: super::Configuration,
queue: Arc<Queue>,
events: EventsSender,
notify_shutdown: Arc<Notify>,
sumeragi: SumeragiHandle,
query_store: Arc<LiveQueryStore>,
kura: Arc<Kura>,
}

Expand All @@ -64,10 +71,13 @@ pub enum Error {
/// Error while getting Prometheus metrics
#[error("Failed to produce Prometheus metrics")]
Prometheus(#[source] eyre::Report),
/// Error while resuming cursor
#[error("Failed to find cursor")]
UnknownCursor,
}

/// Status code for query error response.
pub(crate) fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode {
fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode {
use iroha_data_model::{
isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*,
ValidationFail::*,
Expand Down Expand Up @@ -110,7 +120,9 @@ impl Error {
use Error::*;
match self {
Query(e) => query_status_code(e),
AcceptTransaction(_) | ConfigurationReload(_) => StatusCode::BAD_REQUEST,
AcceptTransaction(_) | ConfigurationReload(_) | UnknownCursor => {
StatusCode::BAD_REQUEST
}
Config(_) => StatusCode::NOT_FOUND,
PushIntoQueue(err) => match **err {
queue::Error::Full => StatusCode::INTERNAL_SERVER_ERROR,
Expand Down
2 changes: 1 addition & 1 deletion cli/src/torii/pagination.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use iroha_data_model::prelude::*;
use iroha_data_model::query::Pagination;

/// Describes a collection to which pagination can be applied.
/// Implemented for the [`Iterator`] implementors.
Expand Down
119 changes: 94 additions & 25 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// FIXME: This can't be fixed, because one trait in `warp` is private.
#![allow(opaque_hidden_inferred_bound)]

use std::cmp::Ordering;
use std::{cmp::Ordering, num::NonZeroUsize};

use eyre::WrapErr;
use futures::TryStreamExt;
Expand All @@ -28,23 +28,30 @@ use iroha_data_model::{
VersionedCommittedBlock,
},
prelude::*,
query::{ForwardCursor, Pagination, Sorting},
};
use iroha_logger::prelude::*;
#[cfg(feature = "telemetry")]
use iroha_telemetry::metrics::Status;
use pagination::{paginate, Paginate};
use parity_scale_codec::Encode;
use tokio::task;

use super::*;
use crate::stream::{Sink, Stream};

/// Filter for warp which extracts sorting
pub fn sorting() -> impl warp::Filter<Extract = (Sorting,), Error = warp::Rejection> + Copy {
fn sorting() -> impl warp::Filter<Extract = (Sorting,), Error = warp::Rejection> + Copy {
warp::query()
}

/// Filter for warp which extracts cursor
fn cursor() -> impl warp::Filter<Extract = (ForwardCursor,), Error = warp::Rejection> + Copy {
warp::query()
}

#[iroha_futures::telemetry_future]
pub(crate) async fn handle_instructions(
async fn handle_instructions(
queue: Arc<Queue>,
sumeragi: SumeragiHandle,
transaction: VersionedSignedTransaction,
Expand All @@ -68,29 +75,84 @@ pub(crate) async fn handle_instructions(
}

#[iroha_futures::telemetry_future]
pub(crate) async fn handle_queries(
async fn handle_queries(
sumeragi: SumeragiHandle,
pagination: Pagination,
sorting: Sorting,
query_store: Arc<LiveQueryStore>,
fetch_size: NonZeroU64,

request: VersionedSignedQuery,
) -> Result<Scale<VersionedQueryResult>> {
let mut wsv = sumeragi.wsv_clone();
sorting: Sorting,
pagination: Pagination,

let valid_request = ValidQueryRequest::validate(request, &mut wsv)?;
let result = valid_request.execute(&wsv).map_err(ValidationFail::from)?;
cursor: ForwardCursor,
) -> Result<Scale<VersionedQueryResponse>> {
let encoded_request = (&request, &sorting, &pagination).encode();

let result = match result {
LazyValue::Value(value) => value,
LazyValue::Iter(iter) => {
Value::Vec(apply_sorting_and_pagination(iter, &sorting, pagination))
let mut live_query: (_, NonZeroU64) = if let Some(cursor) = cursor {
if let Some((_, live_query)) = query_store.remove(&encoded_request) {
if cursor != live_query.1 {
return Err(Error::UnknownCursor);
}

live_query
} else {
return Err(Error::UnknownCursor);
}
} else {
let mut wsv = sumeragi.wsv_clone();

let valid_request = ValidQueryRequest::validate(request, &mut wsv)?;
let res = valid_request.execute(&wsv).map_err(ValidationFail::from)?;

match res {
LazyValue::Iter(iter) => (
Box::new(apply_sorting_and_pagination(iter, &sorting, pagination).into_iter()),
NonZeroU64::new(0).expect("Valid"),
),
LazyValue::Value(result) => {
return Ok(Scale(
QueryResponse {
result,
cursor: None,
pagination,
sorting,
}
.into(),
));
}
}
};

let paginated_result = QueryResult {
result,
let result = live_query
.0
.by_ref()
.take(
fetch_size
.get()
.try_into()
.expect("u64 larger than usize::MAX"),
)
.collect::<Vec<_>>();

let cursor = if result.len() as u64 >= fetch_size.get() {
query_store.insert(encoded_request, live_query);

cursor.map(|cursor| {
cursor
.checked_add(fetch_size.get())
.expect("Cursor size too big")
})
} else {
None
};

let paginated_result = QueryResponse {
result: Value::Vec(result),
cursor,
pagination,
sorting,
};

Ok(Scale(paginated_result.into()))
}

Expand Down Expand Up @@ -297,7 +359,10 @@ mod subscription {
/// There should be a [`warp::filters::ws::Message::close()`]
/// message to end subscription
#[iroha_futures::telemetry_future]
pub async fn handle_subscription(events: EventsSender, stream: WebSocket) -> eyre::Result<()> {
pub(crate) async fn handle_subscription(
events: EventsSender,
stream: WebSocket,
) -> eyre::Result<()> {
let mut consumer = event::Consumer::new(stream).await?;

match subscribe_forever(events, &mut consumer).await {
Expand Down Expand Up @@ -409,6 +474,7 @@ impl Torii {
queue,
notify_shutdown,
sumeragi,
query_store: Arc::default(),
kura,
}
}
Expand Down Expand Up @@ -448,9 +514,7 @@ impl Torii {
}

/// Helper function to create router. This router can tested without starting up an HTTP server
pub(crate) fn create_api_router(
&self,
) -> impl warp::Filter<Extract = impl warp::Reply> + Clone + Send {
fn create_api_router(&self) -> impl warp::Filter<Extract = impl warp::Reply> + Clone + Send {
let health_route = warp::get()
.and(warp::path(uri::HEALTH))
.and_then(|| async { Ok::<_, Infallible>(handle_health()) });
Expand Down Expand Up @@ -483,13 +547,18 @@ impl Torii {
))
.and(body::versioned()),
))
.or(endpoint4(
.or(endpoint7(
handle_queries,
warp::path(uri::QUERY)
.and(add_state!(self.sumeragi))
.and(paginate())
.and(add_state!(
self.sumeragi,
self.query_store,
self.iroha_cfg.torii.fetch_size,
))
.and(body::versioned())
.and(sorting())
.and(body::versioned()),
.and(paginate())
.and(cursor()),
))
.or(endpoint2(
handle_post_configuration,
Expand Down Expand Up @@ -614,7 +683,7 @@ impl Torii {
/// # Errors
/// Can fail due to listening to network or if http server fails
#[iroha_futures::telemetry_future]
pub async fn start(self) -> eyre::Result<()> {
pub(crate) async fn start(self) -> eyre::Result<()> {
let mut handles = vec![];

let torii = Arc::new(self);
Expand Down
2 changes: 1 addition & 1 deletion cli/src/torii/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ impl<O: Reply, E: Reply> Reply for WarpResult<O, E> {
}
}

iroha_cli_derive::generate_endpoints!(2, 3, 4, 5);
iroha_cli_derive::generate_endpoints!(2, 3, 4, 5, 7);
Loading

0 comments on commit 34e8e75

Please sign in to comment.