Skip to content

Commit

Permalink
[feature] #3900: fetch_size query parameter
Browse files Browse the repository at this point in the history
Signed-off-by: Daniil Polyakov <arjentix@gmail.com>
  • Loading branch information
Arjentix committed Nov 9, 2023
1 parent 97ed585 commit b22ebc4
Show file tree
Hide file tree
Showing 23 changed files with 252 additions and 113 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ iroha_genesis = { workspace = true }
iroha_wasm_builder = { workspace = true }


derive_more = { workspace = true }
async-trait = { workspace = true }
color-eyre = { workspace = true }
eyre = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion cli/src/torii/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ impl Error {
QueryFailed(query_error)
| InstructionFailed(InstructionExecutionError::Query(query_error)) => match query_error
{
Evaluate(_) | Conversion(_) | UnknownCursor => StatusCode::BAD_REQUEST,
Evaluate(_) | Conversion(_) | UnknownCursor | FetchSizeTooBig => {
StatusCode::BAD_REQUEST
}
Signature(_) => StatusCode::UNAUTHORIZED,
Find(_) => StatusCode::NOT_FOUND,
},
Expand Down
24 changes: 12 additions & 12 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
// FIXME: This can't be fixed, because one trait in `warp` is private.
#![allow(opaque_hidden_inferred_bound)]

use std::num::NonZeroUsize;

use eyre::{eyre, WrapErr};
use futures::TryStreamExt;
use iroha_config::{
Expand Down Expand Up @@ -45,11 +43,13 @@ fn client_query_request(
body::versioned::<SignedQuery>()
.and(sorting())
.and(paginate())
.and_then(|signed_query, sorting, pagination| async move {
.and(fetch_size())
.and_then(|signed_query, sorting, pagination, fetch_size| async move {
Result::<_, std::convert::Infallible>::Ok(http::ClientQueryRequest::query(
signed_query,
sorting,
pagination,
fetch_size,
))
})
.or(cursor().and_then(|cursor| async move {
Expand All @@ -73,6 +73,11 @@ fn paginate() -> impl warp::Filter<Extract = (Pagination,), Error = warp::Reject
warp::query()
}

/// Filter for warp which extracts fetch size
fn fetch_size() -> impl warp::Filter<Extract = (FetchSize,), Error = warp::Rejection> + Copy {
warp::query()
}

#[iroha_futures::telemetry_future]
async fn handle_instructions(
queue: Arc<Queue>,
Expand Down Expand Up @@ -101,7 +106,6 @@ async fn handle_instructions(
async fn handle_queries(
live_query_store: LiveQueryStoreHandle,
sumeragi: SumeragiHandle,
fetch_size: NonZeroUsize,

query_request: http::ClientQueryRequest,
) -> Result<Scale<BatchedResponse<Value>>> {
Expand All @@ -110,11 +114,12 @@ async fn handle_queries(
query: signed_query,
sorting,
pagination,
fetch_size,
}) => sumeragi.apply_wsv(|wsv| {
let valid_query = ValidQueryRequest::validate(signed_query, wsv)?;
let query_output = valid_query.execute(wsv)?;
live_query_store
.handle_query_output(query_output, fetch_size, &sorting, pagination)
.handle_query_output(query_output, &sorting, pagination, fetch_size)
.map_err(ValidationFail::from)
}),
QueryRequest::Cursor(cursor) => live_query_store
Expand Down Expand Up @@ -477,15 +482,10 @@ impl Torii {
))
.and(body::versioned()),
)
.or(endpoint4(
.or(endpoint3(
handle_queries,
warp::path(uri::QUERY)
.and(add_state!(
self.query_service,
self.sumeragi,
NonZeroUsize::try_from(self.iroha_cfg.torii.fetch_size)
.expect("u64 should always fit into usize")
))
.and(add_state!(self.query_service, self.sumeragi,))
.and(client_query_request()),
))
.or(endpoint2(
Expand Down
19 changes: 17 additions & 2 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ pub struct ResultSet<T> {
client_cursor: usize,
}

impl<T> ResultSet<T> {
/// Get the length of the batch returned by Iroha.
///
/// This is controlled by `fetch_size` parameter of the query.
pub fn batch_len(&self) -> usize {
self.iter.len()
}
}

impl<T: Clone> Iterator for ResultSet<T>
where
Vec<T>: QueryOutput,
Expand Down Expand Up @@ -374,6 +383,7 @@ impl QueryRequest {
query: Vec::default(),
sorting: Sorting::default(),
pagination: Pagination::default(),
fetch_size: FetchSize::default(),
},
),
}
Expand All @@ -389,6 +399,7 @@ impl QueryRequest {
iroha_data_model::query::QueryRequest::Query(query_with_params) => builder
.params(query_with_params.sorting().clone().into_query_parameters())
.params(query_with_params.pagination().into_query_parameters())
.params(query_with_params.fetch_size().into_query_parameters())
.body(query_with_params.query().clone()),
iroha_data_model::query::QueryRequest::Cursor(cursor) => {
builder.params(Vec::from(cursor))
Expand Down Expand Up @@ -798,6 +809,7 @@ impl Client {
filter: PredicateBox,
pagination: Pagination,
sorting: Sorting,
fetch_size: FetchSize,
) -> Result<(DefaultRequestBuilder, QueryResponseHandler<R::Output>)>
where
<R::Output as TryFrom<Value>>::Error: Into<eyre::Error>,
Expand All @@ -809,7 +821,9 @@ impl Client {
torii_url: self.torii_url.clone(),
headers: self.headers.clone(),
request: iroha_data_model::query::QueryRequest::Query(
iroha_data_model::query::QueryWithParameters::new(request, sorting, pagination),
iroha_data_model::query::QueryWithParameters::new(
request, sorting, pagination, fetch_size,
),
),
};

Expand All @@ -827,6 +841,7 @@ impl Client {
&self,
request: R,
pagination: Pagination,
fetch_size: FetchSize,
sorting: Sorting,
filter: PredicateBox,
) -> QueryResult<<R::Output as QueryOutput>::Target>
Expand All @@ -836,7 +851,7 @@ impl Client {
{
iroha_logger::trace!(?request, %pagination, ?sorting, ?filter);
let (req, mut resp_handler) =
self.prepare_query_request::<R>(request, filter, pagination, sorting)?;
self.prepare_query_request::<R>(request, filter, pagination, sorting, fetch_size)?;

let response = req.build()?.send()?;
let value = resp_handler.handle(&response)?;
Expand Down
10 changes: 9 additions & 1 deletion client/src/query_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::Debug;

use iroha_data_model::{
predicate::PredicateBox,
query::{sorting::Sorting, Pagination, Query},
query::{sorting::Sorting, FetchSize, Pagination, Query},
Value,
};

Expand All @@ -14,6 +14,7 @@ pub struct QueryRequestBuilder<'a, R> {
pagination: Pagination,
filter: PredicateBox,
sorting: Sorting,
fetch_size: FetchSize,
}

impl<'a, R> QueryRequestBuilder<'a, R>
Expand All @@ -29,6 +30,7 @@ where
pagination: Pagination::default(),
sorting: Sorting::default(),
filter: PredicateBox::default(),
fetch_size: FetchSize::default(),
}
}

Expand All @@ -47,10 +49,16 @@ where
self
}

pub fn with_fetch_size(mut self, fetch_size: FetchSize) -> Self {
self.fetch_size = fetch_size;
self
}

pub fn execute(self) -> QueryResult<<R::Output as QueryOutput>::Target> {
self.client.request_with_filter_and_pagination_and_sorting(
self.request,
self.pagination,
self.fetch_size,
self.sorting,
self.filter,
)
Expand Down
44 changes: 34 additions & 10 deletions client/tests/integration/pagination.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
use std::num::{NonZeroU32, NonZeroU64};

use eyre::Result;
use iroha_client::client::{asset, QueryResult};
use iroha_client::client::{asset, Client, QueryResult};
use iroha_data_model::{asset::AssetDefinition, prelude::*, query::Pagination};
use test_network::*;

#[test]
fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() -> Result<()> {
fn limits_should_work() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(10_690).start_with_runtime();
wait_for_genesis_committed(&vec![client.clone()], 0);

let register: Vec<InstructionExpr> = ('a'..='z') // This is a subtle mistake, I'm glad we can lint it now.
.map(|c| c.to_string())
.map(|name| (name + "#wonderland").parse().expect("Valid"))
.map(|asset_definition_id| {
RegisterExpr::new(AssetDefinition::quantity(asset_definition_id)).into()
})
.collect();
client.submit_all_blocking(register)?;
register_assets(&client)?;

let vec = &client
.build_query(asset::all_definitions())
Expand All @@ -30,3 +23,34 @@ fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() ->
assert_eq!(vec.len(), 5);
Ok(())
}

#[test]
fn fetch_size_should_work() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(11_120).start_with_runtime();
wait_for_genesis_committed(&vec![client.clone()], 0);

register_assets(&client)?;

let iter = client
.build_query(asset::all_definitions())
.with_pagination(Pagination {
limit: NonZeroU32::new(20),
start: NonZeroU64::new(0),
})
.with_fetch_size(FetchSize::new(Some(NonZeroU32::new(12).expect("Valid"))))
.execute()?;
assert_eq!(iter.batch_len(), 12);
Ok(())
}

fn register_assets(client: &Client) -> Result<()> {
let register: Vec<InstructionExpr> = ('a'..='z')
.map(|c| c.to_string())
.map(|name| (name + "#wonderland").parse().expect("Valid"))
.map(|asset_definition_id| {
RegisterExpr::new(AssetDefinition::quantity(asset_definition_id)).into()
})
.collect();
let _ = client.submit_all_blocking(register)?;
Ok(())
}
2 changes: 1 addition & 1 deletion client/tests/integration/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn genesis_transactions_are_validated() {
// Starting peer
let (_rt, _peer, test_client) = <PeerBuilder>::new()
.with_genesis(genesis)
.with_port(11_100)
.with_port(11_110)
.start_with_runtime();

// Checking that peer contains no blocks multiple times
Expand Down
26 changes: 26 additions & 0 deletions client/tests/integration/queries/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,29 @@
use iroha_client::client::{self, ClientQueryError};
use iroha_data_model::{
query::{error::QueryExecutionFail, FetchSize, MAX_FETCH_SIZE},
ValidationFail,
};
use test_network::*;

mod account;
mod asset;
mod role;

#[test]
fn too_big_fetch_size_is_not_allowed() {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(11_130).start_with_runtime();
wait_for_genesis_committed(&[client.clone()], 0);

let err = client
.build_query(client::asset::all())
.with_fetch_size(FetchSize::new(Some(MAX_FETCH_SIZE.checked_add(1).unwrap())))
.execute()
.expect_err("Should fail");

assert!(matches!(
err,
ClientQueryError::Validation(ValidationFail::QueryFailed(
QueryExecutionFail::FetchSizeTooBig
))
));
}
24 changes: 10 additions & 14 deletions client/tests/integration/sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,12 @@ fn correct_sorting_of_entities() {
.expect("Valid");

let res = test_client
.request_with_filter_and_pagination_and_sorting(
client::domain::all(),
Pagination::default(),
Sorting::by_metadata_key(sort_by_metadata_key.clone()),
PredicateBox::new(value::ValuePredicate::Identifiable(
string::StringPredicate::starts_with("neverland"),
)),
)
.build_query(client::domain::all())
.with_sorting(Sorting::by_metadata_key(sort_by_metadata_key.clone()))
.with_filter(PredicateBox::new(value::ValuePredicate::Identifiable(
string::StringPredicate::starts_with("neverland"),
)))
.execute()
.expect("Valid")
.collect::<QueryResult<Vec<_>>>()
.expect("Valid");
Expand Down Expand Up @@ -305,12 +303,10 @@ fn correct_sorting_of_entities() {
string::StringPredicate::starts_with("neverland_"),
));
let res = test_client
.request_with_filter_and_pagination_and_sorting(
client::domain::all(),
Pagination::default(),
Sorting::by_metadata_key(sort_by_metadata_key),
filter,
)
.build_query(client::domain::all())
.with_sorting(Sorting::by_metadata_key(sort_by_metadata_key))
.with_filter(filter)
.execute()
.expect("Valid")
.collect::<QueryResult<Vec<_>>>()
.expect("Valid");
Expand Down
3 changes: 1 addition & 2 deletions config/iroha_test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@
"P2P_ADDR": "127.0.0.1:1337",
"API_URL": "127.0.0.1:8080",
"MAX_TRANSACTION_SIZE": 32768,
"MAX_CONTENT_LEN": 16384000,
"FETCH_SIZE": 10
"MAX_CONTENT_LEN": 16384000
},
"BLOCK_SYNC": {
"GOSSIP_PERIOD_MS": 10000,
Expand Down
Loading

0 comments on commit b22ebc4

Please sign in to comment.