Skip to content

Commit

Permalink
remove tokio_util::block_on (denoland#3388)
Browse files Browse the repository at this point in the history
This PR removes tokio_util::block_on - refactored compiler and file 
fetcher slightly so that we can safely block there - that's because 
only blocking path consist of only synchronous operations.

Additionally I removed excessive use of tokio_util::panic_on_error 
and tokio_util::run_in_task and moved both functions to cli/worker.rs, 
to tests module.

Closes denoland#2960
  • Loading branch information
bartlomieju committed Dec 28, 2019
1 parent 0d2dbab commit dff2b76
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 147 deletions.
9 changes: 4 additions & 5 deletions cli/compilers/ts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ impl TsCompiler {

let source_file = self
.file_fetcher
.fetch_source_file(&module_specifier)
.fetch_cached_source_file(&module_specifier)
.expect("Source file not found");

let version_hash = source_code_version_hash(
Expand Down Expand Up @@ -581,10 +581,9 @@ impl TsCompiler {
script_name: &str,
) -> Option<SourceFile> {
if let Some(module_specifier) = self.try_to_resolve(script_name) {
return match self.file_fetcher.fetch_source_file(&module_specifier) {
Ok(out) => Some(out),
Err(_) => None,
};
return self
.file_fetcher
.fetch_cached_source_file(&module_specifier);
}

None
Expand Down
151 changes: 84 additions & 67 deletions cli/file_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::http_util;
use crate::http_util::FetchOnceResult;
use crate::msg;
use crate::progress::Progress;
use crate::tokio_util;
use deno::ErrBox;
use deno::ModuleSpecifier;
use futures::future::Either;
Expand Down Expand Up @@ -111,12 +110,25 @@ impl SourceFileFetcher {
Ok(())
}

/// Required for TS compiler.
pub fn fetch_source_file(
/// Required for TS compiler and source maps.
pub fn fetch_cached_source_file(
self: &Self,
specifier: &ModuleSpecifier,
) -> Result<SourceFile, ErrBox> {
tokio_util::block_on(self.fetch_source_file_async(specifier))
) -> Option<SourceFile> {
let maybe_source_file = self.source_file_cache.get(specifier.to_string());

if maybe_source_file.is_some() {
return maybe_source_file;
}

// If file is not in memory cache check if it can be found
// in local cache - which effectively means trying to fetch
// using "--no-fetch" flag. We can safely block on this
// future, because it doesn't do any asynchronous action
// it that path.
let fut = self.get_source_file_async(specifier.as_url(), true, true);

futures::executor::block_on(fut).ok()
}

pub fn fetch_source_file_async(
Expand Down Expand Up @@ -663,6 +675,7 @@ impl SourceCodeHeaders {
mod tests {
use super::*;
use crate::fs as deno_fs;
use crate::tokio_util;
use tempfile::TempDir;

fn setup_file_fetcher(dir_path: &Path) -> SourceFileFetcher {
Expand Down Expand Up @@ -987,45 +1000,45 @@ mod tests {
fn test_get_source_code_multiple_downloads_of_same_file() {
let http_server_guard = crate::test_util::http_server();
let (_temp_dir, fetcher) = test_setup();
// http_util::fetch_sync_string requires tokio
tokio_util::init(|| {
let specifier = ModuleSpecifier::resolve_url(
"http://localhost:4545/tests/subdir/mismatch_ext.ts",
)
.unwrap();
let headers_file_name = fetcher.deps_cache.location.join(
fetcher.deps_cache.get_cache_filename_with_extension(
specifier.as_url(),
"headers.json",
),
);
let specifier = ModuleSpecifier::resolve_url(
"http://localhost:4545/tests/subdir/mismatch_ext.ts",
)
.unwrap();
let headers_file_name = fetcher.deps_cache.location.join(
fetcher
.deps_cache
.get_cache_filename_with_extension(specifier.as_url(), "headers.json"),
);

// first download
let result = fetcher.fetch_source_file(&specifier);
assert!(result.is_ok());

let result = fs::File::open(&headers_file_name);
assert!(result.is_ok());
let headers_file = result.unwrap();
// save modified timestamp for headers file
let headers_file_metadata = headers_file.metadata().unwrap();
let headers_file_modified = headers_file_metadata.modified().unwrap();

// download file again, it should use already fetched file even though `use_disk_cache` is set to
// false, this can be verified using source header file creation timestamp (should be
// the same as after first download)
let result = fetcher.fetch_source_file(&specifier);
assert!(result.is_ok());

let result = fs::File::open(&headers_file_name);
assert!(result.is_ok());
let headers_file_2 = result.unwrap();
// save modified timestamp for headers file
let headers_file_metadata_2 = headers_file_2.metadata().unwrap();
let headers_file_modified_2 = headers_file_metadata_2.modified().unwrap();

assert_eq!(headers_file_modified, headers_file_modified_2);
});
// first download
tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| {
assert!(r.is_ok());
futures::future::ok(())
}));

let result = fs::File::open(&headers_file_name);
assert!(result.is_ok());
let headers_file = result.unwrap();
// save modified timestamp for headers file
let headers_file_metadata = headers_file.metadata().unwrap();
let headers_file_modified = headers_file_metadata.modified().unwrap();

// download file again, it should use already fetched file even though `use_disk_cache` is set to
// false, this can be verified using source header file creation timestamp (should be
// the same as after first download)
tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| {
assert!(r.is_ok());
futures::future::ok(())
}));

let result = fs::File::open(&headers_file_name);
assert!(result.is_ok());
let headers_file_2 = result.unwrap();
// save modified timestamp for headers file
let headers_file_metadata_2 = headers_file_2.metadata().unwrap();
let headers_file_modified_2 = headers_file_metadata_2.modified().unwrap();

assert_eq!(headers_file_modified, headers_file_modified_2);
drop(http_server_guard);
}

Expand Down Expand Up @@ -1427,43 +1440,47 @@ mod tests {
fn test_fetch_source_file() {
let (_temp_dir, fetcher) = test_setup();

tokio_util::init(|| {
// Test failure case.
let specifier =
ModuleSpecifier::resolve_url(file_url!("/baddir/hello.ts")).unwrap();
let r = fetcher.fetch_source_file(&specifier);
// Test failure case.
let specifier =
ModuleSpecifier::resolve_url(file_url!("/baddir/hello.ts")).unwrap();
tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| {
assert!(r.is_err());
futures::future::ok(())
}));

let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("js/main.ts")
.to_owned();
let specifier =
ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap();
let r = fetcher.fetch_source_file(&specifier);
let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("js/main.ts")
.to_owned();
let specifier =
ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap();
tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| {
assert!(r.is_ok());
})
futures::future::ok(())
}));
}

#[test]
fn test_fetch_source_file_1() {
/*recompile ts file*/
let (_temp_dir, fetcher) = test_setup();

tokio_util::init(|| {
// Test failure case.
let specifier =
ModuleSpecifier::resolve_url(file_url!("/baddir/hello.ts")).unwrap();
let r = fetcher.fetch_source_file(&specifier);
// Test failure case.
let specifier =
ModuleSpecifier::resolve_url(file_url!("/baddir/hello.ts")).unwrap();
tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| {
assert!(r.is_err());
futures::future::ok(())
}));

let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("js/main.ts")
.to_owned();
let specifier =
ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap();
let r = fetcher.fetch_source_file(&specifier);
let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("js/main.ts")
.to_owned();
let specifier =
ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap();
tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| {
assert!(r.is_ok());
})
futures::future::ok(())
}));
}

