Skip to content

Commit

Permalink
[server] proper handling of batch errors and mixed calls (#917)
Browse files Browse the repository at this point in the history
* fix: proper error on invalid batches

* proper handling of batch request errors

* ordered futures

* remove needless boxing

Co-authored-by: Alexander Polakov <a.polakov@zubr.io>
  • Loading branch information
niklasad1 and polachok authored Oct 26, 2022
1 parent 990c120 commit e649f38
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 75 deletions.
18 changes: 14 additions & 4 deletions core/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl BatchResponseBuilder {
///
/// Fails if the max limit is exceeded and returns to error response to
/// return early in order to not process method call responses which are thrown away anyway.
pub fn append(mut self, response: &MethodResponse) -> Result<Self, BatchResponse> {
pub fn append(&mut self, response: &MethodResponse) -> Result<(), BatchResponse> {
// `,` will occupy one extra byte for each entry
// on the last item the `,` is replaced by `]`.
let len = response.result.len() + self.result.len() + 1;
Expand All @@ -291,10 +291,15 @@ impl BatchResponseBuilder {
} else {
self.result.push_str(&response.result);
self.result.push(',');
Ok(self)
Ok(())
}
}

/// Check if the batch is empty.
pub fn is_empty(&self) -> bool {
self.result.len() <= 1
}

/// Finish the batch response
pub fn finish(mut self) -> BatchResponse {
if self.result.len() == 1 {
Expand Down Expand Up @@ -366,7 +371,9 @@ mod tests {
assert_eq!(method.result.len(), 37);

// Recall a batch appends two bytes for the `[]`.
let batch = BatchResponseBuilder::new_with_limit(39).append(&method).unwrap().finish();
let mut builder = BatchResponseBuilder::new_with_limit(39);
builder.append(&method).unwrap();
let batch = builder.finish();

assert!(batch.success);
assert_eq!(batch.result, r#"[{"jsonrpc":"2.0","result":"a","id":1}]"#.to_string())
Expand All @@ -380,7 +387,10 @@ mod tests {
// Recall a batch appends two bytes for the `[]` and one byte for `,` to append a method call.
// so it should be 2 + (37 * n) + (n-1)
let limit = 2 + (37 * 2) + 1;
let batch = BatchResponseBuilder::new_with_limit(limit).append(&m1).unwrap().append(&m1).unwrap().finish();
let mut builder = BatchResponseBuilder::new_with_limit(limit);
builder.append(&m1).unwrap();
builder.append(&m1).unwrap();
let batch = builder.finish();

assert!(batch.success);
assert_eq!(
Expand Down
7 changes: 7 additions & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,13 @@ impl MethodResult {
Self::SendAndLogger(r) => r,
}
}

pub(crate) fn into_inner(self) -> MethodResponse {
match self {
Self::JustLogger(r) => r,
Self::SendAndLogger(r) => r,
}
}
}

/// Data required by the server to handle requests.
Expand Down
27 changes: 22 additions & 5 deletions server/src/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ async fn batched_notifications() {
}

#[tokio::test]
async fn invalid_batched_method_calls() {
async fn invalid_batch_calls() {
init_logger();

let (addr, _handle) = server().with_default_timeout().await.unwrap();
Expand All @@ -245,15 +245,13 @@ async fn invalid_batched_method_calls() {
let req = r#"[123]"#;
let response = http_request(req.into(), uri.clone()).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response.status, StatusCode::OK);
// Note: according to the spec the `id` should be `null` here, not 123.
assert_eq!(response.body, invalid_request(Id::Num(123)));
assert_eq!(response.body, invalid_batch(vec![Id::Null]));

// batch with invalid request
let req = r#"[1, 2, 3]"#;
let response = http_request(req.into(), uri.clone()).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response.status, StatusCode::OK);
// Note: according to the spec this should return an array of three `Invalid Request`s
assert_eq!(response.body, parse_error(Id::Null));
assert_eq!(response.body, invalid_batch(vec![Id::Null, Id::Null, Id::Null]));

// invalid JSON in batch
let req = r#"[
Expand All @@ -265,6 +263,25 @@ async fn invalid_batched_method_calls() {
assert_eq!(response.body, parse_error(Id::Null));
}

#[tokio::test]
async fn batch_with_mixed_calls() {
init_logger();

let (addr, _handle) = server().with_default_timeout().await.unwrap();
let uri = to_http_uri(addr);
// mixed notifications, method calls and valid json should be valid.
let req = r#"[
{"jsonrpc": "2.0", "method": "add", "params": [1,2,4], "id": "1"},
{"jsonrpc": "2.0", "method": "add", "params": [7]},
{"foo": "boo"},
{"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"}
]"#;
let res = r#"[{"jsonrpc":"2.0","result":7,"id":"1"},{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":null},{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":"5"}]"#;
let response = http_request(req.into(), uri.clone()).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, res);
}

#[tokio::test]
async fn garbage_request_fails() {
let (addr, _handle) = server().await;
Expand Down
49 changes: 49 additions & 0 deletions server/src/tests/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,3 +604,52 @@ async fn disabled_batches() {
server_handle.stop().unwrap();
server_handle.stopped().await;
}

#[tokio::test]
async fn invalid_batch_calls() {
init_logger();

let addr = server().await;
let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap();

// batch with no requests
let req = r#"[]"#;
let response = client.send_request_text(req).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, invalid_request(Id::Null));

// batch with invalid request
let req = r#"[123]"#;
let response = client.send_request_text(req).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, invalid_batch(vec![Id::Null]));

// batch with invalid request
let req = r#"[1, 2, 3]"#;
let response = client.send_request_text(req).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, invalid_batch(vec![Id::Null, Id::Null, Id::Null]));

// invalid JSON in batch
let req = r#"[
{"jsonrpc": "2.0", "method": "sum", "params": [1,2,4], "id": "1"},
{"jsonrpc": "2.0", "method"
]"#;
let response = client.send_request_text(req).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, parse_error(Id::Null));
}

#[tokio::test]
async fn batch_with_mixed_calls() {
init_logger();

let addr = server().with_default_timeout().await.unwrap();
let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap();
// mixed notifications, method calls and valid json should be valid.
let req = r#"[
{"jsonrpc": "2.0", "method": "add", "params": [1,2,4], "id": "1"},
{"jsonrpc": "2.0", "method": "add", "params": [7]},
{"foo": "boo"},
{"jsonrpc": "2.0", "method": "foo.get", "params": {"name": "myself"}, "id": "5"}
]"#;
let res = r#"[{"jsonrpc":"2.0","result":7,"id":"1"},{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":null},{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":"5"}]"#;
let response = client.send_request_text(req.to_string()).with_default_timeout().await.unwrap().unwrap();
assert_eq!(response, res);
}
75 changes: 41 additions & 34 deletions server/src/transport/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::sync::Arc;

use crate::logger::{self, Logger, TransportProtocol};

use futures_util::TryStreamExt;
use futures_util::future::Either;
use futures_util::stream::{FuturesOrdered, StreamExt};
use http::Method;
use jsonrpsee_core::error::GenericTransportError;
use jsonrpsee_core::http_helpers::read_body;
Expand All @@ -14,7 +15,7 @@ use jsonrpsee_core::server::{resource_limiting::Resources, rpc_module::Methods};
use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str};
use jsonrpsee_core::JsonRawValue;
use jsonrpsee_types::error::{ErrorCode, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG};
use jsonrpsee_types::{ErrorObject, Id, Notification, Params, Request};
use jsonrpsee_types::{ErrorObject, Id, InvalidRequest, Notification, Params, Request};
use tokio::sync::OwnedSemaphorePermit;
use tracing::instrument;

Expand Down Expand Up @@ -157,41 +158,47 @@ where
{
let Batch { data, call } = b;

if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&data) {
let max_response_size = call.max_response_body_size;
let batch = batch.into_iter().map(|req| Ok((req, call.clone())));

let batch_stream = futures_util::stream::iter(batch);

let batch_response = batch_stream
.try_fold(
BatchResponseBuilder::new_with_limit(max_response_size as usize),
|batch_response, (req, call)| async move {
let response = execute_call(req, call).await;
batch_response.append(&response)
},
)
.await;

return match batch_response {
Ok(batch) => batch.finish(),
Err(batch_err) => batch_err,
};
}
if let Ok(batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(&data) {
let mut got_notif = false;
let mut batch_response = BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize);

let mut pending_calls: FuturesOrdered<_> = batch
.into_iter()
.filter_map(|v| {
if let Ok(req) = serde_json::from_str::<Request>(v.get()) {
Some(Either::Right(execute_call(req, call.clone())))
} else if let Ok(_notif) = serde_json::from_str::<Notification<&JsonRawValue>>(v.get()) {
// notifications should not be answered.
got_notif = true;
None
} else {
// valid JSON but could be not parsable as `InvalidRequest`
let id = match serde_json::from_str::<InvalidRequest>(v.get()) {
Ok(err) => err.id,
Err(_) => Id::Null,
};

Some(Either::Left(async {
MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest))
}))
}
})
.collect();

