Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[http server]: configurable max request body size #159

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d637b8d
experimental
niklasad1 Nov 5, 2020
3581cce
ci(benches): sync and concurrent roundtrips
niklasad1 Nov 6, 2020
b278010
ci(benches): sync and concurrent roundtrips
niklasad1 Nov 6, 2020
10cfe8c
fix(nits)
niklasad1 Nov 6, 2020
cf12a2f
feat(http client): limit max request body size
niklasad1 Nov 6, 2020
0b569f7
test(http transport): request limit test
niklasad1 Nov 9, 2020
3e2827d
test(http client): add tests.
niklasad1 Nov 9, 2020
a453d99
fix typo
niklasad1 Nov 9, 2020
1740d2e
fix(benches): make it compile again.
niklasad1 Nov 10, 2020
dd72e6a
fix(ws example): revert unintentional change.
niklasad1 Nov 10, 2020
a904bc2
test(http client): subscription response on call.
niklasad1 Nov 10, 2020
5d06d5d
fix(cleanup)
niklasad1 Nov 10, 2020
6a43545
fix(benches): make it compile again.
niklasad1 Nov 10, 2020
a6d9493
Update src/client/http/transport.rs
niklasad1 Nov 11, 2020
c1ebc08
fix(http client): `&str` -> `AsRef<str>`
niklasad1 Nov 11, 2020
a3ae6ea
docs(client types): better docs for Mismatch type.
niklasad1 Nov 11, 2020
be70049
style: `Default::default` -> `HttpConfig::default`
niklasad1 Nov 11, 2020
19c6c9c
fix(http client): read body size from header.
niklasad1 Nov 11, 2020
ebc6a35
rewrite
niklasad1 Nov 12, 2020
58bb1cc
test(raw http): enable tests to works again.
niklasad1 Nov 12, 2020
d4c0bcc
style: cargo fmt
niklasad1 Nov 12, 2020
77a9fe5
benches: address grumbles
niklasad1 Nov 12, 2020
8e57f23
feat(http server): configurable request body limit
niklasad1 Nov 12, 2020
03c61b3
Merge branch 'v2-http-client-syncronous-call-structure' into v2-http-…
niklasad1 Nov 12, 2020
2f4d656
feat(http server): configurable max body limit.
niklasad1 Nov 12, 2020
c7c83c0
fix(http example): formatting nit.
niklasad1 Nov 12, 2020
f79e7f3
refactor(http helpers): rename methods.
niklasad1 Nov 12, 2020
7770d98
Merge remote-tracking branch 'origin/v2' into v2-http-server-max-requ…
niklasad1 Nov 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn concurrent_tasks() -> Vec<usize> {
}

