Skip to content

Commit

Permalink
[feature] hyperledger-iroha#3468: return iterator as query execution …
Browse files Browse the repository at this point in the history
…result

Signed-off-by: Marin Veršić <marin.versic101@gmail.com>
  • Loading branch information
mversic committed Jul 13, 2023
1 parent cdc08a7 commit 64b2f04
Show file tree
Hide file tree
Showing 28 changed files with 818 additions and 597 deletions.
47 changes: 18 additions & 29 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// FIXME: This can't be fixed, because one trait in `warp` is private.
#![allow(opaque_hidden_inferred_bound)]

use std::{cmp::Ordering, num::TryFromIntError};
use std::cmp::Ordering;

use eyre::WrapErr;
use futures::TryStreamExt;
Expand All @@ -15,7 +15,10 @@ use iroha_config::{
torii::uri,
GetConfiguration, PostConfiguration,
};
use iroha_core::{smartcontracts::isi::query::ValidQueryRequest, sumeragi::SumeragiHandle};
use iroha_core::{
smartcontracts::{isi::query::ValidQueryRequest, query::LazyValue},
sumeragi::SumeragiHandle,
};
use iroha_data_model::{
block::{
stream::{
Expand All @@ -25,7 +28,6 @@ use iroha_data_model::{
VersionedCommittedBlock,
},
prelude::*,
query::error::QueryExecutionFail,
};
use iroha_logger::prelude::*;
#[cfg(feature = "telemetry")]
Expand Down Expand Up @@ -71,43 +73,34 @@ pub(crate) async fn handle_queries(
pagination: Pagination,
sorting: Sorting,
request: VersionedSignedQuery,
) -> Result<Scale<VersionedPaginatedQueryResult>> {
let result = {
let mut wsv = sumeragi.wsv_clone();
let valid_request = ValidQueryRequest::validate(request, &mut wsv)?;
valid_request.execute(&wsv).map_err(ValidationFail::from)?
};
) -> Result<Scale<VersionedQueryResult>> {
let mut wsv = sumeragi.wsv_clone();

let (total, result) = if let Value::Vec(vec_of_val) = result {
let len = vec_of_val.len();
let vec_of_val = apply_sorting_and_pagination(vec_of_val.into_iter(), &sorting, pagination);
let valid_request = ValidQueryRequest::validate(request, &mut wsv)?;
let result = valid_request.execute(&wsv).map_err(ValidationFail::from)?;

(len, Value::Vec(vec_of_val))
} else {
(1, result)
let result = match result {
LazyValue::Value(value) => value,
LazyValue::Iter(iter) => {
Value::Vec(apply_sorting_and_pagination(iter, &sorting, pagination))
}
};

let total = total
.try_into()
.map_err(|e: TryFromIntError| QueryExecutionFail::Conversion(e.to_string()))
.map_err(ValidationFail::from)?;
let result = QueryResult(result);
let paginated_result = PaginatedQueryResult {
let paginated_result = QueryResult {
result,
pagination,
sorting,
total,
};
Ok(Scale(paginated_result.into()))
}

fn apply_sorting_and_pagination(
vec_of_val: impl Iterator<Item = Value>,
iter: impl Iterator<Item = Value>,
sorting: &Sorting,
pagination: Pagination,
) -> Vec<Value> {
if let Some(key) = &sorting.sort_by_metadata_key {
let mut pairs: Vec<(Option<Value>, Value)> = vec_of_val
let mut pairs: Vec<(Option<Value>, Value)> = iter
.map(|value| {
let key = match &value {
Value::Identifiable(IdentifiableBox::Asset(asset)) => match asset.value() {
Expand Down Expand Up @@ -137,7 +130,7 @@ fn apply_sorting_and_pagination(
.paginate(pagination)
.collect()
} else {
vec_of_val.paginate(pagination).collect()
iter.paginate(pagination).collect()
}
}

Expand Down Expand Up @@ -167,7 +160,6 @@ async fn handle_pending_transactions(
Ok(Scale(
queue
.all_transactions(&wsv)
.into_iter()
.map(Into::into)
.paginate(pagination)
.collect::<Vec<_>>(),
Expand Down Expand Up @@ -348,7 +340,6 @@ mod subscription {
async fn handle_version(sumeragi: SumeragiHandle) -> Json {
use iroha_version::Version;

#[allow(clippy::expect_used)]
let string = sumeragi
.apply_wsv(WorldStateView::latest_block_ref)
.expect("Genesis not applied. Nothing we can do. Solve the issue and rerun.")
Expand Down Expand Up @@ -422,7 +413,6 @@ impl Torii {
}
}

#[allow(opaque_hidden_inferred_bound)]
#[cfg(feature = "telemetry")]
/// Helper function to create router. This router can tested without starting up an HTTP server
fn create_telemetry_router(
Expand Down Expand Up @@ -458,7 +448,6 @@ impl Torii {
}

/// Helper function to create router. This router can tested without starting up an HTTP server
#[allow(opaque_hidden_inferred_bound)]
pub(crate) fn create_api_router(
&self,
) -> impl warp::Filter<Extract = impl warp::Reply> + Clone + Send {
Expand Down
44 changes: 20 additions & 24 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ where
// Separate-compilation friendly response handling
fn _handle_query_response_base(
resp: &Response<Vec<u8>>,
) -> QueryHandlerResult<VersionedPaginatedQueryResult> {
) -> QueryHandlerResult<VersionedQueryResult> {
match resp.status() {
StatusCode::OK => {
let res = VersionedPaginatedQueryResult::decode_all_versioned(resp.body());
let res = VersionedQueryResult::decode_all_versioned(resp.body());
res.wrap_err(
"Failed to decode response from Iroha. \
You are likely using a version of the client library \
Expand Down Expand Up @@ -143,7 +143,7 @@ where
}
}

_handle_query_response_base(&resp).and_then(|VersionedPaginatedQueryResult::V1(result)| {
_handle_query_response_base(&resp).and_then(|VersionedQueryResult::V1(result)| {
ClientQueryRequest::try_from(result).map_err(Into::into)
})
}
Expand Down Expand Up @@ -238,7 +238,7 @@ impl From<ResponseReport> for eyre::Report {
}
}

/// More convenient version of [`iroha_data_model::prelude::PaginatedQueryResult`].
/// More convenient version of [`iroha_data_model::prelude::QueryResult`].
/// The only difference is that this struct has `output` field extracted from the result
/// accordingly to the source query.
#[derive(Clone, Debug)]
Expand All @@ -249,12 +249,10 @@ where
{
/// Query output
pub output: R::Output,
/// See [`iroha_data_model::prelude::PaginatedQueryResult`]
/// See [`iroha_data_model::prelude::QueryResult`]
pub pagination: Pagination,
/// See [`iroha_data_model::prelude::PaginatedQueryResult`]
/// See [`iroha_data_model::prelude::QueryResult`]
pub sorting: Sorting,
/// See [`iroha_data_model::prelude::PaginatedQueryResult`]
pub total: u64,
}

impl<R> ClientQueryRequest<R>
Expand All @@ -268,30 +266,28 @@ where
}
}

impl<R> TryFrom<PaginatedQueryResult> for ClientQueryRequest<R>
impl<R> TryFrom<QueryResult> for ClientQueryRequest<R>
where
R: Query + Debug,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
type Error = eyre::Report;

fn try_from(
PaginatedQueryResult {
QueryResult {
result,
pagination,
sorting,
total,
}: PaginatedQueryResult,
}: QueryResult,
) -> Result<Self> {
let output = R::Output::try_from(result.into())
let output = R::Output::try_from(result)
.map_err(Into::into)
.wrap_err("Unexpected type")?;

Ok(Self {
output,
pagination,
sorting,
total,
})
}
}
Expand Down Expand Up @@ -729,9 +725,9 @@ impl Client {
pub fn prepare_query_request<R, B>(
&self,
request: R,
filter: PredicateBox,
pagination: Pagination,
sorting: Sorting,
filter: PredicateBox,
) -> Result<(B, QueryResponseHandler<R>)>
where
R: Query + Debug,
Expand Down Expand Up @@ -760,7 +756,7 @@ impl Client {
///
/// # Errors
/// Fails if sending request fails
pub fn request_with_pagination_and_filter_and_sorting<R>(
pub fn request_with_filter_and_pagination_and_sorting<R>(
&self,
request: R,
pagination: Pagination,
Expand All @@ -773,7 +769,7 @@ impl Client {
{
iroha_logger::trace!(?request, %pagination, ?sorting, ?filter);
let (req, resp_handler) = self.prepare_query_request::<R, DefaultRequestBuilder>(
request, pagination, sorting, filter,
request, filter, pagination, sorting,
)?;
let response = req.build()?.send()?;
resp_handler.handle(response)
Expand All @@ -793,7 +789,7 @@ impl Client {
R: Query + Debug,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
self.request_with_pagination_and_filter_and_sorting(
self.request_with_filter_and_pagination_and_sorting(
request,
pagination,
sorting,
Expand All @@ -805,7 +801,7 @@ impl Client {
///
/// # Errors
/// Fails if sending request fails
pub fn request_with_pagination_and_filter<R>(
pub fn request_with_filter_and_pagination<R>(
&self,
request: R,
pagination: Pagination,
Expand All @@ -815,7 +811,7 @@ impl Client {
R: Query + Debug,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>, // Seems redundant
{
self.request_with_pagination_and_filter_and_sorting(
self.request_with_filter_and_pagination_and_sorting(
request,
pagination,
Sorting::default(),
Expand All @@ -827,7 +823,7 @@ impl Client {
///
/// # Errors
/// Fails if sending request fails
pub fn request_with_sorting_and_filter<R>(
pub fn request_with_filter_and_sorting<R>(
&self,
request: R,
sorting: Sorting,
Expand All @@ -837,7 +833,7 @@ impl Client {
R: Query + Debug,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>, // Seems redundant
{
self.request_with_pagination_and_filter_and_sorting(
self.request_with_filter_and_pagination_and_sorting(
request,
Pagination::default(),
sorting,
Expand All @@ -861,7 +857,7 @@ impl Client {
R: Query + Debug,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
self.request_with_pagination_and_filter(request, Pagination::default(), filter)
self.request_with_filter_and_pagination(request, Pagination::default(), filter)
}

/// Query API entry point. Requests queries from `Iroha` peers with pagination.
Expand All @@ -880,7 +876,7 @@ impl Client {
R: Query + Debug,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
self.request_with_pagination_and_filter(request, pagination, PredicateBox::default())
self.request_with_filter_and_pagination(request, pagination, PredicateBox::default())
}

/// Query API entry point. Requests queries from `Iroha` peers with sorting.
Expand Down
10 changes: 5 additions & 5 deletions client/tests/integration/sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ fn correct_sorting_of_entities() {
.expect("Valid");

let res = test_client
.request_with_sorting_and_filter(
.request_with_filter_and_sorting(
client::asset::all_definitions(),
Sorting::by_metadata_key(sort_by_metadata_key.clone()),
PredicateBox::new(value::ValuePredicate::Identifiable(
Expand Down Expand Up @@ -208,7 +208,7 @@ fn correct_sorting_of_entities() {
.expect("Valid");

let res = test_client
.request_with_sorting_and_filter(
.request_with_filter_and_sorting(
client::account::all(),
Sorting::by_metadata_key(sort_by_metadata_key.clone()),
PredicateBox::new(value::ValuePredicate::Identifiable(
Expand Down Expand Up @@ -258,7 +258,7 @@ fn correct_sorting_of_entities() {
.expect("Valid");

let res = test_client
.request_with_pagination_and_filter_and_sorting(
.request_with_filter_and_pagination_and_sorting(
client::domain::all(),
Pagination::default(),
Sorting::by_metadata_key(sort_by_metadata_key.clone()),
Expand Down Expand Up @@ -310,7 +310,7 @@ fn correct_sorting_of_entities() {
string::StringPredicate::starts_with("neverland_"),
));
let res = test_client
.request_with_pagination_and_filter_and_sorting(
.request_with_filter_and_pagination_and_sorting(
client::domain::all(),
Pagination::default(),
Sorting::by_metadata_key(sort_by_metadata_key),
Expand Down Expand Up @@ -370,7 +370,7 @@ fn sort_only_elements_which_have_sorting_key() -> Result<()> {
.wrap_err("Failed to register accounts")?;

let res = test_client
.request_with_sorting_and_filter(
.request_with_filter_and_sorting(
client::account::all(),
Sorting::by_metadata_key(sort_by_metadata_key),
PredicateBox::new(value::ValuePredicate::Identifiable(
Expand Down
10 changes: 6 additions & 4 deletions client/tests/integration/triggers/time_trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,12 @@ fn change_asset_metadata_after_1_sec() -> Result<()> {
usize::try_from(PERIOD_MS / DEFAULT_CONSENSUS_ESTIMATION_MS + 1)?,
)?;

let value = test_client.request(FindAssetDefinitionKeyValueByIdAndKey {
id: asset_definition_id.into(),
key: key.into(),
})?;
let value = test_client
.request(FindAssetDefinitionKeyValueByIdAndKey {
id: asset_definition_id.into(),
key: key.into(),
})?
.into();
assert!(matches!(value, Value::Numeric(NumericValue::U32(3_u32))));

Ok(())
Expand Down
Loading

0 comments on commit 64b2f04

Please sign in to comment.