if let Ok(batch) = serde_json::from_slice::<Vec<Notif>>(&data) {
return if !batch.is_empty() {
BatchResponse { result: "".to_string(), success: true }
while let Some(response) = pending_calls.next().await {
if let Err(too_large) = batch_response.append(&response) {
return too_large;
}
}

if got_notif && batch_response.is_empty() {
BatchResponse { result: String::new(), success: true }
} else {
BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
};
batch_response.finish()
}
} else {
BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::ParseError))
}

// "If the batch rpc call itself fails to be recognized as an valid JSON or as an
// Array with at least one value, the response from the Server MUST be a single
// Response object." – The Spec.
let (id, code) = prepare_error(&data);
BatchResponse::error(id, ErrorObject::from(code))
}

pub(crate) async fn process_single_request<L: Logger>(data: Vec<u8>, call: CallData<'_, L>) -> MethodResponse {
Expand Down
80 changes: 48 additions & 32 deletions server/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::server::{MethodResult, ServiceData};
use futures_channel::mpsc;
use futures_util::future::{self, Either};
use futures_util::io::{BufReader, BufWriter};
use futures_util::{Future, FutureExt, StreamExt, TryStreamExt};
use futures_util::stream::FuturesOrdered;
use futures_util::{Future, FutureExt, StreamExt};
use hyper::upgrade::Upgraded;
use jsonrpsee_core::server::helpers::{
prepare_error, BatchResponse, BatchResponseBuilder, BoundedSubscriptions, MethodResponse, MethodSink,
Expand All @@ -18,12 +19,12 @@ use jsonrpsee_core::server::resource_limiting::Resources;
use jsonrpsee_core::server::rpc_module::{ConnState, MethodKind, Methods};
use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str};
use jsonrpsee_core::traits::IdProvider;
use jsonrpsee_core::Error;
use jsonrpsee_core::{Error, JsonRawValue};
use jsonrpsee_types::error::{
reject_too_big_request, reject_too_many_subscriptions, ErrorCode, BATCHES_NOT_SUPPORTED_CODE,
BATCHES_NOT_SUPPORTED_MSG,
};
use jsonrpsee_types::{ErrorObject, Id, Params, Request};
use jsonrpsee_types::{ErrorObject, Id, InvalidRequest, Notification, Params, Request};
use soketto::connection::Error as SokettoError;
use soketto::data::ByteSlice125;
use tokio_stream::wrappers::IntervalStream;
Expand Down Expand Up @@ -106,37 +107,50 @@ where
// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
// complete batch response back to the client over `tx`.
#[instrument(name = "batch", skip(b), level = "TRACE")]
pub(crate) async fn process_batch_request<L: Logger>(b: Batch<'_, L>) -> BatchResponse {
pub(crate) async fn process_batch_request<L: Logger>(b: Batch<'_, L>) -> Option<BatchResponse> {
let Batch { data, call } = b;

if let Ok(batch) = serde_json::from_slice::<Vec<Request>>(&data) {
return if !batch.is_empty() {
let batch = batch.into_iter().map(|req| Ok((req, call.clone())));
let batch_stream = futures_util::stream::iter(batch);

let max_response_size = call.max_response_body_size;

let batch_response = batch_stream
.try_fold(
BatchResponseBuilder::new_with_limit(max_response_size as usize),
|batch_response, (req, call)| async move {
let response = execute_call(req, call).await;
batch_response.append(response.as_inner())
},
)
.await;

match batch_response {
Ok(batch) => batch.finish(),
Err(batch_err) => batch_err,
if let Ok(batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(&data) {
let mut got_notif = false;
let mut batch_response = BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize);

let mut pending_calls: FuturesOrdered<_> = batch
.into_iter()
.filter_map(|v| {
if let Ok(req) = serde_json::from_str::<Request>(v.get()) {
Some(Either::Right(async { execute_call(req, call.clone()).await.into_inner() }))
} else if let Ok(_notif) = serde_json::from_str::<Notification<&JsonRawValue>>(v.get()) {
// notifications should not be answered.
got_notif = true;
None
} else {
// valid JSON but could be not parsable as `InvalidRequest`
let id = match serde_json::from_str::<InvalidRequest>(v.get()) {
Ok(err) => err.id,
Err(_) => Id::Null,
};

Some(Either::Left(async {
MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest))
}))
}
})
.collect();

while let Some(response) = pending_calls.next().await {
if let Err(too_large) = batch_response.append(&response) {
return Some(too_large);
}
}

if got_notif && batch_response.is_empty() {
None
} else {
BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
};
Some(batch_response.finish())
}
} else {
Some(BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::ParseError)))
}

let (id, code) = prepare_error(&data);
BatchResponse::error(id, ErrorObject::from(code))
}

