Skip to content

Commit

Permalink
[feature] hyperledger-iroha#3900: fetch_size query parameter
Browse files Browse the repository at this point in the history
Signed-off-by: Daniil Polyakov <arjentix@gmail.com>
  • Loading branch information
Arjentix committed Oct 25, 2023
1 parent 1ea5f31 commit 706fc65
Show file tree
Hide file tree
Showing 16 changed files with 238 additions and 96 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 20 additions & 18 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
// FIXME: This can't be fixed, because one trait in `warp` is private.
#![allow(opaque_hidden_inferred_bound)]

use std::num::NonZeroUsize;

use eyre::WrapErr;
use futures::TryStreamExt;
use iroha_config::{
Expand Down Expand Up @@ -45,13 +43,17 @@ fn client_query_request(
body::versioned::<SignedQuery>()
.and(sorting())
.and(paginate())
.and_then(|signed_query, sorting, pagination| async move {
Result::<_, std::convert::Infallible>::Ok(http::ClientQueryRequest::query(
signed_query,
sorting,
pagination,
))
})
.and(fetch_size())
.and_then(
|signed_query, sorting, pagination, FetchSize { fetch_size }| async move {
Result::<_, std::convert::Infallible>::Ok(http::ClientQueryRequest::query(
signed_query,
sorting,
pagination,
fetch_size,
))
},
)
.or(cursor().and_then(|cursor| async move {
Result::<_, std::convert::Infallible>::Ok(http::ClientQueryRequest::cursor(cursor))
}))
Expand All @@ -73,6 +75,11 @@ fn paginate() -> impl warp::Filter<Extract = (Pagination,), Error = warp::Reject
warp::query()
}

/// Filter for warp which extracts fetch size
fn fetch_size() -> impl warp::Filter<Extract = (FetchSize,), Error = warp::Rejection> + Copy {
warp::query()
}

#[iroha_futures::telemetry_future]
async fn handle_instructions(
queue: Arc<Queue>,
Expand Down Expand Up @@ -101,7 +108,6 @@ async fn handle_instructions(
async fn handle_queries(
live_query_store: LiveQueryStoreHandle,
sumeragi: SumeragiHandle,
fetch_size: NonZeroUsize,

query_request: http::ClientQueryRequest,
) -> Result<Scale<BatchedResponse<Value>>> {
Expand All @@ -110,11 +116,12 @@ async fn handle_queries(
query: signed_query,
sorting,
pagination,
fetch_size,
}) => sumeragi.apply_wsv(|wsv| {
let valid_query = ValidQueryRequest::validate(signed_query, wsv)?;
let query_output = valid_query.execute(wsv)?;
live_query_store
.handle_query_output(query_output, fetch_size, &sorting, pagination)
.handle_query_output(query_output, &sorting, pagination, fetch_size)
.map_err(ValidationFail::from)
}),
QueryRequest::Cursor(cursor) => live_query_store
Expand Down Expand Up @@ -470,15 +477,10 @@ impl Torii {
))
.and(body::versioned()),
)
.or(endpoint4(
.or(endpoint3(
handle_queries,
warp::path(uri::QUERY)
.and(add_state!(
self.query_service,
self.sumeragi,
NonZeroUsize::try_from(self.iroha_cfg.torii.fetch_size)
.expect("u64 should always fit into usize")
))
.and(add_state!(self.query_service, self.sumeragi,))
.and(client_query_request()),
))
.or(endpoint2(
Expand Down
47 changes: 41 additions & 6 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use iroha_data_model::{
isi::Instruction,
predicate::PredicateBox,
prelude::*,
query::{Pagination, Query, Sorting},
query::{Pagination, Query, Sorting, DEFAULT_FETCH_SIZE},
transaction::TransactionPayload,
BatchedResponse, ValidationFail,
};
Expand Down Expand Up @@ -247,6 +247,15 @@ pub struct ResultSet<T> {
client_cursor: usize,
}

impl<T> ResultSet<T> {
/// Get the length of the batch returned by Iroha.
///
/// This is controlled by `fetch_size` parameter of the query.
pub fn batch_len(&self) -> usize {
self.iter.len()
}
}

impl<T: Clone> Iterator for ResultSet<T>
where
Vec<T>: QueryOutput,
Expand Down Expand Up @@ -374,6 +383,7 @@ impl QueryRequest {
query: Vec::default(),
sorting: Sorting::default(),
pagination: Pagination::default(),
fetch_size: *DEFAULT_FETCH_SIZE,
},
),
}
Expand All @@ -389,6 +399,7 @@ impl QueryRequest {
iroha_data_model::query::QueryRequest::Query(query_with_params) => builder
.params(Vec::from(query_with_params.sorting))
.params(Vec::from(query_with_params.pagination))
.params(Vec::from(FetchSize::new(query_with_params.fetch_size)))
.body(query_with_params.query),
iroha_data_model::query::QueryRequest::Cursor(cursor) => {
builder.params(Vec::from(cursor))
Expand Down Expand Up @@ -798,6 +809,7 @@ impl Client {
filter: PredicateBox,
pagination: Pagination,
sorting: Sorting,
fetch_size: NonZeroU32,
) -> Result<(DefaultRequestBuilder, QueryResponseHandler<R::Output>)>
where
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
Expand All @@ -813,6 +825,7 @@ impl Client {
query: request,
pagination,
sorting,
fetch_size,
},
),
};
Expand All @@ -831,6 +844,7 @@ impl Client {
&self,
request: R,
pagination: Pagination,
fetch_size: NonZeroU32,
sorting: Sorting,
filter: PredicateBox,
) -> QueryResult<<R::Output as QueryOutput>::Target>
Expand All @@ -840,7 +854,7 @@ impl Client {
{
iroha_logger::trace!(?request, %pagination, ?sorting, ?filter);
let (req, mut resp_handler) =
self.prepare_query_request::<R>(request, filter, pagination, sorting)?;
self.prepare_query_request::<R>(request, filter, pagination, sorting, fetch_size)?;

let response = req.build()?.send()?;
let value = resp_handler.handle(&response)?;
Expand All @@ -857,6 +871,7 @@ impl Client {
&self,
request: R,
pagination: Pagination,
fetch_size: NonZeroU32,
sorting: Sorting,
) -> QueryResult<<R::Output as QueryOutput>::Target>
where
Expand All @@ -866,6 +881,7 @@ impl Client {
self.request_with_filter_and_pagination_and_sorting(
request,
pagination,
fetch_size,
sorting,
PredicateBox::default(),
)
Expand All @@ -879,6 +895,7 @@ impl Client {
&self,
request: R,
pagination: Pagination,
fetch_size: NonZeroU32,
filter: PredicateBox,
) -> QueryResult<<R::Output as QueryOutput>::Target>
where
Expand All @@ -888,6 +905,7 @@ impl Client {
self.request_with_filter_and_pagination_and_sorting(
request,
pagination,
fetch_size,
Sorting::default(),
filter,
)
Expand All @@ -910,6 +928,7 @@ impl Client {
self.request_with_filter_and_pagination_and_sorting(
request,
Pagination::default(),
*DEFAULT_FETCH_SIZE,
sorting,
filter,
)
Expand All @@ -931,7 +950,12 @@ impl Client {
R::Output: QueryOutput,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
self.request_with_filter_and_pagination(request, Pagination::default(), filter)
self.request_with_filter_and_pagination(
request,
Pagination::default(),
*DEFAULT_FETCH_SIZE,
filter,
)
}

/// Query API entry point. Requests queries from `Iroha` peers with pagination.
Expand All @@ -945,12 +969,18 @@ impl Client {
&self,
request: R,
pagination: Pagination,
fetch_size: NonZeroU32,
) -> QueryResult<<R::Output as QueryOutput>::Target>
where
R::Output: QueryOutput,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
self.request_with_filter_and_pagination(request, pagination, PredicateBox::default())
self.request_with_filter_and_pagination(
request,
pagination,
fetch_size,
PredicateBox::default(),
)
}

/// Query API entry point. Requests queries from `Iroha` peers with sorting.
Expand All @@ -966,7 +996,12 @@ impl Client {
R::Output: QueryOutput,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
self.request_with_pagination_and_sorting(request, Pagination::default(), sorting)
self.request_with_pagination_and_sorting(
request,
Pagination::default(),
*DEFAULT_FETCH_SIZE,
sorting,
)
}

/// Query API entry point. Requests queries from `Iroha` peers.
Expand All @@ -981,7 +1016,7 @@ impl Client {
R::Output: QueryOutput,
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
{
self.request_with_pagination(request, Pagination::default())
self.request_with_pagination(request, Pagination::default(), *DEFAULT_FETCH_SIZE)
}

/// Connect (through `WebSocket`) to listen for `Iroha` `pipeline` and `data` events.
Expand Down
51 changes: 40 additions & 11 deletions client/tests/integration/pagination.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
use std::num::{NonZeroU32, NonZeroU64};

use eyre::Result;
use iroha_client::client::{asset, QueryResult};
use iroha_data_model::{asset::AssetDefinition, prelude::*, query::Pagination};
use iroha_client::client::{asset, Client, QueryResult};
use iroha_data_model::{
asset::AssetDefinition,
prelude::*,
query::{Pagination, DEFAULT_FETCH_SIZE},
};
use test_network::*;

#[test]
fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() -> Result<()> {
fn limits_should_work() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(10_690).start_with_runtime();
wait_for_genesis_committed(&vec![client.clone()], 0);

let register: Vec<InstructionExpr> = ('a'..='z') // This is a subtle mistake, I'm glad we can lint it now.
.map(|c| c.to_string())
.map(|name| (name + "#wonderland").parse().expect("Valid"))
.map(|asset_definition_id| {
RegisterExpr::new(AssetDefinition::quantity(asset_definition_id)).into()
})
.collect();
client.submit_all_blocking(register)?;
register_assets(&client)?;

let vec = client
.request_with_pagination(
Expand All @@ -26,8 +23,40 @@ fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() ->
limit: NonZeroU32::new(5),
start: NonZeroU64::new(5),
},
*DEFAULT_FETCH_SIZE,
)?
.collect::<QueryResult<Vec<_>>>()?;
assert_eq!(vec.len(), 5);
Ok(())
}

#[test]
fn fetch_size_should_work() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(11_120).start_with_runtime();
wait_for_genesis_committed(&vec![client.clone()], 0);

register_assets(&client)?;

let iter = client.request_with_pagination(
asset::all_definitions(),
Pagination {
limit: NonZeroU32::new(20),
start: NonZeroU64::new(0),
},
NonZeroU32::new(12).expect("Valid"),
)?;
assert_eq!(iter.batch_len(), 12);
Ok(())
}

fn register_assets(client: &Client) -> Result<()> {
let register: Vec<InstructionExpr> = ('a'..='z')
.map(|c| c.to_string())
.map(|name| (name + "#wonderland").parse().expect("Valid"))
.map(|asset_definition_id| {
RegisterExpr::new(AssetDefinition::quantity(asset_definition_id)).into()
})
.collect();
let _ = client.submit_all_blocking(register)?;
Ok(())
}
2 changes: 1 addition & 1 deletion client/tests/integration/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn genesis_transactions_are_validated() {
// Starting peer
let (_rt, _peer, test_client) = <PeerBuilder>::new()
.with_genesis(genesis)
.with_port(11_100)
.with_port(11_110)
.start_with_runtime();

// Checking that peer contains no blocks multiple times
Expand Down
10 changes: 5 additions & 5 deletions client/tests/integration/sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use iroha_data_model::{
account::Account,
predicate::{string, value, PredicateBox},
prelude::*,
query::{Pagination, Sorting},
query::{Pagination, Sorting, DEFAULT_FETCH_SIZE},
};
use test_network::*;

Expand Down Expand Up @@ -64,6 +64,7 @@ fn correct_pagination_assets_after_creating_new_one() {
limit: NonZeroU32::new(5),
start: None,
},
*DEFAULT_FETCH_SIZE,
sorting.clone(),
)
.expect("Valid")
Expand Down Expand Up @@ -107,6 +108,7 @@ fn correct_pagination_assets_after_creating_new_one() {
limit: NonZeroU32::new(13),
start: NonZeroU64::new(8),
},
*DEFAULT_FETCH_SIZE,
sorting,
)
.expect("Valid")
Expand Down Expand Up @@ -260,9 +262,8 @@ fn correct_sorting_of_entities() {
.expect("Valid");

let res = test_client
.request_with_filter_and_pagination_and_sorting(
.request_with_filter_and_sorting(
client::domain::all(),
Pagination::default(),
Sorting::by_metadata_key(sort_by_metadata_key.clone()),
PredicateBox::new(value::ValuePredicate::Identifiable(
string::StringPredicate::starts_with("neverland"),
Expand Down Expand Up @@ -309,9 +310,8 @@ fn correct_sorting_of_entities() {
string::StringPredicate::starts_with("neverland_"),
));
let res = test_client
.request_with_filter_and_pagination_and_sorting(
.request_with_filter_and_sorting(
client::domain::all(),
Pagination::default(),
Sorting::by_metadata_key(sort_by_metadata_key),
filter,
)
Expand Down
Loading

0 comments on commit 706fc65

Please sign in to comment.