Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade to hyper v1.0 #1368

Merged
merged 23 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions client/http-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ publish = true

[dependencies]
async-trait = "0.1"
hyper = { version = "0.14.10", features = ["client", "http1", "http2", "tcp"] }
hyper-rustls = { version = "0.24", optional = true, default-features = false, features = ["http1", "http2", "tls12", "logging"] }
hyper = { version = "1.3", features = ["client", "http1", "http2"] }
hyper-rustls = { version = "0.27.1", optional = true, default-features = false, features = ["http1", "http2", "tls12", "logging"] }
hyper-util = { version = "*", features = ["client", "client-legacy"] }
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
http-body = "1"
http-body-util = "0.1.1"
jsonrpsee-types = { workspace = true }
jsonrpsee-core = { workspace = true, features = ["client", "http-helpers"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
serde_json = "1"
thiserror = "1"
tokio = { version = "1.16", features = ["time"] }
tracing = "0.1.34"
tower = { version = "0.4.13", features = ["util"] }
Expand All @@ -34,8 +37,8 @@ tokio = { version = "1.16", features = ["net", "rt-multi-thread", "macros"] }

[features]
default = ["native-tls"]
native-tls = ["hyper-rustls/native-tokio", "__tls"]
webpki-tls = ["hyper-rustls/webpki-tokio", "__tls"]
native-tls = ["hyper-rustls/native-tokio", "hyper-rustls/aws-lc-rs", "__tls"]
webpki-tls = ["hyper-rustls/webpki-tokio", "hyper-rustls/aws-lc-rs", "__tls"]

# Internal feature to indicate whether TLS is enabled.
# Does nothing on its own.
Expand Down
34 changes: 21 additions & 13 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ use std::fmt;
use std::sync::Arc;
use std::time::Duration;

use crate::transport::{self, Error as TransportError, HttpBackend, HttpTransportClient, HttpTransportClientBuilder};
use crate::transport::{self, Error as TransportError, HttpTransportClient, HttpTransportClientBuilder};
use crate::types::{NotificationSer, RequestSer, Response};
use crate::ResponseBody;
use async_trait::async_trait;
use hyper::body::HttpBody;
use hyper::http::HeaderMap;
use hyper::Body;
use jsonrpsee_core::client::{
generate_batch_id_range, BatchResponse, CertificateStore, ClientT, Error, IdKind, RequestIdManager, Subscription,
SubscriptionClientT,
Expand Down Expand Up @@ -189,8 +188,8 @@ impl<L> HttpClientBuilder<L> {
impl<B, S, L> HttpClientBuilder<L>
where
L: Layer<transport::HttpBackend, Service = S>,
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = TransportError> + Clone,
B: HttpBody + Send + 'static,
S: Service<hyper::Request<ResponseBody>, Response = hyper::Response<B>, Error = TransportError> + Clone,
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
Expand All @@ -207,6 +206,7 @@ where
max_log_length,
service_builder,
tcp_no_delay,
..
} = self;

let transport = HttpTransportClientBuilder::new()
Expand Down Expand Up @@ -254,7 +254,7 @@ impl HttpClientBuilder<Identity> {

/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
#[derive(Debug, Clone)]
pub struct HttpClient<S = HttpBackend> {
pub struct HttpClient<S> {
/// HTTP transport client.
transport: HttpTransportClient<S>,
/// Request timeout. Defaults to 60sec.
Expand All @@ -265,17 +265,21 @@ pub struct HttpClient<S = HttpBackend> {

impl<S> HttpClient<S> {
/// Create a builder for the HttpClient.
pub fn builder() -> HttpClientBuilder {
pub fn builder() -> HttpClientBuilder<Identity> {
HttpClientBuilder::new()
}
}

#[async_trait]
impl<B, S> ClientT for HttpClient<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = TransportError> + Send + Sync + Clone,
<S as Service<hyper::Request<Body>>>::Future: Send,
B: HttpBody<Error = hyper::Error> + Send + 'static,
S: Service<hyper::Request<ResponseBody>, Response = hyper::Response<B>, Error = TransportError>
+ Send
+ Sync
+ Clone,
<S as Service<hyper::Request<ResponseBody>>>::Future: Send,
B: http_body::Body + Send + Unpin + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Data: Send,
{
#[instrument(name = "notification", skip(self, params), level = "trace")]
Expand Down Expand Up @@ -405,10 +409,14 @@ where
#[async_trait]
impl<B, S> SubscriptionClientT for HttpClient<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = TransportError> + Send + Sync + Clone,
<S as Service<hyper::Request<Body>>>::Future: Send,
B: HttpBody<Error = hyper::Error> + Send + 'static,
S: Service<hyper::Request<ResponseBody>, Response = hyper::Response<B>, Error = TransportError>
+ Send
+ Sync
+ Clone,
<S as Service<hyper::Request<ResponseBody>>>::Future: Send,
B: http_body::Body + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
/// Send a subscription request to the server. Not implemented for HTTP; will always return
/// [`Error::HttpNotImplemented`].
Expand Down
2 changes: 2 additions & 0 deletions client/http-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,5 @@ mod tests;
pub use client::{HttpClient, HttpClientBuilder};
pub use hyper::http::{HeaderMap, HeaderValue};
pub use jsonrpsee_types as types;

pub(crate) type ResponseBody = http_body_util::Full<hyper::body::Bytes>;
45 changes: 25 additions & 20 deletions client/http-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
// that we need to be guaranteed that hyper doesn't re-use an existing connection if we ever reset
// the JSON-RPC request id to a value that might have already been used.

use hyper::body::{Body, HttpBody};
use hyper::client::{Client, HttpConnector};
use hyper::http::{HeaderMap, HeaderValue};
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use jsonrpsee_core::client::CertificateStore;
use jsonrpsee_core::tracing::client::{rx_log_from_bytes, tx_log_from_str};
use jsonrpsee_core::{
Expand All @@ -24,19 +25,21 @@ use tower::layer::util::Identity;
use tower::{Layer, Service, ServiceExt};
use url::Url;

use crate::ResponseBody;

const CONTENT_TYPE_JSON: &str = "application/json";

/// Wrapper over HTTP transport and connector.
#[derive(Debug)]
pub enum HttpBackend<B = Body> {
pub enum HttpBackend<B = ResponseBody> {
/// Hyper client with https connector.
#[cfg(feature = "__tls")]
Https(Client<hyper_rustls::HttpsConnector<HttpConnector>, B>),
/// Hyper client with http connector.
Http(Client<HttpConnector, B>),
}

impl Clone for HttpBackend {
impl<B> Clone for HttpBackend<B> {
fn clone(&self) -> Self {
match self {
Self::Http(inner) => Self::Http(inner.clone()),
Expand All @@ -48,11 +51,11 @@ impl Clone for HttpBackend {

impl<B> tower::Service<hyper::Request<B>> for HttpBackend<B>
where
B: HttpBody<Error = hyper::Error> + Send + 'static,
B: http_body::Body + Send + 'static + Unpin,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Response = hyper::Response<Body>;
type Response = hyper::Response<hyper::body::Incoming>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

Expand All @@ -62,7 +65,7 @@ where
#[cfg(feature = "__tls")]
Self::Https(inner) => inner.poll_ready(ctx),
}
.map_err(|e| Error::Http(e.into()))
.map_err(|e| Error::Http(HttpError::Stream(e.into())))
}

fn call(&mut self, req: hyper::Request<B>) -> Self::Future {
Expand All @@ -72,7 +75,7 @@ where
Self::Https(inner) => inner.call(req),
};

Box::pin(async move { resp.await.map_err(|e| Error::Http(e.into())) })
Box::pin(async move { resp.await.map_err(|e| Error::Http(HttpError::Stream(e.into()))) })
}
}

Expand Down Expand Up @@ -177,9 +180,9 @@ impl<L> HttpTransportClientBuilder<L> {
/// Build a [`HttpTransportClient`].
pub fn build<S, B>(self, target: impl AsRef<str>) -> Result<HttpTransportClient<S>, Error>
where
L: Layer<HttpBackend<Body>, Service = S>,
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = Error> + Clone,
B: HttpBody + Send + 'static,
L: Layer<HttpBackend, Service = S>,
S: Service<hyper::Request<ResponseBody>, Response = hyper::Response<B>, Error = Error> + Clone,
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
Expand All @@ -202,7 +205,7 @@ impl<L> HttpTransportClientBuilder<L> {
"http" => {
let mut connector = HttpConnector::new();
connector.set_nodelay(tcp_no_delay);
HttpBackend::Http(Client::builder().build(connector))
HttpBackend::Http(Client::builder(TokioExecutor::new()).build(connector))
}
#[cfg(feature = "__tls")]
"https" => {
Expand All @@ -214,9 +217,8 @@ impl<L> HttpTransportClientBuilder<L> {
#[cfg(feature = "native-tls")]
CertificateStore::Native => hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_all_versions()
.wrap_connector(http_conn),
.map(|c| c.https_or_http().enable_all_versions().wrap_connector(http_conn))
.map_err(|_| Error::InvalidCertficateStore)?,
#[cfg(feature = "webpki-tls")]
CertificateStore::WebPki => hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
Expand All @@ -226,7 +228,7 @@ impl<L> HttpTransportClientBuilder<L> {
_ => return Err(Error::InvalidCertficateStore),
};

HttpBackend::Https(Client::builder().build::<_, hyper::Body>(https_conn))
HttpBackend::Https(Client::builder(TokioExecutor::new()).build(https_conn))
}
_ => {
#[cfg(feature = "__tls")]
Expand Down Expand Up @@ -281,9 +283,10 @@ pub struct HttpTransportClient<S> {

impl<B, S> HttpTransportClient<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<B>, Error = Error> + Clone,
B: HttpBody<Error = hyper::Error> + Send + 'static,
S: Service<hyper::Request<ResponseBody>, Response = hyper::Response<B>, Error = Error> + Clone,
B: http_body::Body + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
async fn inner_send(&self, body: String) -> Result<hyper::Response<B>, Error> {
if body.len() > self.max_request_size as usize {
Expand All @@ -294,7 +297,8 @@ where
if let Some(headers) = req.headers_mut() {
*headers = self.headers.clone();
}
let req = req.body(From::from(body)).expect("URI and request headers are valid; qed");

let req = req.body(body.into()).expect("Failed to create request");
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
let response = self.client.clone().ready().await?.call(req).await?;

if response.status().is_success() {
Expand All @@ -310,7 +314,8 @@ where

let response = self.inner_send(body).await?;
let (parts, body) = response.into_parts();
let (body, _) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?;

let (body, _is_single) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?;

rx_log_from_bytes(&body, self.max_log_length);

Expand Down
6 changes: 3 additions & 3 deletions client/transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ tracing = "0.1.34"
thiserror = { version = "1", optional = true }
futures-channel = { version = "0.3.14", default-features = false, optional = true }
futures-util = { version = "0.3.14", default-features = false, features = ["alloc"], optional = true }
http = { version = "0.2", optional = true }
http = { version = "1", optional = true }
tokio-util = { version = "0.7", features = ["compat"], optional = true }
tokio = { version = "1.16", features = ["net", "time", "macros"], optional = true }
pin-project = { version = "1", optional = true }
Expand All @@ -30,11 +30,11 @@ url = { version = "2.4.0", optional = true }
# tls
rustls-native-certs = { version = "0.7", optional = true }
webpki-roots = { version = "0.26", optional = true }
tokio-rustls = { version = "0.25", optional = true }
tokio-rustls = { version = "0.26", optional = true }
rustls-pki-types = { version = "1", optional = true }

# ws
soketto = { version = "0.7.1", optional = true }
soketto = { version = "0.8", optional = true }

# web-sys
gloo-net = { version = "0.5.0", default-features = false, features = ["json", "websocket"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion client/ws-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme.workspace = true
publish = true

[dependencies]
http = "0.2.0"
http = "1"
jsonrpsee-types = { workspace = true }
jsonrpsee-client-transport = { workspace = true, features = ["ws"] }
jsonrpsee-core = { workspace = true, features = ["async-client"] }
Expand Down
6 changes: 5 additions & 1 deletion client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ fn assert_error_response(err: Error, exp: ErrorObjectOwned) {

#[tokio::test]
async fn redirections() {
init_logger();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this indended?


let expected = "abc 123";
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
Expand All @@ -471,10 +473,12 @@ async fn redirections() {
.unwrap();

let server_url = format!("ws://{}", server.local_addr());
let redirect_url = jsonrpsee_test_utils::mocks::ws_server_with_redirect(server_url);
let redirect_url =
jsonrpsee_test_utils::mocks::ws_server_with_redirect(server_url).with_default_timeout().await.unwrap();

// The client will first connect to a server that only performs re-directions and finally
// redirect to another server to complete the handshake.
println!("Connecting to {redirect_url}");
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
let client = WsClientBuilder::default().build(&redirect_url).with_default_timeout().await;
// It's an ok client
let client = match client {
Expand Down
15 changes: 4 additions & 11 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ tracing = "0.1.34"

# optional deps
futures-util = { version = "0.3.14", default-features = false, optional = true }
hyper = { version = "0.14.10", default-features = false, features = ["stream"], optional = true }
hyper = { version = "1.3", default-features = false, optional = true }
http-body-util = "0.1.1"
rustc-hash = { version = "1", optional = true }
rand = { version = "0.8", optional = true }
parking_lot = { version = "0.12", optional = true }
Expand All @@ -38,16 +39,7 @@ pin-project = { version = "1", optional = true }
[features]
default = []
http-helpers = ["hyper", "futures-util"]
server = [
"futures-util/alloc",
"rustc-hash/std",
"parking_lot",
"rand",
"tokio/rt",
"tokio/sync",
"tokio/macros",
"tokio/time",
]
server = ["futures-util/alloc", "rustc-hash/std", "parking_lot", "rand", "tokio/rt", "tokio/sync", "tokio/macros", "tokio/time"]
client = ["futures-util/sink", "tokio/sync"]
async-client = [
"client",
Expand Down Expand Up @@ -75,6 +67,7 @@ async-wasm-client = [
serde_json = "1.0"
tokio = { version = "1.16", features = ["macros", "rt"] }
jsonrpsee = { path = "../jsonrpsee", features = ["server", "macros"] }
http-body-util = "0.1.1"

[package.metadata.docs.rs]
all-features = true
Expand Down
Loading
Loading