pub(crate) async fn process_single_request<L: Logger>(data: Vec<u8>, call: CallData<'_, L>) -> MethodResult {
Expand Down Expand Up @@ -416,9 +430,11 @@ pub(crate) async fn background_task<L: Logger>(
})
.await;

tx_log_from_str(&response.result, max_log_length);
logger.on_response(&response.result, request_start, TransportProtocol::WebSocket);
let _ = sink.send_raw(response.result);
if let Some(response) = response {
tx_log_from_str(&response.result, max_log_length);
logger.on_response(&response.result, request_start, TransportProtocol::WebSocket);
let _ = sink.send_raw(response.result);
}
};

method_executors.add(Box::pin(fut));
Expand Down
Loading

3 comments on commit e649f38

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: e649f38 Previous: 990c120 Ratio
sync/http_batch_requests/fast_call/5 189864 ns/iter (± 62728) 93775 ns/iter (± 1986) 2.02
sync/http_batch_requests/fast_call/10 209835 ns/iter (± 27088) 104752 ns/iter (± 1942) 2.00
sync/ws_round_trip/fast_call 125729 ns/iter (± 19276) 62272 ns/iter (± 3422) 2.02
sync/ws_batch_requests/fast_call/2 143240 ns/iter (± 33698) 69257 ns/iter (± 975) 2.07
sync/ws_batch_requests/fast_call/5 159469 ns/iter (± 23985) 79589 ns/iter (± 1577) 2.00
async/ws_round_trip/fast_call 131514 ns/iter (± 27889) 62359 ns/iter (± 1344) 2.11
async/ws_batch_requests/fast_call/2 142929 ns/iter (± 17205) 69975 ns/iter (± 1152) 2.04
async/ws_batch_requests/fast_call/5 160161 ns/iter (± 22977) 79819 ns/iter (± 1653) 2.01
subscriptions/subscribe 185176 ns/iter (± 28250) 85880 ns/iter (± 1894) 2.16

