Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dependencies]: upgrade tokio to v0.3 #133

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ serde_json = "1.0.58"
smallvec = { version = "1.4.2", default-features = false }
thiserror = "1.0.20"

# used for tests and HTTP/hyper
# the features are used only for tests but enabled because `dev-depedencies` are leaked into dependencies.
tokio = { version = "0.2.22", features = ["stream", "rt-threaded", "macros"], optional = true }

# HTTP-related dependencies
# the features `rt-multi-thread` and `macros` are used only for tests but enabled because `dev-dependencies` are leaked into dependencies.
tokio = { version = "0.3.0", features = ["stream", "rt-multi-thread", "macros"], optional = true }
# hyper is using tokio v0.2, remove when hyper upgrades.
tokio-compat-02 = "0.1.0"
hyper = { version = "0.13.8", features = ["stream"], optional = true }
unicase = { version = "2.6.0", optional = true }

Expand Down
6 changes: 4 additions & 2 deletions src/client/http/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::pin::Pin;
use std::{fmt, io, thread};

use crate::common;

use futures::{channel::mpsc, prelude::*};
use thiserror::Error;
use tokio_compat_02::FutureExt;

/// Implementation of a raw client for HTTP requests.
pub struct HttpTransportClient {
Expand Down Expand Up @@ -52,12 +52,14 @@ impl HttpTransportClient {
.name("jsonrpsee-hyper-client".to_string())
.spawn(move || {
let client = hyper::Client::new();
// hyper v0.13 requires tokio v0.2 that's why compat() is used.
background_thread(requests_rx, move |rq| {
// cloning Hyper client = cloning references
let client = client.clone();
async move {
let _ = rq.send_back.unbounded_send(client.request(rq.request).await);
}
.compat()
})
})
.unwrap();
Expand Down Expand Up @@ -173,7 +175,7 @@ fn background_thread<T, ProcessRequest: Future<Output = ()>>(
mut requests_rx: mpsc::Receiver<T>,
process_request: impl Fn(T) -> ProcessRequest,
) {
let mut runtime = match tokio::runtime::Builder::new().basic_scheduler().enable_all().build() {
let runtime = match tokio::runtime::Builder::new_current_thread().enable_all().build() {
Ok(r) => r,
Err(err) => {
// Ideally, we would try to initialize the tokio runtime in the main thread then move
Expand Down
35 changes: 20 additions & 15 deletions src/http/transport/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use futures::{channel::mpsc, channel::oneshot, prelude::*};
use hyper::service::{make_service_fn, service_fn};
use hyper::Error;
use std::{error, io, net::SocketAddr, thread};
use tokio_compat_02::FutureExt;

/// Background thread that serves HTTP requests.
pub(super) struct BackgroundHttp {
Expand Down Expand Up @@ -79,29 +80,33 @@ impl BackgroundHttp {

// Because hyper can only be polled through tokio, we spawn it in a background thread.
thread::Builder::new().name("jsonrpsee-hyper-server".to_string()).spawn(move || {
let mut runtime = match tokio::runtime::Builder::new().basic_scheduler().enable_all().build() {
let runtime = match tokio::runtime::Builder::new_current_thread().enable_all().build() {
Ok(r) => r,
Err(err) => {
log::error!("Failed to initialize tokio runtime in HTTP JSON-RPC server: {}", err);
return;
}
};

runtime.block_on(async move {
match hyper::Server::try_bind(&addr) {
Ok(builder) => {
let server = builder.serve(make_service);
let _ = addr_tx.send(Ok(server.local_addr()));
if let Err(err) = server.await {
log::error!("HTTP JSON-RPC server closed with an error: {}", err);
// hyper v0.13 is using tokio v0.2 that's why compat() is used.
runtime.block_on(
async move {
match hyper::Server::try_bind(&addr) {
Ok(builder) => {
let server = builder.serve(make_service);
let _ = addr_tx.send(Ok(server.local_addr()));
if let Err(err) = server.await {
log::error!("HTTP JSON-RPC server closed with an error: {}", err);
}
}
}
Err(err) => {
log::error!("Failed to bind to address {}: {}", addr, err);
let _ = addr_tx.send(Err(err));
}
};
});
Err(err) => {
log::error!("Failed to bind to address {}: {}", addr, err);
let _ = addr_tx.send(Err(err));
}
};
}
.compat(),
);
})?;

let local_addr = addr_rx.await??;
Expand Down
5 changes: 3 additions & 2 deletions test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ hyper = "0.13.8"
serde = { version = "1.0.116", default-features = false, features = ["derive"] }
serde_json = "1.0.58"
soketto = "0.4.2"
tokio = { version = "0.2.22", features = ["dns", "stream", "tcp", "rt-threaded", "macros"] }
tokio-util = { version = "0.3", features = ["compat"] }
tokio = { version = "0.3", features = ["net", "rt-multi-thread", "macros"] }
tokio-util = { version = "0.4", features = ["compat"] }
tokio-compat-02 = "0.1.0"
3 changes: 2 additions & 1 deletion test-utils/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::types::{Body, HttpResponse, Id, Uri};
use serde_json::Value;
use std::net::SocketAddr;
use tokio_compat_02::FutureExt;

/// Converts a sockaddress to a WebSocket URI.
pub fn to_ws_uri_string(addr: SocketAddr) -> String {
Expand Down Expand Up @@ -54,7 +55,7 @@ pub async fn http_request(body: Body, uri: Uri) -> Result<HttpResponse, String>
.header(hyper::header::CONTENT_TYPE, hyper::header::HeaderValue::from_static("application/json"))
.body(body)
.expect("uri and request headers are valid; qed");
let res = client.request(r).await.map_err(|e| format!("{:?}", e))?;
let res = client.request(r).compat().await.map_err(|e| format!("{:?}", e))?;

let (parts, body) = res.into_parts();
let bytes = hyper::body::to_bytes(body).await.unwrap();
Expand Down