From cb685493ddac73a433877a092d67cf9914c577e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Ver=C5=A1i=C4=87?= Date: Tue, 11 Jul 2023 12:35:06 +0200 Subject: [PATCH] [feature] #3468: implement server-side cursor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Marin Veršić --- Cargo.lock | 254 ++++++++++++++ cli/Cargo.toml | 2 + cli/src/torii/mod.rs | 52 ++- cli/src/torii/pagination.rs | 2 +- cli/src/torii/routing.rs | 118 +++++-- cli/src/torii/utils.rs | 2 +- client/src/client.rs | 347 +++++++++++--------- client/tests/integration/events/pipeline.rs | 9 +- client/tests/integration/pagination.rs | 2 +- client/tests/integration/tx_history.rs | 2 +- config/Cargo.toml | 1 + config/src/torii.rs | 11 +- core/src/queue.rs | 5 +- core/src/smartcontracts/isi/query.rs | 9 +- core/src/smartcontracts/isi/tx.rs | 8 +- core/src/smartcontracts/mod.rs | 2 +- core/src/tx.rs | 1 + core/src/wsv.rs | 2 +- core/test_network/src/lib.rs | 110 +++---- crypto/src/merkle.rs | 2 +- data_model/derive/src/model.rs | 5 +- data_model/src/expression.rs | 2 +- data_model/src/isi.rs | 8 +- data_model/src/lib.rs | 14 +- data_model/src/name.rs | 3 +- data_model/src/numeric.rs | 2 +- data_model/src/predicate.rs | 30 +- data_model/src/query/cursor.rs | 6 + data_model/src/{query.rs => query/mod.rs} | 138 +++++--- data_model/src/{ => query}/pagination.rs | 0 data_model/src/{ => query}/sorting.rs | 2 +- data_model/src/transaction.rs | 27 +- data_model/src/visit.rs | 10 +- lints.toml | 1 - macro/src/lib.rs | 4 +- primitives/src/conststr.rs | 1 + schema/gen/src/lib.rs | 13 +- tools/parity_scale_decoder/src/main.rs | 5 +- wasm/src/lib.rs | 7 +- wasm_codec/derive/src/lib.rs | 38 ++- 40 files changed, 897 insertions(+), 360 deletions(-) create mode 100644 data_model/src/query/cursor.rs rename data_model/src/{query.rs => query/mod.rs} (94%) rename data_model/src/{ => query}/pagination.rs (100%) rename data_model/src/{ => query}/sorting.rs (97%) diff --git a/Cargo.lock b/Cargo.lock index a308139c430..132ddae9889 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,6 +225,35 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg 1.1.0", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix 0.37.19", + "slab", + "socket2", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +dependencies = [ + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -544,6 +573,12 @@ dependencies = [ "utf8-width", ] +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "byteorder" version = "1.4.3" @@ -565,6 +600,37 @@ dependencies = [ "ppv-lite86", ] +[[package]] +name = "camino" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbdb825da8a5df079a43676dbe042702f1707b1109f713a01420fbb4cc71fa27" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cast" version = "0.3.0" @@ -785,6 +851,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "concurrent-queue" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.7" @@ -1533,6 +1608,21 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "expect-test" version = "1.4.1" @@ -1736,6 +1826,21 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.28" @@ -2820,6 +2925,7 @@ version = "2.0.0-pre-rc.16" dependencies = [ "async-trait", "color-eyre", + "dashmap", "eyre", "futures", "iroha_cli_derive", @@ -2836,6 +2942,7 @@ dependencies = [ "iroha_telemetry", "iroha_version", "iroha_wasm_builder", + "moka", "once_cell", "owo-colors", "parity-scale-codec", @@ -2932,6 +3039,7 @@ dependencies = [ "iroha_data_model", "iroha_primitives", "json5", + "once_cell", "path-absolutize", "proptest", "serde", @@ -3591,6 +3699,15 @@ dependencies = [ "libc", ] +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "matchers" version = "0.1.0" @@ -3706,6 +3823,31 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4330eca86d39f2b52d0481aa1e90fe21bfa61f11b0bf9b48ab95595013cefe48" +[[package]] +name = "moka" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206bf83f415b0579fd885fe0804eb828e727636657dc1bf73d80d2f1218e14a1" +dependencies = [ + "async-io", + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "futures-util", + "once_cell", + "parking_lot", + "quanta", + "rustc_version", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "more-asserts" version = "0.2.2" @@ -3981,6 +4123,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "parking" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" + [[package]] name = "parking_lot" version = "0.12.1" @@ -4173,6 +4321,22 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg 1.1.0", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + [[package]] name = "poly1305" version = "0.6.2" @@ -4324,6 +4488,33 @@ dependencies = [ "cc", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998" +dependencies = [ + "bitflags 1.3.2", + "memchr", + "unicase", +] + +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -4516,6 +4707,15 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "rayon" version = "1.7.0" @@ -4723,6 +4923,15 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -4801,6 +5010,9 @@ name = "semver" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" +dependencies = [ + "serde", +] [[package]] name = "serde" @@ -5090,6 +5302,21 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.8" @@ -5284,6 +5511,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -5776,6 +6009,12 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "triomphe" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee8098afad3fb0c54a9007aab6804558410503ad676d4633f9c2559a00ac0f" + [[package]] name = "try-lock" version = "0.2.4" @@ -6021,6 +6260,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d023da39d1fde5a8a3fe1f3e01ca9632ada0a63e9797de55a879d6e2236277be" +dependencies = [ + "getrandom 0.2.10", +] + [[package]] name = "valuable" version = "0.1.0" @@ -6060,6 +6308,12 @@ dependencies = [ "libc", ] +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "walkdir" version = "2.3.3" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index f05ff0da5cf..4238e3216ff 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -55,10 +55,12 @@ iroha_genesis = { version = "=2.0.0-pre-rc.16", path = "../genesis" } iroha_wasm_builder = { version = "=2.0.0-pre-rc.16", path = "../wasm_builder" } +dashmap = "5.4.0" async-trait = "0.1.60" color-eyre = "0.6.2" eyre = "0.6.8" tracing = "0.1.37" +moka = { version = "0.11", features = ["future"] } futures = { version = "0.3.25", default-features = false, features = ["std", "async-await"] } parity-scale-codec = { version = "3.2.1", default-features = false, features = ["derive"] } serde = { version = "1.0.151", features = ["derive"] } diff --git a/cli/src/torii/mod.rs b/cli/src/torii/mod.rs index 386ec96b4b5..567cc634340 100644 --- a/cli/src/torii/mod.rs +++ b/cli/src/torii/mod.rs @@ -6,9 +6,12 @@ use std::{ convert::Infallible, fmt::{Debug, Write as _}, net::ToSocketAddrs, + num::NonZeroU64, sync::Arc, + time::{Duration, Instant}, }; +use dashmap::DashMap; use futures::{stream::FuturesUnordered, StreamExt}; use iroha_core::{ kura::Kura, @@ -17,8 +20,9 @@ use iroha_core::{ sumeragi::SumeragiHandle, EventsSender, }; +use iroha_data_model::Value; use thiserror::Error; -use tokio::sync::Notify; +use tokio::{sync::Notify, time::sleep}; use utils::*; use warp::{ http::StatusCode, @@ -32,6 +36,42 @@ pub(crate) mod utils; mod pagination; pub mod routing; +type LiveQuery = Box + Send + Sync>; + +#[derive(Default)] +struct LiveQueryStore { + queries: DashMap, (LiveQuery, NonZeroU64, Instant)>, +} + +impl LiveQueryStore { + fn insert(&self, request: Vec, (live_query, cursor): (LiveQuery, NonZeroU64)) { + self.queries + .insert(request.clone(), (live_query, cursor, Instant::now())); + } + + fn remove(&self, request: &Vec) -> Option<(LiveQuery, NonZeroU64)> { + self.queries + .remove(request) + .map(|(_, (live_query, cursor, _))| (live_query, cursor)) + } + + // TODO: Add notifier channel to enable graceful shutdown + fn expired_query_cleanup(self: Arc) -> tokio::task::JoinHandle<()> { + tokio::task::spawn(async move { + // Time query can remain in the store if unaccessed + let query_idle_time = Duration::from_millis(10_000); + + loop { + sleep(query_idle_time).await; + + self.queries.retain(|_, (_, _, last_access_time)| { + last_access_time.elapsed() <= query_idle_time + }); + } + }) + } +} + /// Main network handler and the only entrypoint of the Iroha. pub struct Torii { iroha_cfg: super::Configuration, @@ -39,6 +79,7 @@ pub struct Torii { events: EventsSender, notify_shutdown: Arc, sumeragi: SumeragiHandle, + query_store: Arc, kura: Arc, } @@ -64,10 +105,13 @@ pub enum Error { /// Error while getting Prometheus metrics #[error("Failed to produce Prometheus metrics")] Prometheus(#[source] eyre::Report), + /// Error while resuming cursor + #[error("Failed to find cursor")] + UnknownCursor, } /// Status code for query error response. -pub(crate) fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode { +fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode { use iroha_data_model::{ isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*, ValidationFail::*, @@ -110,7 +154,9 @@ impl Error { use Error::*; match self { Query(e) => query_status_code(e), - AcceptTransaction(_) | ConfigurationReload(_) => StatusCode::BAD_REQUEST, + AcceptTransaction(_) | ConfigurationReload(_) | UnknownCursor => { + StatusCode::BAD_REQUEST + } Config(_) => StatusCode::NOT_FOUND, PushIntoQueue(err) => match **err { queue::Error::Full => StatusCode::INTERNAL_SERVER_ERROR, diff --git a/cli/src/torii/pagination.rs b/cli/src/torii/pagination.rs index 2a20556a686..3cb7e8772bd 100644 --- a/cli/src/torii/pagination.rs +++ b/cli/src/torii/pagination.rs @@ -1,4 +1,4 @@ -use iroha_data_model::prelude::*; +use iroha_data_model::query::Pagination; /// Describes a collection to which pagination can be applied. /// Implemented for the [`Iterator`] implementors. diff --git a/cli/src/torii/routing.rs b/cli/src/torii/routing.rs index bed382b7bc7..a7f5b79c341 100644 --- a/cli/src/torii/routing.rs +++ b/cli/src/torii/routing.rs @@ -28,23 +28,30 @@ use iroha_data_model::{ VersionedCommittedBlock, }, prelude::*, + query::{ForwardCursor, Pagination, Sorting}, }; use iroha_logger::prelude::*; #[cfg(feature = "telemetry")] use iroha_telemetry::metrics::Status; use pagination::{paginate, Paginate}; +use parity_scale_codec::Encode; use tokio::task; use super::*; use crate::stream::{Sink, Stream}; /// Filter for warp which extracts sorting -pub fn sorting() -> impl warp::Filter + Copy { +fn sorting() -> impl warp::Filter + Copy { + warp::query() +} + +/// Filter for warp which extracts cursor +fn cursor() -> impl warp::Filter + Copy { warp::query() } #[iroha_futures::telemetry_future] -pub(crate) async fn handle_instructions( +async fn handle_instructions( queue: Arc, sumeragi: SumeragiHandle, transaction: VersionedSignedTransaction, @@ -68,29 +75,84 @@ pub(crate) async fn handle_instructions( } #[iroha_futures::telemetry_future] -pub(crate) async fn handle_queries( +async fn handle_queries( sumeragi: SumeragiHandle, - pagination: Pagination, - sorting: Sorting, + query_store: Arc, + fetch_size: NonZeroU64, + request: VersionedSignedQuery, -) -> Result> { - let mut wsv = sumeragi.wsv_clone(); + sorting: Sorting, + pagination: Pagination, - let valid_request = ValidQueryRequest::validate(request, &mut wsv)?; - let result = valid_request.execute(&wsv).map_err(ValidationFail::from)?; + cursor: ForwardCursor, +) -> Result> { + let encoded_request = (&request, &sorting, &pagination).encode(); - let result = match result { - LazyValue::Value(value) => value, - LazyValue::Iter(iter) => { - Value::Vec(apply_sorting_and_pagination(iter, &sorting, pagination)) + let mut live_query: (_, NonZeroU64) = if let Some(cursor) = cursor { + if let Some((live_query, prev_cursor)) = query_store.remove(&encoded_request) { + if cursor != prev_cursor { + return Err(Error::UnknownCursor); + } + + (live_query, cursor) + } else { + return Err(Error::UnknownCursor); + } + } else { + let mut wsv = sumeragi.wsv_clone(); + + let valid_request = ValidQueryRequest::validate(request, &mut wsv)?; + let res = valid_request.execute(&wsv).map_err(ValidationFail::from)?; + + match res { + LazyValue::Iter(iter) => ( + Box::new(apply_sorting_and_pagination(iter, &sorting, pagination).into_iter()), + NonZeroU64::new(0).expect("Valid"), + ), + LazyValue::Value(result) => { + return Ok(Scale( + QueryResponse { + result, + cursor: None, + pagination, + sorting, + } + .into(), + )); + } } }; - let paginated_result = QueryResult { - result, + let result = live_query + .0 + .by_ref() + .take( + fetch_size + .get() + .try_into() + .expect("u64 larger than usize::MAX"), + ) + .collect::>(); + + let cursor = if result.len() as u64 >= fetch_size.get() { + query_store.insert(encoded_request, live_query); + + cursor.map(|cursor| { + cursor + .checked_add(fetch_size.get()) + .expect("Cursor size too big") + }) + } else { + None + }; + + let paginated_result = QueryResponse { + result: Value::Vec(result), + cursor, pagination, sorting, }; + Ok(Scale(paginated_result.into())) } @@ -297,7 +359,10 @@ mod subscription { /// There should be a [`warp::filters::ws::Message::close()`] /// message to end subscription #[iroha_futures::telemetry_future] - pub async fn handle_subscription(events: EventsSender, stream: WebSocket) -> eyre::Result<()> { + pub(crate) async fn handle_subscription( + events: EventsSender, + stream: WebSocket, + ) -> eyre::Result<()> { let mut consumer = event::Consumer::new(stream).await?; match subscribe_forever(events, &mut consumer).await { @@ -409,6 +474,7 @@ impl Torii { queue, notify_shutdown, sumeragi, + query_store: Arc::default(), kura, } } @@ -448,9 +514,7 @@ impl Torii { } /// Helper function to create router. This router can tested without starting up an HTTP server - pub(crate) fn create_api_router( - &self, - ) -> impl warp::Filter + Clone + Send { + fn create_api_router(&self) -> impl warp::Filter + Clone + Send { let health_route = warp::get() .and(warp::path(uri::HEALTH)) .and_then(|| async { Ok::<_, Infallible>(handle_health()) }); @@ -483,13 +547,18 @@ impl Torii { )) .and(body::versioned()), )) - .or(endpoint4( + .or(endpoint7( handle_queries, warp::path(uri::QUERY) - .and(add_state!(self.sumeragi)) - .and(paginate()) + .and(add_state!( + self.sumeragi, + self.query_store, + self.iroha_cfg.torii.fetch_size, + )) + .and(body::versioned()) .and(sorting()) - .and(body::versioned()), + .and(paginate()) + .and(cursor()), )) .or(endpoint2( handle_post_configuration, @@ -614,13 +683,14 @@ impl Torii { /// # Errors /// Can fail due to listening to network or if http server fails #[iroha_futures::telemetry_future] - pub async fn start(self) -> eyre::Result<()> { + pub(crate) async fn start(self) -> eyre::Result<()> { let mut handles = vec![]; let torii = Arc::new(self); #[cfg(feature = "telemetry")] handles.extend(Arc::clone(&torii).start_telemetry()?); handles.extend(Arc::clone(&torii).start_api()?); + handles.push(Arc::clone(&torii.query_store).expired_query_cleanup()); handles .into_iter() diff --git a/cli/src/torii/utils.rs b/cli/src/torii/utils.rs index 80c0d0028ab..77d31319c06 100644 --- a/cli/src/torii/utils.rs +++ b/cli/src/torii/utils.rs @@ -66,4 +66,4 @@ impl Reply for WarpResult { } } -iroha_cli_derive::generate_endpoints!(2, 3, 4, 5); +iroha_cli_derive::generate_endpoints!(2, 3, 4, 5, 7); diff --git a/client/src/client.rs b/client/src/client.rs index 687f74b8aca..9cbb8fb378a 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -9,7 +9,7 @@ use std::{ collections::HashMap, fmt::Debug, marker::PhantomData, - num::{NonZeroU32, NonZeroU64}, + num::{NonZeroU32, NonZeroU64, NonZeroUsize}, thread, time::Duration, }; @@ -21,8 +21,13 @@ use http_default::{AsyncWebSocketStream, WebSocketStream}; use iroha_config::{client::Configuration, torii::uri, GetConfiguration, PostConfiguration}; use iroha_crypto::{HashOf, KeyPair}; use iroha_data_model::{ - block::VersionedCommittedBlock, predicate::PredicateBox, prelude::*, - transaction::TransactionPayload, ValidationFail, + block::VersionedCommittedBlock, + isi::Instruction, + predicate::PredicateBox, + prelude::*, + query::{Pagination, Query, Sorting}, + transaction::TransactionPayload, + ValidationFail, }; use iroha_logger::prelude::*; use iroha_telemetry::metrics::Status; @@ -40,28 +45,25 @@ use crate::{ const APPLICATION_JSON: &str = "application/json"; -/// General trait for all response handlers -pub trait ResponseHandler> { - /// What is the output of the handler - type Output; - - /// Handles HTTP response - fn handle(self, response: Response) -> Self::Output; -} - /// Phantom struct that handles responses of Query API. /// Depending on input query struct, transforms a response into appropriate output. -#[derive(Clone, Copy)] -pub struct QueryResponseHandler(PhantomData); +#[derive(Clone)] +pub struct QueryResponseHandler { + query_request: QueryRequest, + _output_type: PhantomData, +} -impl Default for QueryResponseHandler { - fn default() -> Self { - Self(PhantomData) +impl QueryResponseHandler { + fn new(query_request: QueryRequest) -> Self { + Self { + query_request, + _output_type: PhantomData, + } } } /// `Result` with [`ClientQueryError`] as an error -pub type QueryHandlerResult = core::result::Result; +pub type QueryResult = core::result::Result; /// Trait for signing transactions pub trait Sign { @@ -94,21 +96,19 @@ impl Sign for VersionedSignedTransaction { } } -impl ResponseHandler for QueryResponseHandler +impl QueryResponseHandler where - R: Query + Debug, + R::Output: QueryOutput, >::Error: Into, { - type Output = QueryHandlerResult>; - - fn handle(self, resp: Response>) -> Self::Output { + fn handle(self, resp: Response>) -> QueryResult<::Target> { // Separate-compilation friendly response handling fn _handle_query_response_base( resp: &Response>, - ) -> QueryHandlerResult { + ) -> QueryResult { match resp.status() { StatusCode::OK => { - let res = VersionedQueryResult::decode_all_versioned(resp.body()); + let res = VersionedQueryResponse::decode_all_versioned(resp.body()); res.wrap_err( "Failed to decode response from Iroha. \ You are likely using a version of the client library \ @@ -143,9 +143,27 @@ where } } - _handle_query_response_base(&resp).and_then(|VersionedQueryResult::V1(result)| { - ClientQueryRequest::try_from(result).map_err(Into::into) - }) + let response = _handle_query_response_base(&resp) + .map(|VersionedQueryResponse::V1(response)| response)?; + + let cursor = response.cursor; + let value = R::Output::try_from(response.result) + .map_err(Into::into) + .wrap_err("Unexpected type")?; + + let output = QueryOutput::new( + value, + self.query_request, + cursor + .map(|cursor| { + cursor + .try_into() + .wrap_err("Query result length exceeds usize::MAX") + }) + .transpose()?, + ); + + Ok(output) } } @@ -171,10 +189,8 @@ impl From for ClientQueryError { #[derive(Clone, Copy)] pub struct TransactionResponseHandler; -impl ResponseHandler for TransactionResponseHandler { - type Output = Result<()>; - - fn handle(self, resp: Response>) -> Self::Output { +impl TransactionResponseHandler { + fn handle(self, resp: Response>) -> Result<()> { if resp.status() == StatusCode::OK { Ok(()) } else { @@ -191,10 +207,8 @@ impl ResponseHandler for TransactionResponseHandler { #[derive(Clone, Copy)] pub struct StatusResponseHandler; -impl ResponseHandler for StatusResponseHandler { - type Output = Result; - - fn handle(self, resp: Response>) -> Self::Output { +impl StatusResponseHandler { + fn handle(self, resp: Response>) -> Result { if resp.status() != StatusCode::OK { return Err( ResponseReport::with_msg("Unexpected status response", &resp) @@ -214,10 +228,7 @@ impl ResponseReport { /// /// # Errors /// If response body isn't a valid utf-8 string - fn with_msg(msg: S, response: &Response>) -> Result - where - S: AsRef, - { + fn with_msg>(msg: S, response: &Response>) -> Result { let status = response.status(); let body = std::str::from_utf8(response.body()); let msg = msg.as_ref(); @@ -238,60 +249,87 @@ impl From for eyre::Report { } } -/// More convenient version of [`iroha_data_model::prelude::QueryResult`]. -/// The only difference is that this struct has `output` field extracted from the result -/// accordingly to the source query. -#[derive(Clone, Debug)] -pub struct ClientQueryRequest -where - R: Query + Debug, - >::Error: Into, -{ - /// Query output - pub output: R::Output, - /// See [`iroha_data_model::prelude::QueryResult`] - pub pagination: Pagination, - /// See [`iroha_data_model::prelude::QueryResult`] - pub sorting: Sorting, +pub trait QueryOutput { + type Target: Clone; + + fn new( + value: Self, + query_request: QueryRequest, + server_cursor: Option, + ) -> Self::Target; } -impl ClientQueryRequest -where - R: Query + Debug, - >::Error: Into, -{ - /// Extracts output as is - pub fn only_output(self) -> R::Output { - self.output - } +#[derive(Clone, serde::Serialize)] +pub struct ResultSet { + query_request: QueryRequest, + server_cursor: Option, + + iter: Vec, + client_cursor: usize, } -impl TryFrom for ClientQueryRequest -where - R: Query + Debug, - >::Error: Into, -{ - type Error = eyre::Report; +impl Iterator for ResultSet { + type Item = T; - fn try_from( - QueryResult { - result, - pagination, - sorting, - }: QueryResult, - ) -> Result { - let output = R::Output::try_from(result) - .map_err(Into::into) - .wrap_err("Unexpected type")?; + fn next(&mut self) -> Option { + if self.client_cursor >= self.iter.len() { + if let Some(server_cursor) = self.server_cursor { + //response = request_with_cursor(server_cursor); + self.client_cursor = 0; + } else { + return None; + } + } - Ok(Self { - output, - pagination, - sorting, - }) + let item = self.iter.get(self.client_cursor); + self.client_cursor += 1; + item.cloned() + } +} + +impl QueryOutput for Vec { + type Target = ResultSet; + + fn new( + value: Self, + query_request: QueryRequest, + server_cursor: Option, + ) -> Self::Target { + ResultSet { + query_request, + iter: value, + server_cursor, + client_cursor: 0, + } } } +macro_rules! impl_query_result { + ( $($ident:ty),+ $(,)? ) => { $( + impl QueryOutput for $ident { + type Target = Self; + + fn new(value: Self, _query_request: QueryRequest, _server_cursor: Option) -> Self::Target { + value + } + } )+ + }; +} +impl_query_result! { + bool, + iroha_data_model::Value, + iroha_data_model::numeric::NumericValue, + iroha_data_model::role::Role, + iroha_data_model::asset::Asset, + iroha_data_model::asset::AssetDefinition, + iroha_data_model::account::Account, + iroha_data_model::domain::Domain, + iroha_data_model::block::BlockHeader, + iroha_data_model::query::MetadataValue, + iroha_data_model::query::TransactionQueryOutput, + iroha_data_model::trigger::Trigger, +} + /// Iroha client #[derive(Clone, DebugCustom, Display)] #[debug( @@ -319,6 +357,28 @@ pub struct Client { add_transaction_nonce: bool, } +#[derive(Clone, serde::Serialize)] +pub struct QueryRequest { + torii_url: Url, + headers: HashMap, + sorting: Sorting, + pagination: Pagination, + request: Vec, +} + +impl QueryRequest { + fn assemble(self) -> DefaultRequestBuilder { + DefaultRequestBuilder::new( + HttpMethod::POST, + self.torii_url.join(uri::QUERY).expect("Valid URI"), + ) + .headers(self.headers) + .body(self.request) + .params(Vec::from(self.sorting)) + .params(Vec::from(self.pagination)) + } +} + /// Representation of `Iroha` client. impl Client { /// Constructor for client from configuration @@ -413,7 +473,7 @@ impl Client { /// /// # Errors /// Fails if signature generation fails - pub fn sign_query(&self, query: QueryBuilder) -> Result { + pub fn sign_query(&self, query: QueryBuilder) -> Result { query .sign(self.key_pair.clone()) .wrap_err("Failed to sign query") @@ -424,10 +484,7 @@ impl Client { /// /// # Errors /// Fails if sending transaction to peer fails or if it response with error - pub fn submit( - &self, - instruction: impl Instruction + Debug, - ) -> Result> { + pub fn submit(&self, instruction: impl Instruction) -> Result> { let isi = instruction.into(); self.submit_all([isi]) } @@ -672,7 +729,7 @@ impl Client { /// ```ignore /// use eyre::Result; /// use iroha_client::{ - /// client::{Client, ResponseHandler}, + /// client::Client, /// http::{RequestBuilder, Response, Method}, /// }; /// use iroha_data_model::{predicate::PredicateBox, prelude::{Account, FindAllAccounts, Pagination}}; @@ -719,36 +776,33 @@ impl Client { /// // Handle response with the handler and get typed result /// let accounts = resp_handler.handle(resp)?; /// - /// Ok(accounts.only_output()) + /// Ok(accounts.output()) /// } /// ``` - pub fn prepare_query_request( + pub fn prepare_query_request( &self, request: R, filter: PredicateBox, pagination: Pagination, sorting: Sorting, - ) -> Result<(B, QueryResponseHandler)> + ) -> Result<(DefaultRequestBuilder, QueryResponseHandler)> where - R: Query + Debug, >::Error: Into, - B: RequestBuilder, { - let pagination: Vec<_> = pagination.into(); - let sorting: Vec<_> = sorting.into(); - let request = QueryBuilder::new(request, self.account_id.clone()).with_filter(filter); - let request: VersionedSignedQuery = self.sign_query(request)?.into(); + let query_builder = QueryBuilder::new(request, self.account_id.clone()).with_filter(filter); + let request = self.sign_query(query_builder)?.encode_versioned(); + + let query_request = QueryRequest { + torii_url: self.torii_url.clone(), + headers: self.headers.clone(), + sorting, + pagination, + request, + }; Ok(( - B::new( - HttpMethod::POST, - self.torii_url.join(uri::QUERY).expect("Valid URI"), - ) - .params(pagination) - .params(sorting) - .headers(self.headers.clone()) - .body(request.encode_versioned()), - QueryResponseHandler::default(), + query_request.clone().assemble(), + QueryResponseHandler::new(query_request), )) } @@ -756,21 +810,21 @@ impl Client { /// /// # Errors /// Fails if sending request fails - pub fn request_with_filter_and_pagination_and_sorting( + pub fn request_with_filter_and_pagination_and_sorting( &self, request: R, pagination: Pagination, sorting: Sorting, filter: PredicateBox, - ) -> QueryHandlerResult> + ) -> QueryResult<::Target> where - R: Query + Debug, - >::Error: Into, // Seems redundant + R::Output: QueryOutput, + >::Error: Into, { iroha_logger::trace!(?request, %pagination, ?sorting, ?filter); - let (req, resp_handler) = self.prepare_query_request::( - request, filter, pagination, sorting, - )?; + let (req, resp_handler) = + self.prepare_query_request::(request, filter, pagination, sorting)?; + let response = req.build()?.send()?; resp_handler.handle(response) } @@ -779,14 +833,14 @@ impl Client { /// /// # Errors /// Fails if sending request fails - pub fn request_with_pagination_and_sorting( + pub fn request_with_pagination_and_sorting( &self, request: R, pagination: Pagination, sorting: Sorting, - ) -> QueryHandlerResult> + ) -> QueryResult<::Target> where - R: Query + Debug, + R::Output: QueryOutput, >::Error: Into, { self.request_with_filter_and_pagination_and_sorting( @@ -801,15 +855,15 @@ impl Client { /// /// # Errors /// Fails if sending request fails - pub fn request_with_filter_and_pagination( + pub fn request_with_filter_and_pagination( &self, request: R, pagination: Pagination, filter: PredicateBox, - ) -> QueryHandlerResult> + ) -> QueryResult<::Target> where - R: Query + Debug, - >::Error: Into, // Seems redundant + R::Output: QueryOutput, + >::Error: Into, { self.request_with_filter_and_pagination_and_sorting( request, @@ -823,15 +877,15 @@ impl Client { /// /// # Errors /// Fails if sending request fails - pub fn request_with_filter_and_sorting( + pub fn request_with_filter_and_sorting( &self, request: R, sorting: Sorting, filter: PredicateBox, - ) -> QueryHandlerResult> + ) -> QueryResult<::Target> where - R: Query + Debug, - >::Error: Into, // Seems redundant + R::Output: QueryOutput, + >::Error: Into, { self.request_with_filter_and_pagination_and_sorting( request, @@ -848,13 +902,13 @@ impl Client { /// /// # Errors /// Fails if sending request fails - pub fn request_with_filter( + pub fn request_with_filter( &self, request: R, filter: PredicateBox, - ) -> QueryHandlerResult> + ) -> QueryResult<::Target> where - R: Query + Debug, + R::Output: QueryOutput, >::Error: Into, { self.request_with_filter_and_pagination(request, Pagination::default(), filter) @@ -867,13 +921,13 @@ impl Client { /// /// # Errors /// Fails if sending request fails - pub fn request_with_pagination( + pub fn request_with_pagination( &self, request: R, pagination: Pagination, - ) -> QueryHandlerResult> + ) -> QueryResult<::Target> where - R: Query + Debug, + R::Output: QueryOutput, >::Error: Into, { self.request_with_filter_and_pagination(request, pagination, PredicateBox::default()) @@ -883,13 +937,13 @@ impl Client { /// /// # Errors /// Fails if sending request fails - pub fn request_with_sorting( + pub fn request_with_sorting( &self, request: R, sorting: Sorting, - ) -> QueryHandlerResult> + ) -> QueryResult<::Target> where - R: Query + Debug, + R::Output: QueryOutput, >::Error: Into, { self.request_with_pagination_and_sorting(request, Pagination::default(), sorting) @@ -899,13 +953,15 @@ impl Client { /// /// # Errors /// Fails if sending request fails - pub fn request(&self, request: R) -> QueryHandlerResult + pub fn request( + &self, + request: R, + ) -> QueryResult<::Target> where - R: Query + Debug, + R::Output: QueryOutput, >::Error: Into, { self.request_with_pagination(request, Pagination::default()) - .map(ClientQueryRequest::only_output) } /// Connect (through `WebSocket`) to listen for `Iroha` `pipeline` and `data` events. @@ -1144,10 +1200,7 @@ impl Client { /// /// # Errors /// Fails if request build fails - pub fn prepare_status_request(&self) -> (B, StatusResponseHandler) - where - B: RequestBuilder, - { + pub fn prepare_status_request(&self) -> (B, StatusResponseHandler) { ( B::new( HttpMethod::GET, @@ -1258,11 +1311,9 @@ pub mod stream_api { /// - Sending failed /// - Message not received in stream during connection or subscription /// - Message is an error - pub async fn new(handler: I) -> Result> - where - I: Init + Send, - I::Next: Send, - { + pub async fn new>( + handler: I, + ) -> Result> { trace!("Creating `AsyncStream`"); let InitData { first_message, diff --git a/client/tests/integration/events/pipeline.rs b/client/tests/integration/events/pipeline.rs index d649d2c3b1a..40c009ea370 100644 --- a/client/tests/integration/events/pipeline.rs +++ b/client/tests/integration/events/pipeline.rs @@ -116,7 +116,14 @@ fn committed_block_must_be_available_in_kura() { .expect("Failed to submit transaction"); let event = event_iter.next().expect("Block must be committed"); - let Ok(Event::Pipeline(PipelineEvent { entity_kind: PipelineEntityKind::Block, status: PipelineStatus::Committed, hash })) = event else { panic!("Received unexpected event") }; + let Ok(Event::Pipeline(PipelineEvent { + entity_kind: PipelineEntityKind::Block, + status: PipelineStatus::Committed, + hash, + })) = event + else { + panic!("Received unexpected event") + }; let hash = HashOf::from_untyped_unchecked(hash); peer.iroha diff --git a/client/tests/integration/pagination.rs b/client/tests/integration/pagination.rs index afa827a511e..82c31927a4e 100644 --- a/client/tests/integration/pagination.rs +++ b/client/tests/integration/pagination.rs @@ -21,7 +21,7 @@ fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() -> let vec = client .request_with_pagination(asset::all_definitions(), Pagination::new(Some(5), Some(5)))? - .only_output(); + .output(); assert_eq!(vec.len(), 5); Ok(()) } diff --git a/client/tests/integration/tx_history.rs b/client/tests/integration/tx_history.rs index 45ea0a8eb26..045ca757491 100644 --- a/client/tests/integration/tx_history.rs +++ b/client/tests/integration/tx_history.rs @@ -54,7 +54,7 @@ fn client_has_rejected_and_acepted_txs_should_return_tx_history() -> Result<()> transaction::by_account_id(account_id.clone()), Pagination::new(Some(1), Some(50)), )? - .only_output(); + .output(); assert_eq!(transactions.len(), 50); let mut prev_creation_time = core::time::Duration::from_millis(0); diff --git a/config/Cargo.toml b/config/Cargo.toml index fce4a71e9ea..9dfad580a18 100644 --- a/config/Cargo.toml +++ b/config/Cargo.toml @@ -18,6 +18,7 @@ tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", default-features = false, features = ["fmt", "ansi"] } url = { version = "2.3.1", features = ["serde"] } +once_cell = "1.16.0" serde = { version = "1.0.151", default-features = false, features = ["derive"] } strum = { version = "0.24.1", default-features = false, features = ["derive"] } serde_json = "1.0.91" diff --git a/config/src/torii.rs b/config/src/torii.rs index 81ad7d67701..6cd4ca741ca 100644 --- a/config/src/torii.rs +++ b/config/src/torii.rs @@ -1,5 +1,7 @@ //! `Torii` configuration as well as the default values for the URLs used for the main endpoints: `p2p`, `telemetry`, but not `api`. #![allow(clippy::std_instead_of_core, clippy::arithmetic_side_effects)] +use std::num::NonZeroU64; + use iroha_config_base::derive::{Documented, Proxy}; use iroha_primitives::addr::{socket_addr, SocketAddr}; use serde::{Deserialize, Serialize}; @@ -12,6 +14,9 @@ pub const DEFAULT_TORII_TELEMETRY_ADDR: SocketAddr = socket_addr!(127.0.0.1:8180 pub const DEFAULT_TORII_MAX_TRANSACTION_SIZE: u32 = 2_u32.pow(15); /// Default upper bound on `content-length` specified in the HTTP request header pub const DEFAULT_TORII_MAX_CONTENT_LENGTH: u32 = 2_u32.pow(12) * 4000; +/// Default max size of a single batch of results from a query +pub static DEFAULT_TORII_FETCH_SIZE: once_cell::sync::Lazy = + once_cell::sync::Lazy::new(|| NonZeroU64::new(10).unwrap()); /// Structure that defines the configuration parameters of `Torii` which is the routing module. /// For example the `p2p_addr`, which is used for consensus and block-synchronisation purposes, @@ -33,6 +38,8 @@ pub struct Configuration { pub max_transaction_size: u32, /// Maximum number of bytes in raw message. Used to prevent from DOS attacks. pub max_content_len: u32, + /// How many query results are returned in one batch + pub fetch_size: NonZeroU64, } impl Default for ConfigurationProxy { @@ -43,6 +50,7 @@ impl Default for ConfigurationProxy { telemetry_url: None, max_transaction_size: Some(DEFAULT_TORII_MAX_TRANSACTION_SIZE), max_content_len: Some(DEFAULT_TORII_MAX_CONTENT_LENGTH), + fetch_size: Some(*DEFAULT_TORII_FETCH_SIZE), } } } @@ -96,9 +104,10 @@ pub mod tests { telemetry_url in prop::option::of(Just(DEFAULT_TORII_TELEMETRY_ADDR)), max_transaction_size in prop::option::of(Just(DEFAULT_TORII_MAX_TRANSACTION_SIZE)), max_content_len in prop::option::of(Just(DEFAULT_TORII_MAX_CONTENT_LENGTH)), + fetch_size in prop::option::of(Just(*DEFAULT_TORII_FETCH_SIZE)), ) -> ConfigurationProxy { - ConfigurationProxy { p2p_addr, api_url, telemetry_url, max_transaction_size, max_content_len } + ConfigurationProxy { p2p_addr, api_url, telemetry_url, max_transaction_size, max_content_len, fetch_size } } } } diff --git a/core/src/queue.rs b/core/src/queue.rs index 2fb760a1b93..84a8687cef8 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -399,8 +399,9 @@ impl Queue { } fn decrease_per_user_tx_count(&self, account_id: &AccountId) { - let Entry::Occupied(mut occupied) = self.txs_per_user - .entry(account_id.clone()) else { panic!("Call to decrease always should be paired with increase count. This is a bug.") }; + let Entry::Occupied(mut occupied) = self.txs_per_user.entry(account_id.clone()) else { + panic!("Call to decrease always should be paired with increase count. This is a bug.") + }; let count = occupied.get_mut(); if *count > 1 { diff --git a/core/src/smartcontracts/isi/query.rs b/core/src/smartcontracts/isi/query.rs index a5d7c67da14..91dae16ecc4 100644 --- a/core/src/smartcontracts/isi/query.rs +++ b/core/src/smartcontracts/isi/query.rs @@ -42,7 +42,7 @@ macro_rules! impl_lazy { } impl_lazy! { bool, - NumericValue, + iroha_data_model::numeric::NumericValue, iroha_data_model::role::Role, iroha_data_model::asset::Asset, iroha_data_model::asset::AssetDefinition, @@ -50,12 +50,13 @@ impl_lazy! { iroha_data_model::domain::Domain, iroha_data_model::block::BlockHeader, iroha_data_model::query::MetadataValue, - iroha_data_model::query::TransactionQueryResult, + iroha_data_model::query::TransactionQueryOutput, iroha_data_model::trigger::Trigger, } /// Query Request statefully validated on the Iroha node side. #[derive(Debug, Decode, Encode)] +#[repr(transparent)] pub struct ValidQueryRequest(VersionedSignedQuery); impl ValidQueryRequest { @@ -387,7 +388,7 @@ mod tests { #[test] fn find_block_header_by_hash() -> Result<()> { let wsv = wsv_with_test_blocks_and_transactions(1, 1, 1)?; - let block = wsv.all_blocks().into_iter().last().expect("WSV is empty"); + let block = wsv.all_blocks().last().expect("WSV is empty"); assert_eq!( FindBlockHeaderByHash::new(block.hash()).execute(&wsv)?, @@ -463,7 +464,7 @@ mod tests { .sign(ALICE_KEYS.clone())?; let wrong_hash = unapplied_tx.hash(); let not_found = FindTransactionByHash::new(wrong_hash).execute(&wsv); - assert!(matches!(not_found, Err(_))); + assert!(not_found.is_err()); let found_accepted = FindTransactionByHash::new(va_tx.hash()).execute(&wsv)?; if found_accepted.transaction.error.is_none() { diff --git a/core/src/smartcontracts/isi/tx.rs b/core/src/smartcontracts/isi/tx.rs index 2c87fa781cb..7919f060b37 100644 --- a/core/src/smartcontracts/isi/tx.rs +++ b/core/src/smartcontracts/isi/tx.rs @@ -10,7 +10,7 @@ use iroha_data_model::{ prelude::*, query::{ error::{FindError, QueryExecutionFail}, - TransactionQueryResult, + TransactionQueryOutput, }, transaction::TransactionValue, }; @@ -65,7 +65,7 @@ impl ValidQuery for FindAllTransactions { Ok(Box::new( wsv.all_blocks() .flat_map(BlockTransactionIter::new) - .map(|tx| TransactionQueryResult { + .map(|tx| TransactionQueryOutput { block_hash: tx.block_hash(), transaction: tx.value(), }), @@ -88,7 +88,7 @@ impl ValidQuery for FindTransactionsByAccountId { wsv.all_blocks() .flat_map(BlockTransactionIter::new) .filter(move |tx| *tx.authority() == account_id) - .map(|tx| TransactionQueryResult { + .map(|tx| TransactionQueryOutput { block_hash: tx.block_hash(), transaction: tx.value(), }), @@ -122,7 +122,7 @@ impl ValidQuery for FindTransactionByHash { .iter() .find(|transaction| transaction.value.hash() == tx_hash) .cloned() - .map(|transaction| TransactionQueryResult { + .map(|transaction| TransactionQueryOutput { block_hash, transaction, }) diff --git a/core/src/smartcontracts/mod.rs b/core/src/smartcontracts/mod.rs index ebab9f86afe..d4ddbb03201 100644 --- a/core/src/smartcontracts/mod.rs +++ b/core/src/smartcontracts/mod.rs @@ -28,7 +28,7 @@ pub trait Execute { } /// This trait should be implemented for all Iroha Queries. -pub trait ValidQuery: Query +pub trait ValidQuery: iroha_data_model::query::Query where Self::Output: Lazy, { diff --git a/core/src/tx.rs b/core/src/tx.rs index f3456e023ff..e676bdbf2ed 100644 --- a/core/src/tx.rs +++ b/core/src/tx.rs @@ -20,6 +20,7 @@ use eyre::Result; use iroha_crypto::{HashOf, SignatureVerificationFail, SignaturesOf}; pub use iroha_data_model::prelude::*; use iroha_data_model::{ + isi::Instruction, query::error::FindError, transaction::{ error::{TransactionLimitError, TransactionRejectionReason}, diff --git a/core/src/wsv.rs b/core/src/wsv.rs index f5639f2cbff..ff451240c61 100644 --- a/core/src/wsv.rs +++ b/core/src/wsv.rs @@ -402,7 +402,7 @@ impl WorldStateView { macro_rules! update_params { ($ident:ident, $($param:expr => $config:expr),+ $(,)?) => { $(if let Some(param) = self.query_param($param) { - let mut $ident = &mut self.config; + let $ident = &mut self.config; $config = param; })+ diff --git a/core/test_network/src/lib.rs b/core/test_network/src/lib.rs index 161a15c489c..b81b4e8b5fe 100644 --- a/core/test_network/src/lib.rs +++ b/core/test_network/src/lib.rs @@ -9,10 +9,10 @@ use std::{ thread, }; -use eyre::{Error, Result}; +use eyre::Result; use futures::{prelude::*, stream::FuturesUnordered}; use iroha::Iroha; -use iroha_client::client::Client; +use iroha_client::client::{Client, QueryOutput}; use iroha_config::{ base::proxy::{LoadFromEnv, Override}, client::Configuration as ClientConfiguration, @@ -20,8 +20,8 @@ use iroha_config::{ sumeragi::Configuration as SumeragiConfiguration, torii::Configuration as ToriiConfiguration, }; -use iroha_core::{prelude::*, smartcontracts::query::Lazy}; -use iroha_data_model::{peer::Peer as DataModelPeer, prelude::*}; +use iroha_core::prelude::*; +use iroha_data_model::{isi::Instruction, peer::Peer as DataModelPeer, prelude::*, query::Query}; use iroha_genesis::{GenesisNetwork, RawGenesisBlock}; use iroha_logger::{Configuration as LoggerConfiguration, InstrumentFutures}; use iroha_primitives::addr::{socket_addr, SocketAddr}; @@ -660,7 +660,7 @@ impl PeerBuilder { type PeerWithRuntimeAndClient = (Runtime, Peer, Client); fn local_unique_port() -> Result { - Ok(socket_addr!(127.0.0.1: unique_port::get_unique_free_port().map_err(Error::msg)?)) + Ok(socket_addr!(127.0.0.1: unique_port::get_unique_free_port().map_err(eyre::Error::msg)?)) } /// Runtime used for testing. @@ -708,61 +708,61 @@ pub trait TestClient: Sized { /// /// # Errors /// If predicate is not satisfied, after maximum retries. - fn submit_till( + fn submit_till( &mut self, - instruction: impl Instruction + Debug, + instruction: impl Instruction + Debug + Clone, request: R, - f: impl Fn(&R::Output) -> bool, - ) -> eyre::Result + f: impl Fn(&::Target) -> bool, + ) -> eyre::Result<::Target> where - R: ValidQuery + Into + Debug + Clone, - >::Error: Into, - R::Output: Lazy + Clone + Debug; + R::Output: QueryOutput, + ::Target: core::fmt::Debug, + >::Error: Into; /// Submits instructions with polling /// /// # Errors /// If predicate is not satisfied, after maximum retries. - fn submit_all_till( + fn submit_all_till( &mut self, instructions: Vec, request: R, - f: impl Fn(&R::Output) -> bool, - ) -> eyre::Result + f: impl Fn(&::Target) -> bool, + ) -> eyre::Result<::Target> where - R: ValidQuery + Into + Debug + Clone, - >::Error: Into, - R::Output: Lazy + Clone + Debug; + R::Output: QueryOutput, + ::Target: core::fmt::Debug, + >::Error: Into; /// Polls request till predicate `f` is satisfied, with default period and max attempts. /// /// # Errors /// If predicate is not satisfied after maximum retries. - fn poll_request( + fn poll_request( &mut self, request: R, - f: impl Fn(&R::Output) -> bool, - ) -> eyre::Result + f: impl Fn(&::Target) -> bool, + ) -> eyre::Result<::Target> where - R: ValidQuery + Into + Debug + Clone, - >::Error: Into, - R::Output: Lazy + Clone + Debug; + R::Output: QueryOutput, + ::Target: core::fmt::Debug, + >::Error: Into; /// Polls request till predicate `f` is satisfied with `period` and `max_attempts` supplied. /// /// # Errors /// If predicate is not satisfied after maximum retries. - fn poll_request_with_period( + fn poll_request_with_period( &mut self, request: R, period: Duration, max_attempts: u32, - f: impl Fn(&R::Output) -> bool, - ) -> eyre::Result + f: impl Fn(&::Target) -> bool, + ) -> eyre::Result<::Target> where - R: ValidQuery + Into + Debug + Clone, - >::Error: Into, - R::Output: Lazy + Clone + Debug; + R::Output: QueryOutput, + ::Target: core::fmt::Debug, + >::Error: Into; } impl TestRuntime for Runtime { @@ -848,49 +848,49 @@ impl TestClient for Client { } } - fn submit_till( + fn submit_till( &mut self, - instruction: impl Instruction + Debug, + instruction: impl Instruction + Debug + Clone, request: R, - f: impl Fn(&R::Output) -> bool, - ) -> eyre::Result + f: impl Fn(&::Target) -> bool, + ) -> eyre::Result<::Target> where - R: ValidQuery + Into + Debug + Clone, - >::Error: Into, - R::Output: Lazy + Clone + Debug, + R::Output: QueryOutput, + ::Target: core::fmt::Debug, + >::Error: Into, { self.submit(instruction) .expect("Failed to submit instruction."); self.poll_request(request, f) } - fn submit_all_till( + fn submit_all_till( &mut self, instructions: Vec, request: R, - f: impl Fn(&R::Output) -> bool, - ) -> eyre::Result + f: impl Fn(&::Target) -> bool, + ) -> eyre::Result<::Target> where - R: ValidQuery + Into + Debug + Clone, - >::Error: Into, - R::Output: Lazy + Clone + Debug, + R::Output: QueryOutput, + ::Target: core::fmt::Debug, + >::Error: Into, { self.submit_all(instructions) .expect("Failed to submit instruction."); self.poll_request(request, f) } - fn poll_request_with_period( + fn poll_request_with_period( &mut self, request: R, period: Duration, max_attempts: u32, - f: impl Fn(&R::Output) -> bool, - ) -> eyre::Result + f: impl Fn(&::Target) -> bool, + ) -> eyre::Result<::Target> where - R: ValidQuery + Into + Debug + Clone, - >::Error: Into, - R::Output: Lazy + Clone + Debug, + R::Output: QueryOutput, + ::Target: core::fmt::Debug, + >::Error: Into, { let mut query_result = None; for _ in 0..max_attempts { @@ -903,15 +903,15 @@ impl TestClient for Client { Err(eyre::eyre!("Failed to wait for query request completion that would satisfy specified closure. Got this query result instead: {:?}", &query_result)) } - fn poll_request( + fn poll_request( &mut self, request: R, - f: impl Fn(&R::Output) -> bool, - ) -> eyre::Result + f: impl Fn(&::Target) -> bool, + ) -> eyre::Result<::Target> where - R: ValidQuery + Into + Debug + Clone, - >::Error: Into, - R::Output: Lazy + Clone + Debug, + R::Output: QueryOutput, + ::Target: core::fmt::Debug, + >::Error: Into, { self.poll_request_with_period(request, Configuration::pipeline_time() / 2, 10, f) } diff --git a/crypto/src/merkle.rs b/crypto/src/merkle.rs index 122b6cf45ee..8cd0db82fe8 100644 --- a/crypto/src/merkle.rs +++ b/crypto/src/merkle.rs @@ -288,7 +288,7 @@ mod tests { assert_eq!(tree.len(), 12); assert!(matches!(tree.get(6), Some(None))); assert!(matches!(tree.get(11), Some(Some(_)))); - assert!(matches!(tree.get(12), None)); + assert!(tree.get(12).is_none()); } #[test] diff --git a/data_model/derive/src/model.rs b/data_model/derive/src/model.rs index 1605d4a57f5..8a7426baca3 100644 --- a/data_model/derive/src/model.rs +++ b/data_model/derive/src/model.rs @@ -15,7 +15,10 @@ pub fn impl_model(input: &syn::ItemMod) -> TokenStream { } = input; let syn::Visibility::Public(vis_public) = vis else { - abort!(input, "The `model` attribute can only be used on public modules"); + abort!( + input, + "The `model` attribute can only be used on public modules" + ); }; if ident != "model" { abort!( diff --git a/data_model/src/expression.rs b/data_model/src/expression.rs index a20853db627..b5e75ce26bc 100644 --- a/data_model/src/expression.rs +++ b/data_model/src/expression.rs @@ -496,7 +496,7 @@ impl EvaluatesTo { pub fn new_unchecked(expression: impl Into) -> Self { Self { expression: Box::new(expression.into()), - _value_type: PhantomData::default(), + _value_type: PhantomData, } } diff --git a/data_model/src/isi.rs b/data_model/src/isi.rs index 1ab82f63f5f..448fb2ec555 100644 --- a/data_model/src/isi.rs +++ b/data_model/src/isi.rs @@ -1266,10 +1266,10 @@ pub mod error { pub mod prelude { pub use super::{ Burn, BurnBox, Conditional, ExecuteTrigger, ExecuteTriggerBox, FailBox, Grant, GrantBox, - Instruction, InstructionBox, Mint, MintBox, NewParameter, NewParameterBox, Pair, Register, - RegisterBox, RemoveKeyValue, RemoveKeyValueBox, Revoke, RevokeBox, SequenceBox, - SetKeyValue, SetKeyValueBox, SetParameter, SetParameterBox, Transfer, TransferBox, - Unregister, UnregisterBox, Upgrade, UpgradeBox, + InstructionBox, Mint, MintBox, NewParameter, NewParameterBox, Pair, Register, RegisterBox, + RemoveKeyValue, RemoveKeyValueBox, Revoke, RevokeBox, SequenceBox, SetKeyValue, + SetKeyValueBox, SetParameter, SetParameterBox, Transfer, TransferBox, Unregister, + UnregisterBox, Upgrade, UpgradeBox, }; } diff --git a/data_model/src/lib.rs b/data_model/src/lib.rs index 44b394065cd..5d7148c202b 100644 --- a/data_model/src/lib.rs +++ b/data_model/src/lib.rs @@ -49,7 +49,7 @@ use iroha_primitives::{ use iroha_schema::IntoSchema; pub use numeric::model::NumericValue; use parity_scale_codec::{Decode, Encode}; -use prelude::{Executable, TransactionQueryResult}; +use prelude::{Executable, TransactionQueryOutput}; use serde::{Deserialize, Serialize}; use serde_with::{DeserializeFromStr, SerializeDisplay}; use strum::EnumDiscriminants; @@ -70,16 +70,12 @@ pub mod isi; pub mod metadata; pub mod name; pub mod numeric; -#[cfg(feature = "http")] -pub mod pagination; pub mod peer; pub mod permission; #[cfg(feature = "http")] pub mod predicate; pub mod query; pub mod role; -#[cfg(feature = "http")] -pub mod sorting; pub mod transaction; pub mod trigger; pub mod validator; @@ -828,7 +824,7 @@ pub mod model { Identifiable(IdentifiableBox), PublicKey(PublicKey), SignatureCheckCondition(SignatureCheckCondition), - TransactionQueryResult(TransactionQueryResult), + TransactionQueryOutput(TransactionQueryOutput), PermissionToken(permission::PermissionToken), Hash(HashValue), Block(VersionedCommittedBlockWrapper), @@ -1117,7 +1113,7 @@ impl fmt::Display for Value { Value::Identifiable(v) => fmt::Display::fmt(&v, f), Value::PublicKey(v) => fmt::Display::fmt(&v, f), Value::SignatureCheckCondition(v) => fmt::Display::fmt(&v, f), - Value::TransactionQueryResult(_) => write!(f, "TransactionQueryResult"), + Value::TransactionQueryOutput(_) => write!(f, "TransactionQueryOutput"), Value::PermissionToken(v) => fmt::Display::fmt(&v, f), Value::Hash(v) => fmt::Display::fmt(&v, f), Value::Block(v) => fmt::Display::fmt(&**v, f), @@ -1146,7 +1142,7 @@ impl Value { | Identifiable(_) | String(_) | Name(_) - | TransactionQueryResult(_) + | TransactionQueryOutput(_) | PermissionToken(_) | Hash(_) | Block(_) @@ -1943,6 +1939,4 @@ pub mod prelude { LengthLimits, NumericValue, PredicateTrait, RegistrableBox, ToValue, TriggerBox, TryAsMut, TryAsRef, TryToValue, UpgradableBox, ValidationFail, ValidatorDeny, Value, }; - #[cfg(feature = "http")] - pub use super::{pagination::prelude::*, sorting::prelude::*}; } diff --git a/data_model/src/name.rs b/data_model/src/name.rs index 97d515e1a2f..ab5bf391bbf 100644 --- a/data_model/src/name.rs +++ b/data_model/src/name.rs @@ -55,7 +55,8 @@ impl Name { .chars() .count() .try_into() - .map(|len| range.contains(&len)) else { + .map(|len| range.contains(&len)) + else { return Err(InvalidParameterError::NameLength); }; Ok(()) diff --git a/data_model/src/numeric.rs b/data_model/src/numeric.rs index 8164d5a8c34..f85a7131eb1 100644 --- a/data_model/src/numeric.rs +++ b/data_model/src/numeric.rs @@ -17,7 +17,7 @@ use serde::{ Deserializer, }; -use self::model::NumericValue; +pub use self::model::*; use super::{ DebugCustom, Decode, Deserialize, Display, Encode, FromVariant, IntoSchema, Serialize, }; diff --git a/data_model/src/predicate.rs b/data_model/src/predicate.rs index 49f89440b7a..a14c75de318 100644 --- a/data_model/src/predicate.rs +++ b/data_model/src/predicate.rs @@ -10,7 +10,7 @@ use crate::{IdBox, Name, Value}; mod nontrivial { use super::*; /// Struct representing a sequence with at least three elements. - #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, IntoSchema)] + #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema)] pub struct NonTrivial(Vec); impl NonTrivial { @@ -73,7 +73,7 @@ macro_rules! nontrivial { } /// Predicate combinator enum. -#[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, IntoSchema)] +#[derive(Debug, Clone, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema)] // Ideally we would enforce `P: PredicateTrait` here, but I // couldn't find a way to do it without polluting everything // downstream with explicit lifetimes, since we would need to @@ -282,7 +282,7 @@ pub mod string { use super::*; /// Predicate useful for processing [`String`]s and [`Name`]s. - #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, IntoSchema)] + #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema)] pub enum StringPredicate { /// Forward to [`str::contains()`] Contains(String), @@ -559,7 +559,7 @@ pub mod numerical { use super::*; /// A lower-inclusive range predicate. - #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, IntoSchema)] + #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema)] pub struct SemiInterval { /// The start of the range (inclusive) start: T, @@ -583,7 +583,7 @@ pub mod numerical { impl Copy for SemiInterval {} /// A both-inclusive range predicate - #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, IntoSchema)] + #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema)] pub struct Interval { /// The start of the range (inclusive) start: T, @@ -624,7 +624,7 @@ pub mod numerical { /// [`Self`] only applies to `Values` that are variants of /// compatible types. If the [`Range`] variant and the [`Value`] /// variant don't match defaults to `false`. - #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, IntoSchema)] + #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema)] pub enum SemiRange { /// 32-bit U32(SemiInterval), @@ -642,7 +642,7 @@ pub mod numerical { /// [`Self`] only applies to `Values` that are variants of /// compatible types. If the [`Range`] variant and the [`Value`] /// variant don't match defaults to `false`. - #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, IntoSchema)] + #[derive(Debug, Clone, Decode, Encode, Deserialize, Serialize, IntoSchema)] pub enum Range { /// 32-bit U32(Interval), @@ -971,7 +971,7 @@ pub mod value { use super::*; /// A predicate designed for general processing of `Value`. - #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, IntoSchema)] + #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema)] pub enum ValuePredicate { /// Apply predicate to the [`Identifiable::Id`] and/or [`IdBox`]. Identifiable(string::StringPredicate), @@ -1107,14 +1107,14 @@ pub mod value { } /// A predicate that targets the particular `index` of a collection. - #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, IntoSchema)] + #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema)] pub struct AtIndex { index: u32, predicate: Box, } /// A predicate that targets the particular `key` of a collection. - #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, IntoSchema)] + #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema)] pub struct ValueOfKey { key: Name, predicate: Box, @@ -1124,7 +1124,7 @@ pub mod value { /// working with containers. Currently only /// [`Metadata`](crate::metadata::Metadata) and [`Vec`] are /// supported. - #[derive(Debug, Clone, Serialize, Deserialize, Encode, Decode, IntoSchema)] + #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema)] pub enum Container { /// Forward to [`Iterator::any`] Any(Box), @@ -1267,7 +1267,9 @@ pub mod ip_addr { /// A Predicate containing independent octuplet masks to be /// applied to all elements of an IP version 4 address. - #[derive(Debug, Clone, Copy, Encode, Decode, IntoSchema, Serialize, Deserialize)] + #[derive( + Debug, Clone, Copy, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema, + )] pub struct Ipv4Predicate([Mask; 4]); impl PredicateTrait for Ipv4Predicate { @@ -1302,7 +1304,9 @@ pub mod ip_addr { /// A Predicate containing independent _hexadecuplets_ (u16 /// groups) masks to be applied to all elements of an IP version 6 /// address. - #[derive(Debug, Clone, Copy, Encode, Decode, IntoSchema, Serialize, Deserialize)] + #[derive( + Debug, Clone, Copy, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema, + )] pub struct Ipv6Predicate([Mask; 8]); impl PredicateTrait for Ipv6Predicate { diff --git a/data_model/src/query/cursor.rs b/data_model/src/query/cursor.rs new file mode 100644 index 00000000000..7d9e36082c3 --- /dev/null +++ b/data_model/src/query/cursor.rs @@ -0,0 +1,6 @@ +//! Structures and traits related to server-side cursor. + +use core::num::NonZeroU64; + +/// Forward-only (a.k.a non-scrollable) cursor +pub type ForwardCursor = Option; diff --git a/data_model/src/query.rs b/data_model/src/query/mod.rs similarity index 94% rename from data_model/src/query.rs rename to data_model/src/query/mod.rs index a704a87c342..7aa35f4ef97 100644 --- a/data_model/src/query.rs +++ b/data_model/src/query/mod.rs @@ -6,14 +6,20 @@ use alloc::{boxed::Box, format, string::String, vec::Vec}; use core::cmp::Ordering; +#[cfg(feature = "http")] +pub use cursor::ForwardCursor; use derive_more::Display; use iroha_crypto::SignatureOf; use iroha_data_model_derive::model; use iroha_macro::FromVariant; use iroha_schema::IntoSchema; use iroha_version::prelude::*; +#[cfg(feature = "http")] +pub use pagination::Pagination; use parity_scale_codec::{Decode, Encode}; use serde::{Deserialize, Serialize}; +#[cfg(feature = "http")] +pub use sorting::Sorting; pub use self::model::*; use self::{ @@ -28,6 +34,13 @@ use crate::{ Identifiable, Value, }; +#[cfg(feature = "http")] +pub mod cursor; +#[cfg(feature = "http")] +pub mod pagination; +#[cfg(feature = "http")] +pub mod sorting; + macro_rules! queries { ($($($meta:meta)* $item:item)+) => { pub use self::model::*; @@ -170,24 +183,40 @@ pub mod model { )] #[getset(get = "pub")] #[ffi_type] - pub struct TransactionQueryResult { + pub struct TransactionQueryOutput { /// Transaction pub transaction: TransactionValue, /// The hash of the block to which `tx` belongs to pub block_hash: HashOf, } -} -/// Type returned from [`Metadata`] queries -pub struct MetadataValue(Value); + /// Type returned from [`Metadata`] queries + #[derive( + Debug, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + Decode, + Encode, + Deserialize, + Serialize, + IntoSchema, + )] + #[ffi_type] + pub struct MetadataValue(pub Value); +} impl From for Value { + #[inline] fn from(source: MetadataValue) -> Self { source.0 } } impl From for MetadataValue { + #[inline] fn from(source: Value) -> Self { Self(source) } @@ -197,7 +226,7 @@ impl Query for QueryBox { type Output = Value; } -impl TransactionQueryResult { +impl TransactionQueryOutput { #[inline] /// Return payload of the transaction pub fn payload(&self) -> &TransactionPayload { @@ -205,14 +234,14 @@ impl TransactionQueryResult { } } -impl PartialOrd for TransactionQueryResult { +impl PartialOrd for TransactionQueryOutput { #[inline] fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Ord for TransactionQueryResult { +impl Ord for TransactionQueryOutput { #[inline] fn cmp(&self, other: &Self) -> Ordering { self.payload() @@ -229,6 +258,7 @@ pub mod role { use derive_more::Display; + use super::Query; use crate::prelude::*; queries! { @@ -315,6 +345,7 @@ pub mod permission { use derive_more::Display; + use super::Query; use crate::{permission, prelude::*}; queries! { @@ -399,7 +430,7 @@ pub mod account { use derive_more::Display; - use super::MetadataValue; + use super::{MetadataValue, Query}; use crate::prelude::*; queries! { @@ -561,7 +592,7 @@ pub mod asset { use iroha_data_model_derive::model; pub use self::model::*; - use super::MetadataValue; + use super::{MetadataValue, Query}; use crate::prelude::*; queries! { @@ -910,7 +941,7 @@ pub mod domain { use derive_more::Display; - use super::MetadataValue; + use super::{MetadataValue, Query}; use crate::prelude::*; queries! { @@ -1151,7 +1182,7 @@ pub mod transaction { use derive_more::Display; use iroha_crypto::HashOf; - use super::{Query, TransactionQueryResult}; + use super::{Query, TransactionQueryOutput}; use crate::{ account::AccountId, expression::EvaluatesTo, prelude::Account, transaction::VersionedSignedTransaction, @@ -1190,15 +1221,15 @@ pub mod transaction { } impl Query for FindAllTransactions { - type Output = Vec; + type Output = Vec; } impl Query for FindTransactionsByAccountId { - type Output = Vec; + type Output = Vec; } impl Query for FindTransactionByHash { - type Output = TransactionQueryResult; + type Output = TransactionQueryOutput; } impl FindTransactionsByAccountId { @@ -1296,19 +1327,23 @@ pub mod block { pub mod http { //! Structures related to sending queries over HTTP + use getset::Getters; use iroha_data_model_derive::model; pub use self::model::*; use super::*; - use crate::{ - account::AccountId, pagination::prelude::*, predicate::PredicateBox, sorting::prelude::*, - }; + use crate::{account::AccountId, predicate::PredicateBox}; + + // TODO: Could we make a variant of `Value` that holds only query results? + type QueryResult = Value; declare_versioned_with_scale!(VersionedSignedQuery 1..2, Debug, Clone, iroha_macro::FromVariant, IntoSchema); - declare_versioned_with_scale!(VersionedQueryResult 1..2, Debug, Clone, iroha_macro::FromVariant, IntoSchema); + declare_versioned_with_scale!(VersionedQueryResponse 1..2, Debug, Clone, iroha_macro::FromVariant, IntoSchema); #[model] pub mod model { + use core::num::NonZeroU64; + use super::*; /// I/O ready structure to send queries. @@ -1321,15 +1356,17 @@ pub mod http { } /// Payload of a query. - #[derive(Debug, Clone, Decode, Encode, Deserialize, Serialize, IntoSchema)] + #[derive( + Debug, Clone, PartialEq, Eq, Decode, Encode, Deserialize, Serialize, IntoSchema, + )] pub(crate) struct QueryPayload { /// Timestamp of the query creation. #[codec(compact)] pub timestamp_ms: u128, - /// Query definition. - pub query: QueryBox, /// Account id of the user who will sign this query. pub authority: AccountId, + /// Query definition. + pub query: QueryBox, /// The filter applied to the result on the server-side. pub filter: PredicateBox, } @@ -1338,10 +1375,27 @@ pub mod http { #[derive(Debug, Clone, Encode, Serialize, IntoSchema)] #[version_with_scale(n = 1, versioned = "VersionedSignedQuery")] pub struct SignedQuery { - /// Payload - pub payload: QueryPayload, /// Signature of the client who sends this query. pub signature: SignatureOf, + /// Payload + pub payload: QueryPayload, + } + + /// [`SignedQuery`] response + #[derive(Debug, Clone, Getters, Decode, Encode, Deserialize, Serialize, IntoSchema)] + #[version_with_scale(n = 1, versioned = "VersionedQueryResponse")] + #[getset(get = "pub")] + pub struct QueryResponse { + /// The result of the query execution. + #[getset(skip)] + pub result: QueryResult, + /// Index of the next element in the result set. Client will use this value + /// in the next request to continue fetching results of the original query + pub cursor: cursor::ForwardCursor, + /// Sorting + pub sorting: sorting::Sorting, + /// Pagination + pub pagination: pagination::Pagination, } } @@ -1369,6 +1423,7 @@ pub mod http { }) } } + impl Decode for SignedQuery { fn decode(input: &mut I) -> Result { SignedQueryCandidate::decode(input)? @@ -1376,6 +1431,7 @@ pub mod http { .map_err(Into::into) } } + impl<'de> Deserialize<'de> for SignedQuery { fn deserialize(deserializer: D) -> Result where @@ -1414,19 +1470,6 @@ pub mod http { } } - /// Paginated Query Result - // TODO: This is the only structure whose inner fields are exposed. Wrap it in model macro? - #[derive(Debug, Clone, Decode, Encode, Deserialize, Serialize, IntoSchema)] - #[version_with_scale(n = 1, versioned = "VersionedQueryResult")] - pub struct QueryResult { - /// The result of the query execution. - pub result: Value, - /// pagination - pub pagination: Pagination, - /// sorting - pub sorting: Sorting, - } - impl QueryBuilder { /// Construct a new request with the `query`. pub fn new(query: impl Into, authority: AccountId) -> Self { @@ -1457,11 +1500,20 @@ pub mod http { pub fn sign( self, key_pair: iroha_crypto::KeyPair, - ) -> Result { - SignatureOf::new(key_pair, &self.payload).map(|signature| SignedQuery { - payload: self.payload, - signature, - }) + ) -> Result { + SignatureOf::new(key_pair, &self.payload) + .map(|signature| SignedQuery { + payload: self.payload, + signature, + }) + .map(Into::into) + } + } + + impl From for Value { + #[inline] + fn from(source: QueryResponse) -> Self { + source.result } } @@ -1469,7 +1521,7 @@ pub mod http { //! The prelude re-exports most commonly used traits, structs and macros from this crate. pub use super::{ - QueryBuilder, QueryResult, SignedQuery, VersionedQueryResult, VersionedSignedQuery, + QueryBuilder, QueryResponse, SignedQuery, VersionedQueryResponse, VersionedSignedQuery, }; } } @@ -1637,6 +1689,6 @@ pub mod prelude { pub use super::{ account::prelude::*, asset::prelude::*, block::prelude::*, domain::prelude::*, peer::prelude::*, permission::prelude::*, role::prelude::*, transaction::*, - trigger::prelude::*, Query, QueryBox, TransactionQueryResult, + trigger::prelude::*, QueryBox, TransactionQueryOutput, }; } diff --git a/data_model/src/pagination.rs b/data_model/src/query/pagination.rs similarity index 100% rename from data_model/src/pagination.rs rename to data_model/src/query/pagination.rs diff --git a/data_model/src/sorting.rs b/data_model/src/query/sorting.rs similarity index 97% rename from data_model/src/sorting.rs rename to data_model/src/query/sorting.rs index 0ceac5a86e2..b1e5f5e797e 100644 --- a/data_model/src/sorting.rs +++ b/data_model/src/query/sorting.rs @@ -21,7 +21,7 @@ const SORT_BY_KEY: &str = "sort_by_metadata_key"; pub mod model { use super::*; - /// Enum for sorting requests + /// Struct for sorting requests #[derive(Debug, Clone, Default, Decode, Encode, Deserialize, Serialize, IntoSchema)] pub struct Sorting { /// Sort query result using [`Name`] of the key in [`Asset`]'s metadata. diff --git a/data_model/src/transaction.rs b/data_model/src/transaction.rs index b49775f0e46..1b8b81a9e28 100644 --- a/data_model/src/transaction.rs +++ b/data_model/src/transaction.rs @@ -22,8 +22,11 @@ use serde::{Deserialize, Serialize}; pub use self::model::*; use crate::{ - account::AccountId, isi::InstructionBox, metadata::UnlimitedMetadata, name::Name, - prelude::Instruction, Value, + account::AccountId, + isi::{Instruction, InstructionBox}, + metadata::UnlimitedMetadata, + name::Name, + Value, }; #[model] @@ -99,20 +102,20 @@ pub mod model { #[getset(get = "pub")] #[ffi_type] pub struct TransactionPayload { - /// Account ID of transaction creator. - pub authority: AccountId, /// Creation timestamp (unix time in milliseconds). #[getset(skip)] pub creation_time_ms: u64, + /// Account ID of transaction creator. + pub authority: AccountId, + /// ISI or a `WebAssembly` smartcontract. + pub instructions: Executable, + /// If transaction is not committed by this time it will be dropped. + #[getset(skip)] + pub time_to_live_ms: Option, /// Random value to make different hashes for transactions which occur repeatedly and simultaneously. // TODO: Only temporary #[getset(skip)] pub nonce: Option, - /// If transaction is not committed by this time it will be dropped. - #[getset(skip)] - pub time_to_live_ms: Option, - /// ISI or a `WebAssembly` smartcontract. - pub instructions: Executable, /// Store for additional information. #[getset(skip)] pub metadata: UnlimitedMetadata, @@ -160,10 +163,10 @@ pub mod model { #[ffi_type] // TODO: All fields in this struct should be private pub struct SignedTransaction { - /// [`Transaction`] payload. - pub payload: TransactionPayload, /// [`iroha_crypto::SignatureOf`]<[`TransactionPayload`]>. pub signatures: SignaturesOf, + /// [`Transaction`] payload. + pub payload: TransactionPayload, } /// Transaction Value used in Instructions and Queries @@ -371,8 +374,8 @@ mod candidate { #[derive(Decode, Deserialize)] struct SignedTransactionCandidate { - payload: TransactionPayload, signatures: SignaturesOf, + payload: TransactionPayload, } impl SignedTransactionCandidate { diff --git a/data_model/src/visit.rs b/data_model/src/visit.rs index 650379526c7..3f60589144a 100644 --- a/data_model/src/visit.rs +++ b/data_model/src/visit.rs @@ -477,11 +477,11 @@ pub fn visit_transfer( let object = evaluate_expr!(visitor, authority, ::object()); let (IdBox::AssetId(source_id), IdBox::AccountId(destination_id)) = ( - evaluate_expr!(visitor, authority, ::source_id()), - evaluate_expr!(visitor, authority, ::destination_id()), - ) else { - return visitor.visit_unsupported(authority, isi); - }; + evaluate_expr!(visitor, authority, ::source_id()), + evaluate_expr!(visitor, authority, ::destination_id()), + ) else { + return visitor.visit_unsupported(authority, isi); + }; match object { Value::Numeric(object) => visitor.visit_transfer_asset( diff --git a/lints.toml b/lints.toml index 5daf19f2945..b97277149f3 100644 --- a/lints.toml +++ b/lints.toml @@ -55,7 +55,6 @@ allow = [ 'clippy::implicit_return', 'clippy::inconsistent_struct_constructor', 'clippy::indexing_slicing', - 'clippy::integer_arithmetic', 'clippy::let_underscore_must_use', 'clippy::match_wildcard_for_single_variants', 'clippy::missing_docs_in_private_items', diff --git a/macro/src/lib.rs b/macro/src/lib.rs index 7f1aa97b042..35b71caacec 100644 --- a/macro/src/lib.rs +++ b/macro/src/lib.rs @@ -33,8 +33,8 @@ pub mod error { impl Default for ErrorTryFromEnum { fn default() -> Self { Self { - from: PhantomData::default(), - to: PhantomData::default(), + from: PhantomData, + to: PhantomData, } } } diff --git a/primitives/src/conststr.rs b/primitives/src/conststr.rs index f0f3a156d0c..87772b512dc 100644 --- a/primitives/src/conststr.rs +++ b/primitives/src/conststr.rs @@ -515,6 +515,7 @@ mod tests { } #[test] + #[allow(clippy::redundant_clone)] fn const_string_clone() { run_with_strings(|string| { let const_string = ConstString::from(string); diff --git a/schema/gen/src/lib.rs b/schema/gen/src/lib.rs index 8543817b32b..b3ecd5fc567 100644 --- a/schema/gen/src/lib.rs +++ b/schema/gen/src/lib.rs @@ -47,7 +47,7 @@ pub fn build_schemas() -> MetaMap { VersionedBlockSubscriptionRequest, VersionedEventMessage, VersionedEventSubscriptionRequest, - VersionedQueryResult, + VersionedQueryResponse, VersionedSignedQuery, // Never referenced, but present in type signature. Like `PhantomData` @@ -282,7 +282,7 @@ types!( OriginFilter, OriginFilter, OriginFilter, - QueryResult, + QueryResponse, Pagination, Pair, Parameter, @@ -348,7 +348,7 @@ types!( TransactionLimitError, TransactionLimits, TransactionPayload, - TransactionQueryResult, + TransactionQueryOutput, TransactionRejectionReason, TransactionValue, TransferBox, @@ -378,7 +378,7 @@ types!( VersionedCommittedBlockWrapper, VersionedEventMessage, VersionedEventSubscriptionRequest, - VersionedQueryResult, + VersionedQueryResponse, VersionedSignedQuery, VersionedSignedTransaction, WasmExecutionFail, @@ -429,7 +429,10 @@ mod tests { GenericPredicateBox, NonTrivial, PredicateBox, }, prelude::*, - query::error::{FindError, QueryExecutionFail}, + query::{ + error::{FindError, QueryExecutionFail}, + Pagination, Sorting, + }, transaction::{error::TransactionLimitError, SignedTransaction, TransactionLimits}, validator::Validator, ValueKind, VersionedCommittedBlockWrapper, diff --git a/tools/parity_scale_decoder/src/main.rs b/tools/parity_scale_decoder/src/main.rs index b80d6578fe4..b43303c198c 100644 --- a/tools/parity_scale_decoder/src/main.rs +++ b/tools/parity_scale_decoder/src/main.rs @@ -40,7 +40,10 @@ use iroha_data_model::{ GenericPredicateBox, NonTrivial, PredicateBox, }, prelude::*, - query::error::{FindError, QueryExecutionFail}, + query::{ + error::{FindError, QueryExecutionFail}, + Pagination, Sorting, + }, transaction::{error::TransactionLimitError, SignedTransaction, TransactionLimits}, validator::Validator, ValueKind, VersionedCommittedBlockWrapper, diff --git a/wasm/src/lib.rs b/wasm/src/lib.rs index e478b27eac3..5ae2b7fff31 100644 --- a/wasm/src/lib.rs +++ b/wasm/src/lib.rs @@ -14,7 +14,12 @@ extern crate alloc; use alloc::{boxed::Box, collections::BTreeMap, format, vec::Vec}; use core::ops::RangeFrom; -use data_model::{prelude::*, query::QueryBox, validator::NeedsValidationBox}; +use data_model::{ + prelude::*, + isi::Instruction, + query::{Query, QueryBox}, + validator::NeedsValidationBox, +}; use debug::DebugExpectExt as _; pub use iroha_data_model as data_model; pub use iroha_wasm_derive::main; diff --git a/wasm_codec/derive/src/lib.rs b/wasm_codec/derive/src/lib.rs index c9234cf9719..06b8b85889e 100644 --- a/wasm_codec/derive/src/lib.rs +++ b/wasm_codec/derive/src/lib.rs @@ -373,8 +373,15 @@ fn classify_fn(fn_sig: &syn::Signature) -> FnClass { }; } - let syn::PathArguments::AngleBracketed(syn::AngleBracketedGenericArguments { args: generics, ..}) = &output_last_segment.arguments else { - abort!(output_last_segment.arguments, "`Result` return type should have generic arguments"); + let syn::PathArguments::AngleBracketed(syn::AngleBracketedGenericArguments { + args: generics, + .. + }) = &output_last_segment.arguments + else { + abort!( + output_last_segment.arguments, + "`Result` return type should have generic arguments" + ); }; let ok_type = classify_ok_type(generics); @@ -434,7 +441,11 @@ fn classify_params_and_state( fn parse_state_param(param: &syn::PatType) -> Result<&syn::Type, Diagnostic> { let syn::Pat::Ident(pat_ident) = &*param.pat else { - return Err(diagnostic!(param, Level::Error, "State parameter should be an ident")); + return Err(diagnostic!( + param, + Level::Error, + "State parameter should be an ident" + )); }; if !["state", "_state"].contains(&&*pat_ident.ident.to_string()) { return Err(diagnostic!( @@ -445,7 +456,11 @@ fn parse_state_param(param: &syn::PatType) -> Result<&syn::Type, Diagnostic> { } let syn::Type::Reference(ty_ref) = &*param.ty else { - return Err(diagnostic!(param.ty, Level::Error, "State parameter should be either reference or mutable reference")); + return Err(diagnostic!( + param.ty, + Level::Error, + "State parameter should be either reference or mutable reference" + )); }; Ok(&*ty_ref.elem) @@ -458,7 +473,10 @@ fn classify_ok_type( .first() .expect_or_abort("First generic argument expected in `Result` return type"); let syn::GenericArgument::Type(ok_type) = ok_generic else { - abort!(ok_generic, "First generic of `Result` return type expected to be a type"); + abort!( + ok_generic, + "First generic of `Result` return type expected to be a type" + ); }; if let syn::Type::Tuple(syn::TypeTuple { elems, .. }) = ok_type { @@ -473,15 +491,17 @@ fn extract_err_type(generics: &Punctuated) .iter() .nth(1) .expect_or_abort("Second generic argument expected in `Result` return type"); - let syn::GenericArgument::Type(err_type) = err_generic else - { - abort!(err_generic, "Second generic of `Result` return type expected to be a type"); + let syn::GenericArgument::Type(err_type) = err_generic else { + abort!( + err_generic, + "Second generic of `Result` return type expected to be a type" + ); }; err_type } fn unwrap_path(ty: &syn::Type) -> &syn::Path { - let syn::Type::Path(syn::TypePath {ref path, ..}) = *ty else { + let syn::Type::Path(syn::TypePath { ref path, .. }) = *ty else { abort!(ty, "Expected path"); };