Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] [5.0.x] Replace custom hyper client with reqwest #520

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
323 changes: 238 additions & 85 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions api/src/foreign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ where
///
/// // A NodeClient must first be created to handle communication between
/// // the wallet and the node.
/// let node_client = HTTPNodeClient::new(&wallet_config.check_node_api_http_addr, None);
/// let node_client = HTTPNodeClient::new(&wallet_config.check_node_api_http_addr, None).unwrap();
///
/// // impls::DefaultWalletImpl is provided for convenience in instantiating the wallet
/// // It contains the LMDBBackend, DefaultLCProvider (lifecycle) and ExtKeychain used
Expand Down Expand Up @@ -494,7 +494,8 @@ macro_rules! doctest_helper_setup_doc_env_foreign {
wallet_config.data_file_dir = dir.to_owned();
let pw = ZeroingString::from("");

let node_client = HTTPNodeClient::new(&wallet_config.check_node_api_http_addr, None);
let node_client =
HTTPNodeClient::new(&wallet_config.check_node_api_http_addr, None).unwrap();
let mut wallet = Box::new(
DefaultWalletImpl::<'static, HTTPNodeClient>::new(node_client.clone()).unwrap(),
)
Expand Down
5 changes: 3 additions & 2 deletions api/src/owner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ where
///
/// // A NodeClient must first be created to handle communication between
/// // the wallet and the node.
/// let node_client = HTTPNodeClient::new(&wallet_config.check_node_api_http_addr, None);
/// let node_client = HTTPNodeClient::new(&wallet_config.check_node_api_http_addr, None).unwrap();
///
/// // impls::DefaultWalletImpl is provided for convenience in instantiating the wallet
/// // It contains the LMDBBackend, DefaultLCProvider (lifecycle) and ExtKeychain used
Expand Down Expand Up @@ -2451,7 +2451,8 @@ macro_rules! doctest_helper_setup_doc_env {
wallet_config.data_file_dir = dir.to_owned();
let pw = ZeroingString::from("");

let node_client = HTTPNodeClient::new(&wallet_config.check_node_api_http_addr, None);
let node_client =
HTTPNodeClient::new(&wallet_config.check_node_api_http_addr, None).unwrap();
let mut wallet = Box::new(
DefaultWalletImpl::<'static, HTTPNodeClient>::new(node_client.clone()).unwrap(),
)
Expand Down
12 changes: 3 additions & 9 deletions impls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,14 @@ serde_derive = "1"
serde_json = "1"
log = "0.4"
ring = "0.16"
tokio = { version = "0.2", features = ["full"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
chrono = { version = "0.4.11", features = ["serde"] }
crossbeam-utils = "0.7"

#http client (copied from grin)
http = "0.2"
hyper-rustls = "0.20"
hyper-timeout = "0.3"
tokio = { version = "0.2", features = ["full"] }
reqwest = { version = "0.10", features = ["rustls-tls", "socks"] }
lazy_static = "1.4"

#Socks/Tor
byteorder = "1"
hyper = "0.13"
hyper-socks2-mw = "0.4"
ed25519-dalek = "1.0.0-pre.4"
x25519-dalek = "0.6"
data-encoding = "2"
Expand Down
16 changes: 10 additions & 6 deletions impls/src/adapters/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

/// HTTP Wallet 'plugin' implementation
use crate::client_utils::{Client, ClientError};
use crate::client_utils::{Client, ClientError, ClientErrorKind};
use crate::libwallet::slate_versions::{SlateVersion, VersionedSlate};
use crate::libwallet::{Error, ErrorKind, Slate};
use crate::SlateSender;
Expand Down Expand Up @@ -163,11 +163,15 @@ impl HttpSlateSender {
where
IN: Serialize,
{
let mut client = Client::new();
if self.use_socks {
client.use_socks = true;
client.socks_proxy_addr = self.socks_proxy_addr;
}
let client =
if self.use_socks {
Client::new()
} else {
Client::with_socks_proxy(self.socks_proxy_addr.ok_or_else(|| {
ClientErrorKind::Internal("No socks proxy address set".into())
})?)
}
.map_err(|_| ClientErrorKind::Internal("Unable to create http client".into()))?;
let req = client.create_post_request(url, api_secret, &input)?;
let res = client.send_request(req)?;
Ok(res)
Expand Down
2 changes: 1 addition & 1 deletion impls/src/adapters/keybase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl SlateReceiver for KeybaseAllChannels {
account: &str,
node_api_secret: Option<String>,
) -> Result<(), Error> {
let node_client = HTTPNodeClient::new(&config.check_node_api_http_addr, node_api_secret);
let node_client = HTTPNodeClient::new(&config.check_node_api_http_addr, node_api_secret)?;
let mut wallet =
Box::new(DefaultWalletImpl::<'static, HTTPNodeClient>::new(node_client).unwrap())
as Box<
Expand Down
179 changes: 80 additions & 99 deletions impls/src/client_utils/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,32 @@
//! High level JSON/HTTP client API

use crate::util::to_base64;
use crossbeam_utils::thread::scope;
use failure::{Backtrace, Context, Fail, ResultExt};
use hyper::body;
use hyper::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, USER_AGENT};
use hyper::{self, Body, Client as HyperClient, Request, Uri};
use hyper_rustls;
use hyper_timeout::TimeoutConnector;
use lazy_static::lazy_static;
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE, USER_AGENT};
use reqwest::{ClientBuilder, Method, Proxy, RequestBuilder};
use serde::{Deserialize, Serialize};
use serde_json;
use std::fmt::{self, Display};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::runtime::Builder;
use tokio::runtime::{Builder, Runtime};

// Global Tokio runtime.
// Needs a `Mutex` because `Runtime::block_on` requires mutable access.
// Tokio v0.3 requires immutable self, but we are waiting on upstream
// updates before we can upgrade.
// See: https://github.com/seanmonstar/reqwest/pull/1076
lazy_static! {
pub static ref RUNTIME: Arc<Mutex<Runtime>> = Arc::new(Mutex::new(
Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap()
));
}

/// Errors that can be returned by an ApiEndpoint implementation.
#[derive(Debug)]
Expand Down Expand Up @@ -85,20 +98,43 @@ impl From<Context<ErrorKind>> for Error {
}
}

#[derive(Clone)]
pub struct Client {
/// Whether to use socks proxy
pub use_socks: bool,
/// Proxy url/port
pub socks_proxy_addr: Option<SocketAddr>,
client: reqwest::Client,
}

impl Client {
/// New client
pub fn new() -> Self {
Client {
use_socks: false,
socks_proxy_addr: None,
pub fn new() -> Result<Self, Error> {
Self::build(None)
}

pub fn with_socks_proxy(socks_proxy_addr: SocketAddr) -> Result<Self, Error> {
Self::build(Some(socks_proxy_addr))
}

fn build(socks_proxy_addr: Option<SocketAddr>) -> Result<Self, Error> {
let mut headers = HeaderMap::new();
headers.insert(USER_AGENT, HeaderValue::from_static("grin-client"));
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));

let mut builder = ClientBuilder::new()
.timeout(Duration::from_secs(20))
.use_rustls_tls()
.default_headers(headers);

if let Some(s) = socks_proxy_addr {
let proxy = Proxy::all(&format!("socks5://{}:{}", s.ip(), s.port()))
.map_err(|e| ErrorKind::Internal(format!("Unable to create proxy: {}", e)))?;
builder = builder.proxy(proxy);
}

let client = builder
.build()
.map_err(|e| ErrorKind::Internal(format!("Unable to build client: {}", e)))?;

Ok(Client { client })
}

/// Helper function to easily issue a HTTP GET request against a given URL that
Expand All @@ -108,7 +144,7 @@ impl Client {
where
for<'de> T: Deserialize<'de>,
{
self.handle_request(self.build_request(url, "GET", api_secret, None)?)
self.handle_request(self.build_request(url, Method::GET, api_secret, None)?)
}

/// Helper function to easily issue an async HTTP GET request against a given
Expand All @@ -122,15 +158,15 @@ impl Client {
where
for<'de> T: Deserialize<'de> + Send + 'static,
{
self.handle_request_async(self.build_request(url, "GET", api_secret, None)?)
self.handle_request_async(self.build_request(url, Method::GET, api_secret, None)?)
.await
}

/// Helper function to easily issue a HTTP GET request
/// on a given URL that returns nothing. Handles request
/// building and response code checking.
pub fn _get_no_ret(&self, url: &str, api_secret: Option<String>) -> Result<(), Error> {
let req = self.build_request(url, "GET", api_secret, None)?;
let req = self.build_request(url, Method::GET, api_secret, None)?;
self.send_request(req)?;
Ok(())
}
Expand Down Expand Up @@ -211,50 +247,39 @@ impl Client {
fn build_request(
&self,
url: &str,
method: &str,
method: Method,
api_secret: Option<String>,
body: Option<String>,
) -> Result<Request<Body>, Error> {
let uri: Uri = url
.parse()
.map_err(|_| ErrorKind::RequestError(format!("Invalid url {}", url)))?;
let mut builder = Request::builder();
) -> Result<RequestBuilder, Error> {
let mut builder = self.client.request(method, url);
if let Some(api_secret) = api_secret {
let basic_auth = format!("Basic {}", to_base64(&format!("grin:{}", api_secret)));
builder = builder.header(AUTHORIZATION, basic_auth);
}

builder
.method(method)
.uri(uri)
.header(USER_AGENT, "grin-client")
.header(ACCEPT, "application/json")
.header(CONTENT_TYPE, "application/json")
.body(match body {
None => Body::empty(),
Some(json) => json.into(),
})
.map_err(|e| {
ErrorKind::RequestError(format!("Bad request {} {}: {}", method, url, e)).into()
})
if let Some(body) = body {
builder = builder.body(body);
}

Ok(builder)
}

pub fn create_post_request<IN>(
&self,
url: &str,
api_secret: Option<String>,
input: &IN,
) -> Result<Request<Body>, Error>
) -> Result<RequestBuilder, Error>
where
IN: Serialize,
{
let json = serde_json::to_string(input).context(ErrorKind::Internal(
"Could not serialize data to JSON".to_owned(),
))?;
self.build_request(url, "POST", api_secret, Some(json))
self.build_request(url, Method::POST, api_secret, Some(json))
}

fn handle_request<T>(&self, req: Request<Body>) -> Result<T, Error>
fn handle_request<T>(&self, req: RequestBuilder) -> Result<T, Error>
where
for<'de> T: Deserialize<'de>,
{
Expand All @@ -265,7 +290,7 @@ impl Client {
})
}

async fn handle_request_async<T>(&self, req: Request<Body>) -> Result<T, Error>
async fn handle_request_async<T>(&self, req: RequestBuilder) -> Result<T, Error>
where
for<'de> T: Deserialize<'de> + Send + 'static,
{
Expand All @@ -275,66 +300,22 @@ impl Client {
Ok(ser)
}

async fn send_request_async(&self, req: Request<Body>) -> Result<String, Error> {
let resp = if !self.use_socks {
let https = hyper_rustls::HttpsConnector::new();
let mut connector = TimeoutConnector::new(https);
connector.set_connect_timeout(Some(Duration::from_secs(20)));
connector.set_read_timeout(Some(Duration::from_secs(20)));
connector.set_write_timeout(Some(Duration::from_secs(20)));
let client = HyperClient::builder().build::<_, Body>(connector);

client.request(req).await
} else {
let addr = self.socks_proxy_addr.ok_or_else(|| {
ErrorKind::RequestError("Missing Socks proxy address".to_string())
})?;
let auth = format!("{}:{}", addr.ip(), addr.port());

let https = hyper_rustls::HttpsConnector::new();
let socks = hyper_socks2_mw::SocksConnector {
proxy_addr: hyper::Uri::builder()
.scheme("socks5")
.authority(auth.as_str())
.path_and_query("/")
.build()
.map_err(|_| {
ErrorKind::RequestError("Can't parse Socks proxy address".to_string())
})?,
auth: None,
connector: https,
};
let mut connector = TimeoutConnector::new(socks);
connector.set_connect_timeout(Some(Duration::from_secs(20)));
connector.set_read_timeout(Some(Duration::from_secs(20)));
connector.set_write_timeout(Some(Duration::from_secs(20)));
let client = HyperClient::builder().build::<_, Body>(connector);

client.request(req).await
};
let resp =
resp.map_err(|e| ErrorKind::RequestError(format!("Cannot make request: {}", e)))?;

let raw = body::to_bytes(resp)
async fn send_request_async(&self, req: RequestBuilder) -> Result<String, Error> {
let resp = req
.send()
.await
.map_err(|e| ErrorKind::RequestError(format!("Cannot read response body: {}", e)))?;

Ok(String::from_utf8_lossy(&raw).to_string())
.map_err(|e| ErrorKind::RequestError(format!("Cannot make request: {}", e)))?;
let text = resp
.text()
.await
.map_err(|e| ErrorKind::ResponseError(format!("Cannot parse response: {}", e)))?;
Ok(text)
}

pub fn send_request(&self, req: Request<Body>) -> Result<String, Error> {
let task = self.send_request_async(req);
scope(|s| {
let handle = s.spawn(|_| {
let mut rt = Builder::new()
.basic_scheduler()
.enable_all()
.build()
.context(ErrorKind::Internal("can't create Tokio runtime".to_owned()))?;
rt.block_on(task)
});
handle.join().unwrap()
})
.unwrap()
pub fn send_request(&self, req: RequestBuilder) -> Result<String, Error> {
RUNTIME
.lock()
.unwrap()
.block_on(self.send_request_async(req))
}
}
Loading