From 46d4961d4e25260142d9eb830ea7c01173cff140 Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 16 Oct 2020 09:35:04 +0200 Subject: [PATCH 1/2] [dependencies]: upgrade tokio to v0.3 --- Cargo.toml | 8 ++++---- src/client/http/transport.rs | 8 ++++---- src/http/transport/background.rs | 7 ++++--- test-utils/Cargo.toml | 5 +++-- test-utils/src/helpers.rs | 3 ++- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8c6bf82139..ec5e9827a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,11 +24,11 @@ serde_json = "1.0.58" smallvec = { version = "1.4.2", default-features = false } thiserror = "1.0.20" -# used for tests and HTTP/hyper -# the features are used only for tests but enabled because `dev-depedencies` are leaked into dependencies. -tokio = { version = "0.2.22", features = ["stream", "rt-threaded", "macros"], optional = true } - # HTTP-related dependencies +# the features `rt-multi-thread` and `macros` are used only for tests but enabled because `dev-dependencies` are leaked into dependencies. +tokio = { version = "0.3.0", features = ["stream", "rt-multi-thread", "macros"], optional = true } +# hyper is using tokio v0.2, remove when hyper upgrades. +tokio-compat-02 = "0.1.0" hyper = { version = "0.13.8", features = ["stream"], optional = true } unicase = { version = "2.6.0", optional = true } diff --git a/src/client/http/transport.rs b/src/client/http/transport.rs index ecccf11f7a..09de445cad 100644 --- a/src/client/http/transport.rs +++ b/src/client/http/transport.rs @@ -17,9 +17,9 @@ use std::pin::Pin; use std::{fmt, io, thread}; use crate::common; - use futures::{channel::mpsc, prelude::*}; use thiserror::Error; +use tokio_compat_02::FutureExt; /// Implementation of a raw client for HTTP requests. pub struct HttpTransportClient { @@ -52,6 +52,7 @@ impl HttpTransportClient { .name("jsonrpsee-hyper-client".to_string()) .spawn(move || { let client = hyper::Client::new(); + // hyper v0.13 requires tokio v0.2 that's why compat() is used. background_thread(requests_rx, move |rq| { // cloning Hyper client = cloning references let client = client.clone(); @@ -59,7 +60,7 @@ impl HttpTransportClient { let _ = rq .send_back .unbounded_send(client.request(rq.request).await); - } + }.compat() }) }) .unwrap(); @@ -186,8 +187,7 @@ fn background_thread>( mut requests_rx: mpsc::Receiver, process_request: impl Fn(T) -> ProcessRequest, ) { - let mut runtime = match tokio::runtime::Builder::new() - .basic_scheduler() + let runtime = match tokio::runtime::Builder::new_current_thread() .enable_all() .build() { diff --git a/src/http/transport/background.rs b/src/http/transport/background.rs index b8c0a14927..18b6cfed8d 100644 --- a/src/http/transport/background.rs +++ b/src/http/transport/background.rs @@ -31,6 +31,7 @@ use futures::{channel::mpsc, channel::oneshot, prelude::*}; use hyper::service::{make_service_fn, service_fn}; use hyper::Error; use std::{error, io, net::SocketAddr, thread}; +use tokio_compat_02::FutureExt; /// Background thread that serves HTTP requests. pub(super) struct BackgroundHttp { @@ -83,8 +84,7 @@ impl BackgroundHttp { thread::Builder::new() .name("jsonrpsee-hyper-server".to_string()) .spawn(move || { - let mut runtime = match tokio::runtime::Builder::new() - .basic_scheduler() + let runtime = match tokio::runtime::Builder::new_current_thread() .enable_all() .build() { @@ -98,6 +98,7 @@ impl BackgroundHttp { } }; + // hyper v0.13 is using tokio v0.2 that's why compat() is used. runtime.block_on(async move { match hyper::Server::try_bind(&addr) { Ok(builder) => { @@ -112,7 +113,7 @@ impl BackgroundHttp { let _ = addr_tx.send(Err(err)); } }; - }); + }.compat()); })?; let local_addr = addr_rx.await??; diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index a9bd5de33c..32639c7951 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -13,5 +13,6 @@ hyper = "0.13.8" serde = { version = "1.0.116", default-features = false, features = ["derive"] } serde_json = "1.0.58" soketto = "0.4.2" -tokio = { version = "0.2.22", features = ["dns", "stream", "tcp", "rt-threaded", "macros"] } -tokio-util = { version = "0.3", features = ["compat"] } +tokio = { version = "0.3", features = ["net", "rt-multi-thread", "macros"] } +tokio-util = { version = "0.4", features = ["compat"] } +tokio-compat-02 = "0.1.0" diff --git a/test-utils/src/helpers.rs b/test-utils/src/helpers.rs index bb999f98e3..20ccaaa4e8 100644 --- a/test-utils/src/helpers.rs +++ b/test-utils/src/helpers.rs @@ -1,6 +1,7 @@ use crate::types::{Body, HttpResponse, Id, Uri}; use serde_json::Value; use std::net::SocketAddr; +use tokio_compat_02::FutureExt; /// Converts a sockaddress to a WebSocket URI. pub fn to_ws_uri_string(addr: SocketAddr) -> String { @@ -66,7 +67,7 @@ pub async fn http_request(body: Body, uri: Uri) -> Result ) .body(body) .expect("uri and request headers are valid; qed"); - let res = client.request(r).await.map_err(|e| format!("{:?}", e))?; + let res = client.request(r).compat().await.map_err(|e| format!("{:?}", e))?; let (parts, body) = res.into_parts(); let bytes = hyper::body::to_bytes(body).await.unwrap(); From 11ccfeae1ff9f5fd874e7491d56ec785e5d3a748 Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 16 Oct 2020 09:36:52 +0200 Subject: [PATCH 2/2] fmt --- src/client/http/transport.rs | 3 ++- src/http/tests.rs | 18 ++++++++++++----- src/http/transport/background.rs | 34 +++++++++++++++++++------------- src/ws/raw/tests.rs | 8 ++++++-- test-utils/src/helpers.rs | 6 +++++- 5 files changed, 46 insertions(+), 23 deletions(-) diff --git a/src/client/http/transport.rs b/src/client/http/transport.rs index 09de445cad..95468d9690 100644 --- a/src/client/http/transport.rs +++ b/src/client/http/transport.rs @@ -60,7 +60,8 @@ impl HttpTransportClient { let _ = rq .send_back .unbounded_send(client.request(rq.request).await); - }.compat() + } + .compat() }) }) .unwrap(); diff --git a/src/http/tests.rs b/src/http/tests.rs index 006346ae3f..40acd6d218 100644 --- a/src/http/tests.rs +++ b/src/http/tests.rs @@ -5,8 +5,8 @@ use crate::http::HttpServer; use futures::channel::oneshot::{self, Sender}; use futures::future::FutureExt; use futures::{pin_mut, select}; -use jsonrpsee_test_utils::types::{Id, StatusCode}; use jsonrpsee_test_utils::helpers::*; +use jsonrpsee_test_utils::types::{Id, StatusCode}; use std::net::SocketAddr; async fn server(server_started_tx: Sender) { @@ -77,7 +77,9 @@ async fn single_method_call_with_params() { let server_addr = server_started_rx.await.unwrap(); let req = r#"{"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}"#; - let response = http_request(req.into(), to_http_uri(server_addr)).await.unwrap(); + let response = http_request(req.into(), to_http_uri(server_addr)) + .await + .unwrap(); assert_eq!(response.status, StatusCode::OK); assert_eq!( response.body, @@ -92,7 +94,9 @@ async fn should_return_method_not_found() { let server_addr = server_started_rx.await.unwrap(); let req = r#"{"jsonrpc":"2.0","method":"bar","id":"foo"}"#; - let response = http_request(req.into(), to_http_uri(server_addr)).await.unwrap(); + let response = http_request(req.into(), to_http_uri(server_addr)) + .await + .unwrap(); assert_eq!(response.status, StatusCode::OK); assert_eq!(response.body, method_not_found(Id::Str("foo".into()))); } @@ -104,7 +108,9 @@ async fn invalid_json_id_missing_value() { let server_addr = server_started_rx.await.unwrap(); let req = r#"{"jsonrpc":"2.0","method":"say_hello","id"}"#; - let response = http_request(req.into(), to_http_uri(server_addr)).await.unwrap(); + let response = http_request(req.into(), to_http_uri(server_addr)) + .await + .unwrap(); // If there was an error in detecting the id in the Request object (e.g. Parse error/Invalid Request), it MUST be Null. assert_eq!(response.body, parse_error(Id::Null)); } @@ -116,7 +122,9 @@ async fn invalid_request_object() { let server_addr = server_started_rx.await.unwrap(); let req = r#"{"jsonrpc":"2.0","method":"bar","id":1,"is_not_request_object":1}"#; - let response = http_request(req.into(), to_http_uri(server_addr)).await.unwrap(); + let response = http_request(req.into(), to_http_uri(server_addr)) + .await + .unwrap(); assert_eq!(response.status, StatusCode::OK); assert_eq!(response.body, invalid_request(Id::Num(1))); } diff --git a/src/http/transport/background.rs b/src/http/transport/background.rs index 18b6cfed8d..9db79bdb7d 100644 --- a/src/http/transport/background.rs +++ b/src/http/transport/background.rs @@ -99,21 +99,27 @@ impl BackgroundHttp { }; // hyper v0.13 is using tokio v0.2 that's why compat() is used. - runtime.block_on(async move { - match hyper::Server::try_bind(&addr) { - Ok(builder) => { - let server = builder.serve(make_service); - let _ = addr_tx.send(Ok(server.local_addr())); - if let Err(err) = server.await { - log::error!("HTTP JSON-RPC server closed with an error: {}", err); + runtime.block_on( + async move { + match hyper::Server::try_bind(&addr) { + Ok(builder) => { + let server = builder.serve(make_service); + let _ = addr_tx.send(Ok(server.local_addr())); + if let Err(err) = server.await { + log::error!( + "HTTP JSON-RPC server closed with an error: {}", + err + ); + } } - } - Err(err) => { - log::error!("Failed to bind to address {}: {}", addr, err); - let _ = addr_tx.send(Err(err)); - } - }; - }.compat()); + Err(err) => { + log::error!("Failed to bind to address {}: {}", addr, err); + let _ = addr_tx.send(Err(err)); + } + }; + } + .compat(), + ); })?; let local_addr = addr_rx.await??; diff --git a/src/ws/raw/tests.rs b/src/ws/raw/tests.rs index 019ce60425..f24f04767e 100644 --- a/src/ws/raw/tests.rs +++ b/src/ws/raw/tests.rs @@ -47,7 +47,9 @@ async fn request_work() { let (mut server, server_addr) = raw_server().await; tokio::spawn(async move { - let mut client = WsTransportClient::new(&to_ws_uri_string(server_addr)).await.unwrap(); + let mut client = WsTransportClient::new(&to_ws_uri_string(server_addr)) + .await + .unwrap(); let call = Call::MethodCall(MethodCall { jsonrpc: Version::V2, method: "hello_world".to_owned(), @@ -75,7 +77,9 @@ async fn notification_work() { let (mut server, server_addr) = raw_server().await; tokio::spawn(async move { - let mut client = WsTransportClient::new(&to_ws_uri_string(server_addr)).await.unwrap(); + let mut client = WsTransportClient::new(&to_ws_uri_string(server_addr)) + .await + .unwrap(); let n = Notification { jsonrpc: Version::V2, method: "hello_world".to_owned(), diff --git a/test-utils/src/helpers.rs b/test-utils/src/helpers.rs index 20ccaaa4e8..65ad54ef4f 100644 --- a/test-utils/src/helpers.rs +++ b/test-utils/src/helpers.rs @@ -67,7 +67,11 @@ pub async fn http_request(body: Body, uri: Uri) -> Result ) .body(body) .expect("uri and request headers are valid; qed"); - let res = client.request(r).compat().await.map_err(|e| format!("{:?}", e))?; + let res = client + .request(r) + .compat() + .await + .map_err(|e| format!("{:?}", e))?; let (parts, body) = res.into_parts(); let bytes = hyper::body::to_bytes(body).await.unwrap();