Skip to content

Commit

Permalink
[feature] #3468: implement server-side cursor
Browse files Browse the repository at this point in the history
Signed-off-by: Marin Veršić <marin.versic101@gmail.com>
  • Loading branch information
mversic committed Oct 17, 2023
1 parent 01aa10e commit af7dab7
Show file tree
Hide file tree
Showing 77 changed files with 1,807 additions and 1,354 deletions.
888 changes: 496 additions & 392 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ thiserror = { workspace = true }
displaydoc = { workspace = true }
tokio = { workspace = true, features = ["sync", "time", "rt", "io-util", "rt-multi-thread", "macros", "fs", "signal"] }
warp = { workspace = true, features = ["multipart", "websocket"] }
dashmap = "5.4.0"
serial_test = "0.8.0"
once_cell = { workspace = true }
owo-colors = { workspace = true, features = ["supports-colors"] }
Expand Down
2 changes: 1 addition & 1 deletion cli/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const TIMEOUT: Duration = Duration::from_millis(10_000);
const TIMEOUT: Duration = Duration::from_millis(1000);

/// Error type with generic for actual Stream/Sink error type
#[derive(thiserror::Error, displaydoc::Display, Debug)]
#[derive(Debug, displaydoc::Display, thiserror::Error)]
#[ignore_extra_doc_attributes]
pub enum Error<InternalStreamError>
where
Expand Down
79 changes: 79 additions & 0 deletions cli/src/torii/cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::num::NonZeroUsize;

use iroha_data_model::query::ForwardCursor;

use crate::torii::{Error, Result};

pub trait Batch: IntoIterator + Sized {
fn batched(self, fetch_size: NonZeroUsize) -> Batched<Self>;
}

impl<I: IntoIterator> Batch for I {
fn batched(self, batch_size: NonZeroUsize) -> Batched<Self> {
Batched {
iter: self.into_iter(),
batch_size,
cursor: ForwardCursor::default(),
}
}
}

/// Paginated [`Iterator`].
/// Not recommended to use directly, only use in iterator chains.
#[derive(Debug)]
pub struct Batched<I: IntoIterator> {
iter: I::IntoIter,
batch_size: NonZeroUsize,
cursor: ForwardCursor,
}

impl<I: IntoIterator + FromIterator<I::Item>> Batched<I> {
pub(crate) fn next_batch(&mut self, cursor: ForwardCursor) -> Result<(I, ForwardCursor)> {
if cursor != self.cursor {
return Err(Error::UnknownCursor);
}

let mut batch_size = 0;
let batch: I = self
.iter
.by_ref()
.inspect(|_| batch_size += 1)
.take(self.batch_size.get())
.collect();

self.cursor.cursor = if let Some(cursor) = self.cursor.cursor {
if batch_size >= self.batch_size.get() {
let batch_size = self
.batch_size
.get()
.try_into()
.expect("usize should fit in u64");
Some(
cursor
.checked_add(batch_size)
.expect("Cursor size should never reach the platform limit"),
)
} else {
None
}
} else if batch_size >= self.batch_size.get() {
Some(self.batch_size.try_into().expect("usize should fit in u64"))
} else {
None
};

Ok((batch, self.cursor))
}

pub fn is_depleted(&self) -> bool {
self.cursor.cursor.is_none()
}
}

impl<I: Iterator> Iterator for Batched<I> {
type Item = I::Item;

fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}
53 changes: 47 additions & 6 deletions cli/src/torii/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use std::{
fmt::{Debug, Write as _},
net::ToSocketAddrs,
sync::Arc,
time::{Duration, Instant},
};

use dashmap::DashMap;
use futures::{stream::FuturesUnordered, StreamExt};
use iroha_core::{
kura::Kura,
Expand All @@ -17,8 +19,8 @@ use iroha_core::{
sumeragi::SumeragiHandle,
EventsSender,
};
use thiserror::Error;
use tokio::sync::Notify;
use iroha_data_model::Value;
use tokio::{sync::Notify, time::sleep};
use utils::*;
use warp::{
http::StatusCode,
Expand All @@ -27,10 +29,44 @@ use warp::{
Filter as _, Reply,
};

use self::cursor::Batched;

#[macro_use]
pub(crate) mod utils;
mod cursor;
mod pagination;
pub mod routing;
mod routing;

type LiveQuery = Batched<Vec<Value>>;

#[derive(Default)]
struct LiveQueryStore {
queries: DashMap<Vec<u8>, (LiveQuery, Instant)>,
}

impl LiveQueryStore {
fn insert(&self, request: Vec<u8>, live_query: LiveQuery) {
self.queries.insert(request, (live_query, Instant::now()));
}

fn remove(&self, request: &Vec<u8>) -> Option<(Vec<u8>, LiveQuery)> {
self.queries
.remove(request)
.map(|(query_id, (query, _))| (query_id, query))
}

// TODO: Add notifier channel to enable graceful shutdown
fn expired_query_cleanup(self: Arc<Self>, idle_time: Duration) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
loop {
sleep(idle_time).await;

self.queries
.retain(|_, (_, last_access_time)| last_access_time.elapsed() <= idle_time);
}
})
}
}

