Skip to content

Commit

Permalink
ordered futures
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Oct 24, 2022
1 parent 5248522 commit d6ba724
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
7 changes: 7 additions & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,13 @@ impl MethodResult {
Self::SendAndLogger(r) => r,
}
}

pub(crate) fn into_inner(self) -> MethodResponse {
match self {
Self::JustLogger(r) => r,
Self::SendAndLogger(r) => r,
}
}
}

/// Data required by the server to handle requests.
Expand Down
25 changes: 15 additions & 10 deletions server/src/transport/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::sync::Arc;

use crate::logger::{self, Logger, TransportProtocol};

use futures_util::future::FutureExt;
use futures_util::stream::{FuturesOrdered, StreamExt};
use http::Method;
use jsonrpsee_core::error::GenericTransportError;
use jsonrpsee_core::http_helpers::read_body;
Expand Down Expand Up @@ -157,15 +159,13 @@ where
let Batch { data, call } = b;

if let Ok(batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(&data) {
let mut batch_response = BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize);
let mut got_notif = false;
let mut pending_calls = FuturesOrdered::new();
let mut batch_response = BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize);

for val in batch {
if let Ok(req) = serde_json::from_str::<Request>(val.get()) {
let response = execute_call(req, call.clone()).await;
if let Err(e) = batch_response.append(&response) {
return e;
}
pending_calls.push_back(execute_call(req, call.clone()).boxed());
} else if let Ok(_notif) = serde_json::from_str::<Notification<&JsonRawValue>>(val.get()) {
// notifications should not be answered.
got_notif = true;
Expand All @@ -175,11 +175,16 @@ where
Ok(err) => err.id,
Err(_) => Id::Null,
};
if let Err(err) =
batch_response.append(&MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest)))
{
return err;
}

pending_calls.push_back(
async { MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest)) }.boxed(),
);
}
}

while let Some(response) = pending_calls.next().await {
if let Err(too_large) = batch_response.append(&response) {
return too_large;
}
}

Expand Down
27 changes: 17 additions & 10 deletions server/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::server::{MethodResult, ServiceData};
use futures_channel::mpsc;
use futures_util::future::{self, Either};
use futures_util::io::{BufReader, BufWriter};
use futures_util::stream::FuturesOrdered;
use futures_util::{Future, FutureExt, StreamExt};
use hyper::upgrade::Upgraded;
use jsonrpsee_core::server::helpers::{
Expand Down Expand Up @@ -110,15 +111,16 @@ pub(crate) async fn process_batch_request<L: Logger>(b: Batch<'_, L>) -> Option<
let Batch { data, call } = b;

if let Ok(batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(&data) {
let mut batch_response = BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize);
let mut got_notif = false;
let mut pending_calls = FuturesOrdered::new();
let mut batch_response = BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize);

for val in batch {
if let Ok(req) = serde_json::from_str::<Request>(val.get()) {
let response = execute_call(req, call.clone()).await;
if let Err(batch_err) = batch_response.append(response.as_inner()) {
return Some(batch_err);
}
let call = call.clone();
let fut = async move { execute_call(req, call.clone()).await.into_inner() };

pending_calls.push_back(fut.boxed());
} else if let Ok(_notif) = serde_json::from_str::<Notification<&JsonRawValue>>(val.get()) {
// notifications should not be answered.
got_notif = true;
Expand All @@ -128,11 +130,16 @@ pub(crate) async fn process_batch_request<L: Logger>(b: Batch<'_, L>) -> Option<
Ok(err) => err.id,
Err(_) => Id::Null,
};
if let Err(batch_err) =
batch_response.append(&MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest)))
{
return Some(batch_err);
}

pending_calls.push_back(
async { MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest)) }.boxed(),
);
}
}

while let Some(response) = pending_calls.next().await {
if let Err(too_large) = batch_response.append(&response) {
return Some(too_large);
}
}

Expand Down

0 comments on commit d6ba724

Please sign in to comment.