This comment was automatically generated by workflow using github-action-benchmark.

CC: @niklasad1

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: e649f38 Previous: 990c120 Ratio
sync/ws_concurrent_conn_calls/fast_call/8 1692296 ns/iter (± 26815) 757773 ns/iter (± 55593) 2.23
sync/ws_concurrent_conn_calls/fast_call/32 5532557 ns/iter (± 106633) 2682021 ns/iter (± 70905) 2.06
sync/ws_concurrent_conn_calls/fast_call/64 10508181 ns/iter (± 225813) 5167166 ns/iter (± 93270) 2.03
sync/ws_concurrent_conn_subs/1024 158770692 ns/iter (± 818524) 78120255 ns/iter (± 858715) 2.03
async/ws_concurrent_conn_calls/fast_call/32 5498146 ns/iter (± 79331) 2673624 ns/iter (± 75070) 2.06
async/ws_concurrent_conn_calls/fast_call/64 10440471 ns/iter (± 217372) 5141961 ns/iter (± 114534) 2.03
async/ws_concurrent_conn_subs/1024 158857767 ns/iter (± 904006) 78027613 ns/iter (± 975727) 2.04

This comment was automatically generated by workflow using github-action-benchmark.

CC: @niklasad1

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: e649f38 Previous: 990c120 Ratio
sync/ws_concurrent_conn_calls/fast_call/8 1671923 ns/iter (± 32155) 757773 ns/iter (± 55593) 2.21
sync/ws_concurrent_conn_calls/fast_call/32 5501062 ns/iter (± 120067) 2682021 ns/iter (± 70905) 2.05
sync/ws_concurrent_conn_calls/fast_call/64 10482054 ns/iter (± 354570) 5167166 ns/iter (± 93270) 2.03
sync/ws_concurrent_conn_subs/1024 158882595 ns/iter (± 1037824) 78120255 ns/iter (± 858715) 2.03
async/ws_concurrent_conn_calls/fast_call/32 5499119 ns/iter (± 69507) 2673624 ns/iter (± 75070) 2.06
async/ws_concurrent_conn_calls/fast_call/64 10465082 ns/iter (± 263403) 5141961 ns/iter (± 114534) 2.04
async/ws_concurrent_conn_subs/1024 158707960 ns/iter (± 920591) 78027613 ns/iter (± 975727) 2.03

This comment was automatically generated by workflow using github-action-benchmark.

CC: @niklasad1

Please sign in to comment.