/// Main network handler and the only entrypoint of the Iroha.
pub struct Torii {
Expand All @@ -39,11 +75,12 @@ pub struct Torii {
events: EventsSender,
notify_shutdown: Arc<Notify>,
sumeragi: SumeragiHandle,
query_store: Arc<LiveQueryStore>,
kura: Arc<Kura>,
}

/// Torii errors.
#[derive(Debug, Error, displaydoc::Display)]
#[derive(Debug, thiserror::Error, displaydoc::Display)]
pub enum Error {
/// Failed to execute or validate query
Query(#[from] iroha_data_model::ValidationFail),
Expand All @@ -58,10 +95,12 @@ pub enum Error {
#[cfg(feature = "telemetry")]
/// Error while getting Prometheus metrics
Prometheus(#[source] eyre::Report),
/// Error while resuming cursor
UnknownCursor,
}

/// Status code for query error response.
pub(crate) fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode {
fn query_status_code(validation_error: &iroha_data_model::ValidationFail) -> StatusCode {
use iroha_data_model::{
isi::error::InstructionExecutionError, query::error::QueryExecutionFail::*,
ValidationFail::*,
Expand Down Expand Up @@ -104,7 +143,9 @@ impl Error {
use Error::*;
match self {
Query(e) => query_status_code(e),
AcceptTransaction(_) | ConfigurationReload(_) => StatusCode::BAD_REQUEST,
AcceptTransaction(_) | ConfigurationReload(_) | UnknownCursor => {
StatusCode::BAD_REQUEST
}
Config(_) => StatusCode::NOT_FOUND,
PushIntoQueue(err) => match **err {
queue::Error::Full => StatusCode::INTERNAL_SERVER_ERROR,
Expand Down
97 changes: 47 additions & 50 deletions cli/src/torii/pagination.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use iroha_data_model::prelude::*;
use iroha_data_model::query::Pagination;

/// Describes a collection to which pagination can be applied.
/// Implemented for the [`Iterator`] implementors.
Expand All @@ -7,61 +7,57 @@ pub trait Paginate: Iterator + Sized {
fn paginate(self, pagination: Pagination) -> Paginated<Self>;
}

impl<I: Iterator + Sized> Paginate for I {
impl<I: Iterator> Paginate for I {
fn paginate(self, pagination: Pagination) -> Paginated<Self> {
Paginated {
pagination,
iter: self,
}
Paginated::new(pagination, self)
}
}

/// Paginated [`Iterator`].
/// Not recommended to use directly, only use in iterator chains.
#[derive(Debug)]
pub struct Paginated<I: Iterator> {
pagination: Pagination,
iter: I,
pub struct Paginated<I: Iterator>(core::iter::Take<core::iter::Skip<I>>);

impl<I: Iterator> Paginated<I> {
fn new(pagination: Pagination, iter: I) -> Self {
Self(
iter.skip(pagination.start.map_or_else(
|| 0,
|start| start.get().try_into().expect("U64 should fit into usize"),
))
.take(pagination.limit.map_or_else(
|| usize::MAX,
|limit| limit.get().try_into().expect("U32 should fit into usize"),
)),
)
}
}

impl<I: Iterator> Iterator for Paginated<I> {
type Item = I::Item;

fn next(&mut self) -> Option<Self::Item> {
if let Some(limit) = self.pagination.limit.as_mut() {
if *limit == 0 {
return None;
}

*limit -= 1
}

#[allow(clippy::option_if_let_else)]
// Required because of E0524. 2 closures with unique refs to self
if let Some(start) = self.pagination.start.take() {
self.iter
.nth(start.try_into().expect("u32 should always fit in usize"))
} else {
self.iter.next()
}
self.0.next()
}
}

/// Filter for warp which extracts pagination
pub fn paginate() -> impl warp::Filter<Extract = (Pagination,), Error = warp::Rejection> + Copy {
warp::query()
}

#[cfg(test)]
mod tests {
use std::num::{NonZeroU32, NonZeroU64};

use iroha_data_model::query::pagination::Pagination;

use super::*;

#[test]
fn empty() {
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(None, None))
.paginate(Pagination {
limit: None,
start: None
})
.collect::<Vec<_>>(),
vec![1_i32, 2_i32, 3_i32]
)
Expand All @@ -72,21 +68,20 @@ mod tests {
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(Some(0), None))
.collect::<Vec<_>>(),
vec![1_i32, 2_i32, 3_i32]
);
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(Some(1), None))
.paginate(Pagination {
limit: None,
start: NonZeroU64::new(1)
})
.collect::<Vec<_>>(),
vec![2_i32, 3_i32]
);
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(Some(3), None))
.paginate(Pagination {
limit: None,
start: NonZeroU64::new(3)
})
.collect::<Vec<_>>(),
Vec::<i32>::new()
);
Expand All @@ -97,21 +92,20 @@ mod tests {
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(None, Some(0)))
.collect::<Vec<_>>(),
Vec::<i32>::new()
);
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(None, Some(2)))
.paginate(Pagination {
limit: NonZeroU32::new(2),
start: None
})
.collect::<Vec<_>>(),
vec![1_i32, 2_i32]
);
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(None, Some(4)))
.paginate(Pagination {
limit: NonZeroU32::new(4),
start: None
})
.collect::<Vec<_>>(),
vec![1_i32, 2_i32, 3_i32]
);
Expand All @@ -122,7 +116,10 @@ mod tests {
assert_eq!(
vec![1_i32, 2_i32, 3_i32]
.into_iter()
.paginate(Pagination::new(Some(1), Some(1)))
.paginate(Pagination {
limit: NonZeroU32::new(1),
start: NonZeroU64::new(1),
})
.collect::<Vec<_>>(),
vec![2_i32]
)
Expand Down
Loading

0 comments on commit af7dab7

Please sign in to comment.