Skip to content

Commit

Permalink
upgrade to hyper v1.0 (#1368)
Browse files Browse the repository at this point in the history
* upgrade to hyper v1.0

* some progress

* fix nits

* more cleanup

* Update client/http-client/Cargo.toml

* make all examples compile

* remove nits

* unify http request

* introduce BoxError

* more boxerror

* fix tests

* remove sync requirement

* fix nits

* use ring crypto backend

* more ring

* Update client/ws-client/src/tests.rs

* Update client/http-client/src/transport.rs

* Update server/src/middleware/http/proxy_get_request.rs

* Update server/src/transport/ws.rs

* simplify code

* address grumbles

* unify http body in client/server
  • Loading branch information
niklasad1 authored May 21, 2024
1 parent a453396 commit 257513e
Show file tree
Hide file tree
Showing 34 changed files with 885 additions and 491 deletions.
14 changes: 8 additions & 6 deletions client/http-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ publish = true

[dependencies]
async-trait = "0.1"
hyper = { version = "0.14.10", features = ["client", "http1", "http2", "tcp"] }
hyper-rustls = { version = "0.24", optional = true, default-features = false, features = ["http1", "http2", "tls12", "logging"] }
hyper = { version = "1.3", features = ["client", "http1", "http2"] }
hyper-rustls = { version = "0.27.1", optional = true, default-features = false, features = ["http1", "http2", "tls12", "logging"] }
hyper-util = { version = "0.1.1", features = ["client", "client-legacy"] }
http-body = "1"
jsonrpsee-types = { workspace = true }
jsonrpsee-core = { workspace = true, features = ["client", "http-helpers"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
serde_json = "1"
thiserror = "1"
tokio = { version = "1.16", features = ["time"] }
tracing = "0.1.34"
tower = { version = "0.4.13", features = ["util"] }
Expand All @@ -34,8 +36,8 @@ tokio = { version = "1.16", features = ["net", "rt-multi-thread", "macros"] }

[features]
default = ["native-tls"]
native-tls = ["hyper-rustls/native-tokio", "__tls"]
webpki-tls = ["hyper-rustls/webpki-tokio", "__tls"]
native-tls = ["hyper-rustls/native-tokio", "hyper-rustls/ring", "__tls"]
webpki-tls = ["hyper-rustls/webpki-tokio", "hyper-rustls/ring", "__tls"]

# Internal feature to indicate whether TLS is enabled.
# Does nothing on its own.
Expand Down
34 changes: 18 additions & 16 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,23 @@
// DEALINGS IN THE SOFTWARE.

use std::borrow::Cow as StdCow;
use std::error::Error as StdError;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;

use crate::transport::{self, Error as TransportError, HttpBackend, HttpTransportClient, HttpTransportClientBuilder};
use crate::transport::{self, Error as TransportError, HttpTransportClient, HttpTransportClientBuilder};
use crate::types::{NotificationSer, RequestSer, Response};
use crate::{HttpRequest, HttpResponse};
use async_trait::async_trait;
use hyper::body::HttpBody;
use hyper::body::Bytes;
use hyper::http::HeaderMap;
use hyper::Body;
use jsonrpsee_core::client::{
generate_batch_id_range, BatchResponse, CertificateStore, ClientT, Error, IdKind, RequestIdManager, Subscription,
SubscriptionClientT,
};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_core::{BoxError, JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::{ErrorObject, InvalidRequestId, ResponseSuccess, TwoPointZero};
use serde::de::DeserializeOwned;
use tower::layer::util::Identity;
Expand Down Expand Up @@ -189,10 +188,10 @@ impl<L> HttpClientBuilder<L> {
impl<B, S, L> HttpClientBuilder<L>
where
L: Layer<transport::HttpBackend, Service = S>,
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = TransportError> + Clone,
B: HttpBody + Send + 'static,
S: Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Clone,
B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Error: Into<BoxError>,
{
/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient<S>, Error> {
Expand All @@ -207,6 +206,7 @@ where
max_log_length,
service_builder,
tcp_no_delay,
..
} = self;

let transport = HttpTransportClientBuilder::new()
Expand Down Expand Up @@ -254,7 +254,7 @@ impl HttpClientBuilder<Identity> {

/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
#[derive(Debug, Clone)]
pub struct HttpClient<S = HttpBackend> {
pub struct HttpClient<S> {
/// HTTP transport client.
transport: HttpTransportClient<S>,
/// Request timeout. Defaults to 60sec.
Expand All @@ -265,17 +265,18 @@ pub struct HttpClient<S = HttpBackend> {

impl<S> HttpClient<S> {
/// Create a builder for the HttpClient.
pub fn builder() -> HttpClientBuilder {
pub fn builder() -> HttpClientBuilder<Identity> {
HttpClientBuilder::new()
}
}

#[async_trait]
impl<B, S> ClientT for HttpClient<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = TransportError> + Send + Sync + Clone,
<S as Service<hyper::Request<Body>>>::Future: Send,
B: HttpBody<Error = hyper::Error> + Send + 'static,
S: Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Send + Sync + Clone,
<S as Service<HttpRequest>>::Future: Send,
B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
B::Error: Into<BoxError>,
B::Data: Send,
{
#[instrument(name = "notification", skip(self, params), level = "trace")]
Expand Down Expand Up @@ -405,10 +406,11 @@ where
#[async_trait]
impl<B, S> SubscriptionClientT for HttpClient<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = TransportError> + Send + Sync + Clone,
<S as Service<hyper::Request<Body>>>::Future: Send,
B: HttpBody<Error = hyper::Error> + Send + 'static,
S: Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Send + Sync + Clone,
<S as Service<HttpRequest>>::Future: Send,
B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
/// Send a subscription request to the server. Not implemented for HTTP; will always return
/// [`Error::HttpNotImplemented`].
Expand Down
7 changes: 7 additions & 0 deletions client/http-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,10 @@ mod tests;
pub use client::{HttpClient, HttpClientBuilder};
pub use hyper::http::{HeaderMap, HeaderValue};
pub use jsonrpsee_types as types;

/// Default HTTP body for the client.
pub type HttpBody = jsonrpsee_core::http_helpers::Body;
/// HTTP request with default body.
pub type HttpRequest<T = HttpBody> = jsonrpsee_core::http_helpers::Request<T>;
/// HTTP response with default body.
pub type HttpResponse<T = HttpBody> = jsonrpsee_core::http_helpers::Response<T>;
60 changes: 33 additions & 27 deletions client/http-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@
// that we need to be guaranteed that hyper doesn't re-use an existing connection if we ever reset
// the JSON-RPC request id to a value that might have already been used.

use hyper::body::{Body, HttpBody};
use hyper::client::{Client, HttpConnector};
use hyper::body::Bytes;
use hyper::http::{HeaderMap, HeaderValue};
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use jsonrpsee_core::client::CertificateStore;
use jsonrpsee_core::tracing::client::{rx_log_from_bytes, tx_log_from_str};
use jsonrpsee_core::BoxError;
use jsonrpsee_core::{
http_helpers::{self, HttpError},
TEN_MB_SIZE_BYTES,
};
use std::error::Error as StdError;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand All @@ -24,19 +26,21 @@ use tower::layer::util::Identity;
use tower::{Layer, Service, ServiceExt};
use url::Url;

use crate::{HttpBody, HttpRequest, HttpResponse};

const CONTENT_TYPE_JSON: &str = "application/json";

/// Wrapper over HTTP transport and connector.
#[derive(Debug)]
pub enum HttpBackend<B = Body> {
pub enum HttpBackend<B = HttpBody> {
/// Hyper client with https connector.
#[cfg(feature = "__tls")]
Https(Client<hyper_rustls::HttpsConnector<HttpConnector>, B>),
/// Hyper client with http connector.
Http(Client<HttpConnector, B>),
}

impl Clone for HttpBackend {
impl<B> Clone for HttpBackend<B> {
fn clone(&self) -> Self {
match self {
Self::Http(inner) => Self::Http(inner.clone()),
Expand All @@ -46,13 +50,13 @@ impl Clone for HttpBackend {
}
}

impl<B> tower::Service<hyper::Request<B>> for HttpBackend<B>
impl<B> tower::Service<HttpRequest<B>> for HttpBackend<B>
where
B: HttpBody<Error = hyper::Error> + Send + 'static,
B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Error: Into<BoxError>,
{
type Response = hyper::Response<Body>;
type Response = HttpResponse<hyper::body::Incoming>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

Expand All @@ -62,17 +66,17 @@ where
#[cfg(feature = "__tls")]
Self::Https(inner) => inner.poll_ready(ctx),
}
.map_err(|e| Error::Http(e.into()))
.map_err(|e| Error::Http(HttpError::Stream(e.into())))
}

fn call(&mut self, req: hyper::Request<B>) -> Self::Future {
fn call(&mut self, req: HttpRequest<B>) -> Self::Future {
let resp = match self {
Self::Http(inner) => inner.call(req),
#[cfg(feature = "__tls")]
Self::Https(inner) => inner.call(req),
};

Box::pin(async move { resp.await.map_err(|e| Error::Http(e.into())) })
Box::pin(async move { resp.await.map_err(|e| Error::Http(HttpError::Stream(e.into()))) })
}
}

Expand Down Expand Up @@ -177,11 +181,11 @@ impl<L> HttpTransportClientBuilder<L> {
/// Build a [`HttpTransportClient`].
pub fn build<S, B>(self, target: impl AsRef<str>) -> Result<HttpTransportClient<S>, Error>
where
L: Layer<HttpBackend<Body>, Service = S>,
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = Error> + Clone,
B: HttpBody + Send + 'static,
L: Layer<HttpBackend, Service = S>,
S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Error: Into<BoxError>,
{
let Self {
certificate_store,
Expand All @@ -202,7 +206,7 @@ impl<L> HttpTransportClientBuilder<L> {
"http" => {
let mut connector = HttpConnector::new();
connector.set_nodelay(tcp_no_delay);
HttpBackend::Http(Client::builder().build(connector))
HttpBackend::Http(Client::builder(TokioExecutor::new()).build(connector))
}
#[cfg(feature = "__tls")]
"https" => {
Expand All @@ -214,9 +218,8 @@ impl<L> HttpTransportClientBuilder<L> {
#[cfg(feature = "native-tls")]
CertificateStore::Native => hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_all_versions()
.wrap_connector(http_conn),
.map(|c| c.https_or_http().enable_all_versions().wrap_connector(http_conn))
.map_err(|_| Error::InvalidCertficateStore)?,
#[cfg(feature = "webpki-tls")]
CertificateStore::WebPki => hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
Expand All @@ -226,7 +229,7 @@ impl<L> HttpTransportClientBuilder<L> {
_ => return Err(Error::InvalidCertficateStore),
};

HttpBackend::Https(Client::builder().build::<_, hyper::Body>(https_conn))
HttpBackend::Https(Client::builder(TokioExecutor::new()).build(https_conn))
}
_ => {
#[cfg(feature = "__tls")]
Expand Down Expand Up @@ -281,20 +284,22 @@ pub struct HttpTransportClient<S> {

impl<B, S> HttpTransportClient<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = Error> + Clone,
B: HttpBody<Error = hyper::Error> + Send + 'static,
S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
async fn inner_send(&self, body: String) -> Result<hyper::Response<B>, Error> {
async fn inner_send(&self, body: String) -> Result<HttpResponse<B>, Error> {
if body.len() > self.max_request_size as usize {
return Err(Error::RequestTooLarge);
}

let mut req = hyper::Request::post(&self.target);
let mut req = HttpRequest::post(&self.target);
if let Some(headers) = req.headers_mut() {
*headers = self.headers.clone();
}
let req = req.body(From::from(body)).expect("URI and request headers are valid; qed");

let req = req.body(body.into()).expect("URI and request headers are valid; qed");
let response = self.client.clone().ready().await?.call(req).await?;

if response.status().is_success() {
Expand All @@ -310,7 +315,8 @@ where

let response = self.inner_send(body).await?;
let (parts, body) = response.into_parts();
let (body, _) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?;

let (body, _is_single) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?;

rx_log_from_bytes(&body, self.max_log_length);

Expand Down
6 changes: 3 additions & 3 deletions client/transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ tracing = "0.1.34"
thiserror = { version = "1", optional = true }
futures-channel = { version = "0.3.14", default-features = false, optional = true }
futures-util = { version = "0.3.14", default-features = false, features = ["alloc"], optional = true }
http = { version = "0.2", optional = true }
http = { version = "1", optional = true }
tokio-util = { version = "0.7", features = ["compat"], optional = true }
tokio = { version = "1.16", features = ["net", "time", "macros"], optional = true }
pin-project = { version = "1", optional = true }
Expand All @@ -30,11 +30,11 @@ url = { version = "2.4.0", optional = true }
# tls
rustls-native-certs = { version = "0.7", optional = true }
webpki-roots = { version = "0.26", optional = true }
tokio-rustls = { version = "0.25", optional = true }
tokio-rustls = { version = "0.26", default-features = false, optional = true, features = ["logging", "tls12", "ring"] }
rustls-pki-types = { version = "1", optional = true }

# ws
soketto = { version = "0.7.1", optional = true }
soketto = { version = "0.8", optional = true }

# web-sys
gloo-net = { version = "0.5.0", default-features = false, features = ["json", "websocket"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion client/ws-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme.workspace = true
publish = true

[dependencies]
http = "0.2.0"
http = "1"
jsonrpsee-types = { workspace = true }
jsonrpsee-client-transport = { workspace = true, features = ["ws"] }
jsonrpsee-core = { workspace = true, features = ["async-client"] }
Expand Down
3 changes: 2 additions & 1 deletion client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ async fn redirections() {
.unwrap();

let server_url = format!("ws://{}", server.local_addr());
let redirect_url = jsonrpsee_test_utils::mocks::ws_server_with_redirect(server_url);
let redirect_url =
jsonrpsee_test_utils::mocks::ws_server_with_redirect(server_url).with_default_timeout().await.unwrap();

// The client will first connect to a server that only performs re-directions and finally
// redirect to another server to complete the handshake.
Expand Down
19 changes: 7 additions & 12 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ tracing = "0.1.34"

# optional deps
futures-util = { version = "0.3.14", default-features = false, optional = true }
hyper = { version = "0.14.10", default-features = false, features = ["stream"], optional = true }
http = { version = "1.1", default-features = false, optional = true }
bytes = { version = "1.6", optional = true }
http-body = { version = "1", optional = true }
http-body-util = { version = "0.1.1", optional = true }
rustc-hash = { version = "1", optional = true }
rand = { version = "0.8", optional = true }
parking_lot = { version = "0.12", optional = true }
Expand All @@ -37,17 +40,8 @@ pin-project = { version = "1", optional = true }

[features]
default = []
http-helpers = ["hyper", "futures-util"]
server = [
"futures-util/alloc",
"rustc-hash/std",
"parking_lot",
"rand",
"tokio/rt",
"tokio/sync",
"tokio/macros",
"tokio/time",
]
http-helpers = ["bytes", "futures-util", "http-body", "http-body-util", "http"]
server = ["futures-util/alloc", "rustc-hash/std", "parking_lot", "rand", "tokio/rt", "tokio/sync", "tokio/macros", "tokio/time"]
client = ["futures-util/sink", "tokio/sync"]
async-client = [
"client",
Expand Down Expand Up @@ -75,6 +69,7 @@ async-wasm-client = [
serde_json = "1.0"
tokio = { version = "1.16", features = ["macros", "rt"] }
jsonrpsee = { path = "../jsonrpsee", features = ["server", "macros"] }
http-body-util = "0.1.1"

[package.metadata.docs.rs]
all-features = true
Expand Down
Loading

0 comments on commit 257513e

Please sign in to comment.