async fn http_server(tx: Sender<SocketAddr>) {
let server = HttpServer::new("127.0.0.1:0").await.unwrap();
let server = HttpServer::new("127.0.0.1:0", HttpConfig { max_request_body_size: u32::MAX }).await.unwrap();
let mut say_hello = server.register_method("say_hello".to_string()).unwrap();
tx.send(*server.local_addr()).unwrap();
loop {
Expand Down
3 changes: 1 addition & 2 deletions examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

async fn run_server(server_started_tx: Sender<()>, url: &str) {
let server = HttpServer::new(url).await.unwrap();
let server = HttpServer::new(url, HttpConfig::default()).await.unwrap();
let mut say_hello = server.register_method("say_hello".to_string()).unwrap();

server_started_tx.send(()).unwrap();
loop {
let r = say_hello.next().await;
Expand Down
20 changes: 2 additions & 18 deletions src/client/http/client.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
use crate::client::http::transport::HttpTransportClient;
use crate::types::client::{Error, Mismatch};
use crate::types::http::HttpConfig;
use crate::types::jsonrpc::{self, JsonValue};
use std::sync::atomic::{AtomicU64, Ordering};

/// Default maximum request body size (10 MB).
const DEFAULT_MAX_BODY_SIZE_TEN_MB: u32 = 10 * 1024 * 1024;

/// HTTP configuration.
#[derive(Copy, Clone)]
pub struct HttpConfig {
/// Maximum request body size in bytes.
pub max_request_body_size: u32,
}

/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
///
/// WARNING: The async methods must be executed on [Tokio 0.2](https://docs.rs/tokio/0.2.22/tokio).
Expand All @@ -23,19 +14,12 @@ pub struct HttpClient {
request_id: AtomicU64,
}

impl Default for HttpConfig {
fn default() -> Self {
Self { max_request_body_size: DEFAULT_MAX_BODY_SIZE_TEN_MB }
}
}

impl HttpClient {
/// Initializes a new HTTP client.
///
/// Fails when the URL is invalid.
pub fn new(target: impl AsRef<str>, config: HttpConfig) -> Result<Self, Error> {
let transport = HttpTransportClient::new(target, config.max_request_body_size)
.map_err(|e| Error::TransportError(Box::new(e)))?;
let transport = HttpTransportClient::new(target, config).map_err(|e| Error::TransportError(Box::new(e)))?;
Ok(Self { transport, request_id: AtomicU64::new(0) })
}

Expand Down
3 changes: 2 additions & 1 deletion src/client/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ mod transport;
#[cfg(test)]
mod tests;

pub use client::{HttpClient, HttpConfig};
pub use crate::types::http::HttpConfig;
pub use client::HttpClient;
pub use transport::HttpTransportClient;
25 changes: 13 additions & 12 deletions src/client/http/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
// that we need to be guaranteed that hyper doesn't re-use an existing connection if we ever reset
// the JSON-RPC request id to a value that might have already been used.

use crate::types::jsonrpc;
use crate::types::{http::HttpConfig, jsonrpc};
use futures::StreamExt;
use thiserror::Error;

const CONTENT_TYPE_JSON: &str = "application/json";

Expand All @@ -20,15 +19,15 @@ pub struct HttpTransportClient {
/// HTTP client,
client: hyper::Client<hyper::client::HttpConnector>,
/// Configurable max request body size
max_request_body_size: u32,
config: HttpConfig,
}

impl HttpTransportClient {
/// Initializes a new HTTP client.
pub fn new(target: impl AsRef<str>, max_request_body_size: u32) -> Result<Self, Error> {
pub fn new(target: impl AsRef<str>, config: HttpConfig) -> Result<Self, Error> {
let target = url::Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {}", e)))?;
if target.scheme() == "http" {
Ok(HttpTransportClient { client: hyper::Client::new(), target, max_request_body_size })
Ok(HttpTransportClient { client: hyper::Client::new(), target, config })
} else {
Err(Error::Url("URL scheme not supported, expects 'http'".into()))
}
Expand All @@ -39,7 +38,7 @@ impl HttpTransportClient {
let body = jsonrpc::to_vec(&request).map_err(Error::Serialization)?;
log::debug!("send: {}", request);

if body.len() > self.max_request_body_size as usize {
if body.len() > self.config.max_request_body_size as usize {
return Err(Error::RequestTooLarge);
}

Expand Down Expand Up @@ -73,15 +72,15 @@ impl HttpTransportClient {
let body_size = read_content_length(response.headers()).unwrap_or(0);
let mut body_fut: hyper::Body = response.into_body();

if body_size > self.max_request_body_size {
if body_size > self.config.max_request_body_size {
return Err(Error::RequestTooLarge);
}

let mut body = Vec::with_capacity(body_size as usize);

while let Some(chunk) = body_fut.next().await {
let chunk = chunk.map_err(|e| Error::Http(Box::new(e)))?;
if chunk.len() + body.len() > self.max_request_body_size as usize {
if chunk.len() + body.len() > self.config.max_request_body_size as usize {
return Err(Error::RequestTooLarge);
}
body.extend_from_slice(&chunk);
Expand Down Expand Up @@ -113,7 +112,7 @@ fn read_content_length(header: &hyper::header::HeaderMap) -> Option<u32> {
}

/// Error that can happen during a request.
#[derive(Debug, Error)]
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Invalid URL.
#[error("Invalid Url: {0}")]
Expand Down Expand Up @@ -151,19 +150,21 @@ pub enum Error {
#[cfg(test)]
mod tests {
use super::{read_content_length, Error, HttpTransportClient};
use crate::types::http::HttpConfig;
use crate::types::jsonrpc::{Call, Id, MethodCall, Params, Request, Version};

#[test]
fn invalid_http_url_rejected() {
let err = HttpTransportClient::new("ws://localhost:9933", 1337).unwrap_err();
let err = HttpTransportClient::new("ws://localhost:9933", HttpConfig::default()).unwrap_err();
assert!(matches!(err, Error::Url(_)));
}

#[tokio::test]
async fn request_limit_works() {
let eighty_bytes_limit = 80;
let client = HttpTransportClient::new("http://localhost:9933", eighty_bytes_limit).unwrap();
assert_eq!(client.max_request_body_size, eighty_bytes_limit);
let client =
HttpTransportClient::new("http://localhost:9933", HttpConfig { max_request_body_size: 80 }).unwrap();
assert_eq!(client.config.max_request_body_size, eighty_bytes_limit);

let request = Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
Expand Down
2 changes: 1 addition & 1 deletion src/http/raw/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl From<HttpTransportServer> for RawServer {
impl<'a> RawServerRequest<'a> {
/// Returns the id of the request.
///
/// If this request object is dropped, you can retreive it again later by calling
/// If this request object is dropped, you can retrieve it again later by calling
/// [`request_by_id`](crate::raw::RawServer::request_by_id).
pub fn id(&self) -> RawServerRequestId {
RawServerRequestId { inner: self.inner.id() }
Expand Down
5 changes: 3 additions & 2 deletions src/http/raw/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@

use crate::client::HttpTransportClient;
use crate::http::{HttpRawServer, HttpRawServerEvent, HttpTransportServer};
use crate::types::http::HttpConfig;
use crate::types::jsonrpc::{self, Call, MethodCall, Notification, Params, Request, Version};
use serde_json::Value;

async fn connection_context() -> (HttpTransportClient, HttpRawServer) {
let server = HttpTransportServer::new(&"127.0.0.1:0".parse().unwrap()).await.unwrap();
let server = HttpTransportServer::new(&"127.0.0.1:0".parse().unwrap(), HttpConfig::default()).await.unwrap();
let uri = format!("http://{}", server.local_addr());
let client = HttpTransportClient::new(&uri, 10 * 1024 * 1024).unwrap();
let client = HttpTransportClient::new(&uri, HttpConfig::default()).unwrap();
(client, server.into())
}

Expand Down
5 changes: 3 additions & 2 deletions src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

use crate::http::raw::{RawServer, RawServerEvent, RawServerRequestId};
use crate::http::transport::HttpTransportServer;
use crate::types::http::HttpConfig;
use crate::types::jsonrpc::{self, JsonValue};
use crate::types::server::Error;

Expand Down Expand Up @@ -111,9 +112,9 @@ enum FrontToBack {

impl Server {
/// Initializes a new server based upon this raw server.
pub async fn new(url: &str) -> Result<Self, Box<dyn error::Error + Send + Sync>> {
pub async fn new(url: &str, config: HttpConfig) -> Result<Self, Box<dyn error::Error + Send + Sync>> {
let sockaddr = url.parse()?;
let transport_server = HttpTransportServer::new(&sockaddr).await?;
let transport_server = HttpTransportServer::new(&sockaddr, config).await?;
let local_addr = *transport_server.local_addr();

// We use an unbounded channel because the only exchanged messages concern registering
Expand Down
3 changes: 2 additions & 1 deletion src/http/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![cfg(test)]

use crate::http::HttpServer;
use crate::types::http::HttpConfig;
use crate::types::jsonrpc::JsonValue;
use futures::channel::oneshot::{self, Sender};
use futures::future::FutureExt;
Expand All @@ -10,7 +11,7 @@ use jsonrpsee_test_utils::types::{Id, StatusCode};
use std::net::SocketAddr;

async fn server(server_started_tx: Sender<SocketAddr>) {
let server = HttpServer::new("127.0.0.1:0").await.unwrap();
let server = HttpServer::new("127.0.0.1:0", HttpConfig::default()).await.unwrap();
let mut hello = server.register_method("say_hello".to_owned()).unwrap();
let mut add = server.register_method("add".to_owned()).unwrap();
let mut notif = server.register_notification("notif".to_owned(), false).unwrap();
Expand Down
85 changes: 18 additions & 67 deletions src/http/transport/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@

use crate::http::server_utils::access_control::AccessControl;
use crate::http::transport::response;
use crate::types::jsonrpc;
use crate::types::{
http::{self, HttpConfig},
jsonrpc,
};
use futures::{channel::mpsc, channel::oneshot, prelude::*};
use hyper::service::{make_service_fn, service_fn};
use hyper::Error;
Expand All @@ -52,13 +55,17 @@ impl BackgroundHttp {
///
/// In addition to `Self`, also returns the local address the server ends up listening on,
/// which might be different than the one passed as parameter.
pub async fn bind(addr: &SocketAddr) -> Result<(BackgroundHttp, SocketAddr), Box<dyn error::Error + Send + Sync>> {
Self::bind_with_acl(addr, AccessControl::default()).await
pub async fn bind(
addr: &SocketAddr,
config: HttpConfig,
) -> Result<(BackgroundHttp, SocketAddr), Box<dyn error::Error + Send + Sync>> {
Self::bind_with_acl(addr, AccessControl::default(), config).await
}

pub async fn bind_with_acl(
addr: &SocketAddr,
access_control: AccessControl,
config: HttpConfig,
) -> Result<(BackgroundHttp, SocketAddr), Box<dyn error::Error + Send + Sync>> {
let (tx, rx) = mpsc::channel(4);

Expand All @@ -69,7 +76,7 @@ impl BackgroundHttp {
Ok::<_, Error>(service_fn(move |req| {
let mut tx = tx.clone();
let access_control = access_control.clone();
async move { Ok::<_, Error>(process_request(req, &mut tx, &access_control).await) }
async move { Ok::<_, Error>(process_request(req, &mut tx, &access_control, config).await) }
}))
}
});
Expand Down Expand Up @@ -124,6 +131,7 @@ async fn process_request(
request: hyper::Request<hyper::Body>,
fg_process_tx: &mut mpsc::Sender<Request>,
access_control: &AccessControl,
config: HttpConfig,
) -> hyper::Response<hyper::Body> {
// Process access control
if access_control.deny_host(&request) {
Expand All @@ -136,18 +144,16 @@ async fn process_request(
return response::invalid_allow_headers();
}

/*
// Read metadata
let metadata = self.jsonrpc_handler.extractor.read_metadata(&request);
*/

// Proceed
match *request.method() {
// Validate the ContentType header
// to prevent Cross-Origin XHRs with text/plain
hyper::Method::POST if is_json(request.headers().get("content-type")) => {
let request = match body_to_request(request.into_body()).await {
Ok(b) => b,
let request = match http::read_http_body(request, config).await {
Ok(body) => match jsonrpc::from_slice(&body) {
Ok(response) => response,
Err(_e) => return response::parse_error(),
},
Err(e) => match (e.kind(), e.into_inner()) {
(io::ErrorKind::InvalidData, _) => return response::parse_error(),
(io::ErrorKind::UnexpectedEof, _) => return response::parse_error(),
Expand All @@ -170,6 +176,7 @@ async fn process_request(
Err(_) => return response::internal_error("JSON request send back channel has shut down"),
}
}
// Disallow other methods.
_ => response::method_not_allowed(),
}
}
Expand All @@ -187,59 +194,3 @@ fn is_json(content_type: Option<&hyper::header::HeaderValue>) -> bool {
_ => false,
}
}

/// Converts a `hyper` body into a structured JSON object.
///
/// Enforces a size limit on the body.
async fn body_to_request(mut body: hyper::Body) -> Result<jsonrpc::Request, io::Error> {
let mut json_body = Vec::new();
while let Some(chunk) = body.next().await {
let chunk = match chunk {
Ok(c) => c,
Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err.to_string())), // TODO:
};
json_body.extend_from_slice(&chunk);
if json_body.len() >= 16384 {
// TODO: some limit
return Err(io::Error::new(io::ErrorKind::Other, "request too large"));
}
}

Ok(serde_json::from_slice(&json_body)?)
}

#[cfg(test)]
mod tests {
use super::body_to_request;

#[test]
fn body_to_request_works() {
let s = r#"[{"a":"hello"}]"#;
let expected: super::jsonrpc::Request = serde_json::from_str(s).unwrap();
let req = futures::executor::block_on(async move {
let body = hyper::Body::from(s);
body_to_request(body).await.unwrap()
});
assert_eq!(req, expected);
}

#[test]
fn body_to_request_size_limit_json() {
let huge_body =
serde_json::to_vec(&(0..32768).map(|_| serde_json::Value::from("test")).collect::<Vec<_>>()).unwrap();

futures::executor::block_on(async move {
let body = hyper::Body::from(huge_body);
assert!(body_to_request(body).await.is_err());
});
}

#[test]
fn body_to_request_size_limit_garbage() {
let huge_body = (0..100_000).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
futures::executor::block_on(async move {
let body = hyper::Body::from(huge_body);
assert!(body_to_request(body).await.is_err());
});
}
}
Loading