From dc5bba0c3c48a56dca473202e12340a9dd773f2d Mon Sep 17 00:00:00 2001 From: Yurii Rashkovskii Date: Fri, 25 May 2018 00:52:49 -0700 Subject: [PATCH 1/3] Problem: bridge panics if there's an error retrieving gas price Solution: instead, log an error and use previous price --- bridge/src/bridge/gas_price.rs | 43 ++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/bridge/src/bridge/gas_price.rs b/bridge/src/bridge/gas_price.rs index d6efc3d..3e8a2ff 100644 --- a/bridge/src/bridge/gas_price.rs +++ b/bridge/src/bridge/gas_price.rs @@ -12,7 +12,6 @@ use config::{GasPriceSpeed, Node}; use error::Error; const CACHE_TIMEOUT_DURATION: Duration = Duration::from_secs(5 * 60); -const REQUEST_TIMEOUT_DURATION: Duration = Duration::from_secs(30); enum State { Initial, @@ -27,6 +26,8 @@ pub struct GasPriceStream { speed: GasPriceSpeed, request_timer: Timer, interval: Interval, + last_price: u64, + request_timeout: Duration, } impl GasPriceStream { @@ -44,6 +45,8 @@ impl GasPriceStream { speed: node.gas_price_speed, request_timer: timer.clone(), interval: timer.interval_at(Instant::now(), CACHE_TIMEOUT_DURATION), + last_price: node.default_gas_price, + request_timeout: node.request_timeout, } } } @@ -66,7 +69,7 @@ impl Stream for GasPriceStream { ); let request_future = self.request_timer - .timeout(request, REQUEST_TIMEOUT_DURATION); + .timeout(request, self.request_timeout); State::WaitingForResponse(request_future) }, @@ -74,21 +77,37 @@ impl Stream for GasPriceStream { match request_future.poll() { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(chunk)) => { - let json_obj: HashMap = json::from_slice(&chunk)?; - - let gas_price = match json_obj.get(self.speed.as_str()) { - Some(json::Value::Number(price)) => (price.as_f64().unwrap() * 1_000_000_000.0).trunc() as u64, - _ => unreachable!(), - }; - - State::Yield(Some(gas_price)) + match json::from_slice::>(&chunk) { + Ok(json_obj) => { + match json_obj.get(self.speed.as_str()) { + Some(json::Value::Number(price)) => State::Yield(Some((price.as_f64().unwrap() * 1_000_000_000.0).trunc() as u64)), + _ => { + error!("Invalid or missing gas price ({}) in the gas price oracle response: {}", self.speed.as_str(), String::from_utf8_lossy(&*chunk)); + State::Yield(Some(self.last_price)) + }, + } + }, + Err(e) => { + error!("Error while parsing response from gas price oracle: {:?} {}", e, String::from_utf8_lossy(&*chunk)); + State::Yield(Some(self.last_price)) + } + } + }, + Err(e) => { + error!("Error while fetching gas price: {:?}", e); + State::Yield(Some(self.last_price)) }, - Err(e) => panic!(e), } }, State::Yield(ref mut opt) => match opt.take() { None => State::Initial, - price => return Ok(Async::Ready(price)), + Some(price) => { + if price != self.last_price { + info!("Gas price: {} gwei", (price as f64) / 1_000_000_000.0); + self.last_price = price; + } + return Ok(Async::Ready(Some(price))) + }, } }; From 8082daa3924d0dc9b79055f1a82d48379ff433ce Mon Sep 17 00:00:00 2001 From: Yurii Rashkovskii Date: Mon, 28 May 2018 10:00:39 -0700 Subject: [PATCH 2/3] Problem: gas price retrieving is not tested well Solution: extract price retrieving from GasPriceStream and test it. --- bridge/src/bridge/gas_price.rs | 288 ++++++++++++++++++++++++++++++--- bridge/src/bridge/mod.rs | 10 +- bridge/src/error.rs | 4 + 3 files changed, 275 insertions(+), 27 deletions(-) diff --git a/bridge/src/bridge/gas_price.rs b/bridge/src/bridge/gas_price.rs index 3e8a2ff..c67002e 100644 --- a/bridge/src/bridge/gas_price.rs +++ b/bridge/src/bridge/gas_price.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::time::{Duration, Instant}; use futures::{Async, Future, Poll, Stream}; -use hyper::{Chunk, client::HttpConnector, Client, Uri}; +use hyper::{Chunk, client::{HttpConnector, Connect}, Client, Uri, Error as HyperError}; use hyper_tls::HttpsConnector; use serde_json as json; use tokio_core::reactor::Handle; @@ -13,15 +13,36 @@ use error::Error; const CACHE_TIMEOUT_DURATION: Duration = Duration::from_secs(5 * 60); -enum State { +enum State { Initial, - WaitingForResponse(Timeout>>), + WaitingForResponse(Timeout), Yield(Option), } -pub struct GasPriceStream { - state: State, - client: Client>, +pub trait Retriever { + type Item: AsRef<[u8]>; + type Future: Future; + fn retrieve(&self, uri: &Uri) -> Self::Future; +} + +impl Retriever for Client where C: Connect, B: Stream + 'static { + type Item = Chunk; + type Future = Box>; + + fn retrieve(&self, uri: &Uri) -> Self::Future { + Box::new( + self.get(uri.clone()) + .and_then(|resp| resp.body().concat2()) + .map_err(|e| e.into()) + ) + } +} + +pub type StandardGasPriceStream = GasPriceStream>, Client>, Chunk>; + +pub struct GasPriceStream where I: AsRef<[u8]>, F: Future, R: Retriever { + state: State, + retriever: R, uri: Uri, speed: GasPriceSpeed, request_timer: Timer, @@ -30,17 +51,22 @@ pub struct GasPriceStream { request_timeout: Duration, } -impl GasPriceStream { - pub fn new(node: &Node, handle: &Handle, timer: &Timer) -> Self { - let client = Client::configure() - .connector(HttpsConnector::new(4, handle).unwrap()) - .build(handle); +impl StandardGasPriceStream { + pub fn new(node: &Node, handle: &Handle, timer: &Timer) -> Self { + let client = Client::configure() + .connector(HttpsConnector::new(4, handle).unwrap()) + .build(handle); + GasPriceStream::new_with_retriever(node, client, timer) + } +} +impl GasPriceStream where I: AsRef<[u8]>, F: Future, R: Retriever { + pub fn new_with_retriever(node: &Node, retriever: R, timer: &Timer) -> Self { let uri: Uri = node.gas_price_oracle_url.clone().unwrap().parse().unwrap(); GasPriceStream { state: State::Initial, - client, + retriever, uri, speed: node.gas_price_speed, request_timer: timer.clone(), @@ -51,7 +77,7 @@ impl GasPriceStream { } } -impl Stream for GasPriceStream { +impl Stream for GasPriceStream where I: AsRef<[u8]>, F: Future, R: Retriever { type Item = u64; type Error = Error; @@ -61,12 +87,7 @@ impl Stream for GasPriceStream { State::Initial => { let _ = try_stream!(self.interval.poll()); - let request: Box> = - Box::new( - self.client.get(self.uri.clone()) - .and_then(|resp| resp.body().concat2()) - .map_err(|e| e.into()) - ); + let request = self.retriever.retrieve(&self.uri); let request_future = self.request_timer .timeout(request, self.request_timeout); @@ -77,18 +98,18 @@ impl Stream for GasPriceStream { match request_future.poll() { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(chunk)) => { - match json::from_slice::>(&chunk) { + match json::from_slice::>(chunk.as_ref()) { Ok(json_obj) => { match json_obj.get(self.speed.as_str()) { Some(json::Value::Number(price)) => State::Yield(Some((price.as_f64().unwrap() * 1_000_000_000.0).trunc() as u64)), _ => { - error!("Invalid or missing gas price ({}) in the gas price oracle response: {}", self.speed.as_str(), String::from_utf8_lossy(&*chunk)); + error!("Invalid or missing gas price ({}) in the gas price oracle response: {}", self.speed.as_str(), String::from_utf8_lossy(&*chunk.as_ref())); State::Yield(Some(self.last_price)) }, } }, Err(e) => { - error!("Error while parsing response from gas price oracle: {:?} {}", e, String::from_utf8_lossy(&*chunk)); + error!("Error while parsing response from gas price oracle: {:?} {}", e, String::from_utf8_lossy(&*chunk.as_ref())); State::Yield(Some(self.last_price)) } } @@ -115,3 +136,226 @@ impl Stream for GasPriceStream { } } } + +#[cfg(test)] +mod tests { + + use super::*; + use error::{Error, ErrorKind}; + use futures::{Async, future::{err, ok, FutureResult}}; + use config::{Node, NodeInfo}; + use tokio_timer::Timer; + use std::time::Duration; + use std::path::PathBuf; + use web3::types::Address; + use std::str::FromStr; + + struct ErroredRequest; + + impl Retriever for ErroredRequest { + type Item = Vec; + type Future = FutureResult; + + fn retrieve(&self, _uri: &Uri) -> ::Future { + err(ErrorKind::OtherError("something went wrong".into()).into()) + } + } + + #[test] + fn errored_request() { + let node = Node { + account: Address::new(), + request_timeout: Duration::from_secs(5), + poll_interval: Duration::from_secs(1), + required_confirmations: 0, + rpc_host: "https://rpc".into(), + rpc_port: 443, + password: PathBuf::from("password"), + info: NodeInfo::default(), + gas_price_oracle_url: Some("https://gas.price".into()), + gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(), + gas_price_timeout: Duration::from_secs(5), + default_gas_price: 15_000_000_000, + }; + let timer = Timer::default(); + let mut stream = GasPriceStream::new_with_retriever(&node, ErroredRequest, &timer); + loop { + match stream.poll() { + Ok(Async::Ready(Some(v))) => { + assert_eq!(v, node.default_gas_price); + break; + }, + Err(_) => panic!("should not error out"), + _ => (), + } + } + } + + + struct BadJson; + + impl Retriever for BadJson { + type Item = String; + type Future = FutureResult; + + fn retrieve(&self, _uri: &Uri) -> ::Future { + ok("bad json".into()) + } + } + + #[test] + fn bad_json() { + let node = Node { + account: Address::new(), + request_timeout: Duration::from_secs(5), + poll_interval: Duration::from_secs(1), + required_confirmations: 0, + rpc_host: "https://rpc".into(), + rpc_port: 443, + password: PathBuf::from("password"), + info: NodeInfo::default(), + gas_price_oracle_url: Some("https://gas.price".into()), + gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(), + gas_price_timeout: Duration::from_secs(5), + default_gas_price: 15_000_000_000, + }; + let timer = Timer::default(); + let mut stream = GasPriceStream::new_with_retriever(&node, BadJson, &timer); + loop { + match stream.poll() { + Ok(Async::Ready(Some(v))) => { + assert_eq!(v, node.default_gas_price); + break; + }, + Err(_) => panic!("should not error out"), + _ => (), + } + } + } + + + struct UnexpectedJson; + + impl Retriever for UnexpectedJson { + type Item = String; + type Future = FutureResult; + + fn retrieve(&self, _uri: &Uri) -> ::Future { + ok(r#"{"cow": "moo"}"#.into()) + } + } + + #[test] + fn unexpected_json() { + let node = Node { + account: Address::new(), + request_timeout: Duration::from_secs(5), + poll_interval: Duration::from_secs(1), + required_confirmations: 0, + rpc_host: "https://rpc".into(), + rpc_port: 443, + password: PathBuf::from("password"), + info: NodeInfo::default(), + gas_price_oracle_url: Some("https://gas.price".into()), + gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(), + gas_price_timeout: Duration::from_secs(5), + default_gas_price: 15_000_000_000, + }; + let timer = Timer::default(); + let mut stream = GasPriceStream::new_with_retriever(&node, UnexpectedJson, &timer); + loop { + match stream.poll() { + Ok(Async::Ready(Some(v))) => { + assert_eq!(v, node.default_gas_price); + break; + }, + Err(_) => panic!("should not error out"), + _ => (), + } + } + } + + struct NonObjectJson; + + impl Retriever for NonObjectJson { + type Item = String; + type Future = FutureResult; + + fn retrieve(&self, _uri: &Uri) -> ::Future { + ok("3".into()) + } + } + + #[test] + fn non_object_json() { + let node = Node { + account: Address::new(), + request_timeout: Duration::from_secs(5), + poll_interval: Duration::from_secs(1), + required_confirmations: 0, + rpc_host: "https://rpc".into(), + rpc_port: 443, + password: PathBuf::from("password"), + info: NodeInfo::default(), + gas_price_oracle_url: Some("https://gas.price".into()), + gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(), + gas_price_timeout: Duration::from_secs(5), + default_gas_price: 15_000_000_000, + }; + let timer = Timer::default(); + let mut stream = GasPriceStream::new_with_retriever(&node, NonObjectJson, &timer); + loop { + match stream.poll() { + Ok(Async::Ready(Some(v))) => { + assert_eq!(v, node.default_gas_price); + break; + }, + Err(_) => panic!("should not error out"), + _ => (), + } + } + } + + struct CorrectJson; + + impl Retriever for CorrectJson { + type Item = String; + type Future = FutureResult; + + fn retrieve(&self, _uri: &Uri) -> ::Future { + ok(r#"{"fast": 12.0}"#.into()) + } + } + + #[test] + fn correct_json() { + let node = Node { + account: Address::new(), + request_timeout: Duration::from_secs(5), + poll_interval: Duration::from_secs(1), + required_confirmations: 0, + rpc_host: "https://rpc".into(), + rpc_port: 443, + password: PathBuf::from("password"), + info: NodeInfo::default(), + gas_price_oracle_url: Some("https://gas.price".into()), + gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(), + gas_price_timeout: Duration::from_secs(5), + default_gas_price: 15_000_000_000, + }; + let timer = Timer::default(); + let mut stream = GasPriceStream::new_with_retriever(&node, CorrectJson, &timer); + loop { + match stream.poll() { + Ok(Async::Ready(Some(v))) => { + assert_eq!(v, 12_000_000_000); + break; + }, + Err(_) => panic!("should not error out"), + _ => (), + } + } + } + +} + diff --git a/bridge/src/bridge/mod.rs b/bridge/src/bridge/mod.rs index eda5179..528599a 100644 --- a/bridge/src/bridge/mod.rs +++ b/bridge/src/bridge/mod.rs @@ -24,7 +24,7 @@ pub use self::chain_id::{ChainIdRetrieval, create_chain_id_retrieval}; pub use self::deposit_relay::{DepositRelay, create_deposit_relay}; pub use self::withdraw_relay::{WithdrawRelay, create_withdraw_relay}; pub use self::withdraw_confirm::{WithdrawConfirm, create_withdraw_confirm}; -pub use self::gas_price::GasPriceStream; +pub use self::gas_price::StandardGasPriceStream; /// Last block checked by the bridge components. #[derive(Clone, Copy)] @@ -89,14 +89,14 @@ pub fn create_bridge_backed_by(app: Arc< let foreign_balance = Arc::new(RwLock::new(None)); let home_gas_stream = if app.config.home.gas_price_oracle_url.is_some() { - let stream = GasPriceStream::new(&app.config.home, handle, &app.timer); + let stream = StandardGasPriceStream::new(&app.config.home, handle, &app.timer); Some(stream) } else { None }; let foreign_gas_stream = if app.config.foreign.gas_price_oracle_url.is_some() { - let stream = GasPriceStream::new(&app.config.foreign, handle, &app.timer); + let stream = StandardGasPriceStream::new(&app.config.foreign, handle, &app.timer); Some(stream) } else { None @@ -134,8 +134,8 @@ pub struct Bridge { state: BridgeStatus, backend: F, running: Arc, - home_gas_stream: Option, - foreign_gas_stream: Option, + home_gas_stream: Option, + foreign_gas_stream: Option, home_gas_price: Arc>, foreign_gas_price: Arc>, } diff --git a/bridge/src/error.rs b/bridge/src/error.rs index 6413872..63209c0 100644 --- a/bridge/src/error.rs +++ b/bridge/src/error.rs @@ -58,6 +58,10 @@ error_chain! { description("contextualized error") display("{:?} in {}", err, context) } + OtherError(error: String) { + description("other error") + display("{}", error) + } } } From 4404af9a5faaeca95476db1c40dc1a4672fadec6 Mon Sep 17 00:00:00 2001 From: Yurii Rashkovskii Date: Mon, 28 May 2018 10:06:38 -0700 Subject: [PATCH 3/3] Problem: GasPriceStream using web3 timeout However, this is different infrastructure and might have different requirements. Solution: use gas_price_timeout --- bridge/src/bridge/gas_price.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bridge/src/bridge/gas_price.rs b/bridge/src/bridge/gas_price.rs index c67002e..f5ebbe3 100644 --- a/bridge/src/bridge/gas_price.rs +++ b/bridge/src/bridge/gas_price.rs @@ -72,7 +72,7 @@ impl GasPriceStream where I: AsRef<[u8]>, F: Future