#[test]
Expand Down
9 changes: 5 additions & 4 deletions cli/global_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! only need to be able to start and cancel a single timer (or Delay, as Tokio
//! calls it) for an entire Isolate. This is what is implemented here.
use crate::tokio_util::panic_on_error;
use crate::futures::TryFutureExt;
use futures::channel::oneshot;
use futures::future::FutureExt;
use std::future::Future;
Expand Down Expand Up @@ -43,9 +43,10 @@ impl GlobalTimer {
let (tx, rx) = oneshot::channel();
self.tx = Some(tx);

let delay =
panic_on_error(futures::compat::Compat01As03::new(Delay::new(deadline)));
let rx = panic_on_error(rx);
let delay = futures::compat::Compat01As03::new(Delay::new(deadline))
.map_err(|err| panic!("Unexpected error in timeout {:?}", err));
let rx = rx
.map_err(|err| panic!("Unexpected error in receiving channel {:?}", err));

futures::future::select(delay, rx).then(|_| futures::future::ok(()))
}
Expand Down
64 changes: 0 additions & 64 deletions cli/tokio_util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use deno::ErrBox;
use futures;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
Expand Down Expand Up @@ -29,66 +28,3 @@ where
{
tokio::runtime::current_thread::run(future.boxed().compat());
}

/// THIS IS A HACK AND SHOULD BE AVOIDED.
///
/// This spawns a new thread and creates a single-threaded tokio runtime on that thread,
/// to execute the given future.
///
/// This is useful when we want to block the main runtime to
/// resolve a future without worrying that we'll use up all the threads in the
/// main runtime.
pub fn block_on<F, R>(future: F) -> Result<R, ErrBox>
where
F: Send + 'static + Future<Output = Result<R, ErrBox>> + Unpin,
R: Send + 'static,
{
use std::sync::mpsc::channel;
use std::thread;
let (sender, receiver) = channel();
// Create a new runtime to evaluate the future asynchronously.
thread::spawn(move || {
let r = tokio::runtime::current_thread::block_on_all(future.compat());
sender
.send(r)
.expect("Unable to send blocking future result")
});
receiver
.recv()
.expect("Unable to receive blocking future result")
}

// Set the default executor so we can use tokio::spawn(). It's difficult to
// pass around mut references to the runtime, so using with_default is
// preferable. Ideally Tokio would provide this function.
#[cfg(test)]
pub fn init<F>(f: F)
where
F: FnOnce(),
{
let rt = create_threadpool_runtime().expect("Unable to create Tokio runtime");
let mut executor = rt.executor();
let mut enter = tokio_executor::enter().expect("Multiple executors at once");
tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f());
}

pub fn panic_on_error<I, E, F>(f: F) -> impl Future<Output = Result<I, ()>>
where
F: Future<Output = Result<I, E>>,
E: std::fmt::Debug,
{
f.map_err(|err| panic!("Future got unexpected error: {:?}", err))
}

#[cfg(test)]
pub fn run_in_task<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
let fut = futures::future::lazy(move |_cx| {
f();
Ok(())
});

run(fut)
}
Loading

0 comments on commit dff2b76

Please sign in to comment.