Skip to content

Commit

Permalink
[feature] hyperledger-iroha#3468: add query_id to cursor
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <marin.versic101@gmail.com>
  • Loading branch information
mversic committed Jul 21, 2023
1 parent bf67459 commit 9518034
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 172 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ ursa = "0.3.7"
aead = "0.3.2"

rand = "0.8.5"
warp = { version = "0.3.3", default-features = false }
warp = { version = "0.3.5", default-features = false }
wasmtime = "0.39.1"

tracing = "0.1.37"
Expand Down
10 changes: 7 additions & 3 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ thiserror = { workspace = true }
displaydoc = { workspace = true }
tokio = { workspace = true, features = ["sync", "time", "rt", "io-util", "rt-multi-thread", "macros", "fs", "signal"] }
warp = { workspace = true, features = ["multipart", "websocket"] }
dashmap = "5.4.0"
serial_test = "0.8.0"
once_cell = { workspace = true }
owo-colors = { workspace = true, features = ["supports-colors"] }
supports-color = { workspace = true }
thread-local-panic-hook = { version = "0.1.0", optional = true }
tempfile = { workspace = true }
dashmap = { workspace = true }

thread-local-panic-hook = { version = "0.1.0", optional = true }
uuid = { version = "1.4.1", features = ["v4"] }

#[dev-dependencies]
serial_test = "0.8.0"

[build-dependencies]
iroha_wasm_builder = { workspace = true }
Expand Down
27 changes: 17 additions & 10 deletions cli/src/torii/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::num::NonZeroUsize;

use iroha_data_model::query::ForwardCursor;
use std::num::{NonZeroU64, NonZeroUsize};

use crate::torii::{Error, Result};

