diff --git a/Cargo.lock b/Cargo.lock index 4b23a94c854..870c4ce3bd7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2930,6 +2930,7 @@ dependencies = [ "thread-local-panic-hook", "tokio", "tracing", + "uuid", "vergen", "warp", ] @@ -6126,6 +6127,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" +dependencies = [ + "getrandom 0.2.10", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 637a4be5399..b07710bcf07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,7 +106,7 @@ ursa = "0.3.7" aead = "0.3.2" rand = "0.8.5" -warp = { version = "0.3.3", default-features = false } +warp = { version = "0.3.5", default-features = false } wasmtime = "0.39.1" tracing = "0.1.37" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 328ebcae528..77d51e16893 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -67,13 +67,17 @@ thiserror = { workspace = true } displaydoc = { workspace = true } tokio = { workspace = true, features = ["sync", "time", "rt", "io-util", "rt-multi-thread", "macros", "fs", "signal"] } warp = { workspace = true, features = ["multipart", "websocket"] } -dashmap = "5.4.0" -serial_test = "0.8.0" once_cell = { workspace = true } owo-colors = { workspace = true, features = ["supports-colors"] } supports-color = { workspace = true } -thread-local-panic-hook = { version = "0.1.0", optional = true } tempfile = { workspace = true } +dashmap = { workspace = true } + +thread-local-panic-hook = { version = "0.1.0", optional = true } +uuid = { version = "1.4.1", features = ["v4"] } + +[dev-dependencies] +serial_test = "0.8.0" [build-dependencies] iroha_wasm_builder = { workspace = true } diff --git a/cli/src/torii/cursor.rs b/cli/src/torii/cursor.rs index aae3713314c..a3d1aca610c 100644 --- a/cli/src/torii/cursor.rs +++ b/cli/src/torii/cursor.rs @@ -1,6 +1,4 @@ -use std::num::NonZeroUsize; - -use iroha_data_model::query::ForwardCursor; +use std::num::{NonZeroU64, NonZeroUsize}; use crate::torii::{Error, Result}; @@ -13,7 +11,7 @@ impl Batch for I { Batched { iter: self.into_iter(), batch_size, - cursor: ForwardCursor::default(), + cursor: Some(0), } } } @@ -24,11 +22,11 @@ impl Batch for I { pub struct Batched { iter: I::IntoIter, batch_size: NonZeroUsize, - cursor: ForwardCursor, + cursor: Option, } impl> Batched { - pub(crate) fn next_batch(&mut self, cursor: ForwardCursor) -> Result<(I, ForwardCursor)> { + pub(crate) fn next_batch(&mut self, cursor: Option) -> Result<(I, Option)> { if cursor != self.cursor { return Err(Error::UnknownCursor); } @@ -41,7 +39,7 @@ impl> Batched { .take(self.batch_size.get()) .collect(); - self.cursor.cursor = if let Some(cursor) = self.cursor.cursor { + self.cursor = if let Some(cursor) = self.cursor { if batch_size >= self.batch_size.get() { let batch_size = self .batch_size @@ -57,23 +55,24 @@ impl> Batched { None } } else if batch_size >= self.batch_size.get() { - Some(self.batch_size.try_into().expect("usize should fit in u64")) + Some( + self.batch_size + .get() + .try_into() + .expect("usize should fit in u64"), + ) } else { None }; - Ok((batch, self.cursor)) + Ok(( + batch, + self.cursor + .map(|cursor| NonZeroU64::new(cursor).expect("Cursor is never 0")), + )) } pub fn is_depleted(&self) -> bool { - self.cursor.cursor.is_none() - } -} - -impl Iterator for Batched { - type Item = I::Item; - - fn next(&mut self) -> Option { - self.iter.next() + self.cursor.is_none() } } diff --git a/cli/src/torii/mod.rs b/cli/src/torii/mod.rs index c98ba158b1d..a95ba787d2a 100644 --- a/cli/src/torii/mod.rs +++ b/cli/src/torii/mod.rs @@ -20,6 +20,7 @@ use iroha_core::{ EventsSender, }; use iroha_data_model::Value; +use parity_scale_codec::Encode; use tokio::{sync::Notify, time::sleep}; use utils::*; use warp::{ @@ -41,18 +42,19 @@ type LiveQuery = Batched>; #[derive(Default)] struct LiveQueryStore { - queries: DashMap, (LiveQuery, Instant)>, + queries: DashMap<(String, Vec), (LiveQuery, Instant)>, } impl LiveQueryStore { - fn insert(&self, request: Vec, live_query: LiveQuery) { - self.queries.insert(request, (live_query, Instant::now())); + fn insert(&self, query_id: String, request: T, live_query: LiveQuery) { + self.queries + .insert((query_id, request.encode()), (live_query, Instant::now())); } - fn remove(&self, request: &Vec) -> Option<(Vec, LiveQuery)> { + fn remove(&self, query_id: &str, request: &T) -> Option { self.queries - .remove(request) - .map(|(query_id, (query, _))| (query_id, query)) + .remove(&(query_id.to_string(), request.encode())) + .map(|(_, (output, _))| output) } // TODO: Add notifier channel to enable graceful shutdown diff --git a/cli/src/torii/routing.rs b/cli/src/torii/routing.rs index 01003c30360..f0358f0b9ed 100644 --- a/cli/src/torii/routing.rs +++ b/cli/src/torii/routing.rs @@ -5,7 +5,10 @@ // FIXME: This can't be fixed, because one trait in `warp` is private. #![allow(opaque_hidden_inferred_bound)] -use std::{cmp::Ordering, num::NonZeroUsize}; +use std::{ + cmp::Ordering, + num::{NonZeroU64, NonZeroUsize}, +}; use cursor::Batch; use eyre::WrapErr; @@ -28,13 +31,13 @@ use iroha_data_model::{ }, VersionedCommittedBlock, }, + http::{BatchedResponse, VersionedBatchedResponse}, prelude::*, query::{ForwardCursor, Pagination, Sorting}, }; #[cfg(feature = "telemetry")] use iroha_telemetry::metrics::Status; use pagination::Paginate; -use parity_scale_codec::Encode; use tokio::task; use super::*; @@ -90,39 +93,48 @@ async fn handle_queries( pagination: Pagination, cursor: ForwardCursor, -) -> Result> { - let (query_id, mut live_query) = if cursor.cursor.is_some() { - let query_id = (&request, &sorting, &pagination).encode(); - query_store.remove(&query_id).ok_or(Error::UnknownCursor)? +) -> Result>> { + // TODO: Remove wsv clone + let mut wsv = sumeragi.wsv_clone(); + let valid_request = ValidQueryRequest::validate(request, &mut wsv)?; + let request_id = (&valid_request, &sorting, &pagination); + + let (query_id, curr_cursor, mut live_query) = if let Some(query_id) = cursor.query_id { + let live_query = query_store + .remove(&query_id, &request_id) + .ok_or(Error::UnknownCursor)?; + + (query_id, cursor.cursor.map(NonZeroU64::get), live_query) } else { - let mut wsv = sumeragi.wsv_clone(); - - let valid_request = ValidQueryRequest::validate(request, &mut wsv)?; let res = valid_request.execute(&wsv).map_err(ValidationFail::from)?; match res { - LazyValue::Value(result) => { + LazyValue::Value(batch) => { let cursor = ForwardCursor::default(); - let result = QueryResponse { result, cursor }; + let result = BatchedResponse { batch, cursor }; return Ok(Scale(result.into())); } LazyValue::Iter(iter) => { - let query_id = (&valid_request, &sorting, &pagination).encode(); - let query = apply_sorting_and_pagination(iter, &sorting, pagination); - (query_id, query.batched(fetch_size)) + let live_query = apply_sorting_and_pagination(iter, &sorting, pagination); + let query_id = uuid::Uuid::new_v4().to_string(); + + (query_id, Some(0), live_query.batched(fetch_size)) } } }; - let (batch, next_cursor) = live_query.next_batch(cursor)?; + let (batch, next_cursor) = live_query.next_batch(curr_cursor)?; if !live_query.is_depleted() { - query_store.insert(query_id, live_query); + query_store.insert(query_id.clone(), request_id, live_query); } - let query_response = QueryResponse { - result: Value::Vec(batch), - cursor: next_cursor, + let query_response = BatchedResponse { + batch: Value::Vec(batch), + cursor: ForwardCursor { + query_id: Some(query_id), + cursor: next_cursor, + }, }; Ok(Scale(query_response.into())) @@ -192,16 +204,16 @@ async fn handle_pending_transactions( ) -> Result>> { // TODO: Don't clone wsv here let wsv = sumeragi.wsv_clone(); - Ok(Scale( - queue - .all_transactions(&wsv) - .map(Into::into) - .paginate(pagination) - .collect::>(), - // TODO: - // NOTE: batching is done after collecting the result of pagination - //.batched(fetch_size) - )) + + let query_response = queue + .all_transactions(&wsv) + .map(Into::into) + .paginate(pagination) + .collect::>(); + // TODO: + //.batched(fetch_size) + + Ok(Scale(query_response)) } #[iroha_futures::telemetry_future] @@ -495,7 +507,7 @@ impl Torii { endpoint3( handle_pending_transactions, warp::path(uri::PENDING_TRANSACTIONS) - .and(add_state!(self.queue, self.sumeragi)) + .and(add_state!(self.queue, self.sumeragi,)) .and(paginate()), ) .or(endpoint2( diff --git a/cli/src/torii/utils.rs b/cli/src/torii/utils.rs index 77d31319c06..7d590ff4b48 100644 --- a/cli/src/torii/utils.rs +++ b/cli/src/torii/utils.rs @@ -66,4 +66,4 @@ impl Reply for WarpResult { } } -iroha_cli_derive::generate_endpoints!(2, 3, 4, 5, 7); +iroha_cli_derive::generate_endpoints!(2, 3, 4, 5, 6, 7); diff --git a/client/src/client.rs b/client/src/client.rs index 71ab833c919..f917635fb07 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -22,6 +22,7 @@ use iroha_config::{client::Configuration, torii::uri, GetConfiguration, PostConf use iroha_crypto::{HashOf, KeyPair}; use iroha_data_model::{ block::VersionedCommittedBlock, + http::VersionedBatchedResponse, isi::Instruction, predicate::PredicateBox, prelude::*, @@ -47,7 +48,7 @@ const APPLICATION_JSON: &str = "application/json"; /// Phantom struct that handles responses of Query API. /// Depending on input query struct, transforms a response into appropriate output. -#[derive(Debug, Clone, serde::Serialize)] +#[derive(Debug, Clone)] pub struct QueryResponseHandler { query_request: QueryRequest, _output_type: PhantomData, @@ -104,10 +105,10 @@ where // Separate-compilation friendly response handling fn _handle_query_response_base( resp: &Response>, - ) -> QueryResult { + ) -> QueryResult> { match resp.status() { StatusCode::OK => { - let res = VersionedQueryResponse::decode_all_versioned(resp.body()); + let res = VersionedBatchedResponse::decode_all_versioned(resp.body()); res.wrap_err( "Failed to decode response from Iroha. \ You are likely using a version of the client library \ @@ -143,13 +144,13 @@ where } let response = _handle_query_response_base(resp) - .map(|VersionedQueryResponse::V1(response)| response)?; + .map(|VersionedBatchedResponse::V1(response)| response)?; - let value = R::try_from(response.result) + let value = R::try_from(response.batch) .map_err(Into::into) .wrap_err("Unexpected type")?; - self.query_request.server_cursor = response.cursor; + self.query_request.query_cursor = response.cursor; Ok(value) } } @@ -242,7 +243,7 @@ pub trait QueryOutput: Into + TryFrom { } /// Iterable query output -#[derive(Debug, Clone, serde::Serialize)] +#[derive(Debug, Clone)] pub struct ResultSet { query_handler: QueryResponseHandler>, @@ -259,7 +260,7 @@ where fn next(&mut self) -> Option { if self.client_cursor >= self.iter.len() { - self.query_handler.query_request.server_cursor.get()?; + self.query_handler.query_request.query_cursor.cursor?; let request = match self.query_handler.query_request.clone().assemble().build() { Err(err) => return Some(Err(ClientQueryError::Other(err))), @@ -322,6 +323,7 @@ impl_query_result! { iroha_data_model::block::BlockHeader, iroha_data_model::query::MetadataValue, iroha_data_model::query::TransactionQueryOutput, + iroha_data_model::permission::PermissionTokenSchema, iroha_data_model::trigger::Trigger, } @@ -360,19 +362,21 @@ pub struct QueryRequest { request: Vec, sorting: Sorting, pagination: Pagination, - server_cursor: ForwardCursor, + query_cursor: ForwardCursor, } impl QueryRequest { #[cfg(test)] fn dummy() -> Self { + let torii_url = iroha_config::torii::uri::DEFAULT_API_ADDR; + Self { - torii_url: uri::QUERY.parse().unwrap(), + torii_url: format!("http://{torii_url}").parse().unwrap(), headers: HashMap::new(), request: Vec::new(), sorting: Sorting::default(), pagination: Pagination::default(), - server_cursor: ForwardCursor::default(), + query_cursor: ForwardCursor::default(), } } fn assemble(self) -> DefaultRequestBuilder { @@ -383,7 +387,7 @@ impl QueryRequest { .headers(self.headers) .params(Vec::from(self.sorting)) .params(Vec::from(self.pagination)) - .params(Vec::from(self.server_cursor)) + .params(Vec::from(self.query_cursor)) .body(self.request) } } @@ -805,7 +809,7 @@ impl Client { request, sorting, pagination, - server_cursor: ForwardCursor::default(), + query_cursor: ForwardCursor::default(), }; Ok(( diff --git a/client/tests/integration/upgrade.rs b/client/tests/integration/upgrade.rs index 3d667503ef0..03a61bc9027 100644 --- a/client/tests/integration/upgrade.rs +++ b/client/tests/integration/upgrade.rs @@ -3,7 +3,7 @@ use std::path::Path; use eyre::Result; -use iroha_client::client::{Client, QueryResult}; +use iroha_client::client::Client; use iroha_crypto::KeyPair; use iroha_data_model::{prelude::*, query::permission::FindPermissionTokenSchema}; use iroha_logger::info; diff --git a/client_cli/src/main.rs b/client_cli/src/main.rs index 73266683ab5..c0f71d42142 100644 --- a/client_cli/src/main.rs +++ b/client_cli/src/main.rs @@ -26,7 +26,7 @@ use color_eyre::{ }; use dialoguer::Confirm; use erased_serde::Serialize; -use iroha_client::client::Client; +use iroha_client::client::{Client, QueryResult}; use iroha_config::{client::Configuration as ClientConfiguration, path::Path as ConfigPath}; use iroha_crypto::prelude::*; use iroha_data_model::prelude::*; @@ -381,7 +381,7 @@ mod domain { .request(client::domain::all()) .wrap_err("Failed to get all domains"), }?; - context.print_data(&vec)?; + context.print_data(&vec.collect::>>()?)?; Ok(()) } } @@ -390,7 +390,7 @@ mod domain { mod account { use std::fmt::Debug; - use iroha_client::client; + use iroha_client::client::{self}; use super::*; @@ -515,7 +515,7 @@ mod account { .request(client::account::all()) .wrap_err("Failed to get all accounts"), }?; - context.print_data(&vec)?; + context.print_data(&vec.collect::>>()?)?; Ok(()) } } @@ -579,7 +579,7 @@ mod account { let permissions = client .request(find_all_permissions) .wrap_err("Failed to get all account permissions")?; - context.print_data(&permissions)?; + context.print_data(&permissions.collect::>>()?)?; Ok(()) } } @@ -803,7 +803,7 @@ mod asset { .request(client::asset::all()) .wrap_err("Failed to get all assets"), }?; - context.print_data(&vec)?; + context.print_data(&vec.collect::>>()?)?; Ok(()) } } diff --git a/config/Cargo.toml b/config/Cargo.toml index 93dccade084..89a01d1bc68 100644 --- a/config/Cargo.toml +++ b/config/Cargo.toml @@ -27,7 +27,7 @@ displaydoc = { workspace = true } derive_more = { workspace = true } cfg-if = { workspace = true } path-absolutize = { workspace = true } -once_cell = "1.16.0" +once_cell = { workspace = true } [dev-dependencies] proptest = { workspace = true } diff --git a/config/iroha_test_config.json b/config/iroha_test_config.json index c3e37e741cd..3ac14745d08 100644 --- a/config/iroha_test_config.json +++ b/config/iroha_test_config.json @@ -44,7 +44,9 @@ "API_URL": "127.0.0.1:8080", "TELEMETRY_URL": "127.0.0.1:8180", "MAX_TRANSACTION_SIZE": 32768, - "MAX_CONTENT_LEN": 16384000 + "MAX_CONTENT_LEN": 16384000, + "FETCH_SIZE": 10, + "QUERY_IDLE_TIME_MS": 30000 }, "BLOCK_SYNC": { "GOSSIP_PERIOD_MS": 10000, diff --git a/core/Cargo.toml b/core/Cargo.toml index 88bc244849d..94b21f93d77 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -66,15 +66,17 @@ wasmtime = { workspace = true } parking_lot = { workspace = true, features = ["deadlock_detection"] } derive_more = { workspace = true } itertools = { workspace = true } + sealed = "0.5.0" [dev-dependencies] criterion = { workspace = true } hex = { workspace = true } -byte-unit = "4.0.18" once_cell = { workspace = true } tempfile = { workspace = true } +byte-unit = "4.0.18" + [[bench]] name = "validation" harness = false diff --git a/core/src/smartcontracts/isi/account.rs b/core/src/smartcontracts/isi/account.rs index 6550e6b37f7..c3f06d03a62 100644 --- a/core/src/smartcontracts/isi/account.rs +++ b/core/src/smartcontracts/isi/account.rs @@ -262,7 +262,7 @@ pub mod isi { wsv.account_mut(&account_id)?; if !wsv - .permission_token_definitions() + .permission_token_schema() .token_ids .contains(&permission_id) { diff --git a/core/src/smartcontracts/isi/query.rs b/core/src/smartcontracts/isi/query.rs index 452f77bef98..1bdb45062c7 100644 --- a/core/src/smartcontracts/isi/query.rs +++ b/core/src/smartcontracts/isi/query.rs @@ -51,6 +51,7 @@ impl_lazy! { iroha_data_model::block::BlockHeader, iroha_data_model::query::MetadataValue, iroha_data_model::query::TransactionQueryOutput, + iroha_data_model::permission::PermissionTokenSchema, iroha_data_model::trigger::Trigger, } @@ -423,7 +424,6 @@ mod tests { .count() as u64, num_blocks ); - assert!(txs.windows(2).all(|wnd| wnd[0] >= wnd[1])); Ok(()) } diff --git a/core/src/smartcontracts/isi/tx.rs b/core/src/smartcontracts/isi/tx.rs index 894c965333e..dbc85541346 100644 --- a/core/src/smartcontracts/isi/tx.rs +++ b/core/src/smartcontracts/isi/tx.rs @@ -34,7 +34,10 @@ impl Iterator for BlockTransactionIter { let block = self.0.as_v1(); if self.1 < block.transactions.len() { - return Some(BlockTransactionRef(Arc::clone(&self.0), self.1)); + let res = Some(BlockTransactionRef(Arc::clone(&self.0), self.1)); + + self.1 += 1; + return res; } None diff --git a/core/src/smartcontracts/isi/world.rs b/core/src/smartcontracts/isi/world.rs index 34cbe4ea3cc..d7237ce14d0 100644 --- a/core/src/smartcontracts/isi/world.rs +++ b/core/src/smartcontracts/isi/world.rs @@ -123,7 +123,7 @@ pub mod isi { for permission in &role.permissions { if !wsv - .permission_token_definitions() + .permission_token_schema() .token_ids .contains(&permission.definition_id) { @@ -182,7 +182,7 @@ pub mod isi { token_id: PermissionTokenId, wsv: &mut WorldStateView, ) -> Result<(), RepetitionError> { - let permission_token_ids = &mut wsv.world_mut().permission_token_definitions.token_ids; + let permission_token_ids = &mut wsv.world_mut().permission_token_schema.token_ids; // Keep permission tokens sorted if let Err(pos) = permission_token_ids.binary_search(&token_id) { @@ -204,7 +204,7 @@ pub mod isi { remove_token_from_roles(token_id, wsv)?; remove_token_from_accounts(token_id, wsv)?; - let permission_token_ids = &mut wsv.world_mut().permission_token_definitions.token_ids; + let permission_token_ids = &mut wsv.world_mut().permission_token_schema.token_ids; if let Ok(pos) = permission_token_ids.binary_search(token_id) { permission_token_ids.remove(pos); @@ -270,7 +270,7 @@ pub mod isi { accounts_with_token.insert( account_id.clone(), wsv.account_inherent_permission_tokens(account_id) - .filter(|token| token.definition_id == *target_definition_id) + .filter(|token| token.definition_id == *token_id) .cloned() .collect::>(), ); @@ -373,18 +373,16 @@ pub mod isi { } } - let old_token_schema = wsv.permission_token_definitions().clone(); + let old_token_schema = wsv.permission_token_schema().clone(); for token_id in &old_token_schema.token_ids { if !new_token_schema.token_ids.contains(token_id) { unregister_permission_token_definition(token_id, wsv)?; } - wsv.world_mut().permission_token_definitions.schema = - new_token_schema.schema.clone(); + wsv.world_mut().permission_token_schema.schema = new_token_schema.schema.clone(); } for token_id in &new_token_schema.token_ids { - wsv.world_mut().permission_token_definitions.schema = - new_token_schema.schema.clone(); + wsv.world_mut().permission_token_schema.schema = new_token_schema.schema.clone(); if !old_token_schema.token_ids.contains(token_id) { register_permission_token_definition(token_id.clone(), wsv)?; @@ -407,7 +405,7 @@ pub mod query { use iroha_data_model::{ parameter::Parameter, peer::Peer, - permission::PermissionTokenDefinition, + permission::PermissionTokenSchema, prelude::*, query::{ error::{FindError, QueryExecutionFail as Error}, @@ -470,14 +468,9 @@ pub mod query { } impl ValidQuery for FindPermissionTokenSchema { - #[metrics("find_all_permission_token_ids")] - fn execute<'wsv>( - &self, - wsv: &'wsv WorldStateView, - ) -> Result + 'wsv>, Error> { - Ok(Box::new( - wsv.permission_token_definitions().cloned(), - )) + #[metrics("find_permission_token_schema")] + fn execute(&self, wsv: &WorldStateView) -> Result { + Ok(wsv.permission_token_schema().clone()) } } @@ -497,13 +490,13 @@ pub mod query { let authority = wsv .evaluate(&self.account_id) .map_err(|e| Error::Evaluate(e.to_string()))?; - let permission_token = wsv + let given_permission_token = wsv .evaluate(&self.permission_token) .map_err(|e| Error::Evaluate(e.to_string()))?; Ok(wsv .account_permission_tokens(&authority)? - .any(|permission_token| *permission_token == self.permission_token)) + .any(|permission_token| *permission_token == given_permission_token)) } } } diff --git a/core/src/wsv.rs b/core/src/wsv.rs index d5b8cfa57ee..2bf33133f02 100644 --- a/core/src/wsv.rs +++ b/core/src/wsv.rs @@ -66,7 +66,7 @@ pub struct World { /// Permission tokens of an account. pub(crate) account_permission_tokens: crate::PermissionTokensMap, /// Registered permission token ids. - pub(crate) permission_token_definitions: PermissionTokenSchema, + pub(crate) permission_token_schema: PermissionTokenSchema, /// Triggers pub(crate) triggers: TriggerSet, /// Runtime Validator @@ -631,8 +631,8 @@ impl WorldStateView { /// Get all permission token definitions #[inline] - pub fn permission_token_definitions(&self) -> &crate::PermissionTokenSchema { - &self.world.permission_token_definitions + pub fn permission_token_schema(&self) -> &crate::PermissionTokenSchema { + &self.world.permission_token_schema } /// Construct [`WorldStateView`] with specific [`Configuration`]. diff --git a/data_model/cbindgen.toml b/data_model/cbindgen.toml deleted file mode 100644 index 5d6b21a6ac3..00000000000 --- a/data_model/cbindgen.toml +++ /dev/null @@ -1,3 +0,0 @@ -[parse.expand] -crates = ["iroha_data_model"] -features = ["ffi_export"] diff --git a/data_model/src/lib.rs b/data_model/src/lib.rs index 9dd3ae8db34..0ae2f810e7e 100644 --- a/data_model/src/lib.rs +++ b/data_model/src/lib.rs @@ -1817,6 +1817,50 @@ pub fn current_time() -> core::time::Duration { .expect("Failed to get the current system time") } +#[cfg(feature = "http")] +pub mod http { + //! Structures related to HTTP communication + + use iroha_data_model_derive::model; + use iroha_schema::IntoSchema; + use iroha_version::declare_versioned_with_scale; + + pub use self::model::*; + use crate::prelude::QueryOutput; + + declare_versioned_with_scale!(VersionedBatchedResponse 1..2, Debug, Clone, iroha_macro::FromVariant, IntoSchema); + + #[model] + pub mod model { + use getset::Getters; + use iroha_version::version_with_scale; + use parity_scale_codec::{Decode, Encode}; + use serde::{Deserialize, Serialize}; + + use super::*; + + /// Batched response of a query sent to torii + #[derive(Debug, Clone, Getters, Decode, Encode, Deserialize, Serialize, IntoSchema)] + #[version_with_scale(n = 1, versioned = "VersionedBatchedResponse")] + #[getset(get = "pub")] + #[must_use] + pub struct BatchedResponse { + /// Current batch + pub batch: T, + /// Index of the next element in the result set. Client will use this value + /// in the next request to continue fetching results of the original query + pub cursor: crate::query::cursor::ForwardCursor, + } + } + + impl From> for QueryOutput { + #[inline] + fn from(source: BatchedResponse) -> Self { + source.batch + } + } +} + mod ffi { //! Definitions and implementations of FFI related functionalities diff --git a/data_model/src/query/cursor.rs b/data_model/src/query/cursor.rs index 331adb73701..1c3f4a24542 100644 --- a/data_model/src/query/cursor.rs +++ b/data_model/src/query/cursor.rs @@ -2,13 +2,15 @@ use core::num::{NonZeroU64, NonZeroUsize}; +use getset::Getters; use iroha_data_model_derive::model; use iroha_schema::IntoSchema; -use parity_scale_codec::{Decode, Encode}; -use serde::{Deserialize, Serialize}; +use parity_scale_codec::{Decode, Encode, Input}; +use serde::Serialize; pub use self::model::*; +const QUERY_ID: &str = "query_id"; const CURSOR: &str = "cursor"; #[model] @@ -16,38 +18,76 @@ pub mod model { use super::*; /// Forward-only (a.k.a non-scrollable) cursor - #[derive( - Debug, - Clone, - Copy, - PartialEq, - Eq, - Default, - Decode, - Encode, - Deserialize, - Serialize, - IntoSchema, - )] + #[derive(Debug, Clone, PartialEq, Eq, Default, Getters, Encode, Serialize, IntoSchema)] + #[getset(get = "pub")] pub struct ForwardCursor { + /// Unique ID of query. When provided in a query the query will look up if there + /// is was already a query with a matching ID and resume returning result batches + pub query_id: Option, + /// Pointer to the next element in the result set pub cursor: Option, } } -impl ForwardCursor { - /// Get cursor position - pub fn get(self) -> Option { - self.cursor +mod candidate { + use serde::{de::Error as _, Deserialize}; + + use super::*; + + #[derive(Decode, Deserialize)] + struct ForwardCursorCandidate { + query_id: Option, + cursor: Option, + } + + impl<'de> Deserialize<'de> for ForwardCursor { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let candidate = ForwardCursorCandidate::deserialize(deserializer)?; + + if let Some(query_id) = candidate.query_id { + Ok(ForwardCursor { + query_id: Some(query_id), + cursor: candidate.cursor, + }) + } else if candidate.cursor.is_some() { + Err(D::Error::custom("Cursor missing query id")) + } else { + Ok(ForwardCursor::default()) + } + } + } + + impl Decode for ForwardCursor { + fn decode(input: &mut I) -> Result { + let candidate = ForwardCursorCandidate::decode(input)?; + + if let Some(query_id) = candidate.query_id { + Ok(ForwardCursor { + query_id: Some(query_id), + cursor: candidate.cursor, + }) + } else if candidate.cursor.is_some() { + Err("Cursor missing query id".into()) + } else { + Ok(ForwardCursor::default()) + } + } } } -impl From for Vec<(&'static str, NonZeroU64)> { +impl From for Vec<(&'static str, String)> { fn from(cursor: ForwardCursor) -> Self { - if let Some(cursor) = cursor.cursor { - return vec![(CURSOR, cursor)]; + match (cursor.query_id, cursor.cursor) { + (Some(query_id), Some(cursor)) => { + vec![(QUERY_ID, query_id), (CURSOR, cursor.to_string())] + } + (Some(query_id), None) => vec![(QUERY_ID, query_id)], + (None, Some(_)) => unreachable!(), + (None, None) => Vec::new(), } - - Vec::new() } } diff --git a/data_model/src/query/mod.rs b/data_model/src/query/mod.rs index 336620f3aad..c0ae91ace65 100644 --- a/data_model/src/query/mod.rs +++ b/data_model/src/query/mod.rs @@ -1297,10 +1297,10 @@ pub mod http { use crate::{account::AccountId, predicate::PredicateBox}; // TODO: Could we make a variant of `Value` that holds only query results? - type QueryResult = Value; + /// Type representing Result of executing a query + pub type QueryOutput = Value; declare_versioned_with_scale!(VersionedSignedQuery 1..2, Debug, Clone, iroha_macro::FromVariant, IntoSchema); - declare_versioned_with_scale!(VersionedQueryResponse 1..2, Debug, Clone, iroha_macro::FromVariant, IntoSchema); #[model] pub mod model { @@ -1322,9 +1322,6 @@ pub mod http { Debug, Clone, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema, )] pub(crate) struct QueryPayload { - /// Timestamp of the query creation. - #[codec(compact)] - pub timestamp_ms: u128, /// Account id of the user who will sign this query. pub authority: AccountId, /// Query definition. @@ -1342,19 +1339,6 @@ pub mod http { /// Payload pub payload: QueryPayload, } - - /// [`SignedQuery`] response - #[derive(Debug, Clone, Getters, Decode, Encode, Deserialize, Serialize, IntoSchema)] - #[version_with_scale(n = 1, versioned = "VersionedQueryResponse")] - #[getset(get = "pub")] - pub struct QueryResponse { - /// The result of the query execution. - #[getset(skip)] - pub result: QueryResult, - /// Index of the next element in the result set. Client will use this value - /// in the next request to continue fetching results of the original query - pub cursor: cursor::ForwardCursor, - } } mod candidate { @@ -1431,11 +1415,8 @@ pub mod http { impl QueryBuilder { /// Construct a new request with the `query`. pub fn new(query: impl Into, authority: AccountId) -> Self { - let timestamp_ms = crate::current_time().as_millis(); - Self { payload: QueryPayload { - timestamp_ms, query: query.into(), authority, filter: PredicateBox::default(), @@ -1468,19 +1449,10 @@ pub mod http { } } - impl From for Value { - #[inline] - fn from(source: QueryResponse) -> Self { - source.result - } - } - pub mod prelude { //! The prelude re-exports most commonly used traits, structs and macros from this crate. - pub use super::{ - QueryBuilder, QueryResponse, SignedQuery, VersionedQueryResponse, VersionedSignedQuery, - }; + pub use super::{QueryBuilder, SignedQuery, VersionedSignedQuery}; } } diff --git a/data_model/src/visit.rs b/data_model/src/visit.rs index c56bd363575..571c3aa192b 100644 --- a/data_model/src/visit.rs +++ b/data_model/src/visit.rs @@ -78,7 +78,7 @@ pub trait Visit: ExpressionEvaluator { visit_find_all_domains(&FindAllDomains), visit_find_all_parammeters(&FindAllParameters), visit_find_all_peers(&FindAllPeers), - visit_find_all_permission_token_definitions(&FindPermissionTokenSchema), + visit_find_permission_token_schema(&FindPermissionTokenSchema), visit_find_all_role_ids(&FindAllRoleIds), visit_find_all_roles(&FindAllRoles), visit_find_all_transactions(&FindAllTransactions), @@ -213,7 +213,7 @@ pub fn visit_query(visitor: &mut V, authority: &AccountId, qu visit_find_all_domains(FindAllDomains), visit_find_all_parammeters(FindAllParameters), visit_find_all_peers(FindAllPeers), - visit_find_all_permission_token_definitions(FindPermissionTokenSchema), + visit_find_permission_token_schema(FindPermissionTokenSchema), visit_find_all_role_ids(FindAllRoleIds), visit_find_all_roles(FindAllRoles), visit_find_all_transactions(FindAllTransactions), @@ -721,7 +721,7 @@ leaf_visitors! { visit_find_all_domains(&FindAllDomains), visit_find_all_parammeters(&FindAllParameters), visit_find_all_peers(&FindAllPeers), - visit_find_all_permission_token_definitions(&FindPermissionTokenSchema), + visit_find_permission_token_schema(&FindPermissionTokenSchema), visit_find_all_role_ids(&FindAllRoleIds), visit_find_all_roles(&FindAllRoles), visit_find_all_transactions(&FindAllTransactions), diff --git a/docs/source/references/schema.json b/docs/source/references/schema.json index e94899ac8d4..1649149fe94 100644 --- a/docs/source/references/schema.json +++ b/docs/source/references/schema.json @@ -618,6 +618,30 @@ } ] }, + "BatchedResponse": { + "Struct": [ + { + "name": "batch", + "type": "Value" + }, + { + "name": "cursor", + "type": "ForwardCursor" + } + ] + }, + "BatchedResponse>": { + "Struct": [ + { + "name": "batch", + "type": "Vec" + }, + { + "name": "cursor", + "type": "ForwardCursor" + } + ] + }, "BinaryOpIncompatibleNumericValueTypesError": { "Struct": [ { @@ -708,9 +732,6 @@ } ] }, - "Compact": { - "Int": "Compact" - }, "Conditional": { "Struct": [ { @@ -2072,6 +2093,10 @@ }, "ForwardCursor": { "Struct": [ + { + "name": "query_id", + "type": "Option" + }, { "name": "cursor", "type": "Option>" @@ -3016,6 +3041,9 @@ "Option": { "Option": "PipelineStatusKind" }, + "Option": { + "Option": "String" + }, "Option": { "Option": "TimeInterval" }, @@ -3526,10 +3554,6 @@ }, "QueryPayload": { "Struct": [ - { - "name": "timestamp_ms", - "type": "Compact" - }, { "name": "authority", "type": "AccountId" @@ -3544,18 +3568,6 @@ } ] }, - "QueryResponse": { - "Struct": [ - { - "name": "result", - "type": "Value" - }, - { - "name": "cursor", - "type": "ForwardCursor" - } - ] - }, "RaiseTo": { "Struct": [ { @@ -4615,9 +4627,30 @@ "Vec>": { "Vec": "Vec" }, + "Vec": { + "Vec": "VersionedSignedTransaction" + }, "Vec": { "Vec": "u8" }, + "VersionedBatchedResponse": { + "Enum": [ + { + "tag": "V1", + "discriminant": 1, + "type": "BatchedResponse" + } + ] + }, + "VersionedBatchedResponse>": { + "Enum": [ + { + "tag": "V1", + "discriminant": 1, + "type": "BatchedResponse>" + } + ] + }, "VersionedBlockMessage": { "Enum": [ { @@ -4663,15 +4696,6 @@ } ] }, - "VersionedQueryResponse": { - "Enum": [ - { - "tag": "V1", - "discriminant": 1, - "type": "QueryResponse" - } - ] - }, "VersionedSignedQuery": { "Enum": [ { diff --git a/schema/gen/src/lib.rs b/schema/gen/src/lib.rs index 79014f1223b..36fe752e0c9 100644 --- a/schema/gen/src/lib.rs +++ b/schema/gen/src/lib.rs @@ -4,7 +4,9 @@ #![allow(clippy::arithmetic_side_effects)] use iroha_crypto::MerkleTree; -use iroha_data_model::{block::stream::prelude::*, query::error::QueryExecutionFail}; +use iroha_data_model::{ + block::stream::prelude::*, http::VersionedBatchedResponse, query::error::QueryExecutionFail, +}; use iroha_genesis::RawGenesisBlock; use iroha_schema::prelude::*; @@ -47,7 +49,8 @@ pub fn build_schemas() -> MetaMap { VersionedBlockSubscriptionRequest, VersionedEventMessage, VersionedEventSubscriptionRequest, - VersionedQueryResponse, + VersionedBatchedResponse, + VersionedBatchedResponse>, VersionedSignedQuery, // Never referenced, but present in type signature. Like `PhantomData` @@ -98,6 +101,8 @@ types!( BTreeSet, BTreeSet, BTreeSet>, + BatchedResponse, + BatchedResponse>, BlockHeader, BlockMessage, BlockRejectionReason, @@ -219,6 +224,7 @@ types!( FindTriggersByDomainId, FixNum, Fixed, + ForwardCursor, GrantBox, Greater, Hash, @@ -258,8 +264,8 @@ types!( NewParameterBox, NewRole, NonTrivial, + NonZeroU64, Not, - ValidationFail, NumericValue, Option, Option, @@ -268,11 +274,10 @@ types!( Option>, Option, Option, - Option, Option, Option, + Option, Option, - Option, Or, OriginFilter, OriginFilter, @@ -281,7 +286,6 @@ types!( OriginFilter, OriginFilter, OriginFilter, - QueryResponse, Pair, Parameter, ParameterId, @@ -356,6 +360,7 @@ types!( TriggerNumberOfExecutionsChanged, UnregisterBox, UpgradableBox, + ValidationFail, Validator, ValidatorEvent, Value, @@ -366,14 +371,16 @@ types!( Vec, Vec, Vec, + Vec, Vec, + VersionedBatchedResponse, + VersionedBatchedResponse>, VersionedBlockMessage, VersionedBlockSubscriptionRequest, VersionedCommittedBlock, VersionedCommittedBlockWrapper, VersionedEventMessage, VersionedEventSubscriptionRequest, - VersionedQueryResponse, VersionedSignedQuery, VersionedSignedTransaction, WasmExecutionFail, @@ -391,7 +398,6 @@ types!( u32, u64, u8, - NonZeroU64, ); #[cfg(test)] @@ -415,6 +421,7 @@ mod tests { BlockHeader, CommittedBlock, VersionedCommittedBlock, }, domain::NewDomain, + http::{BatchedResponse, VersionedBatchedResponse}, ipfs::IpfsPath, predicate::{ ip_addr::{Ipv4Predicate, Ipv6Predicate}, @@ -424,7 +431,10 @@ mod tests { GenericPredicateBox, NonTrivial, PredicateBox, }, prelude::*, - query::error::{FindError, QueryExecutionFail}, + query::{ + error::{FindError, QueryExecutionFail}, + ForwardCursor, + }, transaction::{error::TransactionLimitError, SignedTransaction, TransactionLimits}, validator::Validator, VersionedCommittedBlockWrapper, @@ -456,22 +466,7 @@ mod tests { } fn generate_test_map() -> BTreeMap { - let mut map = generate_map! {insert_into_test_map}; - - if map - .insert( - core::any::TypeId::of::>(), - as iroha_schema::TypeId>::id(), - ) - .is_some() - { - panic!( - "{}: Duplicate type id. Make sure that type ids are unique", - as iroha_schema::TypeId>::id(), - ); - } - - map + generate_map! {insert_into_test_map} } // For `PhantomData` wrapped types schemas aren't expanded recursively. diff --git a/tools/parity_scale_decoder/src/main.rs b/tools/parity_scale_decoder/src/main.rs index cf9f57679ae..266166aed14 100644 --- a/tools/parity_scale_decoder/src/main.rs +++ b/tools/parity_scale_decoder/src/main.rs @@ -31,6 +31,7 @@ use iroha_data_model::{ BlockHeader, CommittedBlock, VersionedCommittedBlock, }, domain::NewDomain, + http::{BatchedResponse, VersionedBatchedResponse}, ipfs::IpfsPath, predicate::{ ip_addr::{Ipv4Predicate, Ipv6Predicate}, @@ -40,7 +41,10 @@ use iroha_data_model::{ GenericPredicateBox, NonTrivial, PredicateBox, }, prelude::*, - query::error::{FindError, QueryExecutionFail}, + query::{ + error::{FindError, QueryExecutionFail}, + ForwardCursor, + }, transaction::{error::TransactionLimitError, SignedTransaction, TransactionLimits}, validator::Validator, VersionedCommittedBlockWrapper,