diff --git a/ethcore/light/src/on_demand/mod.rs b/ethcore/light/src/on_demand/mod.rs index c04c687947a..7d1f4fabf8e 100644 --- a/ethcore/light/src/on_demand/mod.rs +++ b/ethcore/light/src/on_demand/mod.rs @@ -94,6 +94,24 @@ pub mod error { } } +/// Public interface for performing network requests `OnDemand` +pub trait OnDemandRequester: Send + Sync { + /// Submit a strongly-typed batch of requests. + /// + /// Fails if back-reference are not coherent. + fn request(&self, ctx: &BasicContext, requests: T) -> Result, basic_request::NoSuchOutput> + where + T: request::RequestAdapter; + + /// Submit a vector of requests to be processed together. + /// + /// Fails if back-references are not coherent. + /// The returned vector of responses will correspond to the requests exactly. + fn request_raw(&self, ctx: &BasicContext, requests: Vec) + -> Result, basic_request::NoSuchOutput>; +} + + // relevant peer info. #[derive(Debug, Clone, PartialEq, Eq)] struct Peer { @@ -355,71 +373,8 @@ pub struct OnDemand { request_number_of_consecutive_errors: usize } -impl OnDemand { - - /// Create a new `OnDemand` service with the given cache. - pub fn new( - cache: Arc>, - response_time_window: Duration, - request_backoff_start: Duration, - request_backoff_max: Duration, - request_backoff_rounds_max: usize, - request_number_of_consecutive_errors: usize, - ) -> Self { - - Self { - pending: RwLock::new(Vec::new()), - peers: RwLock::new(HashMap::new()), - in_transit: RwLock::new(HashMap::new()), - cache, - no_immediate_dispatch: false, - response_time_window: Self::sanitize_circuit_breaker_input(response_time_window, "Response time window"), - request_backoff_start: Self::sanitize_circuit_breaker_input(request_backoff_start, "Request initial backoff time window"), - request_backoff_max: Self::sanitize_circuit_breaker_input(request_backoff_max, "Request maximum backoff time window"), - request_backoff_rounds_max, - request_number_of_consecutive_errors, - } - } - - fn sanitize_circuit_breaker_input(dur: Duration, name: &'static str) -> Duration { - if dur.as_secs() < 1 { - warn!(target: "on_demand", - "{} is too short must be at least 1 second, configuring it to 1 second", name); - Duration::from_secs(1) - } else { - dur - } - } - - // make a test version: this doesn't dispatch pending requests - // until you trigger it manually. - #[cfg(test)] - fn new_test( - cache: Arc>, - request_ttl: Duration, - request_backoff_start: Duration, - request_backoff_max: Duration, - request_backoff_rounds_max: usize, - request_number_of_consecutive_errors: usize, - ) -> Self { - let mut me = OnDemand::new( - cache, - request_ttl, - request_backoff_start, - request_backoff_max, - request_backoff_rounds_max, - request_number_of_consecutive_errors, - ); - me.no_immediate_dispatch = true; - - me - } - - /// Submit a vector of requests to be processed together. - /// - /// Fails if back-references are not coherent. - /// The returned vector of responses will correspond to the requests exactly. - pub fn request_raw(&self, ctx: &BasicContext, requests: Vec) +impl OnDemandRequester for OnDemand { + fn request_raw(&self, ctx: &BasicContext, requests: Vec) -> Result, basic_request::NoSuchOutput> { let (sender, receiver) = oneshot::channel(); @@ -475,10 +430,7 @@ impl OnDemand { Ok(receiver) } - /// Submit a strongly-typed batch of requests. - /// - /// Fails if back-reference are not coherent. - pub fn request(&self, ctx: &BasicContext, requests: T) -> Result, basic_request::NoSuchOutput> + fn request(&self, ctx: &BasicContext, requests: T) -> Result, basic_request::NoSuchOutput> where T: request::RequestAdapter { self.request_raw(ctx, requests.make_requests()).map(|recv| OnResponses { @@ -487,6 +439,69 @@ impl OnDemand { }) } +} + +impl OnDemand { + + /// Create a new `OnDemand` service with the given cache. + pub fn new( + cache: Arc>, + response_time_window: Duration, + request_backoff_start: Duration, + request_backoff_max: Duration, + request_backoff_rounds_max: usize, + request_number_of_consecutive_errors: usize, + ) -> Self { + + Self { + pending: RwLock::new(Vec::new()), + peers: RwLock::new(HashMap::new()), + in_transit: RwLock::new(HashMap::new()), + cache, + no_immediate_dispatch: false, + response_time_window: Self::sanitize_circuit_breaker_input(response_time_window, "Response time window"), + request_backoff_start: Self::sanitize_circuit_breaker_input(request_backoff_start, "Request initial backoff time window"), + request_backoff_max: Self::sanitize_circuit_breaker_input(request_backoff_max, "Request maximum backoff time window"), + request_backoff_rounds_max, + request_number_of_consecutive_errors, + } + } + + fn sanitize_circuit_breaker_input(dur: Duration, name: &'static str) -> Duration { + if dur.as_secs() < 1 { + warn!(target: "on_demand", + "{} is too short must be at least 1 second, configuring it to 1 second", name); + Duration::from_secs(1) + } else { + dur + } + } + + // make a test version: this doesn't dispatch pending requests + // until you trigger it manually. + #[cfg(test)] + fn new_test( + cache: Arc>, + request_ttl: Duration, + request_backoff_start: Duration, + request_backoff_max: Duration, + request_backoff_rounds_max: usize, + request_number_of_consecutive_errors: usize, + ) -> Self { + let mut me = OnDemand::new( + cache, + request_ttl, + request_backoff_start, + request_backoff_max, + request_backoff_rounds_max, + request_number_of_consecutive_errors, + ); + me.no_immediate_dispatch = true; + + me + } + + // maybe dispatch pending requests. // sometimes fn attempt_dispatch(&self, ctx: &BasicContext) { diff --git a/ethcore/light/src/on_demand/tests.rs b/ethcore/light/src/on_demand/tests.rs index fd0e8b0f973..49ec35f10db 100644 --- a/ethcore/light/src/on_demand/tests.rs +++ b/ethcore/light/src/on_demand/tests.rs @@ -29,7 +29,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use std::thread; -use super::{request, OnDemand, Peer, HeaderRef}; +use super::{request, OnDemand, OnDemandRequester, Peer, HeaderRef}; // useful contexts to give the service. enum Context { diff --git a/parity/light_helpers/epoch_fetch.rs b/parity/light_helpers/epoch_fetch.rs index 01e74059ea5..9c7fd6a8ee3 100644 --- a/parity/light_helpers/epoch_fetch.rs +++ b/parity/light_helpers/epoch_fetch.rs @@ -27,7 +27,7 @@ use futures::{future, Future}; use futures::future::Either; use light::client::fetch::ChainDataFetcher; -use light::on_demand::{request, OnDemand}; +use light::on_demand::{request, OnDemand, OnDemandRequester}; use parking_lot::RwLock; use ethereum_types::H256; diff --git a/parity/light_helpers/queue_cull.rs b/parity/light_helpers/queue_cull.rs index ec1ca612b80..693d8f93cff 100644 --- a/parity/light_helpers/queue_cull.rs +++ b/parity/light_helpers/queue_cull.rs @@ -24,7 +24,7 @@ use sync::{LightSync, LightNetworkDispatcher}; use io::{IoContext, IoHandler, TimerToken}; use light::client::LightChainClient; -use light::on_demand::{request, OnDemand}; +use light::on_demand::{request, OnDemand, OnDemandRequester}; use light::TransactionQueue; use futures::{future, Future}; diff --git a/rpc/src/v1/helpers/dispatch/light.rs b/rpc/src/v1/helpers/dispatch/light.rs index 59c3af5232f..88f9fafcf16 100644 --- a/rpc/src/v1/helpers/dispatch/light.rs +++ b/rpc/src/v1/helpers/dispatch/light.rs @@ -20,7 +20,7 @@ use ethereum_types::{H256, Address, U256}; use light::TransactionQueue as LightTransactionQueue; use light::cache::Cache as LightDataCache; use light::client::LightChainClient; -use light::on_demand::{request, OnDemand}; +use light::on_demand::{request, OnDemandRequester}; use parking_lot::{Mutex, RwLock}; use stats::Corpus; use sync::{LightSyncProvider, LightNetworkDispatcher, ManageNetwork}; @@ -37,13 +37,17 @@ use v1::types::{RichRawTransaction as RpcRichRawTransaction,}; use super::{Dispatcher, Accounts, SignWith, PostSign}; /// Dispatcher for light clients -- fetches default gas price, next nonce, etc. from network. -pub struct LightDispatcher { +pub struct LightDispatcher +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static +{ /// Sync service. pub sync: Arc, /// Header chain client. pub client: Arc, /// On-demand request service. - pub on_demand: Arc, + pub on_demand: Arc, /// Data cache. pub cache: Arc>, /// Transaction queue. @@ -54,9 +58,10 @@ pub struct LightDispatcher LightDispatcher +impl LightDispatcher where - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { /// Create a new `LightDispatcher` from its requisite parts. /// @@ -64,7 +69,7 @@ where pub fn new( sync: Arc, client: Arc, - on_demand: Arc, + on_demand: Arc, cache: Arc>, transaction_queue: Arc>, nonces: Arc>, @@ -117,9 +122,10 @@ where } } -impl Clone for LightDispatcher +impl Clone for LightDispatcher where - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { fn clone(&self) -> Self { Self { @@ -134,9 +140,10 @@ where } } -impl Dispatcher for LightDispatcher +impl Dispatcher for LightDispatcher where - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { // Ignore the `force_nonce` flag in order to always query the network when fetching the nonce and // the account state. If the nonce is specified in the transaction use that nonce instead but do the @@ -239,12 +246,16 @@ where /// Get a recent gas price corpus. // TODO: this could be `impl Trait`. -pub fn fetch_gas_price_corpus( +pub fn fetch_gas_price_corpus( sync: Arc, client: Arc, - on_demand: Arc, + on_demand: Arc, cache: Arc>, -) -> BoxFuture> { +) -> BoxFuture> +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static +{ const GAS_PRICE_SAMPLE_SIZE: usize = 100; if let Some(cached) = { cache.lock().gas_price_corpus() } { diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index 3d19e3fa868..f5079647349 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -35,16 +35,15 @@ use light::cache::Cache; use light::client::LightChainClient; use light::{cht, MAX_HEADERS_PER_REQUEST}; use light::on_demand::{ - request, OnDemand, HeaderRef, Request as OnDemandRequest, + request, OnDemandRequester, HeaderRef, Request as OnDemandRequest, Response as OnDemandResponse, ExecutionResult, }; use light::on_demand::error::Error as OnDemandError; use light::request::Field; - use sync::{LightNetworkDispatcher, ManageNetwork, LightSyncProvider}; -use ethereum_types::{U256, Address}; +use ethereum_types::Address; use hash::H256; use parking_lot::Mutex; use fastmap::H256FastMap; @@ -57,9 +56,10 @@ use v1::types::{BlockNumber, CallRequest, Log, Transaction}; const NO_INVALID_BACK_REFS_PROOF: &str = "Fails only on invalid back-references; back-references here known to be valid; qed"; const WRONG_RESPONSE_AMOUNT_TYPE_PROOF: &str = "responses correspond directly with requests in amount and type; qed"; -pub fn light_all_transactions(dispatch: &Arc>) -> impl Iterator +pub fn light_all_transactions(dispatch: &Arc>) -> impl Iterator where - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { let txq = dispatch.transaction_queue.read(); let chain_info = dispatch.client.chain_info(); @@ -71,12 +71,15 @@ where /// Helper for fetching blockchain data either from the light client or the network /// as necessary. -pub struct LightFetch +pub struct LightFetch +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { /// The light client. pub client: Arc, /// The on-demand request service. - pub on_demand: Arc, + pub on_demand: Arc, /// Handle to the network. pub sync: Arc, /// The light data cache. @@ -85,9 +88,10 @@ pub struct LightFetch Clone for LightFetch +impl Clone for LightFetch where - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { fn clone(&self) -> Self { Self { @@ -136,9 +140,10 @@ fn extract_header(res: &[OnDemandResponse], header: HeaderRef) -> Option LightFetch +impl LightFetch where - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { // push the necessary requests onto the request chain to get the header by the given ID. // yield a header reference which other requests can use. @@ -277,7 +282,7 @@ where action: req.to.map_or(Action::Create, Action::Call), gas: req.gas.unwrap_or_else(|| START_GAS.into()), gas_price, - value: req.value.unwrap_or_else(U256::zero), + value: req.value.unwrap_or_default(), data: req.data.unwrap_or_default(), })) ) @@ -387,7 +392,7 @@ where block_index += 1; } } - future::ok::<_,OnDemandError>(matches) + future::ok::<_, OnDemandError>(matches) }) .map_err(errors::on_demand_error) .map(|matches| matches.into_iter().map(|(_, v)| v).collect()) @@ -657,22 +662,24 @@ where } } -struct ExecuteParams +struct ExecuteParams where - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { from: Address, tx: EthTransaction, hdr: encoded::Header, env_info: ::vm::EnvInfo, engine: Arc<::ethcore::engines::EthEngine>, - on_demand: Arc, + on_demand: Arc, sync: Arc, } -impl Clone for ExecuteParams +impl Clone for ExecuteParams where - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { fn clone(&self) -> Self { Self { @@ -689,9 +696,10 @@ where // Has a peer execute the transaction with given params. If `gas_known` is false, this will set the `gas value` to the // `required gas value` unless it exceeds the block gas limit -fn execute_read_only_tx(gas_known: bool, params: ExecuteParams) -> impl Future + Send +fn execute_read_only_tx(gas_known: bool, params: ExecuteParams) -> impl Future + Send where - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { if !gas_known { Box::new(future::loop_fn(params, |mut params| { diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index 91f32827b15..45072815780 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -33,7 +33,7 @@ use ethcore::client::{BlockChainClient, ChainNotify, NewBlocks, ChainRouteType, use ethereum_types::H256; use light::cache::Cache; use light::client::{LightChainClient, LightChainNotify}; -use light::on_demand::OnDemand; +use light::on_demand::OnDemandRequester; use parity_runtime::Executor; use parking_lot::{RwLock, Mutex}; @@ -89,14 +89,15 @@ impl EthPubSubClient { } } -impl EthPubSubClient> +impl EthPubSubClient> where - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { /// Creates a new `EthPubSubClient` for `LightClient`. pub fn light( client: Arc, - on_demand: Arc, + on_demand: Arc, sync: Arc, cache: Arc>, executor: Executor, @@ -194,9 +195,10 @@ pub trait LightClient: Send + Sync { fn logs(&self, filter: EthFilter) -> BoxFuture>; } -impl LightClient for LightFetch +impl LightClient for LightFetch where - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { fn block_header(&self, id: BlockId) -> Option { self.client.block_header(id) diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index fa972c5d609..b3e183c1c93 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -26,7 +26,7 @@ use jsonrpc_core::futures::future::Either; use light::cache::Cache as LightDataCache; use light::client::LightChainClient; use light::{cht, TransactionQueue}; -use light::on_demand::{request, OnDemand}; +use light::on_demand::{request, OnDemandRequester}; use ethereum_types::{Address, H64, H160, H256, U64, U256}; use hash::{KECCAK_NULL_RLP, KECCAK_EMPTY_LIST_RLP}; @@ -54,10 +54,10 @@ use sync::{LightSyncInfo, LightSyncProvider, LightNetworkDispatcher, ManageNetwo const NO_INVALID_BACK_REFS: &str = "Fails only on invalid back-references; back-references here known to be valid; qed"; /// Light client `ETH` (and filter) RPC. -pub struct EthClient { +pub struct EthClient { sync: Arc, client: Arc, - on_demand: Arc, + on_demand: Arc, transaction_queue: Arc>, accounts: Arc Vec
+ Send + Sync>, cache: Arc>, @@ -67,9 +67,10 @@ pub struct EthClient deprecation_notice: DeprecationNotice, } -impl Clone for EthClient +impl Clone for EthClient where - S: LightSyncProvider + LightNetworkDispatcher + 'static + S: LightSyncProvider + LightNetworkDispatcher + 'static, + OD: OnDemandRequester + 'static { fn clone(&self) -> Self { // each instance should have its own poll manager. @@ -88,17 +89,18 @@ where } } -impl EthClient +impl EthClient where C: LightChainClient + 'static, - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { /// Create a new `EthClient` with a handle to the light sync instance, client, /// and on-demand request service, which is assumed to be attached as a handler. pub fn new( sync: Arc, client: Arc, - on_demand: Arc, + on_demand: Arc, transaction_queue: Arc>, accounts: Arc Vec
+ Send + Sync>, cache: Arc>, @@ -120,7 +122,7 @@ where } /// Create a light data fetcher instance. - fn fetcher(&self) -> LightFetch + fn fetcher(&self) -> LightFetch { LightFetch { client: self.client.clone(), @@ -218,10 +220,11 @@ where } } -impl Eth for EthClient +impl Eth for EthClient where C: LightChainClient + 'static, - S: LightSyncInfo + LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncInfo + LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { type Metadata = Metadata; @@ -533,10 +536,11 @@ where } // This trait implementation triggers a blanked impl of `EthFilter`. -impl Filterable for EthClient +impl Filterable for EthClient where C: LightChainClient + 'static, - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { fn best_block_number(&self) -> u64 { self.client.chain_info().best_block_number } diff --git a/rpc/src/v1/impls/light/parity.rs b/rpc/src/v1/impls/light/parity.rs index b6e378f6804..0486366de62 100644 --- a/rpc/src/v1/impls/light/parity.rs +++ b/rpc/src/v1/impls/light/parity.rs @@ -30,6 +30,7 @@ use ethcore_logger::RotatingLogger; use jsonrpc_core::{Result, BoxFuture}; use jsonrpc_core::futures::{future, Future}; +use light::on_demand::OnDemandRequester; use v1::helpers::{self, errors, ipfs, NetworkSettings, verify_signature}; use v1::helpers::external_signer::{SignerService, SigningQueue}; use v1::helpers::dispatch::LightDispatcher; @@ -48,8 +49,12 @@ use v1::types::{ use Host; /// Parity implementation for light client. -pub struct ParityClient { - light_dispatch: Arc>, +pub struct ParityClient +where + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static +{ + light_dispatch: Arc>, logger: Arc, settings: Arc, signer: Option>, @@ -57,13 +62,14 @@ pub struct ParityClient ParityClient +impl ParityClient where - S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { /// Creates new `ParityClient`. pub fn new( - light_dispatch: Arc>, + light_dispatch: Arc>, logger: Arc, settings: Arc, signer: Option>, @@ -81,7 +87,7 @@ where } /// Create a light blockchain data fetcher. - fn fetcher(&self) -> LightFetch + fn fetcher(&self) -> LightFetch { LightFetch { client: self.light_dispatch.client.clone(), @@ -93,9 +99,10 @@ where } } -impl Parity for ParityClient +impl Parity for ParityClient where - S: LightSyncInfo + LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static + S: LightSyncInfo + LightSyncProvider + LightNetworkDispatcher + ManageNetwork + 'static, + OD: OnDemandRequester + 'static { type Metadata = Metadata;