Expand All @@ -13,7 +11,7 @@ impl<I: IntoIterator> Batch for I {
Batched {
iter: self.into_iter(),
batch_size,
cursor: ForwardCursor::default(),
cursor: Some(0),
}
}
}
Expand All @@ -24,11 +22,11 @@ impl<I: IntoIterator> Batch for I {
pub struct Batched<I: IntoIterator> {
iter: I::IntoIter,
batch_size: NonZeroUsize,
cursor: ForwardCursor,
cursor: Option<u64>,
}

impl<I: IntoIterator + FromIterator<I::Item>> Batched<I> {
pub(crate) fn next_batch(&mut self, cursor: ForwardCursor) -> Result<(I, ForwardCursor)> {
pub(crate) fn next_batch(&mut self, cursor: Option<u64>) -> Result<(I, Option<NonZeroU64>)> {
if cursor != self.cursor {
return Err(Error::UnknownCursor);
}
Expand All @@ -41,7 +39,7 @@ impl<I: IntoIterator + FromIterator<I::Item>> Batched<I> {
.take(self.batch_size.get())
.collect();

self.cursor.cursor = if let Some(cursor) = self.cursor.cursor {
self.cursor = if let Some(cursor) = self.cursor {
if batch_size >= self.batch_size.get() {
let batch_size = self
.batch_size
Expand All @@ -57,16 +55,25 @@ impl<I: IntoIterator + FromIterator<I::Item>> Batched<I> {
None
}
} else if batch_size >= self.batch_size.get() {
Some(self.batch_size.try_into().expect("usize should fit in u64"))
Some(
self.batch_size
.get()
.try_into()
.expect("usize should fit in u64"),
)
} else {
None
};

Ok((batch, self.cursor))
Ok((
batch,
self.cursor
.map(|cursor| NonZeroU64::new(cursor).expect("Cursor is never 0")),
))
}

pub fn is_depleted(&self) -> bool {
self.cursor.cursor.is_none()
self.cursor.is_none()
}
}

Expand Down
14 changes: 8 additions & 6 deletions cli/src/torii/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use iroha_core::{
EventsSender,
};
use iroha_data_model::Value;
use parity_scale_codec::Encode;
use tokio::{sync::Notify, time::sleep};
use utils::*;
use warp::{
Expand All @@ -41,18 +42,19 @@ type LiveQuery = Batched<Vec<Value>>;

#[derive(Default)]
struct LiveQueryStore {
queries: DashMap<Vec<u8>, (LiveQuery, Instant)>,
queries: DashMap<(String, Vec<u8>), (LiveQuery, Instant)>,
}

impl LiveQueryStore {
fn insert(&self, request: Vec<u8>, live_query: LiveQuery) {
self.queries.insert(request, (live_query, Instant::now()));
fn insert<T: Encode>(&self, query_id: String, request: T, live_query: LiveQuery) {
self.queries
.insert((query_id, request.encode()), (live_query, Instant::now()));
}

fn remove(&self, request: &Vec<u8>) -> Option<(Vec<u8>, LiveQuery)> {
fn remove<T: Encode>(&self, query_id: &str, request: &T) -> Option<LiveQuery> {
self.queries
.remove(request)
.map(|(query_id, (query, _))| (query_id, query))
.remove(&(query_id.to_string(), request.encode()))
.map(|(_, (output, _))| output)
}

// TODO: Add notifier channel to enable graceful shutdown
Expand Down
72 changes: 42 additions & 30 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
// FIXME: This can't be fixed, because one trait in `warp` is private.
#![allow(opaque_hidden_inferred_bound)]

use std::{cmp::Ordering, num::NonZeroUsize};
use std::{
cmp::Ordering,
num::{NonZeroU64, NonZeroUsize},
};

use cursor::Batch;
use eyre::WrapErr;
Expand All @@ -28,13 +31,13 @@ use iroha_data_model::{
},
VersionedCommittedBlock,
},
http::{BatchedResponse, VersionedBatchedResponse},
prelude::*,
query::{ForwardCursor, Pagination, Sorting},
};
#[cfg(feature = "telemetry")]
use iroha_telemetry::metrics::Status;
use pagination::Paginate;
use parity_scale_codec::Encode;
use tokio::task;

use super::*;
Expand Down Expand Up @@ -90,39 +93,48 @@ async fn handle_queries(
pagination: Pagination,

cursor: ForwardCursor,
) -> Result<Scale<VersionedQueryResponse>> {
let (query_id, mut live_query) = if cursor.cursor.is_some() {
let query_id = (&request, &sorting, &pagination).encode();
query_store.remove(&query_id).ok_or(Error::UnknownCursor)?
) -> Result<Scale<VersionedBatchedResponse<Value>>> {
// TODO: Remove wsv clone
let mut wsv = sumeragi.wsv_clone();
let valid_request = ValidQueryRequest::validate(request, &mut wsv)?;
let request_id = (&valid_request, &sorting, &pagination);

let (query_id, curr_cursor, mut live_query) = if let Some(query_id) = cursor.query_id {
let live_query = query_store
.remove(&query_id, &request_id)
.ok_or(Error::UnknownCursor)?;

(query_id, cursor.cursor.map(NonZeroU64::get), live_query)
} else {
let mut wsv = sumeragi.wsv_clone();

let valid_request = ValidQueryRequest::validate(request, &mut wsv)?;
let res = valid_request.execute(&wsv).map_err(ValidationFail::from)?;

match res {
LazyValue::Value(result) => {
LazyValue::Value(batch) => {
let cursor = ForwardCursor::default();
let result = QueryResponse { result, cursor };
let result = BatchedResponse { batch, cursor };
return Ok(Scale(result.into()));
}
LazyValue::Iter(iter) => {
let query_id = (&valid_request, &sorting, &pagination).encode();
let query = apply_sorting_and_pagination(iter, &sorting, pagination);
(query_id, query.batched(fetch_size))
let live_query = apply_sorting_and_pagination(iter, &sorting, pagination);
let query_id = uuid::Uuid::new_v4().to_string();

(query_id, Some(0), live_query.batched(fetch_size))
}
}
};

let (batch, next_cursor) = live_query.next_batch(cursor)?;
let (batch, next_cursor) = live_query.next_batch(curr_cursor)?;

if !live_query.is_depleted() {
query_store.insert(query_id, live_query);
query_store.insert(query_id.clone(), request_id, live_query);
}

let query_response = QueryResponse {
result: Value::Vec(batch),
cursor: next_cursor,
let query_response = BatchedResponse {
batch: Value::Vec(batch),
cursor: ForwardCursor {
query_id: Some(query_id),
cursor: next_cursor,
},
};

Ok(Scale(query_response.into()))
Expand Down Expand Up @@ -192,16 +204,16 @@ async fn handle_pending_transactions(
) -> Result<Scale<Vec<VersionedSignedTransaction>>> {
// TODO: Don't clone wsv here
let wsv = sumeragi.wsv_clone();
Ok(Scale(
queue
.all_transactions(&wsv)
.map(Into::into)
.paginate(pagination)
.collect::<Vec<_>>(),
// TODO:
// NOTE: batching is done after collecting the result of pagination
//.batched(fetch_size)
))

let query_response = queue
.all_transactions(&wsv)
.map(Into::into)
.paginate(pagination)
.collect::<Vec<_>>();
// TODO:
//.batched(fetch_size)

Ok(Scale(query_response))
}

#[iroha_futures::telemetry_future]
Expand Down Expand Up @@ -495,7 +507,7 @@ impl Torii {
.and(endpoint3(
handle_pending_transactions,
warp::path(uri::PENDING_TRANSACTIONS)
.and(add_state!(self.queue, self.sumeragi))
.and(add_state!(self.queue, self.sumeragi,))
.and(paginate()),
))
.or(endpoint2(
Expand Down
2 changes: 1 addition & 1 deletion cli/src/torii/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ impl<O: Reply, E: Reply> Reply for WarpResult<O, E> {
}
}

iroha_cli_derive::generate_endpoints!(2, 3, 4, 5, 7);
iroha_cli_derive::generate_endpoints!(2, 3, 4, 5, 6, 7);
25 changes: 14 additions & 11 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use iroha_config::{client::Configuration, torii::uri, GetConfiguration, PostConf
use iroha_crypto::{HashOf, KeyPair};
use iroha_data_model::{
block::VersionedCommittedBlock,
http::VersionedBatchedResponse,
isi::Instruction,
predicate::PredicateBox,
prelude::*,
Expand Down Expand Up @@ -104,10 +105,10 @@ where
// Separate-compilation friendly response handling
fn _handle_query_response_base(
resp: &Response<Vec<u8>>,
) -> QueryResult<VersionedQueryResponse> {
) -> QueryResult<VersionedBatchedResponse<Value>> {
match resp.status() {
StatusCode::OK => {
let res = VersionedQueryResponse::decode_all_versioned(resp.body());
let res = VersionedBatchedResponse::decode_all_versioned(resp.body());
res.wrap_err(
"Failed to decode response from Iroha. \
You are likely using a version of the client library \
Expand Down Expand Up @@ -143,13 +144,13 @@ where
}

let response = _handle_query_response_base(resp)
.map(|VersionedQueryResponse::V1(response)| response)?;
.map(|VersionedBatchedResponse::V1(response)| response)?;

let value = R::try_from(response.result)
let value = R::try_from(response.batch)
.map_err(Into::into)
.wrap_err("Unexpected type")?;

self.query_request.server_cursor = response.cursor;
self.query_request.query_cursor = response.cursor;
Ok(value)
}
}
Expand Down Expand Up @@ -259,7 +260,7 @@ where

fn next(&mut self) -> Option<Self::Item> {
if self.client_cursor >= self.iter.len() {
self.query_handler.query_request.server_cursor.get()?;
self.query_handler.query_request.query_cursor.cursor?;

let request = match self.query_handler.query_request.clone().assemble().build() {
Err(err) => return Some(Err(ClientQueryError::Other(err))),
Expand Down Expand Up @@ -360,19 +361,21 @@ pub struct QueryRequest {
request: Vec<u8>,
sorting: Sorting,
pagination: Pagination,
server_cursor: ForwardCursor,
query_cursor: ForwardCursor,
}

impl QueryRequest {
#[cfg(test)]
fn dummy() -> Self {
let torii_url = iroha_config::torii::uri::DEFAULT_API_ADDR;

Self {
torii_url: uri::QUERY.parse().unwrap(),
torii_url: format!("http://{torii_url}").parse().unwrap(),
headers: HashMap::new(),
request: Vec::new(),
sorting: Sorting::default(),
pagination: Pagination::default(),
server_cursor: ForwardCursor::default(),
query_cursor: ForwardCursor::default(),
}
}
fn assemble(self) -> DefaultRequestBuilder {
Expand All @@ -383,7 +386,7 @@ impl QueryRequest {
.headers(self.headers)
.params(Vec::from(self.sorting))
.params(Vec::from(self.pagination))
.params(Vec::from(self.server_cursor))
.params(Vec::from(self.query_cursor))
.body(self.request)
}
}
Expand Down Expand Up @@ -805,7 +808,7 @@ impl Client {
request,
sorting,
pagination,
server_cursor: ForwardCursor::default(),
query_cursor: ForwardCursor::default(),
};

Ok((
Expand Down
Loading

0 comments on commit 9518034

Please sign in to comment.