Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade to hyper v1.0 #1368

Merged
merged 23 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions client/http-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ publish = true

[dependencies]
async-trait = "0.1"
hyper = { version = "0.14.10", features = ["client", "http1", "http2", "tcp"] }
hyper-rustls = { version = "0.24", optional = true, default-features = false, features = ["http1", "http2", "tls12", "logging"] }
hyper = { version = "1.3", features = ["client", "http1", "http2"] }
hyper-rustls = { version = "0.27.1", optional = true, default-features = false, features = ["http1", "http2", "tls12", "logging"] }
hyper-util = { version = "0.1.1", features = ["client", "client-legacy"] }
http-body = "1"
http-body-util = "0.1.1"
jsonrpsee-types = { workspace = true }
jsonrpsee-core = { workspace = true, features = ["client", "http-helpers"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
serde_json = "1"
thiserror = "1"
tokio = { version = "1.16", features = ["time"] }
tracing = "0.1.34"
tower = { version = "0.4.13", features = ["util"] }
Expand All @@ -34,8 +37,8 @@ tokio = { version = "1.16", features = ["net", "rt-multi-thread", "macros"] }

[features]
default = ["native-tls"]
native-tls = ["hyper-rustls/native-tokio", "__tls"]
webpki-tls = ["hyper-rustls/webpki-tokio", "__tls"]
native-tls = ["hyper-rustls/native-tokio", "hyper-rustls/ring", "__tls"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's discuss whether we should support both crypto backends.

Rustls now supports both ring and aws-lc-rs but migrating to aws-lc-rs requires more dependencies such as cmake, NASM etc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with not exposing it initially, and then as a non-breaking change we can always expose it later or whatever if there's demand :)

webpki-tls = ["hyper-rustls/webpki-tokio", "hyper-rustls/ring", "__tls"]

# Internal feature to indicate whether TLS is enabled.
# Does nothing on its own.
Expand Down
34 changes: 18 additions & 16 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,23 @@
// DEALINGS IN THE SOFTWARE.

use std::borrow::Cow as StdCow;
use std::error::Error as StdError;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;

use crate::transport::{self, Error as TransportError, HttpBackend, HttpTransportClient, HttpTransportClientBuilder};
use crate::transport::{self, Error as TransportError, HttpTransportClient, HttpTransportClientBuilder};
use crate::types::{NotificationSer, RequestSer, Response};
use crate::{HttpRequest, HttpResponse};
use async_trait::async_trait;
use hyper::body::HttpBody;
use hyper::body::Bytes;
use hyper::http::HeaderMap;
use hyper::Body;
use jsonrpsee_core::client::{
generate_batch_id_range, BatchResponse, CertificateStore, ClientT, Error, IdKind, RequestIdManager, Subscription,
SubscriptionClientT,
};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_core::{BoxError, JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::{ErrorObject, InvalidRequestId, ResponseSuccess, TwoPointZero};
use serde::de::DeserializeOwned;
use tower::layer::util::Identity;
Expand Down Expand Up @@ -189,10 +188,10 @@ impl<L> HttpClientBuilder<L> {
impl<B, S, L> HttpClientBuilder<L>
where
L: Layer<transport::HttpBackend, Service = S>,
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = TransportError> + Clone,
B: HttpBody + Send + 'static,
S: Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Clone,
B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Error: Into<BoxError>,
{
/// Build the HTTP client with target to connect to.
pub fn build(self, target: impl AsRef<str>) -> Result<HttpClient<S>, Error> {
Expand All @@ -207,6 +206,7 @@ where
max_log_length,
service_builder,
tcp_no_delay,
..
} = self;

let transport = HttpTransportClientBuilder::new()
Expand Down Expand Up @@ -254,7 +254,7 @@ impl HttpClientBuilder<Identity> {

/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
#[derive(Debug, Clone)]
pub struct HttpClient<S = HttpBackend> {
pub struct HttpClient<S> {
/// HTTP transport client.
transport: HttpTransportClient<S>,
/// Request timeout. Defaults to 60sec.
Expand All @@ -265,17 +265,18 @@ pub struct HttpClient<S = HttpBackend> {

impl<S> HttpClient<S> {
/// Create a builder for the HttpClient.
pub fn builder() -> HttpClientBuilder {
pub fn builder() -> HttpClientBuilder<Identity> {
HttpClientBuilder::new()
}
}

#[async_trait]
impl<B, S> ClientT for HttpClient<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = TransportError> + Send + Sync + Clone,
<S as Service<hyper::Request<Body>>>::Future: Send,
B: HttpBody<Error = hyper::Error> + Send + 'static,
S: Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Send + Sync + Clone,
<S as Service<HttpRequest>>::Future: Send,
B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
B::Error: Into<BoxError>,
B::Data: Send,
{
#[instrument(name = "notification", skip(self, params), level = "trace")]
Expand Down Expand Up @@ -405,10 +406,11 @@ where
#[async_trait]
impl<B, S> SubscriptionClientT for HttpClient<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = TransportError> + Send + Sync + Clone,
<S as Service<hyper::Request<Body>>>::Future: Send,
B: HttpBody<Error = hyper::Error> + Send + 'static,
S: Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Send + Sync + Clone,
<S as Service<HttpRequest>>::Future: Send,
B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
/// Send a subscription request to the server. Not implemented for HTTP; will always return
/// [`Error::HttpNotImplemented`].
Expand Down
7 changes: 7 additions & 0 deletions client/http-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,10 @@ mod tests;
pub use client::{HttpClient, HttpClientBuilder};
pub use hyper::http::{HeaderMap, HeaderValue};
pub use jsonrpsee_types as types;

/// Default HTTP body for the client.
pub type HttpBody = http_body_util::Full<hyper::body::Bytes>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this also be jsonrpsee_core::http_helpers::ResponseBody, since the below two things reference that?

(maybe rename RepsonseBody to HttpBody though?)

/// HTTP request with default body.
pub type HttpRequest<T = HttpBody> = jsonrpsee_core::http_helpers::Request<T>;
/// HTTP response with default body.
pub type HttpResponse<T = HttpBody> = jsonrpsee_core::http_helpers::Response<T>;
60 changes: 33 additions & 27 deletions client/http-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@
// that we need to be guaranteed that hyper doesn't re-use an existing connection if we ever reset
// the JSON-RPC request id to a value that might have already been used.

use hyper::body::{Body, HttpBody};
use hyper::client::{Client, HttpConnector};
use hyper::body::Bytes;
use hyper::http::{HeaderMap, HeaderValue};
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use jsonrpsee_core::client::CertificateStore;
use jsonrpsee_core::tracing::client::{rx_log_from_bytes, tx_log_from_str};
use jsonrpsee_core::BoxError;
use jsonrpsee_core::{
http_helpers::{self, HttpError},
TEN_MB_SIZE_BYTES,
};
use std::error::Error as StdError;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand All @@ -24,19 +26,21 @@ use tower::layer::util::Identity;
use tower::{Layer, Service, ServiceExt};
use url::Url;

use crate::{HttpBody, HttpRequest, HttpResponse};

const CONTENT_TYPE_JSON: &str = "application/json";

/// Wrapper over HTTP transport and connector.
#[derive(Debug)]
pub enum HttpBackend<B = Body> {
pub enum HttpBackend<B = HttpBody> {
/// Hyper client with https connector.
#[cfg(feature = "__tls")]
Https(Client<hyper_rustls::HttpsConnector<HttpConnector>, B>),
/// Hyper client with http connector.
Http(Client<HttpConnector, B>),
}

impl Clone for HttpBackend {
impl<B> Clone for HttpBackend<B> {
fn clone(&self) -> Self {
match self {
Self::Http(inner) => Self::Http(inner.clone()),
Expand All @@ -46,13 +50,13 @@ impl Clone for HttpBackend {
}
}

impl<B> tower::Service<hyper::Request<B>> for HttpBackend<B>
impl<B> tower::Service<HttpRequest<B>> for HttpBackend<B>
where
B: HttpBody<Error = hyper::Error> + Send + 'static,
B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Error: Into<BoxError>,
{
type Response = hyper::Response<Body>;
type Response = HttpResponse<hyper::body::Incoming>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

Expand All @@ -62,17 +66,17 @@ where
#[cfg(feature = "__tls")]
Self::Https(inner) => inner.poll_ready(ctx),
}
.map_err(|e| Error::Http(e.into()))
.map_err(|e| Error::Http(HttpError::Stream(e.into())))
}

fn call(&mut self, req: hyper::Request<B>) -> Self::Future {
fn call(&mut self, req: HttpRequest<B>) -> Self::Future {
let resp = match self {
Self::Http(inner) => inner.call(req),
#[cfg(feature = "__tls")]
Self::Https(inner) => inner.call(req),
};

Box::pin(async move { resp.await.map_err(|e| Error::Http(e.into())) })
Box::pin(async move { resp.await.map_err(|e| Error::Http(HttpError::Stream(e.into()))) })
}
}

Expand Down Expand Up @@ -177,11 +181,11 @@ impl<L> HttpTransportClientBuilder<L> {
/// Build a [`HttpTransportClient`].
pub fn build<S, B>(self, target: impl AsRef<str>) -> Result<HttpTransportClient<S>, Error>
where
L: Layer<HttpBackend<Body>, Service = S>,
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = Error> + Clone,
B: HttpBody + Send + 'static,
L: Layer<HttpBackend, Service = S>,
S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Error: Into<BoxError>,
{
let Self {
certificate_store,
Expand All @@ -202,7 +206,7 @@ impl<L> HttpTransportClientBuilder<L> {
"http" => {
let mut connector = HttpConnector::new();
connector.set_nodelay(tcp_no_delay);
HttpBackend::Http(Client::builder().build(connector))
HttpBackend::Http(Client::builder(TokioExecutor::new()).build(connector))
}
#[cfg(feature = "__tls")]
"https" => {
Expand All @@ -214,9 +218,8 @@ impl<L> HttpTransportClientBuilder<L> {
#[cfg(feature = "native-tls")]
CertificateStore::Native => hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_all_versions()
.wrap_connector(http_conn),
.map(|c| c.https_or_http().enable_all_versions().wrap_connector(http_conn))
.map_err(|_| Error::InvalidCertficateStore)?,
#[cfg(feature = "webpki-tls")]
CertificateStore::WebPki => hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
Expand All @@ -226,7 +229,7 @@ impl<L> HttpTransportClientBuilder<L> {
_ => return Err(Error::InvalidCertficateStore),
};

HttpBackend::Https(Client::builder().build::<_, hyper::Body>(https_conn))
HttpBackend::Https(Client::builder(TokioExecutor::new()).build(https_conn))
}
_ => {
#[cfg(feature = "__tls")]
Expand Down Expand Up @@ -281,20 +284,22 @@ pub struct HttpTransportClient<S> {

impl<B, S> HttpTransportClient<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = Error> + Clone,
B: HttpBody<Error = hyper::Error> + Send + 'static,
S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
async fn inner_send(&self, body: String) -> Result<hyper::Response<B>, Error> {
async fn inner_send(&self, body: String) -> Result<HttpResponse<B>, Error> {
if body.len() > self.max_request_size as usize {
return Err(Error::RequestTooLarge);
}

let mut req = hyper::Request::post(&self.target);
let mut req = HttpRequest::post(&self.target);
if let Some(headers) = req.headers_mut() {
*headers = self.headers.clone();
}
let req = req.body(From::from(body)).expect("URI and request headers are valid; qed");

let req = req.body(body.into()).expect("URI and request headers are valid; qed");
let response = self.client.clone().ready().await?.call(req).await?;

if response.status().is_success() {
Expand All @@ -310,7 +315,8 @@ where

let response = self.inner_send(body).await?;
let (parts, body) = response.into_parts();
let (body, _) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?;

let (body, _is_single) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?;

rx_log_from_bytes(&body, self.max_log_length);

Expand Down
6 changes: 3 additions & 3 deletions client/transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ tracing = "0.1.34"
thiserror = { version = "1", optional = true }
futures-channel = { version = "0.3.14", default-features = false, optional = true }
futures-util = { version = "0.3.14", default-features = false, features = ["alloc"], optional = true }
http = { version = "0.2", optional = true }
http = { version = "1", optional = true }
tokio-util = { version = "0.7", features = ["compat"], optional = true }
tokio = { version = "1.16", features = ["net", "time", "macros"], optional = true }
pin-project = { version = "1", optional = true }
Expand All @@ -30,11 +30,11 @@ url = { version = "2.4.0", optional = true }
# tls
rustls-native-certs = { version = "0.7", optional = true }
webpki-roots = { version = "0.26", optional = true }
tokio-rustls = { version = "0.25", optional = true }
tokio-rustls = { version = "0.26", default-features = false, optional = true, features = ["logging", "tls12", "ring"] }
rustls-pki-types = { version = "1", optional = true }

# ws
soketto = { version = "0.7.1", optional = true }
soketto = { version = "0.8", optional = true }

# web-sys
gloo-net = { version = "0.5.0", default-features = false, features = ["json", "websocket"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion client/ws-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme.workspace = true
publish = true

[dependencies]
http = "0.2.0"
http = "1"
jsonrpsee-types = { workspace = true }
jsonrpsee-client-transport = { workspace = true, features = ["ws"] }
jsonrpsee-core = { workspace = true, features = ["async-client"] }
Expand Down
5 changes: 4 additions & 1 deletion client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ fn assert_error_response(err: Error, exp: ErrorObjectOwned) {

#[tokio::test]
async fn redirections() {
init_logger();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this indended?


let expected = "abc 123";
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
Expand All @@ -471,7 +473,8 @@ async fn redirections() {
.unwrap();

let server_url = format!("ws://{}", server.local_addr());
let redirect_url = jsonrpsee_test_utils::mocks::ws_server_with_redirect(server_url);
let redirect_url =
jsonrpsee_test_utils::mocks::ws_server_with_redirect(server_url).with_default_timeout().await.unwrap();

// The client will first connect to a server that only performs re-directions and finally
// redirect to another server to complete the handshake.
Expand Down
Loading
Loading