From d637b8dc21b733e3abe04a73c5eebc927a6ec42e Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 5 Nov 2020 12:56:09 +0100 Subject: [PATCH 01/26] experimental --- benches/benches.rs | 2 +- examples/http.rs | 4 +- examples/ws.rs | 8 +- src/client/http/client.rs | 169 +++++------------ src/client/http/mod.rs | 2 - src/client/http/raw.rs | 339 ----------------------------------- src/client/http/transport.rs | 216 +++++----------------- src/client/mod.rs | 2 +- src/types/client.rs | 8 +- 9 files changed, 114 insertions(+), 636 deletions(-) delete mode 100644 src/client/http/raw.rs diff --git a/benches/benches.rs b/benches/benches.rs index 165b54cf9e..64e4899b53 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -35,7 +35,7 @@ pub fn http(c: &mut criterion::Criterion) { let (tx_addr, rx_addr) = oneshot::channel::(); async_std::task::spawn(http_server(tx_addr)); let server_addr = block_on(rx_addr).unwrap(); - let client = HttpClient::new(&format!("http://{}", server_addr)); + let mut client = HttpClient::new(&format!("http://{}", server_addr)); b.iter(|| { block_on(async { diff --git a/examples/http.rs b/examples/http.rs index 31dd161592..3b8528e59e 100644 --- a/examples/http.rs +++ b/examples/http.rs @@ -33,7 +33,7 @@ use jsonrpsee::types::jsonrpc::{JsonValue, Params}; const SOCK_ADDR: &str = "127.0.0.1:9933"; const SERVER_URI: &str = "http://localhost:9933"; -#[async_std::main] +#[tokio::main] async fn main() -> Result<(), Box> { env_logger::init(); @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box> { server_started_rx.await?; - let client = HttpClient::new(SERVER_URI); + let mut client = HttpClient::new(SERVER_URI); let response: Result = client.request("say_hello", Params::None).await; println!("r: {:?}", response); diff --git a/examples/ws.rs b/examples/ws.rs index e4f66c3a43..696e3ce803 100644 --- a/examples/ws.rs +++ b/examples/ws.rs @@ -43,9 +43,11 @@ async fn main() -> Result<(), Box> { }); server_started_rx.await?; - let client = WsClient::new(SERVER_URI).await?; - let response: JsonValue = client.request("say_hello", Params::None).await?; - println!("r: {:?}", response); + let c1 = WsClient::new(SERVER_URI).await?; + + for _ in 0..10 { + let _: Result = c1.request("say_hello", Params::None).await; + } Ok(()) } diff --git a/src/client/http/client.rs b/src/client/http/client.rs index 255ed92eb3..bd78fec75d 100644 --- a/src/client/http/client.rs +++ b/src/client/http/client.rs @@ -1,54 +1,23 @@ -use std::collections::HashMap; -use std::io; - -use crate::client::http::raw::*; use crate::client::http::transport::HttpTransportClient; use crate::types::client::Error; use crate::types::jsonrpc::{self, JsonValue}; -use futures::{channel::mpsc, channel::oneshot, future::Either, pin_mut, prelude::*}; +use std::sync::atomic::{AtomicU64, Ordering}; /// Client that wraps a `RawClient` where the `RawClient` is spawned in a background worker tasks. /// /// The communication is performed via a `mpsc` channel where the `Client` acts as simple frontend /// and just passes requests along to the backend (worker thread) -#[derive(Clone)] pub struct Client { - backend: mpsc::Sender, -} - -/// Message that the [`Client`] can send to the background task. -enum FrontToBack { - /// Send a one-shot notification to the server. The server doesn't give back any feedback. - Notification { - /// Method for the notification. - method: String, - /// Parameters to send to the server. - params: jsonrpc::Params, - }, - - /// Send a request to the server. - StartRequest { - /// Method for the request. - method: String, - /// Parameters of the request. - params: jsonrpc::Params, - /// One-shot channel where to send back the outcome of that request. - send_back: oneshot::Sender>, - }, + transport: HttpTransportClient, + request_id: AtomicU64, } impl Client { /// Create a client to connect to the server at address `endpoint` pub fn new(endpoint: &str) -> Self { - let client = RawClient::new(HttpTransportClient::new(endpoint)); - - let (to_back, from_front) = mpsc::channel(16); - async_std::task::spawn(async move { - background_task(client, from_front).await; - }); - - Self { backend: to_back } + let transport = HttpTransportClient::new(endpoint); + Self { transport, request_id: AtomicU64::new(0) } } /// Send a notification to the server. @@ -57,100 +26,60 @@ impl Client { method: impl Into, params: impl Into, ) -> Result<(), Error> { - let method = method.into(); - let params = params.into(); - log::trace!("[frontend]: client send notification: method={:?}, params={:?}", method, params); - self.backend.clone().send(FrontToBack::Notification { method, params }).await.map_err(Error::InternalChannel) + let request = jsonrpc::Request::Single(jsonrpc::Call::Notification(jsonrpc::Notification { + jsonrpc: jsonrpc::Version::V2, + method: method.into(), + params: params.into(), + })); + + self.transport.send_notification(request).await.map_err(|e| Error::TransportError(Box::new(e))) } /// Perform a request towards the server. - pub async fn request( + pub async fn request( &self, method: impl Into, params: impl Into, - ) -> Result - where - Ret: jsonrpc::DeserializeOwned, - { - let method = method.into(); - let params = params.into(); - log::trace!("[frontend]: send request: method={:?}, params={:?}", method, params); - let (send_back_tx, send_back_rx) = oneshot::channel(); - - // TODO: send a `ChannelClosed` message if we close the channel unexpectedly - - self.backend.clone().send(FrontToBack::StartRequest { method, params, send_back: send_back_tx }).await?; - let json_value = match send_back_rx.await { - Ok(Ok(v)) => v, - Ok(Err(err)) => return Err(err), - Err(_) => { - let err = io::Error::new(io::ErrorKind::Other, "background task closed"); - return Err(Error::TransportError(Box::new(err))); + ) -> Result { + let id = self.request_id.fetch_add(1, Ordering::SeqCst); + let request = jsonrpc::Request::Single(jsonrpc::Call::MethodCall(jsonrpc::MethodCall { + jsonrpc: jsonrpc::Version::V2, + method: method.into(), + params: params.into(), + id: jsonrpc::Id::Num(id), + })); + + let response = self + .transport + .send_request_and_wait_for_response(request) + .await + .map_err(|e| Error::TransportError(Box::new(e)))?; + + match response { + jsonrpc::Response::Single(rp) => Self::process_response(rp, id), + jsonrpc::Response::Batch(_rps) => { + todo!("batch request not supported"); + // for rp in rps { + // // TODO: if an error happens, we throw away the entire batch + // self.process_response(rp)?; + // } } - }; - jsonrpc::from_value(json_value).map_err(Error::ParseError) - } -} - -/// Function being run in the background that processes messages from the frontend. -async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver) { - // List of requests that the server must answer. - let mut ongoing_requests: HashMap>> = HashMap::new(); - - loop { - // We need to do a little transformation in order to destroy the borrow to `client` - // and `from_front`. - let outcome = { - let next_message = from_front.next(); - let next_event = client.next_event(); - pin_mut!(next_message); - pin_mut!(next_event); - match future::select(next_message, next_event).await { - Either::Left((v, _)) => Either::Left(v), - Either::Right((v, _)) => Either::Right(v), - } - }; - - match outcome { - // If the channel is closed, then the `Client` has been destroyed and we - // stop this task. - Either::Left(None) => { - log::trace!("[backend]: background task terminated"); - if !ongoing_requests.is_empty() { - log::warn!("client was dropped with {} pending requests", ongoing_requests.len()); - } - return; - } - - // User called `notification` on the front-end. - Either::Left(Some(FrontToBack::Notification { method, params })) => { - log::trace!("[backend]: send notification"); - let _ = client.send_notification(method, params).await; - } - - // User called `request` on the front-end. - Either::Left(Some(FrontToBack::StartRequest { method, params, send_back })) => { - match client.start_request(&method, params).await { - Ok(id) => { - log::trace!("[backend]; send request: {:?} id: {:?}", method, id); - ongoing_requests.insert(id, send_back); - } - Err(err) => { - let _ = send_back.send(Err(Error::TransportError(Box::new(err)))); - } - } - } - - // Received a response to a request from the server. - Either::Right(Ok(RawClientEvent::Response { request_id, result })) => { - log::trace!("[backend] received response to req={:?}, result={:?}", request_id, result); - let _ = ongoing_requests.remove(&request_id).unwrap().send(result.map_err(Error::Request)); + // Server MUST NOT reply to a Notification. + jsonrpc::Response::Notif(_notif) => { + Err(Error::Custom(format!("Server replied with notification response to request ID: {}", id))) } + } + } - Either::Right(Err(e)) => { - // TODO: https://github.com/paritytech/jsonrpsee/issues/67 - log::error!("Client Error: {:?}", e); + fn process_response(response: jsonrpc::Output, expected_id: u64) -> Result { + match response.id() { + jsonrpc::Id::Num(n) if n == &expected_id => { + let ret: Result = response.into(); + ret.map_err(|e| Error::Request(e)) } + jsonrpc::Id::Num(n) => Err(Error::InvalidRequestId(expected_id.into(), (*n).into())), + jsonrpc::Id::Str(s) => Err(Error::InvalidRequestId(expected_id.into(), s.to_string().into())), + jsonrpc::Id::Null => Err(Error::InvalidRequestId(expected_id.into(), JsonValue::Null)), } } } diff --git a/src/client/http/mod.rs b/src/client/http/mod.rs index 44aa5cac37..acc3367dd8 100644 --- a/src/client/http/mod.rs +++ b/src/client/http/mod.rs @@ -1,7 +1,5 @@ mod client; -mod raw; mod transport; pub use client::Client; -pub use raw::RawClient; pub use transport::HttpTransportClient; diff --git a/src/client/http/raw.rs b/src/client/http/raw.rs deleted file mode 100644 index e2086f2449..0000000000 --- a/src/client/http/raw.rs +++ /dev/null @@ -1,339 +0,0 @@ -// Copyright 2019 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any -// person obtaining a copy of this software and associated -// documentation files (the "Software"), to deal in the -// Software without restriction, including without -// limitation the rights to use, copy, modify, merge, -// publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software -// is furnished to do so, subject to the following -// conditions: -// -// The above copyright notice and this permission notice -// shall be included in all copies or substantial portions -// of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Performing JSON-RPC requests. -//! -//! The [`RawClient`] struct wraps around a [`TransportClient`] and handles the higher-level JSON-RPC logic -//! on top of it. In order to build a [`RawClient`], you need to pass to it an implementation of -//! [`TransportClient`]. -//! -//! Once created, a [`RawClient`] can be used to send out notifications, requests, and subscription -//! requests to the server. Request identifiers are automatically assigned by the client. -//! -//! # Notifications -//! -//! **Notifications** are one-shot messages to the server that don't expect any response. They can -//! be sent using the [`send_notification`](RawClient::send_notification) method. -//! -//! # Requests -//! -//! **Requests** are messages that expect an answer. A request can be sent using the -//! [`start_request`](RawClient::start_request) method. This method returns a [`RawClientRequestId`] that -//! is used to identify this request within the internals of the [`RawClient`]. You can then call -//! [`request_by_id`](RawClient::request_by_id) to wait for a response from a server about a specific -//! request. You are however encouraged to use [`next_event`](RawClient::next_event) instead, which -//! produces a [`RawClientEvent`] indicating you what the server did. -//! -//! > **Note**: At the time of writing, the [`RawClient`] never uses batches and only sends out -//! > individual requests. -//! -//! # Subscriptions -//! -//! **Subscriptions** are similar to requests, except that we stay connected to the server -//! after the request ended, and expect notifications back from it. The [`RawClient`] will notify -//! you about subscriptions through the [`next_event`](RawClient::next_event) method and the -//! [`RawClientEvent`] enum. -//! -//! > **Note**: The [`request_by_id`](RawClient::request_by_id) method will buffer up incoming -//! > notifications up to a certain limit. Once this limit is reached, new notifications -//! > will be silently discarded. This behaviour exists to prevent DoS attacks from -//! > the server. If you want to be certain to not miss any notification, please only -//! > use the [`next_event`](RawClient::next_event) method. -//! - -use crate::client::http::transport::{HttpTransportClient, RequestError}; -use crate::types::jsonrpc; - -use alloc::{collections::VecDeque, string::String}; -use core::{fmt, future::Future}; -use hashbrown::HashSet; - -/// Wraps around a [`TransportClient`](crate::transport::TransportClient) and analyzes everything -/// correctly. -/// -/// See [the module root documentation](crate::client) for more information. -pub struct RawClient { - /// Inner raw client. - inner: HttpTransportClient, - - /// Id to assign to the next request. We always assign linearly-increasing numeric keys. - next_request_id: RawClientRequestId, - - /// List of requests and subscription requests that have been sent out and that are waiting - /// for a response. - /// - // NOTE: `fnv - fowler-Noll-Vo hash function`, more efficient for smaller hash keys. - requests: HashSet, - - /// Queue of pending events to return from [`RawClient::next_event`]. - // TODO: call shrink_to from time to time; see https://github.com/rust-lang/rust/issues/56431 - events_queue: VecDeque, - - /// Maximum allowed size of [`RawClient::events_queue`]. - /// - /// If this size is reached, elements can still be pushed to the queue if they are critical, - /// but will be discarded if they are not. - // TODO: make this configurable? note: if this is configurable, it should always be >= 1 - events_queue_max_size: usize, -} - -/// Unique identifier of a request within a [`RawClient`]. -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct RawClientRequestId(u64); - -/// Event returned by [`RawClient::next_event`]. -#[derive(Debug)] -pub enum RawClientEvent { - /// A request has received a response. - Response { - /// Identifier of the request. Can be matched with the value that [`RawClient::start_request`] - /// has returned. - request_id: RawClientRequestId, - /// The response itself. - result: Result, - }, -} - -/// Error that can happen during a request. -#[derive(Debug)] -pub enum RawClientError { - /// Error in the raw client. - Inner(RequestError), - /// RawServer returned an error for our request. - RequestError(jsonrpc::Error), - /// RawServer has sent back a response containing an unknown request ID. - UnknownRequestId, - /// RawServer has sent back a response containing a null request ID. - NullRequestId, -} - -impl RawClient { - /// Initializes a new `RawClient` using the given raw client as backend. - pub fn new(inner: HttpTransportClient) -> Self { - RawClient { - inner, - next_request_id: RawClientRequestId(0), - requests: HashSet::default(), - events_queue: VecDeque::with_capacity(16), - events_queue_max_size: 64, - } - } -} - -impl RawClient { - /// Sends a notification to the server. The notification doesn't need any response. - /// - /// This asynchronous function finishes when the notification has finished being sent. - pub async fn send_notification( - &mut self, - method: impl Into, - params: impl Into, - ) -> Result<(), RequestError> { - let request = jsonrpc::Request::Single(jsonrpc::Call::Notification(jsonrpc::Notification { - jsonrpc: jsonrpc::Version::V2, - method: method.into(), - params: params.into(), - })); - - self.inner.send_request(request).await?; - Ok(()) - } - - /// Starts a request. - /// - /// This asynchronous function finishes when the request has been sent to the server. The - /// request is added to the [`RawClient`]. You must then call [`next_event`](RawClient::next_event) - /// until you get a response. - pub async fn start_request( - &mut self, - method: impl Into, - params: impl Into, - ) -> Result { - loop { - let id = self.next_request_id; - self.next_request_id.0 = self.next_request_id.0.wrapping_add(1); - - if self.requests.contains(&id) { - continue; - } else { - self.requests.insert(id); - } - - let request = jsonrpc::Request::Single(jsonrpc::Call::MethodCall(jsonrpc::MethodCall { - jsonrpc: jsonrpc::Version::V2, - method: method.into(), - params: params.into(), - id: jsonrpc::Id::Num(id.0), - })); - - // Note that in case of an error, we "lose" the request id (as in, it will never be - // used). This isn't a problem, however. - self.inner.send_request(request).await?; - - break Ok(id); - } - } - - /// Waits until the client receives a message from the server. - /// - /// If this function returns an `Err`, it indicates a connectivity issue with the server or a - /// low-level protocol error, and not a request that has failed to be answered. - pub async fn next_event(&mut self) -> Result { - loop { - if let Some(event) = self.events_queue.pop_front() { - return Ok(event); - } - - self.event_step().await?; - } - } - - /// Returns a `Future` that resolves when the server sends back a response for the given - /// request. - /// - /// Returns `None` if the request identifier is invalid, or if the request is a subscription. - /// - /// > **Note**: While this function is waiting, all the other responses and pubsub events - /// > returned by the server will be buffered up to a certain limit. Once this - /// > limit is reached, server notifications will be discarded. If you want to be - /// > sure to catch all notifications, use [`next_event`](RawClient::next_event) - /// > instead. - pub fn request_by_id<'a>( - &'a mut self, - rq_id: RawClientRequestId, - ) -> Option> + 'a> { - // First, let's check whether the request ID is valid. - if !self.requests.contains(&rq_id) { - return None; - } - - Some(async move { - let mut events_queue_loopkup = 0; - - loop { - while events_queue_loopkup < self.events_queue.len() { - match &self.events_queue[events_queue_loopkup] { - RawClientEvent::Response { request_id, .. } if *request_id == rq_id => { - return match self.events_queue.remove(events_queue_loopkup) { - Some(RawClientEvent::Response { result, .. }) => { - result.map_err(RawClientError::RequestError) - } - _ => unreachable!(), - } - } - _ => {} - } - - events_queue_loopkup += 1; - } - - self.event_step().await?; - } - }) - } - - /// Waits for one server message and processes it by updating the state of `self`. - /// - /// If the events queue is full (see [`RawClient::events_queue_max_size`]), then responses to - /// requests will still be pushed to the queue, but notifications will be discarded. - /// - /// Check the content of [`events_queue`](RawClient::events_queue) afterwards for events to - /// dispatch to the user. - async fn event_step(&mut self) -> Result<(), RawClientError> { - let result = self.inner.next_response().await.map_err(RawClientError::Inner)?; - - match result { - jsonrpc::Response::Single(rp) => self.process_response(rp)?, - jsonrpc::Response::Batch(rps) => { - for rp in rps { - // TODO: if an error happens, we throw away the entire batch - self.process_response(rp)?; - } - } - // Server MUST NOT reply to a Notification. - jsonrpc::Response::Notif(_notif) => unreachable!(), - } - - Ok(()) - } - - /// Processes the response obtained from the server. Updates the internal state of `self` to - /// account for it. - /// - /// Regards all `response IDs` that is not a number as error because only numbers are used as - /// `id` in this library even though that `JSONRPC 2.0` allows String and Null as valid IDs. - fn process_response(&mut self, response: jsonrpc::Output) -> Result<(), RawClientError> { - let request_id = match response.id() { - jsonrpc::Id::Num(n) => RawClientRequestId(*n), - jsonrpc::Id::Str(s) => { - log::warn!("Server responded with an invalid request id: {:?}", s); - return Err(RawClientError::UnknownRequestId); - } - jsonrpc::Id::Null => { - log::warn!("Server responded with a null request id"); - return Err(RawClientError::NullRequestId); - } - }; - - // Find the request that this answered. - if self.requests.remove(&request_id) { - self.events_queue.push_back(RawClientEvent::Response { result: response.into(), request_id }); - } else { - log::warn!("Server responsed with an invalid request id: {:?}", request_id); - return Err(RawClientError::UnknownRequestId); - } - - Ok(()) - } -} - -impl fmt::Debug for RawClient { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("RawClient").field("inner", &self.inner).field("pending_requests", &self.requests).finish() - } -} - -impl std::error::Error for RawClientError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - RawClientError::Inner(err) => Some(err), - RawClientError::RequestError(ref err) => Some(err), - RawClientError::UnknownRequestId => None, - RawClientError::NullRequestId => None, - } - } -} - -impl fmt::Display for RawClientError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - RawClientError::Inner(err) => write!(f, "Error in the raw client: {}", err), - RawClientError::RequestError(ref err) => write!(f, "Server returned error: {}", err), - RawClientError::UnknownRequestId => write!(f, "Server responded with an unknown request ID"), - RawClientError::NullRequestId => write!(f, "Server responded with a null request ID"), - } - } -} diff --git a/src/client/http/transport.rs b/src/client/http/transport.rs index 5709cc7e21..76dd01149a 100644 --- a/src/client/http/transport.rs +++ b/src/client/http/transport.rs @@ -8,133 +8,73 @@ // In order to perform a request, we send this request to the background thread through a channel // and wait for an answer to come back. // -// Addtionally, despite the fact that hyper is capable of performing requests to multiple different +// Additionally, despite the fact that hyper is capable of performing requests to multiple different // servers through the same `hyper::Client`, we don't use that feature on purpose. The reason is // that we need to be guaranteed that hyper doesn't re-use an existing connection if we ever reset // the JSON-RPC request id to a value that might have already been used. -use std::pin::Pin; -use std::{fmt, io, thread}; - use crate::types::jsonrpc; - -use futures::{channel::mpsc, prelude::*}; +use std::fmt; use thiserror::Error; -const REQUEST_PARALLELISM: usize = 4; - /// Implementation of a raw client for HTTP requests. pub struct HttpTransportClient { - /// Sender that sends requests to the background task. - requests_tx: mpsc::Sender, - /// URL of the server to connect to. - url: String, - /// Responses receiver. - responses_rx: mpsc::UnboundedReceiver, hyper::Error>>, - /// Responses transmitter - responses_tx: mpsc::UnboundedSender, hyper::Error>>, -} - -/// Message transmitted from the foreground task to the background. -struct FrontToBack { - /// Request that the background task should perform. - request: hyper::Request, - /// Channel to send back to the response. - send_back: mpsc::UnboundedSender, hyper::Error>>, + /// URI to connect to. + uri: String, + /// Hyper to client, + client: hyper::Client, } impl HttpTransportClient { /// Initializes a new HTTP client. // TODO: better type for target - pub fn new(target: &str) -> Self { - let (requests_tx, requests_rx) = mpsc::channel::(REQUEST_PARALLELISM); - - // Because hyper can only be polled through tokio, we spawn it in a background thread. - thread::Builder::new() - .name("jsonrpsee-hyper-client".to_string()) - .spawn(move || { - let client = hyper::Client::new(); - background_thread(requests_rx, move |rq| { - // cloning Hyper client = cloning references - let client = client.clone(); - async move { - let _ = rq.send_back.unbounded_send(client.request(rq.request).await); - } - }) - }) - .unwrap(); - - let (responses_tx, responses_rx) = futures::channel::mpsc::unbounded(); - HttpTransportClient { requests_tx, url: target.to_owned(), responses_tx, responses_rx } + pub fn new(uri: &str) -> Self { + HttpTransportClient { client: hyper::Client::new(), uri: uri.to_owned() } } - /// Send request to the target - pub fn send_request<'s>( - &'s mut self, - request: jsonrpc::Request, - ) -> Pin> + Send + 's>> { - log::debug!("send: {}", jsonrpc::to_string(&request).expect("request valid JSON; qed")); - let mut requests_tx = self.requests_tx.clone(); - - let request = jsonrpc::to_vec(&request).map(|body| { - hyper::Request::post(&self.url) - .header(hyper::header::CONTENT_TYPE, hyper::header::HeaderValue::from_static("application/json")) - .body(From::from(body)) - .expect("Uri and request headers are valid; qed") // TODO: not necessarily true for URL here - }); - - Box::pin(async move { - let message = FrontToBack { - request: request.map_err(RequestError::Serialization)?, - send_back: self.responses_tx.clone(), - }; - - if requests_tx.send(message).await.is_err() { - log::error!("JSONRPC http client background thread has shut down"); - return Err(RequestError::Http(Box::new(io::Error::new( - io::ErrorKind::Other, - "background thread is down".to_string(), - )))); - } - - Ok(()) - }) + /// Send request. + pub async fn send_request(&self, request: jsonrpc::Request) -> Result, RequestError> { + let body = jsonrpc::to_vec(&request).map_err(|e| RequestError::Serialization(e))?; + + let req = hyper::Request::post(&self.uri) + .header(hyper::header::CONTENT_TYPE, hyper::header::HeaderValue::from_static("application/json")) + .body(From::from(body)) + .expect("Uri and request headers are valid; qed"); + + let response = match self.client.request(req).await { + Ok(r) => r, + Err(err) => return Err(RequestError::Http(Box::new(err))), + }; + + if !response.status().is_success() { + return Err(RequestError::RequestFailure { status_code: response.status().into() }); + } + Ok(response) } - /// Waits for the next response. - pub fn next_response<'s>( - &'s mut self, - ) -> Pin> + Send + 's>> { - Box::pin(async move { - let hyper_response = match self.responses_rx.next().await { - Some(Ok(r)) => r, - Some(Err(err)) => return Err(RequestError::Http(Box::new(err))), - None => { - log::error!("JSONRPC http client background thread has shut down"); - return Err(RequestError::Http(Box::new(io::Error::new( - io::ErrorKind::Other, - "background thread is down".to_string(), - )))); - } - }; - - if !hyper_response.status().is_success() { - return Err(RequestError::RequestFailure { status_code: hyper_response.status().into() }); - } - - // Note that we don't check the Content-Type of the request. This is deemed - // unnecessary, as a parsing error while happen anyway. - - // TODO: enforce a maximum size here - let body = hyper::body::to_bytes(hyper_response.into_body()) - .await - .map_err(|err| RequestError::Http(Box::new(err)))?; - - // TODO: use Response::from_json - let as_json: jsonrpc::Response = jsonrpc::from_slice(&body).map_err(RequestError::ParseError)?; - log::debug!("recv: {}", jsonrpc::to_string(&as_json).expect("request valid JSON; qed")); - Ok(as_json) - }) + /// Send notification. + pub async fn send_notification(&self, request: jsonrpc::Request) -> Result<(), RequestError> { + let _response = self.send_request(request).await?; + Ok(()) + } + + /// Send request and wait for response. + pub async fn send_request_and_wait_for_response( + &self, + request: jsonrpc::Request, + ) -> Result { + let response = self.send_request(request).await?; + + // TODO: enforce a maximum size here + let body = + hyper::body::to_bytes(response.into_body()).await.map_err(|err| RequestError::Http(Box::new(err)))?; + + // Note that we don't check the Content-Type of the request. This is deemed + // unnecessary, as a parsing error while happen anyway. + // TODO: use Response::from_json + let as_json: jsonrpc::Response = jsonrpc::from_slice(&body).map_err(RequestError::ParseError)?; + log::debug!("recv: {}", jsonrpc::to_string(&as_json).expect("request valid JSON; qed")); + Ok(as_json) } } @@ -172,66 +112,8 @@ pub enum RequestError { ParseError(#[source] serde_json::error::Error), } -/// Function that runs in a background thread. -fn background_thread>( - requests_rx: mpsc::Receiver, - process_request: impl Fn(T) -> ProcessRequest, -) { - let mut runtime = match tokio::runtime::Builder::new().basic_scheduler().enable_all().build() { - Ok(r) => r, - Err(err) => { - // Ideally, we would try to initialize the tokio runtime in the main thread then move - // it here. That however isn't possible. If we fail to initialize the runtime, the only - // thing we can do is print an error and shut down the background thread. - // Initialization failures should be almost non-existant anyway, so this isn't a big - // deal. - log::error!("Failed to initialize tokio runtime: {:?}", err); - return; - } - }; - - // Running until the channel has been closed, and all requests have been completed. - runtime.block_on(requests_rx.for_each_concurrent(Some(REQUEST_PARALLELISM), process_request)); -} - #[cfg(test)] mod tests { use super::*; use futures::channel::oneshot; - - #[test] - fn background_thread_is_able_to_complete_requests() { - // start background thread that returns square(passed_value) after signal - // from 'main' thread is received - let (mut requests_tx, requests_rx) = mpsc::channel(4); - let background_thread = thread::spawn(move || { - background_thread( - requests_rx, - move |(send_when, send_back, value): (oneshot::Receiver<()>, oneshot::Sender, u32)| async move { - send_when.await.unwrap(); - send_back.send(value * value).unwrap(); - }, - ) - }); - - // send two requests - there'll be two simultaneous active requests, waiting for - // main thread' signals - let mut pool = futures::executor::LocalPool::new(); - let (send_when_tx1, send_when_rx1) = oneshot::channel(); - let (send_when_tx2, send_when_rx2) = oneshot::channel(); - let (send_back_tx1, send_back_rx1) = oneshot::channel(); - let (send_back_tx2, send_back_rx2) = oneshot::channel(); - pool.run_until(requests_tx.send((send_when_rx1, send_back_tx1, 32))).unwrap(); - pool.run_until(requests_tx.send((send_when_rx2, send_back_tx2, 1024))).unwrap(); - - // send both signals and wait for responses - send_when_tx1.send(()).unwrap(); - send_when_tx2.send(()).unwrap(); - assert_eq!(pool.run_until(send_back_rx1), Ok(32 * 32)); - assert_eq!(pool.run_until(send_back_rx2), Ok(1024 * 1024)); - - // drop requests sender, asking background thread to exit gently - drop(requests_tx); - background_thread.join().unwrap(); - } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 8d6a2c95b8..08950b1595 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -7,6 +7,6 @@ mod ws; // Unless we want the user to have to possibility to not spawn a background thread to // handle responses. #[cfg(feature = "http")] -pub use http::{Client as HttpClient, HttpTransportClient, RawClient as HttpRawClient}; +pub use http::{Client as HttpClient, HttpTransportClient}; #[cfg(feature = "ws")] pub use ws::{Client as WsClient, RawClient as RawWsClient, Subscription as WsSubscription, WsTransportClient}; diff --git a/src/types/client.rs b/src/types/client.rs index 4cb5c30cd9..02f27273cf 100644 --- a/src/types/client.rs +++ b/src/types/client.rs @@ -1,4 +1,4 @@ -use crate::types::jsonrpc; +use crate::types::jsonrpc::{self, JsonValue}; /// Error produced by the client. #[derive(Debug, thiserror::Error)] @@ -19,4 +19,10 @@ pub enum Error { /// Failed to parse the data that the server sent back to us. #[error("Parse error: {0}")] ParseError(#[source] jsonrpc::ParseError), + #[error("Invalid ID in response; expected: {0}, got: {1}")] + /// Invalid id in response to a request. + InvalidRequestId(JsonValue, JsonValue), + #[error("Custom error: {0}")] + /// Custom error. + Custom(String), } From 3581ccee51fdd984edb5553d3860638b16fbfb0e Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 6 Nov 2020 13:24:39 +0100 Subject: [PATCH 02/26] ci(benches): sync and concurrent roundtrips Improve benchmarks to take concurrent requests into account. --- benches/benches.rs | 81 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 60 insertions(+), 21 deletions(-) diff --git a/benches/benches.rs b/benches/benches.rs index 165b54cf9e..aafd44e754 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -6,8 +6,9 @@ use jsonrpsee::http::HttpServer; use jsonrpsee::types::jsonrpc::{JsonValue, Params}; use jsonrpsee::ws::WsServer; use std::net::SocketAddr; +use std::sync::Arc; -criterion_group!(benches, http, ws); +criterion_group!(benches, http_requests, websocket_requests); criterion_main!(benches); async fn http_server(tx: Sender) { @@ -30,36 +31,74 @@ async fn ws_server(tx: Sender) { } } -pub fn http(c: &mut criterion::Criterion) { - c.bench_function("http 100 requests", |b| { - let (tx_addr, rx_addr) = oneshot::channel::(); - async_std::task::spawn(http_server(tx_addr)); - let server_addr = block_on(rx_addr).unwrap(); - let client = HttpClient::new(&format!("http://{}", server_addr)); +pub fn http_requests(c: &mut criterion::Criterion) { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let (tx_addr, rx_addr) = oneshot::channel::(); + async_std::task::spawn(http_server(tx_addr)); + let server_addr = block_on(rx_addr).unwrap(); + let client = Arc::new(HttpClient::new(&format!("http://{}", server_addr))); + c.bench_function("synchronous http round trip", |b| { b.iter(|| { - block_on(async { - for _ in 0..100 { - let _: JsonValue = black_box(client.request("say_hello", Params::None).await.unwrap()); - } + rt.block_on(async { + let _: JsonValue = black_box(client.request("say_hello", Params::None).await.unwrap()); }) }) }); + + c.bench_function_over_inputs( + "concurrent http round trip", + move |b: &mut Bencher, size: &usize| { + b.iter(|| { + let mut tasks = Vec::with_capacity(size * 10); + for _ in 0..*size { + let client_rc = client.clone(); + let task = rt.spawn(async move { + let _: Result = black_box(client_rc.request("say_hello", Params::None)).await; + }); + tasks.push(task); + } + for task in tasks { + rt.block_on(task).unwrap(); + } + }) + }, + vec![2, 4, 16, 32, 64, 128], + ); } -pub fn ws(c: &mut criterion::Criterion) { - c.bench_function("ws 100 request", |b| { - let (tx_addr, rx_addr) = oneshot::channel::(); - async_std::task::spawn(ws_server(tx_addr)); - let server_addr = block_on(rx_addr).unwrap(); - let client = block_on(WsClient::new(&format!("ws://{}", server_addr))).unwrap(); +pub fn websocket_requests(c: &mut criterion::Criterion) { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let (tx_addr, rx_addr) = oneshot::channel::(); + async_std::task::spawn(ws_server(tx_addr)); + let server_addr = block_on(rx_addr).unwrap(); + let client = Arc::new(block_on(WsClient::new(&format!("ws://{}", server_addr))).unwrap()); + c.bench_function("synchronous WebSocket round trip", |b| { b.iter(|| { - block_on(async { - for _ in 0..100 { - let _: JsonValue = black_box(client.request("say_hello", Params::None).await.unwrap()); - } + rt.block_on(async { + let _: JsonValue = black_box(client.request("say_hello", Params::None).await.unwrap()); }) }) }); + + c.bench_function_over_inputs( + "concurrent WebSocket round trip", + move |b: &mut Bencher, size: &usize| { + b.iter(|| { + let mut tasks = Vec::with_capacity(size * 10); + for _ in 0..*size { + let client_rc = client.clone(); + let task = rt.spawn(async move { + let _: Result = black_box(client_rc.request("say_hello", Params::None)).await; + }); + tasks.push(task); + } + for task in tasks { + rt.block_on(task).unwrap(); + } + }) + }, + vec![2, 4, 16, 32, 64, 128], + ); } From b278010d6220ec455a19b9eab832f880352946d2 Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 6 Nov 2020 13:24:39 +0100 Subject: [PATCH 03/26] ci(benches): sync and concurrent roundtrips Improve benchmarks to take concurrent requests into account. --- benches/benches.rs | 81 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 60 insertions(+), 21 deletions(-) diff --git a/benches/benches.rs b/benches/benches.rs index 64e4899b53..6fe384512c 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -6,8 +6,9 @@ use jsonrpsee::http::HttpServer; use jsonrpsee::types::jsonrpc::{JsonValue, Params}; use jsonrpsee::ws::WsServer; use std::net::SocketAddr; +use std::sync::Arc; -criterion_group!(benches, http, ws); +criterion_group!(benches, http_requests, websocket_requests); criterion_main!(benches); async fn http_server(tx: Sender) { @@ -30,36 +31,74 @@ async fn ws_server(tx: Sender) { } } -pub fn http(c: &mut criterion::Criterion) { - c.bench_function("http 100 requests", |b| { - let (tx_addr, rx_addr) = oneshot::channel::(); - async_std::task::spawn(http_server(tx_addr)); - let server_addr = block_on(rx_addr).unwrap(); - let mut client = HttpClient::new(&format!("http://{}", server_addr)); +pub fn http_requests(c: &mut criterion::Criterion) { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let (tx_addr, rx_addr) = oneshot::channel::(); + async_std::task::spawn(http_server(tx_addr)); + let server_addr = block_on(rx_addr).unwrap(); + let client = Arc::new(HttpClient::new(&format!("http://{}", server_addr))); + c.bench_function("synchronous http round trip", |b| { b.iter(|| { - block_on(async { - for _ in 0..100 { - let _: JsonValue = black_box(client.request("say_hello", Params::None).await.unwrap()); - } + rt.block_on(async { + let _: JsonValue = black_box(client.request("say_hello", Params::None).await.unwrap()); }) }) }); + + c.bench_function_over_inputs( + "concurrent http round trip", + move |b: &mut Bencher, size: &usize| { + b.iter(|| { + let mut tasks = Vec::with_capacity(size * 10); + for _ in 0..*size { + let client_rc = client.clone(); + let task = rt.spawn(async move { + let _: Result = black_box(client_rc.request("say_hello", Params::None)).await; + }); + tasks.push(task); + } + for task in tasks { + rt.block_on(task).unwrap(); + } + }) + }, + vec![2, 4, 8, 16, 32, 64, 128], + ); } -pub fn ws(c: &mut criterion::Criterion) { - c.bench_function("ws 100 request", |b| { - let (tx_addr, rx_addr) = oneshot::channel::(); - async_std::task::spawn(ws_server(tx_addr)); - let server_addr = block_on(rx_addr).unwrap(); - let client = block_on(WsClient::new(&format!("ws://{}", server_addr))).unwrap(); +pub fn websocket_requests(c: &mut criterion::Criterion) { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let (tx_addr, rx_addr) = oneshot::channel::(); + async_std::task::spawn(ws_server(tx_addr)); + let server_addr = block_on(rx_addr).unwrap(); + let client = Arc::new(block_on(WsClient::new(&format!("ws://{}", server_addr))).unwrap()); + c.bench_function("synchronous WebSocket round trip", |b| { b.iter(|| { - block_on(async { - for _ in 0..100 { - let _: JsonValue = black_box(client.request("say_hello", Params::None).await.unwrap()); - } + rt.block_on(async { + let _: JsonValue = black_box(client.request("say_hello", Params::None).await.unwrap()); }) }) }); + + c.bench_function_over_inputs( + "concurrent WebSocket round trip", + move |b: &mut Bencher, size: &usize| { + b.iter(|| { + let mut tasks = Vec::with_capacity(size * 10); + for _ in 0..*size { + let client_rc = client.clone(); + let task = rt.spawn(async move { + let _: Result = black_box(client_rc.request("say_hello", Params::None)).await; + }); + tasks.push(task); + } + for task in tasks { + rt.block_on(task).unwrap(); + } + }) + }, + vec![2, 4, 8, 16, 32, 64, 128], + ); } From 10cfe8c7dff49aa7ff6ec7b1b8eab0bd500e40c2 Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 6 Nov 2020 18:17:39 +0100 Subject: [PATCH 04/26] fix(nits) --- benches/benches.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/benches/benches.rs b/benches/benches.rs index 6fe384512c..891c2cbd44 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -50,7 +50,7 @@ pub fn http_requests(c: &mut criterion::Criterion) { "concurrent http round trip", move |b: &mut Bencher, size: &usize| { b.iter(|| { - let mut tasks = Vec::with_capacity(size * 10); + let mut tasks = Vec::new(); for _ in 0..*size { let client_rc = client.clone(); let task = rt.spawn(async move { @@ -86,7 +86,7 @@ pub fn websocket_requests(c: &mut criterion::Criterion) { "concurrent WebSocket round trip", move |b: &mut Bencher, size: &usize| { b.iter(|| { - let mut tasks = Vec::with_capacity(size * 10); + let mut tasks = Vec::new(); for _ in 0..*size { let client_rc = client.clone(); let task = rt.spawn(async move { @@ -99,6 +99,7 @@ pub fn websocket_requests(c: &mut criterion::Criterion) { } }) }, - vec![2, 4, 8, 16, 32, 64, 128], + // TODO(niklasad1): investigate why it only works to 8 concurrent requests. + vec![2, 4, 8], ); } From cf12a2f0906dd7de8f80bd292b4185ff8087e334 Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 6 Nov 2020 21:34:03 +0100 Subject: [PATCH 05/26] feat(http client): limit max request body size --- examples/http.rs | 2 +- src/client/http/client.rs | 30 +++++++++++++++------- src/client/http/mod.rs | 2 +- src/client/http/transport.rs | 49 ++++++++++++++++++------------------ src/client/mod.rs | 2 +- src/http/raw/tests.rs | 10 +++++--- 6 files changed, 56 insertions(+), 39 deletions(-) diff --git a/examples/http.rs b/examples/http.rs index 3b8528e59e..518c0cb1ee 100644 --- a/examples/http.rs +++ b/examples/http.rs @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box> { server_started_rx.await?; - let mut client = HttpClient::new(SERVER_URI); + let client = HttpClient::new(SERVER_URI, Default::default()); let response: Result = client.request("say_hello", Params::None).await; println!("r: {:?}", response); diff --git a/src/client/http/client.rs b/src/client/http/client.rs index bd78fec75d..e6e9854517 100644 --- a/src/client/http/client.rs +++ b/src/client/http/client.rs @@ -1,22 +1,34 @@ use crate::client::http::transport::HttpTransportClient; use crate::types::client::Error; use crate::types::jsonrpc::{self, JsonValue}; - use std::sync::atomic::{AtomicU64, Ordering}; -/// Client that wraps a `RawClient` where the `RawClient` is spawned in a background worker tasks. -/// -/// The communication is performed via a `mpsc` channel where the `Client` acts as simple frontend -/// and just passes requests along to the backend (worker thread) -pub struct Client { +/// Default maximium request body size (10 MB). +const DEFAULT_MAX_BODY_SIZE: usize = 10 * 1024 * 1024; + +/// HTTP configuration. +#[derive(Copy, Clone)] +pub struct HttpConfig { + /// Maximum request body size in bytes. + pub max_request_body_size: usize, +} + +/// HTTP Client. +pub struct HttpClient { transport: HttpTransportClient, request_id: AtomicU64, } -impl Client { +impl Default for HttpConfig { + fn default() -> Self { + Self { max_request_body_size: DEFAULT_MAX_BODY_SIZE } + } +} + +impl HttpClient { /// Create a client to connect to the server at address `endpoint` - pub fn new(endpoint: &str) -> Self { - let transport = HttpTransportClient::new(endpoint); + pub fn new(endpoint: &str, config: HttpConfig) -> Self { + let transport = HttpTransportClient::new(endpoint, config.max_request_body_size); Self { transport, request_id: AtomicU64::new(0) } } diff --git a/src/client/http/mod.rs b/src/client/http/mod.rs index acc3367dd8..cdb8c47e10 100644 --- a/src/client/http/mod.rs +++ b/src/client/http/mod.rs @@ -1,5 +1,5 @@ mod client; mod transport; -pub use client::Client; +pub use client::{HttpClient, HttpConfig}; pub use transport::HttpTransportClient; diff --git a/src/client/http/transport.rs b/src/client/http/transport.rs index 76dd01149a..1d6ce03e09 100644 --- a/src/client/http/transport.rs +++ b/src/client/http/transport.rs @@ -1,41 +1,42 @@ // Implementation note: hyper's API is not adapted to async/await at all, and there's // unfortunately a lot of boilerplate here that could be removed once/if it gets reworked. // -// In particular, hyper can only be polled by tokio, but we don't want users to have to suffer -// from this restriction. We therefore spawn a background thread dedicated to running the tokio -// runtime. -// -// In order to perform a request, we send this request to the background thread through a channel -// and wait for an answer to come back. -// // Additionally, despite the fact that hyper is capable of performing requests to multiple different // servers through the same `hyper::Client`, we don't use that feature on purpose. The reason is // that we need to be guaranteed that hyper doesn't re-use an existing connection if we ever reset // the JSON-RPC request id to a value that might have already been used. use crate::types::jsonrpc; -use std::fmt; +use futures::StreamExt; use thiserror::Error; /// Implementation of a raw client for HTTP requests. +#[derive(Debug, Clone)] pub struct HttpTransportClient { /// URI to connect to. uri: String, /// Hyper to client, client: hyper::Client, + /// Max request body size + max_request_body_size: usize, } impl HttpTransportClient { /// Initializes a new HTTP client. // TODO: better type for target - pub fn new(uri: &str) -> Self { - HttpTransportClient { client: hyper::Client::new(), uri: uri.to_owned() } + pub fn new(uri: &str, max_request_body_size: usize) -> Self { + HttpTransportClient { client: hyper::Client::new(), uri: uri.to_owned(), max_request_body_size } } /// Send request. pub async fn send_request(&self, request: jsonrpc::Request) -> Result, RequestError> { + log::debug!("send: {}", jsonrpc::to_string(&request).expect("request valid JSON; qed")); let body = jsonrpc::to_vec(&request).map_err(|e| RequestError::Serialization(e))?; + if body.len() > self.max_request_body_size { + return Err(RequestError::RequestTooLarge); + } + let req = hyper::Request::post(&self.uri) .header(hyper::header::CONTENT_TYPE, hyper::header::HeaderValue::from_static("application/json")) .body(From::from(body)) @@ -49,6 +50,7 @@ impl HttpTransportClient { if !response.status().is_success() { return Err(RequestError::RequestFailure { status_code: response.status().into() }); } + Ok(response) } @@ -64,10 +66,17 @@ impl HttpTransportClient { request: jsonrpc::Request, ) -> Result { let response = self.send_request(request).await?; + let mut body_fut: hyper::Body = response.into_body(); - // TODO: enforce a maximum size here - let body = - hyper::body::to_bytes(response.into_body()).await.map_err(|err| RequestError::Http(Box::new(err)))?; + let mut body = Vec::new(); + + while let Some(chunk) = body_fut.next().await { + let chunk = chunk.map_err(|e| RequestError::Http(Box::new(e)))?; + if chunk.len() + body.len() > self.max_request_body_size { + return Err(RequestError::RequestTooLarge); + } + body.extend_from_slice(&chunk); + } // Note that we don't check the Content-Type of the request. This is deemed // unnecessary, as a parsing error while happen anyway. @@ -78,12 +87,6 @@ impl HttpTransportClient { } } -impl fmt::Debug for HttpTransportClient { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_tuple("HttpTransportClient").finish() - } -} - /// Error that can happen during a request. #[derive(Debug, Error)] pub enum RequestError { @@ -110,10 +113,8 @@ pub enum RequestError { /// Failed to parse the JSON returned by the server into a JSON-RPC response. #[error("error while parsing the response body")] ParseError(#[source] serde_json::error::Error), -} -#[cfg(test)] -mod tests { - use super::*; - use futures::channel::oneshot; + /// Request body too large. + #[error("The request body was to large")] + RequestTooLarge, } diff --git a/src/client/mod.rs b/src/client/mod.rs index 08950b1595..b4f9002ffa 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -7,6 +7,6 @@ mod ws; // Unless we want the user to have to possibility to not spawn a background thread to // handle responses. #[cfg(feature = "http")] -pub use http::{Client as HttpClient, HttpTransportClient}; +pub use http::{HttpClient, HttpTransportClient, HttpConfig}; #[cfg(feature = "ws")] pub use ws::{Client as WsClient, RawClient as RawWsClient, Subscription as WsSubscription, WsTransportClient}; diff --git a/src/http/raw/tests.rs b/src/http/raw/tests.rs index dbeb7750e2..efa73fd253 100644 --- a/src/http/raw/tests.rs +++ b/src/http/raw/tests.rs @@ -34,13 +34,15 @@ use serde_json::Value; async fn connection_context() -> (HttpTransportClient, HttpRawServer) { let server = HttpTransportServer::new(&"127.0.0.1:0".parse().unwrap()).await.unwrap(); let uri = format!("http://{}", server.local_addr()); - let client = HttpTransportClient::new(&uri); + let client = HttpTransportClient::new(&uri, Default::default()); (client, server.into()) } +// TODO(niklasad1): fix before eventual merge #[tokio::test] +#[ignore] async fn request_work() { - let (mut client, mut server) = connection_context().await; + let (client, mut server) = connection_context().await; tokio::spawn(async move { let call = Call::MethodCall(MethodCall { jsonrpc: Version::V2, @@ -64,9 +66,11 @@ async fn request_work() { } } +// TODO(niklasad1): fix before eventual merge #[tokio::test] +#[ignore] async fn notification_work() { - let (mut client, mut server) = connection_context().await; + let (client, mut server) = connection_context().await; tokio::spawn(async move { let n = Notification { jsonrpc: Version::V2, From 0b569f7cc92d1767cd789d2245cc890d216bde73 Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 9 Nov 2020 12:19:06 +0100 Subject: [PATCH 06/26] test(http transport): request limit test --- src/client/http/transport.rs | 24 ++++++++++++++++++++++++ src/client/mod.rs | 2 +- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/client/http/transport.rs b/src/client/http/transport.rs index 1d6ce03e09..9b58c669aa 100644 --- a/src/client/http/transport.rs +++ b/src/client/http/transport.rs @@ -118,3 +118,27 @@ pub enum RequestError { #[error("The request body was to large")] RequestTooLarge, } + +#[cfg(test)] +mod tests { + use super::{HttpTransportClient, RequestError}; + use crate::types::jsonrpc::{Call, Id, JsonValue, MethodCall, Params, Request, Version}; + + #[tokio::test] + async fn request_limit_works() { + let eighty_bytes_limit = 80; + let client = HttpTransportClient::new("http:://localhost:9933", eighty_bytes_limit); + assert_eq!(client.max_request_body_size, eighty_bytes_limit); + + let request = Request::Single(Call::MethodCall(MethodCall { + jsonrpc: Version::V2, + method: "request_larger_than_eightybytes".to_string(), + params: Params::None, + id: Id::Num(1), + })); + let bytes = serde_json::to_vec(&request).unwrap(); + assert_eq!(bytes.len(), 81); + let response = client.send_request(request).await.unwrap_err(); + assert!(matches!(response, RequestError::RequestTooLarge)); + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index b4f9002ffa..9fff256970 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -7,6 +7,6 @@ mod ws; // Unless we want the user to have to possibility to not spawn a background thread to // handle responses. #[cfg(feature = "http")] -pub use http::{HttpClient, HttpTransportClient, HttpConfig}; +pub use http::{HttpClient, HttpConfig, HttpTransportClient}; #[cfg(feature = "ws")] pub use ws::{Client as WsClient, RawClient as RawWsClient, Subscription as WsSubscription, WsTransportClient}; From 3e2827de7511d2aa47ee3284d92edb207ee9c180 Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 9 Nov 2020 19:00:05 +0100 Subject: [PATCH 07/26] test(http client): add tests. --- src/client/http/client.rs | 17 +++++--- src/client/http/mod.rs | 3 ++ src/client/http/tests.rs | 76 ++++++++++++++++++++++++++++++++++++ src/client/http/transport.rs | 2 +- src/types/client.rs | 22 +++++++++-- test-utils/Cargo.toml | 2 +- test-utils/src/helpers.rs | 46 ++++++++++++++++++++++ 7 files changed, 158 insertions(+), 10 deletions(-) create mode 100644 src/client/http/tests.rs diff --git a/src/client/http/client.rs b/src/client/http/client.rs index e6e9854517..3329e6d28d 100644 --- a/src/client/http/client.rs +++ b/src/client/http/client.rs @@ -1,9 +1,9 @@ use crate::client::http::transport::HttpTransportClient; -use crate::types::client::Error; +use crate::types::client::{Error, Mismatch}; use crate::types::jsonrpc::{self, JsonValue}; use std::sync::atomic::{AtomicU64, Ordering}; -/// Default maximium request body size (10 MB). +/// Default maximum request body size (10 MB). const DEFAULT_MAX_BODY_SIZE: usize = 10 * 1024 * 1024; /// HTTP configuration. @@ -53,6 +53,7 @@ impl HttpClient { method: impl Into, params: impl Into, ) -> Result { + // NOTE: `fetch_add` wraps on overflow which is intended. let id = self.request_id.fetch_add(1, Ordering::SeqCst); let request = jsonrpc::Request::Single(jsonrpc::Call::MethodCall(jsonrpc::MethodCall { jsonrpc: jsonrpc::Version::V2, @@ -89,9 +90,15 @@ impl HttpClient { let ret: Result = response.into(); ret.map_err(|e| Error::Request(e)) } - jsonrpc::Id::Num(n) => Err(Error::InvalidRequestId(expected_id.into(), (*n).into())), - jsonrpc::Id::Str(s) => Err(Error::InvalidRequestId(expected_id.into(), s.to_string().into())), - jsonrpc::Id::Null => Err(Error::InvalidRequestId(expected_id.into(), JsonValue::Null)), + jsonrpc::Id::Num(n) => { + Err(Error::InvalidRequestId(Mismatch { expected: expected_id.into(), got: (*n).into() })) + } + jsonrpc::Id::Str(s) => { + Err(Error::InvalidRequestId(Mismatch { expected: expected_id.into(), got: s.to_string().into() })) + } + jsonrpc::Id::Null => { + Err(Error::InvalidRequestId(Mismatch { expected: expected_id.into(), got: JsonValue::Null })) + } } } } diff --git a/src/client/http/mod.rs b/src/client/http/mod.rs index cdb8c47e10..31a7121236 100644 --- a/src/client/http/mod.rs +++ b/src/client/http/mod.rs @@ -1,5 +1,8 @@ mod client; mod transport; +#[cfg(test)] +mod tests; + pub use client::{HttpClient, HttpConfig}; pub use transport::HttpTransportClient; diff --git a/src/client/http/tests.rs b/src/client/http/tests.rs new file mode 100644 index 0000000000..9bd54894ac --- /dev/null +++ b/src/client/http/tests.rs @@ -0,0 +1,76 @@ +use crate::client::HttpClient; +use crate::types::client::Error; +use crate::types::jsonrpc::{self, ErrorCode, JsonValue, Params}; + +use jsonrpsee_test_utils::helpers::*; +use jsonrpsee_test_utils::types::Id; + +#[tokio::test] +async fn method_call_works() { + let result = run_request_with_response(ok_response("hello".into(), Id::Num(0))).await.unwrap(); + assert_eq!(JsonValue::String("hello".into()), result); +} + +#[tokio::test] +async fn notification_works() { + let server_addr = http_server_with_hardcoded_response(String::new()).await; + let uri = format!("http://{}", server_addr); + let client = HttpClient::new(&uri, Default::default()); + client + .notification("i_dont_care_about_the_response_because_the_server_should_not_respond", Params::None) + .await + .unwrap(); +} + +#[tokio::test] +async fn response_with_wrong_id() { + let err = run_request_with_response(ok_response("hello".into(), Id::Num(99))).await.unwrap_err(); + assert!(matches!(err, Error::InvalidRequestId(_mismatch))); +} + +#[tokio::test] +async fn response_method_not_found() { + let err = run_request_with_response(method_not_found(Id::Num(0))).await.unwrap_err(); + assert_jsonrpc_error_response(err, ErrorCode::MethodNotFound, METHOD_NOT_FOUND.into()); +} + +#[tokio::test] +async fn response_parse_error() { + let err = run_request_with_response(parse_error(Id::Num(0))).await.unwrap_err(); + assert_jsonrpc_error_response(err, ErrorCode::ParseError, PARSE_ERROR.into()); +} + +#[tokio::test] +async fn invalid_request_works() { + let err = run_request_with_response(invalid_request(Id::Num(0_u64))).await.unwrap_err(); + assert_jsonrpc_error_response(err, ErrorCode::InvalidRequest, INVALID_REQUEST.into()); +} + +#[tokio::test] +async fn invalid_params_works() { + let err = run_request_with_response(invalid_params(Id::Num(0_u64))).await.unwrap_err(); + assert_jsonrpc_error_response(err, ErrorCode::InvalidParams, INVALID_PARAMS.into()); +} + +#[tokio::test] +async fn internal_error_works() { + let err = run_request_with_response(internal_error(Id::Num(0_u64))).await.unwrap_err(); + assert_jsonrpc_error_response(err, ErrorCode::InternalError, INTERNAL_ERROR.into()); +} + +async fn run_request_with_response(response: String) -> Result { + let server_addr = http_server_with_hardcoded_response(response).await; + let uri = format!("http://{}", server_addr); + let client = HttpClient::new(&uri, Default::default()); + client.request("say_hello", Params::None).await +} + +fn assert_jsonrpc_error_response(response: Error, code: ErrorCode, message: String) { + let expected = jsonrpc::Error { code, message, data: None }; + match response { + Error::Request(err) => { + assert_eq!(err, expected); + } + e @ _ => panic!("Expected error: \"{}\", got: {:?}", expected, e), + }; +} diff --git a/src/client/http/transport.rs b/src/client/http/transport.rs index 9b58c669aa..608b0be7d5 100644 --- a/src/client/http/transport.rs +++ b/src/client/http/transport.rs @@ -122,7 +122,7 @@ pub enum RequestError { #[cfg(test)] mod tests { use super::{HttpTransportClient, RequestError}; - use crate::types::jsonrpc::{Call, Id, JsonValue, MethodCall, Params, Request, Version}; + use crate::types::jsonrpc::{Call, Id, MethodCall, Params, Request, Version}; #[tokio::test] async fn request_limit_works() { diff --git a/src/types/client.rs b/src/types/client.rs index 02f27273cf..57451c5f5c 100644 --- a/src/types/client.rs +++ b/src/types/client.rs @@ -1,4 +1,20 @@ use crate::types::jsonrpc::{self, JsonValue}; +use std::fmt; + +/// Mismatch of the expected value. +#[derive(Clone, Debug, PartialEq)] +pub struct Mismatch { + /// Expected value. + pub expected: T, + /// Actual value. + pub got: T, +} + +impl fmt::Display for Mismatch { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_fmt(format_args!("Expected: {}, Got: {}", self.expected, self.got)) + } +} /// Error produced by the client. #[derive(Debug, thiserror::Error)] @@ -11,7 +27,7 @@ pub enum Error { #[error("Server responded to our request with an error: {0:?}")] Request(#[source] jsonrpc::Error), /// Subscription error. - #[error("Subscription to subscribe_method: {0} with unsubscribe_metho: {1} failed")] + #[error("Subscription to subscribe_method: {0} with unsubscribe_methon: {1} failed")] Subscription(String, String), /// Frontend/backend channel error. #[error("Frontend/backend channel error: {0}")] @@ -19,9 +35,9 @@ pub enum Error { /// Failed to parse the data that the server sent back to us. #[error("Parse error: {0}")] ParseError(#[source] jsonrpc::ParseError), - #[error("Invalid ID in response; expected: {0}, got: {1}")] /// Invalid id in response to a request. - InvalidRequestId(JsonValue, JsonValue), + #[error("Invalid ID in response: {0}")] + InvalidRequestId(Mismatch), #[error("Custom error: {0}")] /// Custom error. Custom(String), diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index a9bd5de33c..1228d0baa0 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -9,7 +9,7 @@ edition = "2018" [dependencies] futures = "0.3.6" -hyper = "0.13.8" +hyper = "0.13.9" serde = { version = "1.0.116", default-features = false, features = ["derive"] } serde_json = "1.0.58" soketto = "0.4.2" diff --git a/test-utils/src/helpers.rs b/test-utils/src/helpers.rs index 42dd45e69c..4e20807e1f 100644 --- a/test-utils/src/helpers.rs +++ b/test-utils/src/helpers.rs @@ -1,7 +1,16 @@ use crate::types::{Body, HttpResponse, Id, Uri}; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Request, Response, Server}; use serde_json::Value; +use std::convert::Infallible; use std::net::SocketAddr; +pub const PARSE_ERROR: &str = "Parse error"; +pub const INTERNAL_ERROR: &str = "Internal error"; +pub const INVALID_PARAMS: &str = "Invalid params"; +pub const INVALID_REQUEST: &str = "Invalid request"; +pub const METHOD_NOT_FOUND: &str = "Method not found"; + /// Converts a sockaddress to a WebSocket URI. pub fn to_ws_uri_string(addr: SocketAddr) -> String { let mut s = String::new(); @@ -48,6 +57,13 @@ pub fn invalid_params(id: Id) -> String { ) } +pub fn internal_error(id: Id) -> String { + format!( + r#"{{"jsonrpc":"2.0","error":{{"code":-32603,"message":"Internal error"}},"id":{}}}"#, + serde_json::to_string(&id).unwrap() + ) +} + pub async fn http_request(body: Body, uri: Uri) -> Result { let client = hyper::Client::new(); let r = hyper::Request::post(uri) @@ -61,3 +77,33 @@ pub async fn http_request(body: Body, uri: Uri) -> Result Ok(HttpResponse { status: parts.status, header: parts.headers, body: String::from_utf8(bytes.to_vec()).unwrap() }) } + +/// Spawn HTTP server that responds with a hardcoded response. +// +// NOTE: This must be spawned on tokio because hyper only works with tokio. +pub async fn http_server_with_hardcoded_response(response: String) -> SocketAddr { + async fn process_request(_req: Request, response: String) -> Result, Infallible> { + Ok(Response::new(hyper::Body::from(response))) + } + + let make_service = make_service_fn(move |_| { + let response = response.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req| { + let response = response.clone(); + async move { Ok::<_, Infallible>(process_request(req, response).await.unwrap()) } + })) + } + }); + + let (tx, rx) = futures::channel::oneshot::channel::(); + + tokio::spawn(async { + let addr = SocketAddr::from(([127, 0, 0, 1], 0)); + let server = Server::bind(&addr).serve(make_service); + tx.send(server.local_addr()).unwrap(); + server.await.unwrap() + }); + + rx.await.unwrap() +} From a453d99378ed508acfcf7a787b18168b815f4417 Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 9 Nov 2020 19:04:10 +0100 Subject: [PATCH 08/26] fix typo --- src/types/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/types/client.rs b/src/types/client.rs index 57451c5f5c..de2f234fad 100644 --- a/src/types/client.rs +++ b/src/types/client.rs @@ -27,7 +27,7 @@ pub enum Error { #[error("Server responded to our request with an error: {0:?}")] Request(#[source] jsonrpc::Error), /// Subscription error. - #[error("Subscription to subscribe_method: {0} with unsubscribe_methon: {1} failed")] + #[error("Subscription to subscribe_method: {0} with unsubscribe_method: {1} failed")] Subscription(String, String), /// Frontend/backend channel error. #[error("Frontend/backend channel error: {0}")] From 1740d2eebaeb1c61252f09189cc3de63d30f55eb Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 10 Nov 2020 11:07:07 +0100 Subject: [PATCH 09/26] fix(benches): make it compile again. --- benches/benches.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benches/benches.rs b/benches/benches.rs index 891c2cbd44..f07b6dbd07 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -36,7 +36,7 @@ pub fn http_requests(c: &mut criterion::Criterion) { let (tx_addr, rx_addr) = oneshot::channel::(); async_std::task::spawn(http_server(tx_addr)); let server_addr = block_on(rx_addr).unwrap(); - let client = Arc::new(HttpClient::new(&format!("http://{}", server_addr))); + let client = Arc::new(HttpClient::new(&format!("http://{}", server_addr), Default::default())); c.bench_function("synchronous http round trip", |b| { b.iter(|| { @@ -99,7 +99,7 @@ pub fn websocket_requests(c: &mut criterion::Criterion) { } }) }, - // TODO(niklasad1): investigate why it only works to 8 concurrent requests. + // TODO(niklasad1): investigate why it only works up to 8 concurrent requests. vec![2, 4, 8], ); } From dd72e6a6ed64e0b11a325b02dd1fdf84d8418425 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 10 Nov 2020 11:27:02 +0100 Subject: [PATCH 10/26] fix(ws example): revert unintentional change. --- examples/ws.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/examples/ws.rs b/examples/ws.rs index 696e3ce803..e4f66c3a43 100644 --- a/examples/ws.rs +++ b/examples/ws.rs @@ -43,11 +43,9 @@ async fn main() -> Result<(), Box> { }); server_started_rx.await?; - let c1 = WsClient::new(SERVER_URI).await?; - - for _ in 0..10 { - let _: Result = c1.request("say_hello", Params::None).await; - } + let client = WsClient::new(SERVER_URI).await?; + let response: JsonValue = client.request("say_hello", Params::None).await?; + println!("r: {:?}", response); Ok(()) } From a904bc2d37e2f8323ee74b995b6e73a61113ebd8 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 10 Nov 2020 12:52:36 +0100 Subject: [PATCH 11/26] test(http client): subscription response on call. --- src/client/http/client.rs | 2 +- src/client/http/tests.rs | 7 +++++++ src/types/client.rs | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/client/http/client.rs b/src/client/http/client.rs index 3329e6d28d..8cd32ccfac 100644 --- a/src/client/http/client.rs +++ b/src/client/http/client.rs @@ -88,7 +88,7 @@ impl HttpClient { match response.id() { jsonrpc::Id::Num(n) if n == &expected_id => { let ret: Result = response.into(); - ret.map_err(|e| Error::Request(e)) + ret.map_err(Error::Request) } jsonrpc::Id::Num(n) => { Err(Error::InvalidRequestId(Mismatch { expected: expected_id.into(), got: (*n).into() })) diff --git a/src/client/http/tests.rs b/src/client/http/tests.rs index 9bd54894ac..5aef563b4d 100644 --- a/src/client/http/tests.rs +++ b/src/client/http/tests.rs @@ -58,6 +58,13 @@ async fn internal_error_works() { assert_jsonrpc_error_response(err, ErrorCode::InternalError, INTERNAL_ERROR.into()); } +#[tokio::test] +async fn subscription_response_to_request() { + let req = r#"{"jsonrpc":"2.0","method":"subscribe_hello","params":{"subscription":"3px4FrtxSYQ1zBKW154NoVnrDhrq764yQNCXEgZyM6Mu","result":"hello my friend"}}"#.to_string(); + let err = run_request_with_response(req).await.unwrap_err(); + assert!(matches!(err, Error::Custom(_))); +} + async fn run_request_with_response(response: String) -> Result { let server_addr = http_server_with_hardcoded_response(response).await; let uri = format!("http://{}", server_addr); diff --git a/src/types/client.rs b/src/types/client.rs index de2f234fad..f9611acda3 100644 --- a/src/types/client.rs +++ b/src/types/client.rs @@ -23,7 +23,7 @@ pub enum Error { /// invalid ID, etc.). #[error("Networking or low-level protocol error: {0}")] TransportError(#[source] Box), - /// RawServer responded to our request with an error. + /// Request error. #[error("Server responded to our request with an error: {0:?}")] Request(#[source] jsonrpc::Error), /// Subscription error. From 5d06d5d4f7bee91f2ac6dba08b6b9994c3c1c601 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 10 Nov 2020 16:14:11 +0100 Subject: [PATCH 12/26] fix(cleanup) --- examples/http.rs | 2 +- src/client/http/client.rs | 34 +++++++++++------- src/client/http/tests.rs | 4 +-- src/client/http/transport.rs | 69 +++++++++++++++++++++--------------- src/client/ws/client.rs | 2 +- src/http/raw/tests.rs | 6 ++-- 6 files changed, 69 insertions(+), 48 deletions(-) diff --git a/examples/http.rs b/examples/http.rs index 518c0cb1ee..979e49e4c7 100644 --- a/examples/http.rs +++ b/examples/http.rs @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box> { server_started_rx.await?; - let client = HttpClient::new(SERVER_URI, Default::default()); + let client = HttpClient::new(SERVER_URI, Default::default())?; let response: Result = client.request("say_hello", Params::None).await; println!("r: {:?}", response); diff --git a/src/client/http/client.rs b/src/client/http/client.rs index 8cd32ccfac..d0c592ec99 100644 --- a/src/client/http/client.rs +++ b/src/client/http/client.rs @@ -4,7 +4,7 @@ use crate::types::jsonrpc::{self, JsonValue}; use std::sync::atomic::{AtomicU64, Ordering}; /// Default maximum request body size (10 MB). -const DEFAULT_MAX_BODY_SIZE: usize = 10 * 1024 * 1024; +const DEFAULT_MAX_BODY_SIZE_TEN_MB: usize = 10 * 1024 * 1024; /// HTTP configuration. #[derive(Copy, Clone)] @@ -13,26 +13,35 @@ pub struct HttpConfig { pub max_request_body_size: usize, } -/// HTTP Client. +/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications. +/// +/// WARNING: The async methods must be executed on [Tokio 0.2](https://docs.rs/tokio/0.2.22/tokio). pub struct HttpClient { + /// HTTP transport client. transport: HttpTransportClient, + /// Request ID that wraps around when overflowing. request_id: AtomicU64, } impl Default for HttpConfig { fn default() -> Self { - Self { max_request_body_size: DEFAULT_MAX_BODY_SIZE } + Self { max_request_body_size: DEFAULT_MAX_BODY_SIZE_TEN_MB } } } impl HttpClient { - /// Create a client to connect to the server at address `endpoint` - pub fn new(endpoint: &str, config: HttpConfig) -> Self { - let transport = HttpTransportClient::new(endpoint, config.max_request_body_size); - Self { transport, request_id: AtomicU64::new(0) } + /// Initializes a new HTTP client. + /// + /// Fails when the URL is invalid. + pub fn new(target: &str, config: HttpConfig) -> Result { + let transport = HttpTransportClient::new(target, config.max_request_body_size) + .map_err(|e| Error::TransportError(Box::new(e)))?; + Ok(Self { transport, request_id: AtomicU64::new(0) }) } /// Send a notification to the server. + /// + /// WARNING: This method must be executed on [Tokio 0.2](https://docs.rs/tokio/0.2.22/tokio). pub async fn notification( &self, method: impl Into, @@ -48,6 +57,8 @@ impl HttpClient { } /// Perform a request towards the server. + /// + /// WARNING: This method must be executed on [Tokio 0.2](https://docs.rs/tokio/0.2.22/tokio). pub async fn request( &self, method: impl Into, @@ -70,14 +81,11 @@ impl HttpClient { match response { jsonrpc::Response::Single(rp) => Self::process_response(rp, id), + // Server should not send batch response to a single request. jsonrpc::Response::Batch(_rps) => { - todo!("batch request not supported"); - // for rp in rps { - // // TODO: if an error happens, we throw away the entire batch - // self.process_response(rp)?; - // } + Err(Error::Custom("Server replied with batch response to a single request".to_string())) } - // Server MUST NOT reply to a Notification. + // Server should not reply to a Notification. jsonrpc::Response::Notif(_notif) => { Err(Error::Custom(format!("Server replied with notification response to request ID: {}", id))) } diff --git a/src/client/http/tests.rs b/src/client/http/tests.rs index 5aef563b4d..89bff1d25b 100644 --- a/src/client/http/tests.rs +++ b/src/client/http/tests.rs @@ -15,7 +15,7 @@ async fn method_call_works() { async fn notification_works() { let server_addr = http_server_with_hardcoded_response(String::new()).await; let uri = format!("http://{}", server_addr); - let client = HttpClient::new(&uri, Default::default()); + let client = HttpClient::new(&uri, Default::default()).unwrap(); client .notification("i_dont_care_about_the_response_because_the_server_should_not_respond", Params::None) .await @@ -68,7 +68,7 @@ async fn subscription_response_to_request() { async fn run_request_with_response(response: String) -> Result { let server_addr = http_server_with_hardcoded_response(response).await; let uri = format!("http://{}", server_addr); - let client = HttpClient::new(&uri, Default::default()); + let client = HttpClient::new(&uri, Default::default())?; client.request("say_hello", Params::None).await } diff --git a/src/client/http/transport.rs b/src/client/http/transport.rs index 608b0be7d5..e1c1a14b3e 100644 --- a/src/client/http/transport.rs +++ b/src/client/http/transport.rs @@ -10,52 +10,55 @@ use crate::types::jsonrpc; use futures::StreamExt; use thiserror::Error; -/// Implementation of a raw client for HTTP requests. +/// HTTP Transport Client. #[derive(Debug, Clone)] pub struct HttpTransportClient { - /// URI to connect to. - uri: String, - /// Hyper to client, + /// Target to connect to. + target: url::Url, + /// HTTP client, client: hyper::Client, - /// Max request body size + /// Configurable max request body size max_request_body_size: usize, } impl HttpTransportClient { /// Initializes a new HTTP client. - // TODO: better type for target - pub fn new(uri: &str, max_request_body_size: usize) -> Self { - HttpTransportClient { client: hyper::Client::new(), uri: uri.to_owned(), max_request_body_size } + pub fn new(target: &str, max_request_body_size: usize) -> Result { + let target = url::Url::parse(target).map_err(|e| Error::Url(format!("Invalid URL: {}", e).into()))?; + if target.scheme() != "http" { + return Err(Error::Url("URL scheme not supported, expects 'http'".into())); + }; + Ok(HttpTransportClient { client: hyper::Client::new(), target, max_request_body_size }) } /// Send request. - pub async fn send_request(&self, request: jsonrpc::Request) -> Result, RequestError> { + async fn send_request(&self, request: jsonrpc::Request) -> Result, Error> { log::debug!("send: {}", jsonrpc::to_string(&request).expect("request valid JSON; qed")); - let body = jsonrpc::to_vec(&request).map_err(|e| RequestError::Serialization(e))?; + let body = jsonrpc::to_vec(&request).map_err(|e| Error::Serialization(e))?; if body.len() > self.max_request_body_size { - return Err(RequestError::RequestTooLarge); + return Err(Error::RequestTooLarge); } - let req = hyper::Request::post(&self.uri) + let req = hyper::Request::post(self.target.as_str()) .header(hyper::header::CONTENT_TYPE, hyper::header::HeaderValue::from_static("application/json")) .body(From::from(body)) .expect("Uri and request headers are valid; qed"); let response = match self.client.request(req).await { Ok(r) => r, - Err(err) => return Err(RequestError::Http(Box::new(err))), + Err(err) => return Err(Error::Http(Box::new(err))), }; if !response.status().is_success() { - return Err(RequestError::RequestFailure { status_code: response.status().into() }); + return Err(Error::RequestFailure { status_code: response.status().into() }); } Ok(response) } /// Send notification. - pub async fn send_notification(&self, request: jsonrpc::Request) -> Result<(), RequestError> { + pub async fn send_notification(&self, request: jsonrpc::Request) -> Result<(), Error> { let _response = self.send_request(request).await?; Ok(()) } @@ -64,16 +67,16 @@ impl HttpTransportClient { pub async fn send_request_and_wait_for_response( &self, request: jsonrpc::Request, - ) -> Result { + ) -> Result { let response = self.send_request(request).await?; let mut body_fut: hyper::Body = response.into_body(); let mut body = Vec::new(); while let Some(chunk) = body_fut.next().await { - let chunk = chunk.map_err(|e| RequestError::Http(Box::new(e)))?; + let chunk = chunk.map_err(|e| Error::Http(Box::new(e)))?; if chunk.len() + body.len() > self.max_request_body_size { - return Err(RequestError::RequestTooLarge); + return Err(Error::RequestTooLarge); } body.extend_from_slice(&chunk); } @@ -81,7 +84,7 @@ impl HttpTransportClient { // Note that we don't check the Content-Type of the request. This is deemed // unnecessary, as a parsing error while happen anyway. // TODO: use Response::from_json - let as_json: jsonrpc::Response = jsonrpc::from_slice(&body).map_err(RequestError::ParseError)?; + let as_json: jsonrpc::Response = jsonrpc::from_slice(&body).map_err(Error::ParseError)?; log::debug!("recv: {}", jsonrpc::to_string(&as_json).expect("request valid JSON; qed")); Ok(as_json) } @@ -89,29 +92,33 @@ impl HttpTransportClient { /// Error that can happen during a request. #[derive(Debug, Error)] -pub enum RequestError { +pub enum Error { + /// Invalid URL. + #[error("Invalid Uri: {0}")] + Url(String), + /// Error while serializing the request. // TODO: can that happen? - #[error("error while serializing the request")] + #[error("Error while serializing the request")] Serialization(#[source] serde_json::error::Error), /// Response given by the server failed to decode as UTF-8. - #[error("response body is not UTF-8")] + #[error("Response body is not UTF-8")] Utf8(#[source] std::string::FromUtf8Error), /// Error during the HTTP request, including networking errors and HTTP protocol errors. - #[error("error while performing the HTTP request")] + #[error("Error while performing the HTTP request")] Http(Box), /// Server returned a non-success status code. - #[error("server returned an error status code: {:?}", status_code)] + #[error("Server returned an error status code: {:?}", status_code)] RequestFailure { /// Status code returned by the server. status_code: u16, }, /// Failed to parse the JSON returned by the server into a JSON-RPC response. - #[error("error while parsing the response body")] + #[error("Error while parsing the response body")] ParseError(#[source] serde_json::error::Error), /// Request body too large. @@ -121,13 +128,19 @@ pub enum RequestError { #[cfg(test)] mod tests { - use super::{HttpTransportClient, RequestError}; + use super::{Error, HttpTransportClient}; use crate::types::jsonrpc::{Call, Id, MethodCall, Params, Request, Version}; + #[test] + fn invalid_http_url_rejected() { + let err = HttpTransportClient::new("ws://localhost:9933", 1337).unwrap_err(); + assert!(matches!(err, Error::Url(_))); + } + #[tokio::test] async fn request_limit_works() { let eighty_bytes_limit = 80; - let client = HttpTransportClient::new("http:://localhost:9933", eighty_bytes_limit); + let client = HttpTransportClient::new("http://localhost:9933", eighty_bytes_limit).unwrap(); assert_eq!(client.max_request_body_size, eighty_bytes_limit); let request = Request::Single(Call::MethodCall(MethodCall { @@ -139,6 +152,6 @@ mod tests { let bytes = serde_json::to_vec(&request).unwrap(); assert_eq!(bytes.len(), 81); let response = client.send_request(request).await.unwrap_err(); - assert!(matches!(response, RequestError::RequestTooLarge)); + assert!(matches!(response, Error::RequestTooLarge)); } } diff --git a/src/client/ws/client.rs b/src/client/ws/client.rs index 6200b7f5b6..b152eaf16c 100644 --- a/src/client/ws/client.rs +++ b/src/client/ws/client.rs @@ -103,7 +103,7 @@ enum FrontToBack { impl Client { /// Initializes a new WebSocket client /// - /// Fails when the URI is invalid i.e, doesn't start with `ws://` or `wss://` + /// Fails when the URL is invalid. pub async fn new(target: &str) -> Result { let transport = WsTransportClient::new(target).await.map_err(|e| Error::TransportError(Box::new(e)))?; let client = RawClient::new(transport); diff --git a/src/http/raw/tests.rs b/src/http/raw/tests.rs index efa73fd253..381c5eb3a2 100644 --- a/src/http/raw/tests.rs +++ b/src/http/raw/tests.rs @@ -34,7 +34,7 @@ use serde_json::Value; async fn connection_context() -> (HttpTransportClient, HttpRawServer) { let server = HttpTransportServer::new(&"127.0.0.1:0".parse().unwrap()).await.unwrap(); let uri = format!("http://{}", server.local_addr()); - let client = HttpTransportClient::new(&uri, Default::default()); + let client = HttpTransportClient::new(&uri, Default::default()).unwrap(); (client, server.into()) } @@ -50,7 +50,7 @@ async fn request_work() { params: Params::Array(vec![Value::from(1), Value::from(2)]), id: jsonrpc::Id::Num(3), }); - client.send_request(Request::Single(call)).await.unwrap(); + client.send_request_and_wait_for_response(Request::Single(call)).await.unwrap(); }); match server.next_event().await { @@ -77,7 +77,7 @@ async fn notification_work() { method: "hello_world".to_owned(), params: Params::Array(vec![Value::from("lo"), Value::from(2)]), }; - client.send_request(Request::Single(Call::Notification(n))).await.unwrap(); + client.send_request_and_wait_for_response(Request::Single(Call::Notification(n))).await.unwrap(); }); match server.next_event().await { From 6a4354583b9cf64983062ecada2fbd6d45fe042a Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 10 Nov 2020 18:28:42 +0100 Subject: [PATCH 13/26] fix(benches): make it compile again. --- benches/benches.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benches/benches.rs b/benches/benches.rs index f07b6dbd07..2d2785bb77 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -36,7 +36,7 @@ pub fn http_requests(c: &mut criterion::Criterion) { let (tx_addr, rx_addr) = oneshot::channel::(); async_std::task::spawn(http_server(tx_addr)); let server_addr = block_on(rx_addr).unwrap(); - let client = Arc::new(HttpClient::new(&format!("http://{}", server_addr), Default::default())); + let client = Arc::new(HttpClient::new(&format!("http://{}", server_addr), Default::default()).unwrap()); c.bench_function("synchronous http round trip", |b| { b.iter(|| { From a6d9493a4814b6727b13a02f84ece711fc2b7add Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 11 Nov 2020 09:32:31 +0100 Subject: [PATCH 14/26] Update src/client/http/transport.rs --- src/client/http/transport.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/http/transport.rs b/src/client/http/transport.rs index e1c1a14b3e..a633350b31 100644 --- a/src/client/http/transport.rs +++ b/src/client/http/transport.rs @@ -94,7 +94,7 @@ impl HttpTransportClient { #[derive(Debug, Error)] pub enum Error { /// Invalid URL. - #[error("Invalid Uri: {0}")] + #[error("Invalid Url: {0}")] Url(String), /// Error while serializing the request. From c1ebc087ebc024b67c2e5508100c58e01117b707 Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 11 Nov 2020 11:04:35 +0100 Subject: [PATCH 15/26] fix(http client): `&str` -> `AsRef` --- src/client/http/client.rs | 2 +- src/client/http/transport.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client/http/client.rs b/src/client/http/client.rs index d0c592ec99..728a05eec1 100644 --- a/src/client/http/client.rs +++ b/src/client/http/client.rs @@ -33,7 +33,7 @@ impl HttpClient { /// Initializes a new HTTP client. /// /// Fails when the URL is invalid. - pub fn new(target: &str, config: HttpConfig) -> Result { + pub fn new(target: impl AsRef, config: HttpConfig) -> Result { let transport = HttpTransportClient::new(target, config.max_request_body_size) .map_err(|e| Error::TransportError(Box::new(e)))?; Ok(Self { transport, request_id: AtomicU64::new(0) }) diff --git a/src/client/http/transport.rs b/src/client/http/transport.rs index a633350b31..55b1c6c615 100644 --- a/src/client/http/transport.rs +++ b/src/client/http/transport.rs @@ -23,8 +23,8 @@ pub struct HttpTransportClient { impl HttpTransportClient { /// Initializes a new HTTP client. - pub fn new(target: &str, max_request_body_size: usize) -> Result { - let target = url::Url::parse(target).map_err(|e| Error::Url(format!("Invalid URL: {}", e).into()))?; + pub fn new(target: impl AsRef, max_request_body_size: usize) -> Result { + let target = url::Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {}", e).into()))?; if target.scheme() != "http" { return Err(Error::Url("URL scheme not supported, expects 'http'".into())); }; From a3ae6eae1aa314bf1c895510078bf9f359da85b0 Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 11 Nov 2020 12:02:16 +0100 Subject: [PATCH 16/26] docs(client types): better docs for Mismatch type. --- src/types/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/types/client.rs b/src/types/client.rs index f9611acda3..9dc1ee35ba 100644 --- a/src/types/client.rs +++ b/src/types/client.rs @@ -1,7 +1,7 @@ use crate::types::jsonrpc::{self, JsonValue}; use std::fmt; -/// Mismatch of the expected value. +/// Convenience type for displaying errors. #[derive(Clone, Debug, PartialEq)] pub struct Mismatch { /// Expected value. From be7004923193d216c3555e3306d404357f1192ad Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 11 Nov 2020 16:18:07 +0100 Subject: [PATCH 17/26] style: `Default::default` -> `HttpConfig::default` --- benches/benches.rs | 4 ++-- examples/http.rs | 4 ++-- src/client/http/tests.rs | 6 +++--- src/http/raw/tests.rs | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/benches/benches.rs b/benches/benches.rs index 2d2785bb77..bc68aba9e6 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -1,7 +1,7 @@ use async_std::task::block_on; use criterion::*; use futures::channel::oneshot::{self, Sender}; -use jsonrpsee::client::{HttpClient, WsClient}; +use jsonrpsee::client::{HttpClient, HttpConfig, WsClient}; use jsonrpsee::http::HttpServer; use jsonrpsee::types::jsonrpc::{JsonValue, Params}; use jsonrpsee::ws::WsServer; @@ -36,7 +36,7 @@ pub fn http_requests(c: &mut criterion::Criterion) { let (tx_addr, rx_addr) = oneshot::channel::(); async_std::task::spawn(http_server(tx_addr)); let server_addr = block_on(rx_addr).unwrap(); - let client = Arc::new(HttpClient::new(&format!("http://{}", server_addr), Default::default()).unwrap()); + let client = Arc::new(HttpClient::new(&format!("http://{}", server_addr), HttpConfig::default()).unwrap()); c.bench_function("synchronous http round trip", |b| { b.iter(|| { diff --git a/examples/http.rs b/examples/http.rs index 979e49e4c7..4eb4b3708c 100644 --- a/examples/http.rs +++ b/examples/http.rs @@ -26,7 +26,7 @@ use async_std::task; use futures::channel::oneshot::{self, Sender}; -use jsonrpsee::client::HttpClient; +use jsonrpsee::client::{HttpClient, HttpConfig}; use jsonrpsee::http::HttpServer; use jsonrpsee::types::jsonrpc::{JsonValue, Params}; @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box> { server_started_rx.await?; - let client = HttpClient::new(SERVER_URI, Default::default())?; + let client = HttpClient::new(SERVER_URI, HttpConfig::default())?; let response: Result = client.request("say_hello", Params::None).await; println!("r: {:?}", response); diff --git a/src/client/http/tests.rs b/src/client/http/tests.rs index 89bff1d25b..b9e02d657f 100644 --- a/src/client/http/tests.rs +++ b/src/client/http/tests.rs @@ -1,4 +1,4 @@ -use crate::client::HttpClient; +use crate::client::{HttpClient, HttpConfig}; use crate::types::client::Error; use crate::types::jsonrpc::{self, ErrorCode, JsonValue, Params}; @@ -15,7 +15,7 @@ async fn method_call_works() { async fn notification_works() { let server_addr = http_server_with_hardcoded_response(String::new()).await; let uri = format!("http://{}", server_addr); - let client = HttpClient::new(&uri, Default::default()).unwrap(); + let client = HttpClient::new(&uri, HttpConfig::default()).unwrap(); client .notification("i_dont_care_about_the_response_because_the_server_should_not_respond", Params::None) .await @@ -68,7 +68,7 @@ async fn subscription_response_to_request() { async fn run_request_with_response(response: String) -> Result { let server_addr = http_server_with_hardcoded_response(response).await; let uri = format!("http://{}", server_addr); - let client = HttpClient::new(&uri, Default::default())?; + let client = HttpClient::new(&uri, HttpConfig::default())?; client.request("say_hello", Params::None).await } diff --git a/src/http/raw/tests.rs b/src/http/raw/tests.rs index 381c5eb3a2..ba8e18a40f 100644 --- a/src/http/raw/tests.rs +++ b/src/http/raw/tests.rs @@ -34,7 +34,7 @@ use serde_json::Value; async fn connection_context() -> (HttpTransportClient, HttpRawServer) { let server = HttpTransportServer::new(&"127.0.0.1:0".parse().unwrap()).await.unwrap(); let uri = format!("http://{}", server.local_addr()); - let client = HttpTransportClient::new(&uri, Default::default()).unwrap(); + let client = HttpTransportClient::new(&uri, 10 * 1024 * 1024).unwrap(); (client, server.into()) } From 19c6c9c2fe94a56eeeb4d311dff3ff17bda274cf Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 11 Nov 2020 22:31:15 +0100 Subject: [PATCH 18/26] fix(http client): read body size from header. Expermential to read number of bytes from `HTTP Content Length` to pre-allocate the number of bytes and bail early if the length is bigger than the `max_request_body size` Need to be benched with bigger requests. --- src/client/http/client.rs | 4 +- src/client/http/transport.rs | 80 +++++++++++++++++++++++++----------- 2 files changed, 59 insertions(+), 25 deletions(-) diff --git a/src/client/http/client.rs b/src/client/http/client.rs index 728a05eec1..3874640b1d 100644 --- a/src/client/http/client.rs +++ b/src/client/http/client.rs @@ -4,13 +4,13 @@ use crate::types::jsonrpc::{self, JsonValue}; use std::sync::atomic::{AtomicU64, Ordering}; /// Default maximum request body size (10 MB). -const DEFAULT_MAX_BODY_SIZE_TEN_MB: usize = 10 * 1024 * 1024; +const DEFAULT_MAX_BODY_SIZE_TEN_MB: u32 = 10 * 1024 * 1024; /// HTTP configuration. #[derive(Copy, Clone)] pub struct HttpConfig { /// Maximum request body size in bytes. - pub max_request_body_size: usize, + pub max_request_body_size: u32, } /// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications. diff --git a/src/client/http/transport.rs b/src/client/http/transport.rs index 55b1c6c615..b87dcfae9c 100644 --- a/src/client/http/transport.rs +++ b/src/client/http/transport.rs @@ -10,6 +10,8 @@ use crate::types::jsonrpc; use futures::StreamExt; use thiserror::Error; +const CONTENT_TYPE_JSON: &str = "application/json"; + /// HTTP Transport Client. #[derive(Debug, Clone)] pub struct HttpTransportClient { @@ -18,17 +20,18 @@ pub struct HttpTransportClient { /// HTTP client, client: hyper::Client, /// Configurable max request body size - max_request_body_size: usize, + max_request_body_size: u32, } impl HttpTransportClient { /// Initializes a new HTTP client. - pub fn new(target: impl AsRef, max_request_body_size: usize) -> Result { + pub fn new(target: impl AsRef, max_request_body_size: u32) -> Result { let target = url::Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {}", e).into()))?; - if target.scheme() != "http" { + if target.scheme() == "http" { + Ok(HttpTransportClient { client: hyper::Client::new(), target, max_request_body_size }) + } else { return Err(Error::Url("URL scheme not supported, expects 'http'".into())); - }; - Ok(HttpTransportClient { client: hyper::Client::new(), target, max_request_body_size }) + } } /// Send request. @@ -36,25 +39,23 @@ impl HttpTransportClient { log::debug!("send: {}", jsonrpc::to_string(&request).expect("request valid JSON; qed")); let body = jsonrpc::to_vec(&request).map_err(|e| Error::Serialization(e))?; - if body.len() > self.max_request_body_size { + if body.len() > self.max_request_body_size as usize { return Err(Error::RequestTooLarge); } let req = hyper::Request::post(self.target.as_str()) - .header(hyper::header::CONTENT_TYPE, hyper::header::HeaderValue::from_static("application/json")) + .header(hyper::header::CONTENT_TYPE, hyper::header::HeaderValue::from_static(CONTENT_TYPE_JSON)) + .header(hyper::header::ACCEPT, hyper::header::HeaderValue::from_static(CONTENT_TYPE_JSON)) .body(From::from(body)) - .expect("Uri and request headers are valid; qed"); + .expect("URI and request headers are valid; qed"); - let response = match self.client.request(req).await { - Ok(r) => r, - Err(err) => return Err(Error::Http(Box::new(err))), - }; + let response = self.client.request(req).await.map_err(|e| Error::Http(Box::new(e)))?; - if !response.status().is_success() { - return Err(Error::RequestFailure { status_code: response.status().into() }); + if response.status().is_success() { + Ok(response) + } else { + Err(Error::RequestFailure { status_code: response.status().into() }) } - - Ok(response) } /// Send notification. @@ -68,14 +69,20 @@ impl HttpTransportClient { &self, request: jsonrpc::Request, ) -> Result { + let response = self.send_request(request).await?; + let body_size = read_content_length(response.headers()).unwrap_or(0); let mut body_fut: hyper::Body = response.into_body(); - let mut body = Vec::new(); + if body_size > self.max_request_body_size { + return Err(Error::RequestTooLarge); + } + + let mut body = Vec::with_capacity(body_size as usize); while let Some(chunk) = body_fut.next().await { let chunk = chunk.map_err(|e| Error::Http(Box::new(e)))?; - if chunk.len() + body.len() > self.max_request_body_size { + if chunk.len() + body.len() > self.max_request_body_size as usize { return Err(Error::RequestTooLarge); } body.extend_from_slice(&chunk); @@ -83,13 +90,30 @@ impl HttpTransportClient { // Note that we don't check the Content-Type of the request. This is deemed // unnecessary, as a parsing error while happen anyway. - // TODO: use Response::from_json - let as_json: jsonrpc::Response = jsonrpc::from_slice(&body).map_err(Error::ParseError)?; - log::debug!("recv: {}", jsonrpc::to_string(&as_json).expect("request valid JSON; qed")); - Ok(as_json) + let response: jsonrpc::Response = jsonrpc::from_slice(&body).map_err(Error::ParseError)?; + log::debug!("recv: {}", jsonrpc::to_string(&response).expect("request valid JSON; qed")); + Ok(response) } } + +// Read `content_length` from HTTP Header. +// +// Returns `Some(val)` if `content_length` contains exactly one value. +// None otherwise. +fn read_content_length(header: &hyper::header::HeaderMap) -> Option { + let values = header.get_all("content-length"); + let mut iter = values.iter(); + let content_length = iter.next()?; + if iter.next().is_some() { + return None; + } + + // HTTP Content-Length indicates number of bytes in decimal. + let length = content_length.to_str().ok()?; + u32::from_str_radix(length, 10).ok() +} + /// Error that can happen during a request. #[derive(Debug, Error)] pub enum Error { @@ -128,7 +152,7 @@ pub enum Error { #[cfg(test)] mod tests { - use super::{Error, HttpTransportClient}; + use super::{Error, HttpTransportClient, read_content_length}; use crate::types::jsonrpc::{Call, Id, MethodCall, Params, Request, Version}; #[test] @@ -154,4 +178,14 @@ mod tests { let response = client.send_request(request).await.unwrap_err(); assert!(matches!(response, Error::RequestTooLarge)); } + + #[test] + fn read_content_length_works() { + let mut header = hyper::header::HeaderMap::new(); + header.insert(hyper::header::CONTENT_LENGTH, "177".parse().unwrap()); + assert_eq!(read_content_length(&header), Some(177)); + + header.append(hyper::header::CONTENT_LENGTH, "999".parse().unwrap()); + assert_eq!(read_content_length(&header), None); + } } From ebc6a350b098f17a822e282cfe3c24c78c78c450 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 12 Nov 2020 13:26:23 +0100 Subject: [PATCH 19/26] rewrite --- src/http/server.rs | 5 ++-- src/http/transport/background.rs | 46 ++++++-------------------------- src/http/transport/mod.rs | 7 ++--- src/types/mod.rs | 3 +++ 4 files changed, 18 insertions(+), 43 deletions(-) diff --git a/src/http/server.rs b/src/http/server.rs index f58e0f7829..869fe5c4e4 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -28,6 +28,7 @@ use crate::http::raw::{RawServer, RawServerEvent, RawServerRequestId}; use crate::http::transport::HttpTransportServer; use crate::types::jsonrpc::{self, JsonValue}; use crate::types::server::Error; +use crate::types::http::HttpConfig; use futures::{channel::mpsc, future::Either, pin_mut, prelude::*}; use parking_lot::Mutex; @@ -111,9 +112,9 @@ enum FrontToBack { impl Server { /// Initializes a new server based upon this raw server. - pub async fn new(url: &str) -> Result> { + pub async fn new(url: &str, config: HttpConfig) -> Result> { let sockaddr = url.parse()?; - let transport_server = HttpTransportServer::new(&sockaddr).await?; + let transport_server = HttpTransportServer::new(&sockaddr, config).await?; let local_addr = *transport_server.local_addr(); // We use an unbounded channel because the only exchanged messages concern registering diff --git a/src/http/transport/background.rs b/src/http/transport/background.rs index c47c2d16f3..90af5ca88f 100644 --- a/src/http/transport/background.rs +++ b/src/http/transport/background.rs @@ -26,7 +26,7 @@ use crate::http::server_utils::access_control::AccessControl; use crate::http::transport::response; -use crate::types::jsonrpc; +use crate::types::{jsonrpc, http::HttpConfig}; use futures::{channel::mpsc, channel::oneshot, prelude::*}; use hyper::service::{make_service_fn, service_fn}; use hyper::Error; @@ -52,13 +52,14 @@ impl BackgroundHttp { /// /// In addition to `Self`, also returns the local address the server ends up listening on, /// which might be different than the one passed as parameter. - pub async fn bind(addr: &SocketAddr) -> Result<(BackgroundHttp, SocketAddr), Box> { - Self::bind_with_acl(addr, AccessControl::default()).await + pub async fn bind(addr: &SocketAddr, config: HttpConfig) -> Result<(BackgroundHttp, SocketAddr), Box> { + Self::bind_with_acl(addr, AccessControl::default(), config).await } pub async fn bind_with_acl( addr: &SocketAddr, access_control: AccessControl, + config: HttpConfig, ) -> Result<(BackgroundHttp, SocketAddr), Box> { let (tx, rx) = mpsc::channel(4); @@ -69,7 +70,7 @@ impl BackgroundHttp { Ok::<_, Error>(service_fn(move |req| { let mut tx = tx.clone(); let access_control = access_control.clone(); - async move { Ok::<_, Error>(process_request(req, &mut tx, &access_control).await) } + async move { Ok::<_, Error>(process_request(req, &mut tx, &access_control, config).await) } })) } }); @@ -124,6 +125,7 @@ async fn process_request( request: hyper::Request, fg_process_tx: &mut mpsc::Sender, access_control: &AccessControl, + config: HttpConfig, ) -> hyper::Response { // Process access control if access_control.deny_host(&request) { @@ -136,23 +138,12 @@ async fn process_request( return response::invalid_allow_headers(); } - /* - // Read metadata - let metadata = self.jsonrpc_handler.extractor.read_metadata(&request); - */ - // Proceed match *request.method() { // Validate the ContentType header // to prevent Cross-Origin XHRs with text/plain hyper::Method::POST if is_json(request.headers().get("content-type")) => { - let uri = //if self.rest_api != RestApi::Disabled { - Some(request.uri().clone()) - /*} else { - None - }*/; - - let json_body = match body_to_request(request.into_body()).await { + let json_body = match body_to_request(request.into_body(), config).await { Ok(b) => b, Err(e) => match (e.kind(), e.into_inner()) { (io::ErrorKind::InvalidData, _) => return response::parse_error(), @@ -176,27 +167,6 @@ async fn process_request( Err(_) => return response::internal_error("JSON request send back channel has shut down"), } } - /*Method::POST if /*self.rest_api == RestApi::Unsecure &&*/ request.uri().path().split('/').count() > 2 => { - RpcHandlerState::ProcessRest { - metadata, - uri: request.uri().clone(), - } - } - // Just return error for unsupported content type - Method::POST => response::unsupported_content_type(), - // Don't validate content type on options - Method::OPTIONS => response::empty(), - // Respond to health API request if there is one configured. - Method::GET if self.health_api.as_ref().map(|x| &*x.0) == Some(request.uri().path()) => { - RpcHandlerState::ProcessHealth { - metadata, - method: self - .health_api - .as_ref() - .map(|x| x.1.clone()) - .expect("Health api is defined since the URI matched."), - } - }*/ // Disallow other methods. _ => response::method_not_allowed(), } @@ -219,7 +189,7 @@ fn is_json(content_type: Option<&hyper::header::HeaderValue>) -> bool { /// Converts a `hyper` body into a structured JSON object. /// /// Enforces a size limit on the body. -async fn body_to_request(mut body: hyper::Body) -> Result { +async fn body_to_request(mut body: hyper::Body, config: HttpConfig) -> Result { let mut json_body = Vec::new(); while let Some(chunk) = body.next().await { let chunk = match chunk { diff --git a/src/http/transport/mod.rs b/src/http/transport/mod.rs index 8b630a23c6..f47fd25171 100644 --- a/src/http/transport/mod.rs +++ b/src/http/transport/mod.rs @@ -29,6 +29,7 @@ mod response; use crate::http::server_utils::access_control::AccessControl; use crate::types::jsonrpc; +use crate::types::http::HttpConfig; use fnv::FnvHashMap; use futures::{channel::oneshot, prelude::*}; @@ -83,7 +84,7 @@ impl HttpTransportServer { // > starting to listen on a port is an asynchronous operation, but the hyper library // > hides this to us. In order to be future-proof, this function is async, so that we // > might switch out to a different library later without breaking the API. - pub async fn new(addr: &SocketAddr) -> Result> { + pub async fn new(addr: &SocketAddr, config: HttpConfig) -> Result> { let (background_thread, local_addr) = background::BackgroundHttp::bind(addr).await?; Ok(HttpTransportServer { background_thread, local_addr, requests: Default::default(), next_request_id: 0 }) } @@ -92,9 +93,9 @@ impl HttpTransportServer { pub async fn bind_with_acl( addr: &SocketAddr, access_control: AccessControl, + config: HttpConfig, ) -> Result> { - let (background_thread, local_addr) = background::BackgroundHttp::bind_with_acl(addr, access_control).await?; - + let (background_thread, local_addr) = background::BackgroundHttp::bind_with_acl(addr, access_control, config).await?; Ok(HttpTransportServer { background_thread, local_addr, requests: Default::default(), next_request_id: 0 }) } diff --git a/src/types/mod.rs b/src/types/mod.rs index cdbff64019..d8a3a0568a 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -6,3 +6,6 @@ pub mod client; /// Shared types for server implementation. pub mod server; + +/// Shared types for HTTP +pub mod http; From 58bb1cc83837b68fafb5828c1a9adcfaad5e61e7 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 12 Nov 2020 14:19:53 +0100 Subject: [PATCH 20/26] test(raw http): enable tests to works again. --- src/http/raw/tests.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/http/raw/tests.rs b/src/http/raw/tests.rs index ba8e18a40f..a33898e3ab 100644 --- a/src/http/raw/tests.rs +++ b/src/http/raw/tests.rs @@ -38,9 +38,7 @@ async fn connection_context() -> (HttpTransportClient, HttpRawServer) { (client, server.into()) } -// TODO(niklasad1): fix before eventual merge #[tokio::test] -#[ignore] async fn request_work() { let (client, mut server) = connection_context().await; tokio::spawn(async move { @@ -66,9 +64,7 @@ async fn request_work() { } } -// TODO(niklasad1): fix before eventual merge #[tokio::test] -#[ignore] async fn notification_work() { let (client, mut server) = connection_context().await; tokio::spawn(async move { From d4c0bcc519acbdf2a6e7ca2ede9d4dddeac5a729 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 12 Nov 2020 14:21:04 +0100 Subject: [PATCH 21/26] style: cargo fmt --- src/client/http/transport.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/client/http/transport.rs b/src/client/http/transport.rs index b87dcfae9c..a018a8695a 100644 --- a/src/client/http/transport.rs +++ b/src/client/http/transport.rs @@ -69,7 +69,6 @@ impl HttpTransportClient { &self, request: jsonrpc::Request, ) -> Result { - let response = self.send_request(request).await?; let body_size = read_content_length(response.headers()).unwrap_or(0); let mut body_fut: hyper::Body = response.into_body(); @@ -96,7 +95,6 @@ impl HttpTransportClient { } } - // Read `content_length` from HTTP Header. // // Returns `Some(val)` if `content_length` contains exactly one value. @@ -152,7 +150,7 @@ pub enum Error { #[cfg(test)] mod tests { - use super::{Error, HttpTransportClient, read_content_length}; + use super::{read_content_length, Error, HttpTransportClient}; use crate::types::jsonrpc::{Call, Id, MethodCall, Params, Request, Version}; #[test] From 77a9fe58e68535c600edb74d7ba66089390dbf76 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 12 Nov 2020 15:18:08 +0100 Subject: [PATCH 22/26] benches: address grumbles --- Cargo.toml | 1 + benches/benches.rs | 11 ++++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b307920471..539552a878 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ ws = ["async-tls", "bytes", "soketto", "url", "webpki"] criterion = "0.3.3" env_logger = "0.8.1" jsonrpsee-test-utils = { path = "test-utils" } +num_cpus = "1.13.0" [[bench]] name = "benches" diff --git a/benches/benches.rs b/benches/benches.rs index bc68aba9e6..246d3d92a5 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -11,6 +11,11 @@ use std::sync::Arc; criterion_group!(benches, http_requests, websocket_requests); criterion_main!(benches); +fn concurrent_tasks() -> Vec { + let cores = num_cpus::get(); + vec![cores / 4, cores / 2, cores, cores * 2, cores * 4] +} + async fn http_server(tx: Sender) { let server = HttpServer::new("127.0.0.1:0").await.unwrap(); let mut say_hello = server.register_method("say_hello".to_string()).unwrap(); @@ -63,7 +68,7 @@ pub fn http_requests(c: &mut criterion::Criterion) { } }) }, - vec![2, 4, 8, 16, 32, 64, 128], + concurrent_tasks(), ); } @@ -99,7 +104,7 @@ pub fn websocket_requests(c: &mut criterion::Criterion) { } }) }, - // TODO(niklasad1): investigate why it only works up to 8 concurrent requests. - vec![2, 4, 8], + // TODO(niklasad1): this doesn't work on my machine. + concurrent_tasks(), ); } From 8e57f23b077f2dc0b85201f467cf6ca02503b9e7 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 12 Nov 2020 17:17:46 +0100 Subject: [PATCH 23/26] feat(http server): configurable request body limit --- src/http/server.rs | 2 +- src/http/transport/background.rs | 70 +++++--------------------------- src/http/transport/mod.rs | 12 ++++-- 3 files changed, 19 insertions(+), 65 deletions(-) diff --git a/src/http/server.rs b/src/http/server.rs index 869fe5c4e4..e2c4fb0d59 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -26,9 +26,9 @@ use crate::http::raw::{RawServer, RawServerEvent, RawServerRequestId}; use crate::http::transport::HttpTransportServer; +use crate::types::http::HttpConfig; use crate::types::jsonrpc::{self, JsonValue}; use crate::types::server::Error; -use crate::types::http::HttpConfig; use futures::{channel::mpsc, future::Either, pin_mut, prelude::*}; use parking_lot::Mutex; diff --git a/src/http/transport/background.rs b/src/http/transport/background.rs index 90af5ca88f..da33ddb4e7 100644 --- a/src/http/transport/background.rs +++ b/src/http/transport/background.rs @@ -26,7 +26,10 @@ use crate::http::server_utils::access_control::AccessControl; use crate::http::transport::response; -use crate::types::{jsonrpc, http::HttpConfig}; +use crate::types::{ + http::{self, HttpConfig}, + jsonrpc, +}; use futures::{channel::mpsc, channel::oneshot, prelude::*}; use hyper::service::{make_service_fn, service_fn}; use hyper::Error; @@ -52,7 +55,10 @@ impl BackgroundHttp { /// /// In addition to `Self`, also returns the local address the server ends up listening on, /// which might be different than the one passed as parameter. - pub async fn bind(addr: &SocketAddr, config: HttpConfig) -> Result<(BackgroundHttp, SocketAddr), Box> { + pub async fn bind( + addr: &SocketAddr, + config: HttpConfig, + ) -> Result<(BackgroundHttp, SocketAddr), Box> { Self::bind_with_acl(addr, AccessControl::default(), config).await } @@ -143,8 +149,8 @@ async fn process_request( // Validate the ContentType header // to prevent Cross-Origin XHRs with text/plain hyper::Method::POST if is_json(request.headers().get("content-type")) => { - let json_body = match body_to_request(request.into_body(), config).await { - Ok(b) => b, + let json_body = match http::response_to_bytes(request, config).await { + Ok(body) => jsonrpc::from_slice(&body).unwrap(), Err(e) => match (e.kind(), e.into_inner()) { (io::ErrorKind::InvalidData, _) => return response::parse_error(), (io::ErrorKind::UnexpectedEof, _) => return response::parse_error(), @@ -185,59 +191,3 @@ fn is_json(content_type: Option<&hyper::header::HeaderValue>) -> bool { _ => false, } } - -/// Converts a `hyper` body into a structured JSON object. -/// -/// Enforces a size limit on the body. -async fn body_to_request(mut body: hyper::Body, config: HttpConfig) -> Result { - let mut json_body = Vec::new(); - while let Some(chunk) = body.next().await { - let chunk = match chunk { - Ok(c) => c, - Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err.to_string())), // TODO: - }; - json_body.extend_from_slice(&chunk); - if json_body.len() >= 16384 { - // TODO: some limit - return Err(io::Error::new(io::ErrorKind::Other, "request too large")); - } - } - - Ok(serde_json::from_slice(&json_body)?) -} - -#[cfg(test)] -mod tests { - use super::body_to_request; - - #[test] - fn body_to_request_works() { - let s = r#"[{"a":"hello"}]"#; - let expected: super::jsonrpc::Request = serde_json::from_str(s).unwrap(); - let req = futures::executor::block_on(async move { - let body = hyper::Body::from(s); - body_to_request(body).await.unwrap() - }); - assert_eq!(req, expected); - } - - #[test] - fn body_to_request_size_limit_json() { - let huge_body = - serde_json::to_vec(&(0..32768).map(|_| serde_json::Value::from("test")).collect::>()).unwrap(); - - futures::executor::block_on(async move { - let body = hyper::Body::from(huge_body); - assert!(body_to_request(body).await.is_err()); - }); - } - - #[test] - fn body_to_request_size_limit_garbage() { - let huge_body = (0..100_000).map(|_| rand::random::()).collect::>(); - futures::executor::block_on(async move { - let body = hyper::Body::from(huge_body); - assert!(body_to_request(body).await.is_err()); - }); - } -} diff --git a/src/http/transport/mod.rs b/src/http/transport/mod.rs index f47fd25171..61ac1980d8 100644 --- a/src/http/transport/mod.rs +++ b/src/http/transport/mod.rs @@ -28,8 +28,8 @@ mod background; mod response; use crate::http::server_utils::access_control::AccessControl; -use crate::types::jsonrpc; use crate::types::http::HttpConfig; +use crate::types::jsonrpc; use fnv::FnvHashMap; use futures::{channel::oneshot, prelude::*}; @@ -84,8 +84,11 @@ impl HttpTransportServer { // > starting to listen on a port is an asynchronous operation, but the hyper library // > hides this to us. In order to be future-proof, this function is async, so that we // > might switch out to a different library later without breaking the API. - pub async fn new(addr: &SocketAddr, config: HttpConfig) -> Result> { - let (background_thread, local_addr) = background::BackgroundHttp::bind(addr).await?; + pub async fn new( + addr: &SocketAddr, + config: HttpConfig, + ) -> Result> { + let (background_thread, local_addr) = background::BackgroundHttp::bind(addr, config).await?; Ok(HttpTransportServer { background_thread, local_addr, requests: Default::default(), next_request_id: 0 }) } @@ -95,7 +98,8 @@ impl HttpTransportServer { access_control: AccessControl, config: HttpConfig, ) -> Result> { - let (background_thread, local_addr) = background::BackgroundHttp::bind_with_acl(addr, access_control, config).await?; + let (background_thread, local_addr) = + background::BackgroundHttp::bind_with_acl(addr, access_control, config).await?; Ok(HttpTransportServer { background_thread, local_addr, requests: Default::default(), next_request_id: 0 }) } From 2f4d65695231aa05ed4e14bf05b0dbcba21f8694 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 12 Nov 2020 18:08:21 +0100 Subject: [PATCH 24/26] feat(http server): configurable max body limit. --- benches/benches.rs | 2 +- examples/http.rs | 10 +++- src/client/http/client.rs | 20 +------ src/client/http/mod.rs | 3 +- src/client/http/transport.rs | 22 +++---- src/http/raw/core.rs | 2 +- src/http/raw/tests.rs | 5 +- src/http/tests.rs | 3 +- src/http/transport/background.rs | 5 +- src/http/transport/mod.rs | 10 ++-- src/types/http.rs | 99 ++++++++++++++++++++++++++++++++ 11 files changed, 139 insertions(+), 42 deletions(-) create mode 100644 src/types/http.rs diff --git a/benches/benches.rs b/benches/benches.rs index 246d3d92a5..dec454093a 100644 --- a/benches/benches.rs +++ b/benches/benches.rs @@ -17,7 +17,7 @@ fn concurrent_tasks() -> Vec { } async fn http_server(tx: Sender) { - let server = HttpServer::new("127.0.0.1:0").await.unwrap(); + let server = HttpServer::new("127.0.0.1:0", HttpConfig { max_request_body_size: u32::MAX }).await.unwrap(); let mut say_hello = server.register_method("say_hello".to_string()).unwrap(); tx.send(*server.local_addr()).unwrap(); loop { diff --git a/examples/http.rs b/examples/http.rs index 4eb4b3708c..1f94360ef2 100644 --- a/examples/http.rs +++ b/examples/http.rs @@ -52,8 +52,14 @@ async fn main() -> Result<(), Box> { } async fn run_server(server_started_tx: Sender<()>, url: &str) { - let server = HttpServer::new(url).await.unwrap(); - let mut say_hello = server.register_method("say_hello".to_string()).unwrap(); + let server = HttpServer::new(url, HttpConfig::default()).await.unwrap(); + let mut say_hello = server + .register_method( + "say_ + hello" + .to_string(), + ) + .unwrap(); server_started_tx.send(()).unwrap(); loop { diff --git a/src/client/http/client.rs b/src/client/http/client.rs index 3874640b1d..584c4d94c7 100644 --- a/src/client/http/client.rs +++ b/src/client/http/client.rs @@ -1,18 +1,9 @@ use crate::client::http::transport::HttpTransportClient; use crate::types::client::{Error, Mismatch}; +use crate::types::http::HttpConfig; use crate::types::jsonrpc::{self, JsonValue}; use std::sync::atomic::{AtomicU64, Ordering}; -/// Default maximum request body size (10 MB). -const DEFAULT_MAX_BODY_SIZE_TEN_MB: u32 = 10 * 1024 * 1024; - -/// HTTP configuration. -#[derive(Copy, Clone)] -pub struct HttpConfig { - /// Maximum request body size in bytes. - pub max_request_body_size: u32, -} - /// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications. /// /// WARNING: The async methods must be executed on [Tokio 0.2](https://docs.rs/tokio/0.2.22/tokio). @@ -23,19 +14,12 @@ pub struct HttpClient { request_id: AtomicU64, } -impl Default for HttpConfig { - fn default() -> Self { - Self { max_request_body_size: DEFAULT_MAX_BODY_SIZE_TEN_MB } - } -} - impl HttpClient { /// Initializes a new HTTP client. /// /// Fails when the URL is invalid. pub fn new(target: impl AsRef, config: HttpConfig) -> Result { - let transport = HttpTransportClient::new(target, config.max_request_body_size) - .map_err(|e| Error::TransportError(Box::new(e)))?; + let transport = HttpTransportClient::new(target, config).map_err(|e| Error::TransportError(Box::new(e)))?; Ok(Self { transport, request_id: AtomicU64::new(0) }) } diff --git a/src/client/http/mod.rs b/src/client/http/mod.rs index 31a7121236..f98e1c4209 100644 --- a/src/client/http/mod.rs +++ b/src/client/http/mod.rs @@ -4,5 +4,6 @@ mod transport; #[cfg(test)] mod tests; -pub use client::{HttpClient, HttpConfig}; +pub use crate::types::http::HttpConfig; +pub use client::HttpClient; pub use transport::HttpTransportClient; diff --git a/src/client/http/transport.rs b/src/client/http/transport.rs index a018a8695a..5d011fb6a5 100644 --- a/src/client/http/transport.rs +++ b/src/client/http/transport.rs @@ -6,7 +6,7 @@ // that we need to be guaranteed that hyper doesn't re-use an existing connection if we ever reset // the JSON-RPC request id to a value that might have already been used. -use crate::types::jsonrpc; +use crate::types::{http::HttpConfig, jsonrpc}; use futures::StreamExt; use thiserror::Error; @@ -20,15 +20,15 @@ pub struct HttpTransportClient { /// HTTP client, client: hyper::Client, /// Configurable max request body size - max_request_body_size: u32, + config: HttpConfig, } impl HttpTransportClient { /// Initializes a new HTTP client. - pub fn new(target: impl AsRef, max_request_body_size: u32) -> Result { + pub fn new(target: impl AsRef, config: HttpConfig) -> Result { let target = url::Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {}", e).into()))?; if target.scheme() == "http" { - Ok(HttpTransportClient { client: hyper::Client::new(), target, max_request_body_size }) + Ok(HttpTransportClient { client: hyper::Client::new(), target, config }) } else { return Err(Error::Url("URL scheme not supported, expects 'http'".into())); } @@ -39,7 +39,7 @@ impl HttpTransportClient { log::debug!("send: {}", jsonrpc::to_string(&request).expect("request valid JSON; qed")); let body = jsonrpc::to_vec(&request).map_err(|e| Error::Serialization(e))?; - if body.len() > self.max_request_body_size as usize { + if body.len() > self.config.max_request_body_size as usize { return Err(Error::RequestTooLarge); } @@ -73,7 +73,7 @@ impl HttpTransportClient { let body_size = read_content_length(response.headers()).unwrap_or(0); let mut body_fut: hyper::Body = response.into_body(); - if body_size > self.max_request_body_size { + if body_size > self.config.max_request_body_size { return Err(Error::RequestTooLarge); } @@ -81,7 +81,7 @@ impl HttpTransportClient { while let Some(chunk) = body_fut.next().await { let chunk = chunk.map_err(|e| Error::Http(Box::new(e)))?; - if chunk.len() + body.len() > self.max_request_body_size as usize { + if chunk.len() + body.len() > self.config.max_request_body_size as usize { return Err(Error::RequestTooLarge); } body.extend_from_slice(&chunk); @@ -151,19 +151,21 @@ pub enum Error { #[cfg(test)] mod tests { use super::{read_content_length, Error, HttpTransportClient}; + use crate::types::http::HttpConfig; use crate::types::jsonrpc::{Call, Id, MethodCall, Params, Request, Version}; #[test] fn invalid_http_url_rejected() { - let err = HttpTransportClient::new("ws://localhost:9933", 1337).unwrap_err(); + let err = HttpTransportClient::new("ws://localhost:9933", HttpConfig::default()).unwrap_err(); assert!(matches!(err, Error::Url(_))); } #[tokio::test] async fn request_limit_works() { let eighty_bytes_limit = 80; - let client = HttpTransportClient::new("http://localhost:9933", eighty_bytes_limit).unwrap(); - assert_eq!(client.max_request_body_size, eighty_bytes_limit); + let client = + HttpTransportClient::new("http://localhost:9933", HttpConfig { max_request_body_size: 80 }).unwrap(); + assert_eq!(client.config.max_request_body_size, eighty_bytes_limit); let request = Request::Single(Call::MethodCall(MethodCall { jsonrpc: Version::V2, diff --git a/src/http/raw/core.rs b/src/http/raw/core.rs index dd3b7e14fb..547d9be852 100644 --- a/src/http/raw/core.rs +++ b/src/http/raw/core.rs @@ -146,7 +146,7 @@ impl From for RawServer { impl<'a> RawServerRequest<'a> { /// Returns the id of the request. /// - /// If this request object is dropped, you can retreive it again later by calling + /// If this request object is dropped, you can retrieve it again later by calling /// [`request_by_id`](crate::raw::RawServer::request_by_id). pub fn id(&self) -> RawServerRequestId { RawServerRequestId { inner: self.inner.id() } diff --git a/src/http/raw/tests.rs b/src/http/raw/tests.rs index a33898e3ab..313e65274a 100644 --- a/src/http/raw/tests.rs +++ b/src/http/raw/tests.rs @@ -28,13 +28,14 @@ use crate::client::HttpTransportClient; use crate::http::{HttpRawServer, HttpRawServerEvent, HttpTransportServer}; +use crate::types::http::HttpConfig; use crate::types::jsonrpc::{self, Call, MethodCall, Notification, Params, Request, Version}; use serde_json::Value; async fn connection_context() -> (HttpTransportClient, HttpRawServer) { - let server = HttpTransportServer::new(&"127.0.0.1:0".parse().unwrap()).await.unwrap(); + let server = HttpTransportServer::new(&"127.0.0.1:0".parse().unwrap(), HttpConfig::default()).await.unwrap(); let uri = format!("http://{}", server.local_addr()); - let client = HttpTransportClient::new(&uri, 10 * 1024 * 1024).unwrap(); + let client = HttpTransportClient::new(&uri, HttpConfig::default()).unwrap(); (client, server.into()) } diff --git a/src/http/tests.rs b/src/http/tests.rs index 91b0d7debd..4193a32d99 100644 --- a/src/http/tests.rs +++ b/src/http/tests.rs @@ -1,6 +1,7 @@ #![cfg(test)] use crate::http::HttpServer; +use crate::types::http::HttpConfig; use crate::types::jsonrpc::JsonValue; use futures::channel::oneshot::{self, Sender}; use futures::future::FutureExt; @@ -10,7 +11,7 @@ use jsonrpsee_test_utils::types::{Id, StatusCode}; use std::net::SocketAddr; async fn server(server_started_tx: Sender) { - let server = HttpServer::new("127.0.0.1:0").await.unwrap(); + let server = HttpServer::new("127.0.0.1:0", HttpConfig::default()).await.unwrap(); let mut hello = server.register_method("say_hello".to_owned()).unwrap(); let mut add = server.register_method("add".to_owned()).unwrap(); let mut notif = server.register_notification("notif".to_owned(), false).unwrap(); diff --git a/src/http/transport/background.rs b/src/http/transport/background.rs index da33ddb4e7..d2bd5bda73 100644 --- a/src/http/transport/background.rs +++ b/src/http/transport/background.rs @@ -150,7 +150,10 @@ async fn process_request( // to prevent Cross-Origin XHRs with text/plain hyper::Method::POST if is_json(request.headers().get("content-type")) => { let json_body = match http::response_to_bytes(request, config).await { - Ok(body) => jsonrpc::from_slice(&body).unwrap(), + Ok(body) => match jsonrpc::from_slice(&body) { + Ok(response) => response, + Err(_e) => return response::parse_error(), + }, Err(e) => match (e.kind(), e.into_inner()) { (io::ErrorKind::InvalidData, _) => return response::parse_error(), (io::ErrorKind::UnexpectedEof, _) => return response::parse_error(), diff --git a/src/http/transport/mod.rs b/src/http/transport/mod.rs index 61ac1980d8..8481b5e1e9 100644 --- a/src/http/transport/mod.rs +++ b/src/http/transport/mod.rs @@ -69,7 +69,7 @@ pub struct HttpTransportServer { /// Next identifier to use when inserting an element in `requests`. next_request_id: u64, - /// The identifier is lineraly increasing and is never leaked on the wire or outside of this + /// The identifier is linearly increasing and is never leaked on the wire or outside of this /// module. Therefore there is no risk of hash collision and using a `FnvHashMap` is safe. requests: FnvHashMap>>, } @@ -154,7 +154,7 @@ impl HttpTransportServer { /// You can pass `None` in order to destroy the request object without sending back anything. /// /// The implementation blindly sends back the response and doesn't check whether there is any - /// correspondance with the request in terms of logic. For example, `respond` will accept + /// correspondence with the request in terms of logic. For example, `respond` will accept /// sending back a batch of six responses even if the original request was a single /// notification. /// @@ -202,14 +202,14 @@ impl HttpTransportServer { #[cfg(test)] mod tests { - use super::HttpTransportServer; + use super::{HttpConfig, HttpTransportServer}; #[test] fn error_if_port_occupied() { futures::executor::block_on(async move { let addr = "127.0.0.1:0".parse().unwrap(); - let server1 = HttpTransportServer::new(&addr).await.unwrap(); - assert!(HttpTransportServer::new(server1.local_addr()).await.is_err()); + let server1 = HttpTransportServer::new(&addr, HttpConfig::default()).await.unwrap(); + assert!(HttpTransportServer::new(server1.local_addr(), HttpConfig::default()).await.is_err()); }); } } diff --git a/src/types/http.rs b/src/types/http.rs new file mode 100644 index 0000000000..64e8394ede --- /dev/null +++ b/src/types/http.rs @@ -0,0 +1,99 @@ +//! Shared HTTP types + +use futures::StreamExt; +use std::io::{Error, ErrorKind}; + +/// Default maximum request body size (10 MB). +const DEFAULT_MAX_BODY_SIZE_TEN_MB: u32 = 10 * 1024 * 1024; + +/// HTTP configuration. +#[derive(Copy, Clone, Debug, PartialEq)] +pub struct HttpConfig { + /// Maximum request body size in bytes. + pub max_request_body_size: u32, +} + +impl Default for HttpConfig { + fn default() -> Self { + Self { max_request_body_size: DEFAULT_MAX_BODY_SIZE_TEN_MB } + } +} + +/// Read response body from a received request with configured `HTTP` settings such as `request_max_body_size`. +/// +// TODO: move somewhere else!!! +pub async fn response_to_bytes(response: hyper::Request, config: HttpConfig) -> Result, Error> { + let body_size = read_content_length(response.headers()).unwrap_or(0); + let mut body_fut: hyper::Body = response.into_body(); + + if body_size > config.max_request_body_size { + return Err(Error::new( + ErrorKind::Other, + format!("HTTP request body too large, got: {} max: {}", body_size, config.max_request_body_size), + )); + } + + let mut body = Vec::with_capacity(body_size as usize); + + while let Some(chunk) = body_fut.next().await { + let chunk = chunk.map_err(|e| Error::new(ErrorKind::Other, e.to_string()))?; + let body_length = chunk.len() + body.len(); + if body_length > config.max_request_body_size as usize { + return Err(Error::new( + ErrorKind::Other, + format!("HTTP request body too large, got: {} max: {}", body_length, config.max_request_body_size), + )); + } + body.extend_from_slice(&chunk); + } + Ok(body) +} + +// Read `content_length` from HTTP Header. +// +// Returns `Some(val)` if `content_length` contains exactly one value. +// None otherwise. +fn read_content_length(header: &hyper::header::HeaderMap) -> Option { + let values = header.get_all("content-length"); + let mut iter = values.iter(); + let content_length = iter.next()?; + if iter.next().is_some() { + return None; + } + + // HTTP Content-Length indicates number of bytes in decimal. + let length = content_length.to_str().ok()?; + u32::from_str_radix(length, 10).ok() +} + +#[cfg(test)] +mod tests { + use super::{read_content_length, response_to_bytes, HttpConfig}; + use crate::types::jsonrpc; + + #[tokio::test] + async fn body_to_request_works() { + let s = r#"[{"a":"hello"}]"#; + let expected: jsonrpc::Request = serde_json::from_str(s).unwrap(); + let body = hyper::Body::from(s.to_owned()); + let bytes = response_to_bytes(hyper::Request::new(body), HttpConfig::default()).await.unwrap(); + let req: jsonrpc::Request = serde_json::from_slice(&bytes).unwrap(); + assert_eq!(req, expected); + } + + #[tokio::test] + async fn body_to_bytes_size_limit_works() { + let body = hyper::Body::from(vec![0; 128]); + assert!(response_to_bytes(hyper::Request::new(body), HttpConfig { max_request_body_size: 127 }).await.is_err()); + } + + #[test] + fn read_content_length_works() { + let mut header = hyper::header::HeaderMap::new(); + header.insert(hyper::header::CONTENT_LENGTH, "177".parse().unwrap()); + assert_eq!(read_content_length(&header), Some(177)); + + header.append(hyper::header::CONTENT_LENGTH, "999".parse().unwrap()); + assert_eq!(read_content_length(&header), None); + } +} From c7c83c00e3d8085b42fd4878279ac9a17c994414 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 12 Nov 2020 22:48:19 +0100 Subject: [PATCH 25/26] fix(http example): formatting nit. --- examples/http.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/examples/http.rs b/examples/http.rs index 1f94360ef2..ee4938e6bb 100644 --- a/examples/http.rs +++ b/examples/http.rs @@ -53,14 +53,7 @@ async fn main() -> Result<(), Box> { async fn run_server(server_started_tx: Sender<()>, url: &str) { let server = HttpServer::new(url, HttpConfig::default()).await.unwrap(); - let mut say_hello = server - .register_method( - "say_ - hello" - .to_string(), - ) - .unwrap(); - + let mut say_hello = server.register_method("say_hello".to_string()).unwrap(); server_started_tx.send(()).unwrap(); loop { let r = say_hello.next().await; From f79e7f34e32ff811d44ac6b00894dd6349458e18 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 12 Nov 2020 22:51:35 +0100 Subject: [PATCH 26/26] refactor(http helpers): rename methods. --- src/http/transport/background.rs | 2 +- src/types/http.rs | 19 +++++++++---------- src/types/mod.rs | 1 + 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/http/transport/background.rs b/src/http/transport/background.rs index d2bd5bda73..899af4140e 100644 --- a/src/http/transport/background.rs +++ b/src/http/transport/background.rs @@ -149,7 +149,7 @@ async fn process_request( // Validate the ContentType header // to prevent Cross-Origin XHRs with text/plain hyper::Method::POST if is_json(request.headers().get("content-type")) => { - let json_body = match http::response_to_bytes(request, config).await { + let json_body = match http::read_http_body(request, config).await { Ok(body) => match jsonrpc::from_slice(&body) { Ok(response) => response, Err(_e) => return response::parse_error(), diff --git a/src/types/http.rs b/src/types/http.rs index 64e8394ede..e9e831a76e 100644 --- a/src/types/http.rs +++ b/src/types/http.rs @@ -19,11 +19,10 @@ impl Default for HttpConfig { } } -/// Read response body from a received request with configured `HTTP` settings such as `request_max_body_size`. -/// +/// Read response body from a received request with configured `HTTP` settings. // TODO: move somewhere else!!! -pub async fn response_to_bytes(response: hyper::Request, config: HttpConfig) -> Result, Error> { - let body_size = read_content_length(response.headers()).unwrap_or(0); +pub async fn read_http_body(response: hyper::Request, config: HttpConfig) -> Result, Error> { + let body_size = read_http_content_length(response.headers()).unwrap_or(0); let mut body_fut: hyper::Body = response.into_body(); if body_size > config.max_request_body_size { @@ -53,7 +52,7 @@ pub async fn response_to_bytes(response: hyper::Request, config: Ht // // Returns `Some(val)` if `content_length` contains exactly one value. // None otherwise. -fn read_content_length(header: &hyper::header::HeaderMap) -> Option { +fn read_http_content_length(header: &hyper::header::HeaderMap) -> Option { let values = header.get_all("content-length"); let mut iter = values.iter(); let content_length = iter.next()?; @@ -68,7 +67,7 @@ fn read_content_length(header: &hyper::header::HeaderMap) -> Option { #[cfg(test)] mod tests { - use super::{read_content_length, response_to_bytes, HttpConfig}; + use super::{read_http_body, read_http_content_length, HttpConfig}; use crate::types::jsonrpc; #[tokio::test] @@ -76,7 +75,7 @@ mod tests { let s = r#"[{"a":"hello"}]"#; let expected: jsonrpc::Request = serde_json::from_str(s).unwrap(); let body = hyper::Body::from(s.to_owned()); - let bytes = response_to_bytes(hyper::Request::new(body), HttpConfig::default()).await.unwrap(); + let bytes = read_http_body(hyper::Request::new(body), HttpConfig::default()).await.unwrap(); let req: jsonrpc::Request = serde_json::from_slice(&bytes).unwrap(); assert_eq!(req, expected); } @@ -84,16 +83,16 @@ mod tests { #[tokio::test] async fn body_to_bytes_size_limit_works() { let body = hyper::Body::from(vec![0; 128]); - assert!(response_to_bytes(hyper::Request::new(body), HttpConfig { max_request_body_size: 127 }).await.is_err()); + assert!(read_http_body(hyper::Request::new(body), HttpConfig { max_request_body_size: 127 }).await.is_err()); } #[test] fn read_content_length_works() { let mut header = hyper::header::HeaderMap::new(); header.insert(hyper::header::CONTENT_LENGTH, "177".parse().unwrap()); - assert_eq!(read_content_length(&header), Some(177)); + assert_eq!(read_http_content_length(&header), Some(177)); header.append(hyper::header::CONTENT_LENGTH, "999".parse().unwrap()); - assert_eq!(read_content_length(&header), None); + assert_eq!(read_http_content_length(&header), None); } } diff --git a/src/types/mod.rs b/src/types/mod.rs index d8a3a0568a..5c523583ce 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -8,4 +8,5 @@ pub mod client; pub mod server; /// Shared types for HTTP +#[cfg(feature = "http")] pub mod http;