Skip to content

Commit

Permalink
[feature] hyperledger-iroha#3468: add server-side cursor
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <marin.versic101@gmail.com>
  • Loading branch information
mversic committed Jul 6, 2023
1 parent 279fe39 commit 04cda30
Show file tree
Hide file tree
Showing 23 changed files with 344 additions and 275 deletions.
9 changes: 9 additions & 0 deletions cli/src/torii/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! queries and messages.

use std::{
collections::HashMap,
convert::Infallible,
fmt::{Debug, Write as _},
net::ToSocketAddrs,
Expand All @@ -14,9 +15,11 @@ use iroha_core::{
kura::Kura,
prelude::*,
queue::{self, Queue},
smartcontracts::query::LazyValue,
sumeragi::SumeragiHandle,
EventsSender,
};
use iroha_data_model::{prelude::QueryPayload, query::NonScrollableCursor};
use thiserror::Error;
use tokio::sync::Notify;
use utils::*;
Expand All @@ -32,13 +35,19 @@ pub(crate) mod utils;
mod pagination;
pub mod routing;

#[derive(Debug, Clone)]
struct LiveQuery {
result: Box<dyn Iterator<Item = LazyValue>>,
}

/// Main network handler and the only entrypoint of the Iroha.
pub struct Torii {
iroha_cfg: super::Configuration,
queue: Arc<Queue>,
events: EventsSender,
notify_shutdown: Arc<Notify>,
sumeragi: SumeragiHandle,
live_queries: HashMap<(QueryPayload, NonScrollableCursor), LiveQuery>,
kura: Arc<Kura>,
}

Expand Down
2 changes: 1 addition & 1 deletion cli/src/torii/pagination.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use iroha_data_model::prelude::*;
use iroha_data_model::{prelude::*, query::Pagination};

/// Describes a collection to which pagination can be applied.
/// Implemented for the [`Iterator`] implementors.
Expand Down
22 changes: 16 additions & 6 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use iroha_data_model::{
VersionedCommittedBlock,
},
prelude::*,
query::{NonScrollableCursor, Pagination, Sorting},
};
use iroha_logger::prelude::*;
#[cfg(feature = "telemetry")]
Expand All @@ -43,6 +44,12 @@ pub fn sorting() -> impl warp::Filter<Extract = (Sorting,), Error = warp::Reject
warp::query()
}

/// Filter for warp which extracts cursor
pub fn cursor(
) -> impl warp::Filter<Extract = (NonScrollableCursor,), Error = warp::Rejection> + Copy {
warp::query()
}

#[iroha_futures::telemetry_future]
pub(crate) async fn handle_instructions(
queue: Arc<Queue>,
Expand Down Expand Up @@ -70,10 +77,11 @@ pub(crate) async fn handle_instructions(
#[iroha_futures::telemetry_future]
pub(crate) async fn handle_queries(
sumeragi: SumeragiHandle,
pagination: Pagination,
sorting: Sorting,
pagination: Pagination,
cursor: NonScrollableCursor,
request: VersionedSignedQuery,
) -> Result<Scale<VersionedQueryResult>> {
) -> Result<Scale<VersionedQueryResponse>> {
let mut wsv = sumeragi.wsv_clone();

let valid_request = ValidQueryRequest::validate(request, &mut wsv)?;
Expand All @@ -86,8 +94,9 @@ pub(crate) async fn handle_queries(
}
};

let paginated_result = QueryResult {
let paginated_result = QueryResponse {
result,
cursor: None,
pagination,
sorting,
};
Expand Down Expand Up @@ -483,12 +492,13 @@ impl Torii {
))
.and(body::versioned()),
))
.or(endpoint4(
.or(endpoint5(
handle_queries,
warp::path(uri::QUERY)
.and(add_state!(self.sumeragi))
.and(paginate())
.and(add_state!(self.sumeragi, self.live_queries))
.and(sorting())
.and(paginate())
.and(cursor())
.and(body::versioned()),
))
.or(endpoint2(
Expand Down
Loading

0 comments on commit 04cda30

Please sign in to comment.