Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Backport #8099 to beta #8132

Merged
merged 2 commits into from
Apr 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
518 changes: 212 additions & 306 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions dapps/src/apps/fetcher/installers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use std::{fs, fmt};
use std::io::{self, Read, Write};
use std::path::PathBuf;
use ethereum_types::H256;
use fetch::{self, Mime};
use fetch;
use futures_cpupool::CpuPool;
use hash::keccak_buffer;
use mime_guess::Mime;

use apps::manifest::{MANIFEST_FILENAME, deserialize_manifest, serialize_manifest, Manifest};
use handlers::{ContentValidator, ValidatorResponse};
Expand Down Expand Up @@ -53,7 +54,7 @@ fn write_response_and_check_hash(

// Now write the response
let mut file = io::BufWriter::new(fs::File::create(&content_path)?);
let mut reader = io::BufReader::new(response);
let mut reader = io::BufReader::new(fetch::BodyReader::new(response));
io::copy(&mut reader, &mut file)?;
file.flush()?;

Expand Down
5 changes: 4 additions & 1 deletion dapps/src/apps/fetcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl<R: URLHint + 'static, F: Fetch> Endpoint for ContentFetcher<F, R> {
),
self.embeddable_on.clone(),
self.fetch.clone(),
self.pool.clone(),
)
},
URLHintResult::GithubDapp(content) => {
Expand All @@ -232,6 +233,7 @@ impl<R: URLHint + 'static, F: Fetch> Endpoint for ContentFetcher<F, R> {
),
self.embeddable_on.clone(),
self.fetch.clone(),
self.pool.clone(),
)
},
URLHintResult::Content(content) => {
Expand All @@ -248,6 +250,7 @@ impl<R: URLHint + 'static, F: Fetch> Endpoint for ContentFetcher<F, R> {
),
self.embeddable_on.clone(),
self.fetch.clone(),
self.pool.clone(),
)
},
};
Expand Down Expand Up @@ -280,7 +283,7 @@ impl<R: URLHint + 'static, F: Fetch> Endpoint for ContentFetcher<F, R> {
mod tests {
use std::env;
use std::sync::Arc;
use fetch::{Fetch, Client};
use fetch::Client;
use futures::{future, Future};
use hash_fetch::urlhint::{URLHint, URLHintResult};
use ethereum_types::H256;
Expand Down
2 changes: 1 addition & 1 deletion dapps/src/apps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub fn all_endpoints<F: Fetch>(
insert::<parity_ui::old::App>(&mut pages, "v1", Embeddable::Yes(embeddable.clone()), pool.clone());

pages.insert("proxy".into(), ProxyPac::boxed(embeddable.clone(), dapps_domain.to_owned()));
pages.insert(WEB_PATH.into(), Web::boxed(embeddable.clone(), web_proxy_tokens.clone(), fetch.clone()));
pages.insert(WEB_PATH.into(), Web::boxed(embeddable.clone(), web_proxy_tokens.clone(), fetch.clone(), pool.clone()));

(local_endpoints, pages)
}
Expand Down
14 changes: 9 additions & 5 deletions dapps/src/handlers/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::time::{Instant, Duration};
use fetch::{self, Fetch};
use futures::sync::oneshot;
use futures::{self, Future};
use futures_cpupool::CpuPool;
use hyper::{self, Method, StatusCode};
use parking_lot::Mutex;

Expand All @@ -35,7 +36,7 @@ const FETCH_TIMEOUT: u64 = 300;

pub enum ValidatorResponse {
Local(local::Dapp),
Streaming(StreamingHandler<fetch::Response>),
Streaming(StreamingHandler<fetch::BodyReader>),
}

pub trait ContentValidator: Sized + Send + 'static {
Expand Down Expand Up @@ -252,6 +253,7 @@ impl ContentFetcherHandler {
installer: H,
embeddable_on: Embeddable,
fetch: F,
pool: CpuPool,
) -> Self {
let fetch_control = FetchControl::default();
let errors = Errors { embeddable_on };
Expand All @@ -262,6 +264,7 @@ impl ContentFetcherHandler {
Method::Get => {
trace!(target: "dapps", "Fetching content from: {:?}", url);
FetchState::InProgress(Self::fetch_content(
pool,
fetch,
url,
fetch_control.abort.clone(),
Expand All @@ -282,6 +285,7 @@ impl ContentFetcherHandler {
}

fn fetch_content<H: ContentValidator, F: Fetch>(
pool: CpuPool,
fetch: F,
url: &str,
abort: Arc<AtomicBool>,
Expand All @@ -290,8 +294,8 @@ impl ContentFetcherHandler {
installer: H,
) -> Box<Future<Item=FetchState, Error=()> + Send> {
// Start fetching the content
let fetch2 = fetch.clone();
let future = fetch.fetch_with_abort(url, abort.into()).then(move |result| {
let pool2 = pool.clone();
let future = fetch.fetch(url, abort.into()).then(move |result| {
trace!(target: "dapps", "Fetching content finished. Starting validation: {:?}", result);
Ok(match result {
Ok(response) => match installer.validate_and_install(response) {
Expand All @@ -303,7 +307,7 @@ impl ContentFetcherHandler {
Ok(ValidatorResponse::Streaming(stream)) => {
trace!(target: "dapps", "Validation OK. Streaming response.");
let (reading, response) = stream.into_response();
fetch2.process_and_forget(reading);
pool.spawn(reading).forget();
FetchState::Streaming(response)
},
Err(e) => {
Expand All @@ -319,7 +323,7 @@ impl ContentFetcherHandler {
});

// make sure to run within fetch thread pool.
fetch.process(future)
Box::new(pool2.spawn(future))
}
}

Expand Down
27 changes: 6 additions & 21 deletions dapps/src/tests/helpers/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::{io, thread, time};
use std::{thread, time};
use std::sync::{atomic, mpsc, Arc};
use parking_lot::Mutex;
use hyper;

use futures::{self, Future};
use fetch::{self, Fetch};
use fetch::{self, Fetch, Url};

pub struct FetchControl {
sender: mpsc::Sender<()>,
Expand Down Expand Up @@ -96,11 +97,8 @@ impl FakeFetch {
impl Fetch for FakeFetch {
type Result = Box<Future<Item = fetch::Response, Error = fetch::Error> + Send>;

fn new() -> Result<Self, fetch::Error> where Self: Sized {
Ok(FakeFetch::default())
}

fn fetch_with_abort(&self, url: &str, _abort: fetch::Abort) -> Self::Result {
fn fetch(&self, url: &str, abort: fetch::Abort) -> Self::Result {
let u = Url::parse(url).unwrap();
self.requested.lock().push(url.into());
let manual = self.manual.clone();
let response = self.response.clone();
Expand All @@ -111,23 +109,10 @@ impl Fetch for FakeFetch {
// wait for manual resume
let _ = rx.recv();
}

let data = response.lock().take().unwrap_or(b"Some content");
let cursor = io::Cursor::new(data);
tx.send(fetch::Response::from_reader(cursor)).unwrap();
tx.send(fetch::Response::new(u, hyper::Response::new().with_body(data), abort)).unwrap();
});

Box::new(rx.map_err(|_| fetch::Error::Aborted))
}

fn process_and_forget<F, I, E>(&self, f: F) where
F: Future<Item=I, Error=E> + Send + 'static,
I: Send + 'static,
E: Send + 'static,
{
// Spawn the task in a separate thread.
thread::spawn(|| {
let _ = f.wait();
});
}
}
20 changes: 6 additions & 14 deletions dapps/src/tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn init_server<F, B>(process: F, io: IoHandler) -> (Server, Arc<FakeRegistra
let mut dapps_path = env::temp_dir();
dapps_path.push("non-existent-dir-to-prevent-fs-files-from-loading");

let mut builder = ServerBuilder::new(&dapps_path, registrar.clone());
let mut builder = ServerBuilder::new(FetchClient::new().unwrap(), &dapps_path, registrar.clone());
builder.signer_address = Some(("127.0.0.1".into(), SIGNER_PORT));
let server = process(builder).start_unsecured_http(&"127.0.0.1:0".parse().unwrap(), io).unwrap();
(
Expand Down Expand Up @@ -149,21 +149,21 @@ pub struct ServerBuilder<T: Fetch = FetchClient> {
web_proxy_tokens: Arc<WebProxyTokens>,
signer_address: Option<(String, u16)>,
allowed_hosts: DomainsValidation<Host>,
fetch: Option<T>,
fetch: T,
serve_ui: bool,
}

impl ServerBuilder {
/// Construct new dapps server
pub fn new<P: AsRef<Path>>(dapps_path: P, registrar: Arc<ContractClient>) -> Self {
pub fn new<P: AsRef<Path>>(fetch: FetchClient, dapps_path: P, registrar: Arc<ContractClient>) -> Self {
ServerBuilder {
dapps_path: dapps_path.as_ref().to_owned(),
registrar: registrar,
sync_status: Arc::new(FakeSync(false)),
web_proxy_tokens: Arc::new(|_| None),
signer_address: None,
allowed_hosts: DomainsValidation::Disabled,
fetch: None,
fetch: fetch,
serve_ui: false,
}
}
Expand All @@ -179,15 +179,14 @@ impl<T: Fetch> ServerBuilder<T> {
web_proxy_tokens: self.web_proxy_tokens,
signer_address: self.signer_address,
allowed_hosts: self.allowed_hosts,
fetch: Some(fetch),
fetch: fetch,
serve_ui: self.serve_ui,
}
}

/// Asynchronously start server with no authentication,
/// returns result with `Server` handle on success or an error.
pub fn start_unsecured_http(self, addr: &SocketAddr, io: IoHandler) -> io::Result<Server> {
let fetch = self.fetch_client();
Server::start_http(
addr,
io,
Expand All @@ -199,17 +198,10 @@ impl<T: Fetch> ServerBuilder<T> {
self.sync_status,
self.web_proxy_tokens,
Remote::new_sync(),
fetch,
self.fetch,
self.serve_ui,
)
}

fn fetch_client(&self) -> T {
match self.fetch.clone() {
Some(fetch) => fetch,
None => T::new().unwrap(),
}
}
}

const DAPPS_DOMAIN: &'static str = "web3.site";
Expand Down
7 changes: 6 additions & 1 deletion dapps/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use hyper::{mime, StatusCode};
use apps;
use endpoint::{Endpoint, EndpointPath, Request, Response};
use futures::future;
use futures_cpupool::CpuPool;
use handlers::{
ContentFetcherHandler, ContentHandler, ContentValidator, ValidatorResponse,
StreamingHandler,
Expand All @@ -35,18 +36,21 @@ pub struct Web<F> {
embeddable_on: Embeddable,
web_proxy_tokens: Arc<WebProxyTokens>,
fetch: F,
pool: CpuPool,
}

impl<F: Fetch> Web<F> {
pub fn boxed(
embeddable_on: Embeddable,
web_proxy_tokens: Arc<WebProxyTokens>,
fetch: F,
pool: CpuPool,
) -> Box<Endpoint> {
Box::new(Web {
embeddable_on,
web_proxy_tokens,
fetch,
pool,
})
}

Expand Down Expand Up @@ -129,6 +133,7 @@ impl<F: Fetch> Endpoint for Web<F> {
},
self.embeddable_on.clone(),
self.fetch.clone(),
self.pool.clone(),
))
}
}
Expand All @@ -146,7 +151,7 @@ impl ContentValidator for WebInstaller {
let is_html = response.is_html();
let mime = response.content_type().unwrap_or(mime::TEXT_HTML);
let mut handler = StreamingHandler::new(
response,
fetch::BodyReader::new(response),
status,
mime,
self.embeddable_on,
Expand Down
1 change: 1 addition & 0 deletions ethcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ ethjson = { path = "../json" }
ethkey = { path = "../ethkey" }
ethstore = { path = "../ethstore" }
evm = { path = "evm" }
futures-cpupool = "0.1"
hardware-wallet = { path = "../hw" }
heapsize = "0.4"
itertools = "0.5"
Expand Down
1 change: 1 addition & 0 deletions ethcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ extern crate ethcore_transaction as transaction;
extern crate ethereum_types;
extern crate ethjson;
extern crate ethkey;
extern crate futures_cpupool;
extern crate hardware_wallet;
extern crate hashdb;
extern crate itertools;
Expand Down
5 changes: 3 additions & 2 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use ethcore_miner::transaction_queue::{
AccountDetails,
TransactionOrigin,
};
use futures_cpupool::CpuPool;
use ethcore_miner::work_notify::{WorkPoster, NotifyWork};
use ethcore_miner::service_transaction_checker::ServiceTransactionChecker;
use miner::{MinerService, MinerStatus};
Expand Down Expand Up @@ -216,11 +217,11 @@ pub enum GasPricer {

impl GasPricer {
/// Create a new Calibrated `GasPricer`.
pub fn new_calibrated(options: GasPriceCalibratorOptions, fetch: FetchClient) -> GasPricer {
pub fn new_calibrated(options: GasPriceCalibratorOptions, fetch: FetchClient, p: CpuPool) -> GasPricer {
GasPricer::Calibrated(GasPriceCalibrator {
options: options,
next_calibration: Instant::now(),
price_info: PriceInfoClient::new(fetch),
price_info: PriceInfoClient::new(fetch, p),
})
}

Expand Down
2 changes: 2 additions & 0 deletions hash-fetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ authors = ["Parity Technologies <admin@parity.io>"]

[dependencies]
futures = "0.1"
futures-cpupool = "0.1"
log = "0.3"
mime = "0.3"
mime_guess = "2.0.0-alpha.2"
Expand All @@ -24,4 +25,5 @@ ethabi-derive = "5.0"
ethabi-contract = "5.0"

[dev-dependencies]
hyper = "0.11"
parking_lot = "0.5"
Loading