diff --git a/Cargo.lock b/Cargo.lock index aab41d8f06a..c380ea9c76b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2724,6 +2724,7 @@ dependencies = [ "async-trait", "color-eyre", "dashmap", + "derive_more", "displaydoc", "eyre", "futures", @@ -2966,6 +2967,7 @@ dependencies = [ "iroha_primitives", "iroha_schema", "iroha_version", + "once_cell", "parity-scale-codec", "serde", "serde_json", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 89d1f08a41b..e0abd7c480b 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -59,6 +59,7 @@ iroha_genesis = { workspace = true } iroha_wasm_builder = { workspace = true } +derive_more = { workspace = true } async-trait = { workspace = true } color-eyre = { workspace = true } eyre = { workspace = true } diff --git a/cli/src/torii/mod.rs b/cli/src/torii/mod.rs index db8009e8a03..9594362ab5d 100644 --- a/cli/src/torii/mod.rs +++ b/cli/src/torii/mod.rs @@ -105,7 +105,9 @@ impl Error { QueryFailed(query_error) | InstructionFailed(InstructionExecutionError::Query(query_error)) => match query_error { - Evaluate(_) | Conversion(_) | UnknownCursor => StatusCode::BAD_REQUEST, + Evaluate(_) | Conversion(_) | UnknownCursor | FetchSizeTooBig => { + StatusCode::BAD_REQUEST + } Signature(_) => StatusCode::UNAUTHORIZED, Find(_) => StatusCode::NOT_FOUND, }, diff --git a/cli/src/torii/routing.rs b/cli/src/torii/routing.rs index 5181f4fe57c..48330bdffb9 100644 --- a/cli/src/torii/routing.rs +++ b/cli/src/torii/routing.rs @@ -5,8 +5,6 @@ // FIXME: This can't be fixed, because one trait in `warp` is private. #![allow(opaque_hidden_inferred_bound)] -use std::num::NonZeroUsize; - use eyre::{eyre, WrapErr}; use futures::TryStreamExt; use iroha_config::{ @@ -45,11 +43,13 @@ fn client_query_request( body::versioned::() .and(sorting()) .and(paginate()) - .and_then(|signed_query, sorting, pagination| async move { + .and(fetch_size()) + .and_then(|signed_query, sorting, pagination, fetch_size| async move { Result::<_, std::convert::Infallible>::Ok(http::ClientQueryRequest::query( signed_query, sorting, pagination, + fetch_size, )) }) .or(cursor().and_then(|cursor| async move { @@ -73,6 +73,11 @@ fn paginate() -> impl warp::Filter impl warp::Filter + Copy { + warp::query() +} + #[iroha_futures::telemetry_future] async fn handle_instructions( queue: Arc, @@ -101,7 +106,6 @@ async fn handle_instructions( async fn handle_queries( live_query_store: LiveQueryStoreHandle, sumeragi: SumeragiHandle, - fetch_size: NonZeroUsize, query_request: http::ClientQueryRequest, ) -> Result>> { @@ -110,11 +114,12 @@ async fn handle_queries( query: signed_query, sorting, pagination, + fetch_size, }) => sumeragi.apply_wsv(|wsv| { let valid_query = ValidQueryRequest::validate(signed_query, wsv)?; let query_output = valid_query.execute(wsv)?; live_query_store - .handle_query_output(query_output, fetch_size, &sorting, pagination) + .handle_query_output(query_output, &sorting, pagination, fetch_size) .map_err(ValidationFail::from) }), QueryRequest::Cursor(cursor) => live_query_store @@ -477,15 +482,10 @@ impl Torii { )) .and(body::versioned()), ) - .or(endpoint4( + .or(endpoint3( handle_queries, warp::path(uri::QUERY) - .and(add_state!( - self.query_service, - self.sumeragi, - NonZeroUsize::try_from(self.iroha_cfg.torii.fetch_size) - .expect("u64 should always fit into usize") - )) + .and(add_state!(self.query_service, self.sumeragi,)) .and(client_query_request()), )) .or(endpoint2( diff --git a/client/src/client.rs b/client/src/client.rs index 6e3b28f1196..d81c39aff56 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -245,6 +245,15 @@ pub struct ResultSet { client_cursor: usize, } +impl ResultSet { + /// Get the length of the batch returned by Iroha. + /// + /// This is controlled by `fetch_size` parameter of the query. + pub fn batch_len(&self) -> usize { + self.iter.len() + } +} + impl Iterator for ResultSet where Vec: QueryOutput, @@ -374,6 +383,7 @@ impl QueryRequest { query: Vec::default(), sorting: Sorting::default(), pagination: Pagination::default(), + fetch_size: FetchSize::default(), }, ), } @@ -389,6 +399,7 @@ impl QueryRequest { iroha_data_model::query::QueryRequest::Query(query_with_params) => builder .params(query_with_params.sorting().clone().into_query_parameters()) .params(query_with_params.pagination().into_query_parameters()) + .params(query_with_params.fetch_size().into_query_parameters()) .body(query_with_params.query().clone()), iroha_data_model::query::QueryRequest::Cursor(cursor) => { builder.params(Vec::from(cursor)) @@ -798,6 +809,7 @@ impl Client { filter: PredicateBox, pagination: Pagination, sorting: Sorting, + fetch_size: FetchSize, ) -> Result<(DefaultRequestBuilder, QueryResponseHandler)> where >::Error: Into, @@ -809,7 +821,9 @@ impl Client { torii_url: self.torii_url.clone(), headers: self.headers.clone(), request: iroha_data_model::query::QueryRequest::Query( - iroha_data_model::query::QueryWithParameters::new(request, sorting, pagination), + iroha_data_model::query::QueryWithParameters::new( + request, sorting, pagination, fetch_size, + ), ), }; @@ -827,6 +841,7 @@ impl Client { &self, request: R, pagination: Pagination, + fetch_size: FetchSize, sorting: Sorting, filter: PredicateBox, ) -> QueryResult<::Target> @@ -836,7 +851,7 @@ impl Client { { iroha_logger::trace!(?request, %pagination, ?sorting, ?filter); let (req, mut resp_handler) = - self.prepare_query_request::(request, filter, pagination, sorting)?; + self.prepare_query_request::(request, filter, pagination, sorting, fetch_size)?; let response = req.build()?.send()?; let value = resp_handler.handle(&response)?; diff --git a/client/src/query_builder.rs b/client/src/query_builder.rs index 7897a6574d7..71fc1878d7f 100644 --- a/client/src/query_builder.rs +++ b/client/src/query_builder.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use iroha_data_model::{ predicate::PredicateBox, - query::{sorting::Sorting, Pagination, Query}, + query::{sorting::Sorting, FetchSize, Pagination, Query}, Value, }; @@ -14,6 +14,7 @@ pub struct QueryRequestBuilder<'a, R> { pagination: Pagination, filter: PredicateBox, sorting: Sorting, + fetch_size: FetchSize, } impl<'a, R> QueryRequestBuilder<'a, R> @@ -29,6 +30,7 @@ where pagination: Pagination::default(), sorting: Sorting::default(), filter: PredicateBox::default(), + fetch_size: FetchSize::default(), } } @@ -47,10 +49,16 @@ where self } + pub fn with_fetch_size(mut self, fetch_size: FetchSize) -> Self { + self.fetch_size = fetch_size; + self + } + pub fn execute(self) -> QueryResult<::Target> { self.client.request_with_filter_and_pagination_and_sorting( self.request, self.pagination, + self.fetch_size, self.sorting, self.filter, ) diff --git a/client/tests/integration/pagination.rs b/client/tests/integration/pagination.rs index 60f5173af36..7a7f4141096 100644 --- a/client/tests/integration/pagination.rs +++ b/client/tests/integration/pagination.rs @@ -1,23 +1,16 @@ use std::num::{NonZeroU32, NonZeroU64}; use eyre::Result; -use iroha_client::client::{asset, QueryResult}; +use iroha_client::client::{asset, Client, QueryResult}; use iroha_data_model::{asset::AssetDefinition, prelude::*, query::Pagination}; use test_network::*; #[test] -fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() -> Result<()> { +fn limits_should_work() -> Result<()> { let (_rt, _peer, client) = ::new().with_port(10_690).start_with_runtime(); wait_for_genesis_committed(&vec![client.clone()], 0); - let register: Vec = ('a'..='z') // This is a subtle mistake, I'm glad we can lint it now. - .map(|c| c.to_string()) - .map(|name| (name + "#wonderland").parse().expect("Valid")) - .map(|asset_definition_id| { - RegisterExpr::new(AssetDefinition::quantity(asset_definition_id)).into() - }) - .collect(); - client.submit_all_blocking(register)?; + register_assets(&client)?; let vec = &client .build_query(asset::all_definitions()) @@ -30,3 +23,34 @@ fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() -> assert_eq!(vec.len(), 5); Ok(()) } + +#[test] +fn fetch_size_should_work() -> Result<()> { + let (_rt, _peer, client) = ::new().with_port(11_120).start_with_runtime(); + wait_for_genesis_committed(&vec![client.clone()], 0); + + register_assets(&client)?; + + let iter = client + .build_query(asset::all_definitions()) + .with_pagination(Pagination { + limit: NonZeroU32::new(20), + start: NonZeroU64::new(0), + }) + .with_fetch_size(FetchSize::new(Some(NonZeroU32::new(12).expect("Valid")))) + .execute()?; + assert_eq!(iter.batch_len(), 12); + Ok(()) +} + +fn register_assets(client: &Client) -> Result<()> { + let register: Vec = ('a'..='z') + .map(|c| c.to_string()) + .map(|name| (name + "#wonderland").parse().expect("Valid")) + .map(|asset_definition_id| { + RegisterExpr::new(AssetDefinition::quantity(asset_definition_id)).into() + }) + .collect(); + let _ = client.submit_all_blocking(register)?; + Ok(()) +} diff --git a/client/tests/integration/permissions.rs b/client/tests/integration/permissions.rs index 28b963730b2..7fa9e93cd03 100644 --- a/client/tests/integration/permissions.rs +++ b/client/tests/integration/permissions.rs @@ -32,7 +32,7 @@ fn genesis_transactions_are_validated() { // Starting peer let (_rt, _peer, test_client) = ::new() .with_genesis(genesis) - .with_port(11_100) + .with_port(11_110) .start_with_runtime(); // Checking that peer contains no blocks multiple times diff --git a/client/tests/integration/queries/mod.rs b/client/tests/integration/queries/mod.rs index a3805003c80..19306ab3f31 100644 --- a/client/tests/integration/queries/mod.rs +++ b/client/tests/integration/queries/mod.rs @@ -1,3 +1,29 @@ +use iroha_client::client::{self, ClientQueryError}; +use iroha_data_model::{ + query::{error::QueryExecutionFail, FetchSize, MAX_FETCH_SIZE}, + ValidationFail, +}; +use test_network::*; + mod account; mod asset; mod role; + +#[test] +fn too_big_fetch_size_is_not_allowed() { + let (_rt, _peer, client) = ::new().with_port(11_130).start_with_runtime(); + wait_for_genesis_committed(&[client.clone()], 0); + + let err = client + .build_query(client::asset::all()) + .with_fetch_size(FetchSize::new(Some(MAX_FETCH_SIZE.checked_add(1).unwrap()))) + .execute() + .expect_err("Should fail"); + + assert!(matches!( + err, + ClientQueryError::Validation(ValidationFail::QueryFailed( + QueryExecutionFail::FetchSizeTooBig + )) + )); +} diff --git a/client/tests/integration/sorting.rs b/client/tests/integration/sorting.rs index 1182b3d7519..a4452c3e241 100644 --- a/client/tests/integration/sorting.rs +++ b/client/tests/integration/sorting.rs @@ -256,14 +256,12 @@ fn correct_sorting_of_entities() { .expect("Valid"); let res = test_client - .request_with_filter_and_pagination_and_sorting( - client::domain::all(), - Pagination::default(), - Sorting::by_metadata_key(sort_by_metadata_key.clone()), - PredicateBox::new(value::ValuePredicate::Identifiable( - string::StringPredicate::starts_with("neverland"), - )), - ) + .build_query(client::domain::all()) + .with_sorting(Sorting::by_metadata_key(sort_by_metadata_key.clone())) + .with_filter(PredicateBox::new(value::ValuePredicate::Identifiable( + string::StringPredicate::starts_with("neverland"), + ))) + .execute() .expect("Valid") .collect::>>() .expect("Valid"); @@ -305,12 +303,10 @@ fn correct_sorting_of_entities() { string::StringPredicate::starts_with("neverland_"), )); let res = test_client - .request_with_filter_and_pagination_and_sorting( - client::domain::all(), - Pagination::default(), - Sorting::by_metadata_key(sort_by_metadata_key), - filter, - ) + .build_query(client::domain::all()) + .with_sorting(Sorting::by_metadata_key(sort_by_metadata_key)) + .with_filter(filter) + .execute() .expect("Valid") .collect::>>() .expect("Valid"); diff --git a/config/iroha_test_config.json b/config/iroha_test_config.json index ad415a54587..80f61607c38 100644 --- a/config/iroha_test_config.json +++ b/config/iroha_test_config.json @@ -43,8 +43,7 @@ "P2P_ADDR": "127.0.0.1:1337", "API_URL": "127.0.0.1:8080", "MAX_TRANSACTION_SIZE": 32768, - "MAX_CONTENT_LEN": 16384000, - "FETCH_SIZE": 10 + "MAX_CONTENT_LEN": 16384000 }, "BLOCK_SYNC": { "GOSSIP_PERIOD_MS": 10000, diff --git a/config/src/torii.rs b/config/src/torii.rs index 25263581645..1c2b801e981 100644 --- a/config/src/torii.rs +++ b/config/src/torii.rs @@ -1,5 +1,4 @@ //! `Torii` configuration as well as the default values for the URLs used for the main endpoints: `p2p`, `telemetry`, but not `api`. -use std::num::NonZeroU64; use iroha_config_base::derive::{Documented, Proxy}; use iroha_primitives::addr::{socket_addr, SocketAddr}; @@ -11,9 +10,6 @@ pub const DEFAULT_TORII_P2P_ADDR: SocketAddr = socket_addr!(127.0.0.1:1337); pub const DEFAULT_TORII_MAX_TRANSACTION_SIZE: u32 = 2_u32.pow(15); /// Default upper bound on `content-length` specified in the HTTP request header pub const DEFAULT_TORII_MAX_CONTENT_LENGTH: u32 = 2_u32.pow(12) * 4000; -/// Default max size of a single batch of results from a query -pub static DEFAULT_TORII_FETCH_SIZE: once_cell::sync::Lazy = - once_cell::sync::Lazy::new(|| NonZeroU64::new(10).unwrap()); /// Structure that defines the configuration parameters of `Torii` which is the routing module. /// For example the `p2p_addr`, which is used for consensus and block-synchronisation purposes, @@ -32,8 +28,6 @@ pub struct Configuration { pub max_transaction_size: u32, /// Maximum number of bytes in raw message. Used to prevent from DOS attacks. pub max_content_len: u32, - /// How many query results are returned in one batch - pub fetch_size: NonZeroU64, } impl Default for ConfigurationProxy { @@ -43,7 +37,6 @@ impl Default for ConfigurationProxy { api_url: None, max_transaction_size: Some(DEFAULT_TORII_MAX_TRANSACTION_SIZE), max_content_len: Some(DEFAULT_TORII_MAX_CONTENT_LENGTH), - fetch_size: Some(*DEFAULT_TORII_FETCH_SIZE), } } } @@ -96,10 +89,9 @@ pub mod tests { api_url in prop::option::of(Just(uri::DEFAULT_API_ADDR)), max_transaction_size in prop::option::of(Just(DEFAULT_TORII_MAX_TRANSACTION_SIZE)), max_content_len in prop::option::of(Just(DEFAULT_TORII_MAX_CONTENT_LENGTH)), - fetch_size in prop::option::of(Just(*DEFAULT_TORII_FETCH_SIZE)), ) -> ConfigurationProxy { - ConfigurationProxy { p2p_addr, api_url, max_transaction_size, max_content_len, fetch_size } + ConfigurationProxy { p2p_addr, api_url, max_transaction_size, max_content_len } } } } diff --git a/configs/peer/config.json b/configs/peer/config.json index 02211ed3072..51cc9c5a45a 100644 --- a/configs/peer/config.json +++ b/configs/peer/config.json @@ -24,8 +24,7 @@ "P2P_ADDR": null, "API_URL": null, "MAX_TRANSACTION_SIZE": 32768, - "MAX_CONTENT_LEN": 16384000, - "FETCH_SIZE": 10 + "MAX_CONTENT_LEN": 16384000 }, "BLOCK_SYNC": { "GOSSIP_PERIOD_MS": 10000, diff --git a/configs/peer/executor.wasm b/configs/peer/executor.wasm index a246bb87f48..0ef1007e61c 100644 Binary files a/configs/peer/executor.wasm and b/configs/peer/executor.wasm differ diff --git a/core/src/query/cursor.rs b/core/src/query/cursor.rs index 0757a759081..b1ef3393c8f 100644 --- a/core/src/query/cursor.rs +++ b/core/src/query/cursor.rs @@ -1,6 +1,6 @@ //! Module with cursor-based pagination functional like [`Batched`]. -use std::num::{NonZeroU64, NonZeroUsize}; +use std::num::{NonZeroU32, NonZeroU64}; use derive_more::Display; use parity_scale_codec::{Decode, Encode}; @@ -9,11 +9,11 @@ use serde::{Deserialize, Serialize}; /// Trait for iterators that can be batched. pub trait Batch: IntoIterator + Sized { /// Pack iterator into batches of the given size. - fn batched(self, fetch_size: NonZeroUsize) -> Batched; + fn batched(self, fetch_size: NonZeroU32) -> Batched; } impl Batch for I { - fn batched(self, batch_size: NonZeroUsize) -> Batched { + fn batched(self, batch_size: NonZeroU32) -> Batched { Batched { iter: self.into_iter(), batch_size, @@ -27,7 +27,7 @@ impl Batch for I { #[derive(Debug)] pub struct Batched { iter: I::IntoIter, - batch_size: NonZeroUsize, + batch_size: NonZeroU32, cursor: Option, } @@ -52,7 +52,12 @@ impl> Batched { .iter .by_ref() .inspect(|_| batch_size += 1) - .take(self.batch_size.get()) + .take( + self.batch_size + .get() + .try_into() + .expect("`u32` should always fit into `usize`"), + ) .collect(); self.cursor = if let Some(cursor) = self.cursor { diff --git a/core/src/query/store.rs b/core/src/query/store.rs index 7bb52d2313b..8e0122b5093 100644 --- a/core/src/query/store.rs +++ b/core/src/query/store.rs @@ -1,9 +1,9 @@ -//! This module contains [`QueryService`] actor. +//! This module contains [`LiveQueryStore`] actor. use std::{ cmp::Ordering, collections::HashMap, - num::{NonZeroU64, NonZeroUsize}, + num::NonZeroU64, time::{Duration, Instant}, }; @@ -12,6 +12,7 @@ use iroha_data_model::{ asset::AssetValue, query::{ cursor::ForwardCursor, error::QueryExecutionFail, pagination::Pagination, sorting::Sorting, + FetchSize, DEFAULT_FETCH_SIZE, MAX_FETCH_SIZE, }, BatchedResponse, BatchedResponseV1, HasMetadata, IdentifiableBox, ValidationFail, Value, }; @@ -31,9 +32,12 @@ pub enum Error { /// Unknown cursor error. #[error(transparent)] UnknownCursor(#[from] UnknownCursor), - /// Connection with QueryService is closed. - #[error("Connection with QueryService is closed")] + /// Connection with LiveQueryStore is closed. + #[error("Connection with LiveQueryStore is closed")] ConnectionClosed, + /// Fetch size is too big. + #[error("Fetch size is too big")] + FetchSizeTooBig, } #[allow(clippy::fallible_impl_from)] @@ -46,11 +50,14 @@ impl From for ValidationFail { Error::ConnectionClosed => { panic!("Connection to `LiveQueryStore` was unexpectedly closed, this is a bug") } + Error::FetchSizeTooBig => { + ValidationFail::QueryFailed(QueryExecutionFail::FetchSizeTooBig) + } } } } -/// Result type for [`QueryService`] methods. +/// Result type for [`LiveQueryStore`] methods. pub type Result = std::result::Result; type LiveQuery = Batched>; @@ -65,7 +72,7 @@ pub struct LiveQueryStore { } impl LiveQueryStore { - /// Construct [`QueryService`] from configuration. + /// Construct [`LiveQueryStore`] from configuration. pub fn from_configuration(cfg: Configuration) -> Self { Self { queries: HashMap::default(), @@ -73,7 +80,7 @@ impl LiveQueryStore { } } - /// Construct [`QueryService`] for tests. + /// Construct [`LiveQueryStore`] for tests. /// Default configuration will be used. /// /// Not marked as `#[cfg(test)]` because it is used in benches as well. @@ -87,7 +94,7 @@ impl LiveQueryStore { ) } - /// Start [`QueryService`]. Requires a [`tokio::runtime::Runtime`] being run + /// Start [`LiveQueryStore`]. Requires a [`tokio::runtime::Runtime`] being run /// as it will create new [`tokio::task`] and detach it. /// /// Returns a handle to interact with the service. @@ -158,14 +165,14 @@ impl LiveQueryStoreHandle { /// /// # Errors /// - /// - Returns [`Error::ConnectionClosed`] if [`QueryService`] is dropped, + /// - Returns [`Error::ConnectionClosed`] if [`LiveQueryStore`] is dropped, /// - Otherwise throws up query output handling errors. pub fn handle_query_output( &self, query_output: LazyValue<'_>, - fetch_size: NonZeroUsize, sorting: &Sorting, pagination: Pagination, + fetch_size: FetchSize, ) -> Result> { match query_output { LazyValue::Value(batch) => { @@ -174,6 +181,11 @@ impl LiveQueryStoreHandle { Ok(result.into()) } LazyValue::Iter(iter) => { + let fetch_size = fetch_size.fetch_size.unwrap_or(DEFAULT_FETCH_SIZE); + if fetch_size > MAX_FETCH_SIZE { + return Err(Error::FetchSizeTooBig); + } + let live_query = Self::apply_sorting_and_pagination(iter, sorting, pagination); let query_id = uuid::Uuid::new_v4().to_string(); @@ -188,7 +200,7 @@ impl LiveQueryStoreHandle { /// /// # Errors /// - /// - Returns [`Error::ConnectionClosed`] if [`QueryService`] is dropped, + /// - Returns [`Error::ConnectionClosed`] if [`LiveQueryStore`] is dropped, /// - Otherwise throws up query output handling errors. pub fn handle_query_cursor(&self, cursor: ForwardCursor) -> Result> { let query_id = cursor.query_id.ok_or(UnknownCursor)?; diff --git a/core/src/smartcontracts/wasm.rs b/core/src/smartcontracts/wasm.rs index 8a38a178b6d..3b5cd8d48f1 100644 --- a/core/src/smartcontracts/wasm.rs +++ b/core/src/smartcontracts/wasm.rs @@ -2,8 +2,6 @@ //! `WebAssembly` VM Smartcontracts can be written in Rust, compiled //! to wasm format and submitted in a transaction -use std::num::NonZeroUsize; - use error::*; use import_traits::{ ExecuteOperations as _, GetExecutorPayloads as _, SetPermissionTokenSchema as _, @@ -783,17 +781,14 @@ impl Runtime { query, sorting, pagination, + fetch_size, }) => { wsv.executor() .validate_query(wsv, state.authority(), query.clone())?; let output = query.execute(wsv)?; - wsv.query_handle().handle_query_output( - output, - NonZeroUsize::new(30_000).expect("30 000 is not zero"), - &sorting, - pagination, - ) + wsv.query_handle() + .handle_query_output(output, &sorting, pagination, fetch_size) } QueryRequest::Cursor(cursor) => wsv.query_handle().handle_query_cursor(cursor), } @@ -1001,15 +996,12 @@ where query, sorting, pagination, + fetch_size, }) => { let output = query.execute(wsv)?; - wsv.query_handle().handle_query_output( - output, - NonZeroUsize::new(30_000).expect("30 000 is not zero"), - &sorting, - pagination, - ) + wsv.query_handle() + .handle_query_output(output, &sorting, pagination, fetch_size) } QueryRequest::Cursor(cursor) => wsv.query_handle().handle_query_cursor(cursor), } @@ -1252,15 +1244,12 @@ impl<'wrld> import_traits::ExecuteOperations { let output = query.execute(wsv)?; - wsv.query_handle().handle_query_output( - output, - NonZeroUsize::new(30_000).expect("30 000 is not zero"), - &sorting, - pagination, - ) + wsv.query_handle() + .handle_query_output(output, &sorting, pagination, fetch_size) } QueryRequest::Cursor(cursor) => wsv.query_handle().handle_query_cursor(cursor), } @@ -1773,6 +1762,7 @@ mod tests { QueryBox::from(FindAccountById::new(authority.clone())), Sorting::default(), Pagination::default(), + FetchSize::default(), )); let wat = format!( diff --git a/data_model/Cargo.toml b/data_model/Cargo.toml index 83551e7125d..c04c3dba783 100644 --- a/data_model/Cargo.toml +++ b/data_model/Cargo.toml @@ -21,7 +21,7 @@ default = ["std"] # Enable static linkage of the rust standard library. # Disabled for WASM interoperability, to reduce the binary size. # Please refer to https://docs.rust-embedded.org/book/intro/no-std.html -std = ["iroha_macro/std", "iroha_version/std", "iroha_crypto/std", "iroha_primitives/std", "thiserror", "displaydoc/std", "strum/std"] +std = ["iroha_macro/std", "iroha_version/std", "iroha_crypto/std", "iroha_primitives/std", "thiserror", "displaydoc/std", "strum/std", "once_cell"] # Enable API for HTTP requests. Should be activated for HTTP clients http = ["std", "warp", "iroha_version/http"] # Replace structures and methods with FFI equivalents to facilitate dynamic linkage (mainly used in smartcontracts) @@ -52,6 +52,7 @@ displaydoc = { workspace = true } getset = { workspace = true } strum = { workspace = true, features = ["derive"] } base64 = { workspace = true, features = ["alloc"] } +once_cell = { workspace = true, optional = true } [dev-dependencies] iroha_client = { workspace = true } diff --git a/data_model/src/query/mod.rs b/data_model/src/query/mod.rs index f9e8d266511..6d826de4956 100644 --- a/data_model/src/query/mod.rs +++ b/data_model/src/query/mod.rs @@ -7,9 +7,10 @@ use alloc::{ boxed::Box, format, string::{String, ToString as _}, + vec, vec::Vec, }; -use core::cmp::Ordering; +use core::{cmp::Ordering, num::NonZeroU32}; pub use cursor::ForwardCursor; use derive_more::{Constructor, Display}; @@ -40,6 +41,42 @@ pub mod cursor; pub mod pagination; pub mod sorting; +const FETCH_SIZE: &str = "fetch_size"; + +/// Default value for `fetch_size` parameter in queries. +// SAFETY: `10` is greater than `0` +#[allow(unsafe_code)] +pub const DEFAULT_FETCH_SIZE: NonZeroU32 = unsafe { NonZeroU32::new_unchecked(10) }; + +/// Max value for `fetch_size` parameter in queries. +// SAFETY: `10_000` is greater than `0` +#[allow(unsafe_code)] +pub const MAX_FETCH_SIZE: NonZeroU32 = unsafe { NonZeroU32::new_unchecked(10_000) }; + +/// Structure for query fetch size parameter encoding/decoding +#[derive( + Debug, Default, Clone, Copy, PartialEq, Eq, Constructor, Decode, Encode, Deserialize, Serialize, +)] +pub struct FetchSize { + /// Inner value of a fetch size. + /// + /// If not specified then [`DEFAULT_FETCH_SIZE`] is used. + pub fetch_size: Option, +} + +impl FetchSize { + /// Converts self to iterator of tuples to be used in queries. + /// + /// The length of the output iterator is not constant and has either 0 or 1 value. + pub fn into_query_parameters( + self, + ) -> impl IntoIterator + Clone { + self.fetch_size + .map(|fetch_size| (FETCH_SIZE, fetch_size)) + .into_iter() + } +} + macro_rules! queries { ($($($meta:meta)* $item:item)+) => { pub use self::model::*; @@ -183,6 +220,7 @@ pub mod model { pub query: Q, pub sorting: Sorting, pub pagination: Pagination, + pub fetch_size: FetchSize, } } @@ -1323,9 +1361,14 @@ pub mod http { impl ClientQueryRequest { /// Construct a new request containing query. - pub fn query(query: SignedQuery, sorting: Sorting, pagination: Pagination) -> Self { + pub fn query( + query: SignedQuery, + sorting: Sorting, + pagination: Pagination, + fetch_size: FetchSize, + ) -> Self { Self(QueryRequest::Query(QueryWithParameters::new( - query, sorting, pagination, + query, sorting, pagination, fetch_size, ))) } @@ -1509,6 +1552,8 @@ pub mod error { ), /// Unknown query cursor UnknownCursor, + /// fetch_size could not be greater than {MAX_FETCH_SIZE:?} + FetchSizeTooBig, } /// Type assertion error @@ -1568,6 +1613,6 @@ pub mod prelude { pub use super::{ account::prelude::*, asset::prelude::*, block::prelude::*, domain::prelude::*, peer::prelude::*, permission::prelude::*, role::prelude::*, transaction::*, - trigger::prelude::*, QueryBox, TransactionQueryOutput, + trigger::prelude::*, FetchSize, QueryBox, TransactionQueryOutput, }; } diff --git a/data_model/src/smart_contract.rs b/data_model/src/smart_contract.rs index ceb9114f5ac..0800fbc27df 100644 --- a/data_model/src/smart_contract.rs +++ b/data_model/src/smart_contract.rs @@ -3,9 +3,12 @@ use parity_scale_codec::{Decode, Encode}; pub use self::model::*; -use crate::query::{ - cursor::ForwardCursor, sorting::Sorting, Pagination, QueryBox, QueryRequest, - QueryWithParameters, +use crate::{ + prelude::FetchSize, + query::{ + cursor::ForwardCursor, sorting::Sorting, Pagination, QueryBox, QueryRequest, + QueryWithParameters, + }, }; pub mod payloads { @@ -61,9 +64,14 @@ pub mod model { impl SmartContractQueryRequest { /// Construct a new request containing query. - pub fn query(query: QueryBox, sorting: Sorting, pagination: Pagination) -> Self { + pub fn query( + query: QueryBox, + sorting: Sorting, + pagination: Pagination, + fetch_size: FetchSize, + ) -> Self { Self(QueryRequest::Query(QueryWithParameters::new( - query, sorting, pagination, + query, sorting, pagination, fetch_size, ))) } diff --git a/docs/source/references/config.md b/docs/source/references/config.md index a3f4468ca4b..4a8288df05c 100644 --- a/docs/source/references/config.md +++ b/docs/source/references/config.md @@ -55,8 +55,7 @@ The following is the default configuration used by Iroha. "P2P_ADDR": null, "API_URL": null, "MAX_TRANSACTION_SIZE": 32768, - "MAX_CONTENT_LEN": 16384000, - "FETCH_SIZE": 10 + "MAX_CONTENT_LEN": 16384000 }, "BLOCK_SYNC": { "GOSSIP_PERIOD_MS": 10000, @@ -698,7 +697,6 @@ Has type `Option>`[^1]. Can be configured via env ```json { "API_URL": null, - "FETCH_SIZE": 10, "MAX_CONTENT_LEN": 16384000, "MAX_TRANSACTION_SIZE": 32768, "P2P_ADDR": null @@ -715,16 +713,6 @@ Has type `Option`[^1]. Can be configured via environment variable `T null ``` -### `torii.fetch_size` - -How many query results are returned in one batch - -Has type `Option`[^1]. Can be configured via environment variable `TORII_FETCH_SIZE` - -```json -10 -``` - ### `torii.max_content_len` Maximum number of bytes in raw message. Used to prevent from DOS attacks. diff --git a/docs/source/references/schema.json b/docs/source/references/schema.json index 428a32759d8..756d22082be 100644 --- a/docs/source/references/schema.json +++ b/docs/source/references/schema.json @@ -3628,6 +3628,10 @@ { "tag": "UnknownCursor", "discriminant": 4 + }, + { + "tag": "FetchSizeTooBig", + "discriminant": 5 } ] }, diff --git a/smart_contract/src/lib.rs b/smart_contract/src/lib.rs index 7893364a6ea..03d84f2ef33 100644 --- a/smart_contract/src/lib.rs +++ b/smart_contract/src/lib.rs @@ -80,6 +80,7 @@ pub struct QueryRequest { query: Q, sorting: Sorting, pagination: Pagination, + fetch_size: FetchSize, } impl From> for SmartContractQueryRequest { @@ -88,6 +89,7 @@ impl From> for SmartContractQueryRequest { query_request.query.into(), query_request.sorting, query_request.pagination, + query_request.fetch_size, ) } } @@ -108,6 +110,9 @@ pub trait ExecuteQueryOnHost: Sized { /// Apply pagination to a query fn paginate(self, pagination: Pagination) -> Self::QueryRequest; + /// Set fetch size for a query. Default is [`DEFAULT_FETCH_SIZE`] + fn fetch_size(self, fetch_size: FetchSize) -> Self::QueryRequest; + /// Execute query on the host /// /// # Errors @@ -130,6 +135,7 @@ where query: self, sorting, pagination: Pagination::default(), + fetch_size: FetchSize::default(), } } @@ -138,6 +144,16 @@ where query: self, sorting: Sorting::default(), pagination, + fetch_size: FetchSize::default(), + } + } + + fn fetch_size(self, fetch_size: FetchSize) -> Self::QueryRequest { + QueryRequest { + query: self, + sorting: Sorting::default(), + pagination: Pagination::default(), + fetch_size, } } @@ -146,6 +162,7 @@ where query: self, sorting: Sorting::default(), pagination: Pagination::default(), + fetch_size: FetchSize::default(), } .execute() } @@ -169,6 +186,11 @@ where self } + fn fetch_size(mut self, fetch_size: FetchSize) -> Self::QueryRequest { + self.fetch_size = fetch_size; + self + } + #[allow(irrefutable_let_patterns)] fn execute(self) -> Result, ValidationFail> { #[cfg(not(test))]