From 71fc6e721e567f2f0870c4a0f4d46ad9ca44bf85 Mon Sep 17 00:00:00 2001 From: Yoshiya Hinosawa Date: Fri, 17 Jan 2020 01:43:06 +0900 Subject: [PATCH 1/7] feat: optional async ops --- cli/ops/compiler.rs | 32 +++++++++++++--------- cli/ops/dispatch_json.rs | 11 ++++---- cli/ops/dispatch_minimal.rs | 2 +- cli/ops/fetch.rs | 2 +- cli/ops/files.rs | 4 +-- cli/ops/net.rs | 4 +-- cli/ops/process.rs | 2 +- cli/ops/timers.rs | 2 +- cli/ops/tls.rs | 4 +-- cli/ops/workers.rs | 8 +++--- cli/state.rs | 4 +-- core/es_isolate.rs | 2 +- core/examples/http_bench.rs | 2 +- core/isolate.rs | 53 ++++++++++++++++++++++++++++++++----- core/ops.rs | 17 ++++++++++-- test_plugin/src/lib.rs | 2 +- 16 files changed, 105 insertions(+), 46 deletions(-) diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs index 5d6875fb0ed52e..bc7fcd990fa138 100644 --- a/cli/ops/compiler.rs +++ b/cli/ops/compiler.rs @@ -148,7 +148,7 @@ fn op_fetch_source_files( Ok(v.into()) }); - Ok(JsonOp::Async(future)) + Ok(JsonOp::Async(future, true)) } #[derive(Deserialize, Debug)] @@ -166,13 +166,16 @@ fn op_compile( _zero_copy: Option, ) -> Result { let args: CompileArgs = serde_json::from_value(args)?; - Ok(JsonOp::Async(runtime_compile_async( - state.global_state.clone(), - &args.root_name, - &args.sources, - args.bundle, - &args.options, - ))) + Ok(JsonOp::Async( + runtime_compile_async( + state.global_state.clone(), + &args.root_name, + &args.sources, + args.bundle, + &args.options, + ), + true, + )) } #[derive(Deserialize, Debug)] @@ -187,9 +190,12 @@ fn op_transpile( _zero_copy: Option, ) -> Result { let args: TranspileArgs = serde_json::from_value(args)?; - Ok(JsonOp::Async(runtime_transpile_async( - state.global_state.clone(), - &args.sources, - &args.options, - ))) + Ok(JsonOp::Async( + runtime_transpile_async( + state.global_state.clone(), + &args.sources, + &args.options, + ), + true, + )) } diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index 7f53a3d8004ff4..011385ed08e383 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -13,7 +13,8 @@ pub type AsyncJsonOp = pub enum JsonOp { Sync(Value), - Async(AsyncJsonOp), + /** The 2nd element is true when the op blocks exiting, false otherwise. */ + Async(AsyncJsonOp, bool), } fn json_err(err: ErrBox) -> Value { @@ -70,19 +71,19 @@ where assert!(promise_id.is_none()); CoreOp::Sync(serialize_result(promise_id, Ok(sync_value))) } - Ok(JsonOp::Async(fut)) => { + Ok(JsonOp::Async(fut, blocks_exit)) => { assert!(promise_id.is_some()); let fut2 = fut.then(move |result| { futures::future::ok(serialize_result(promise_id, result)) }); - CoreOp::Async(fut2.boxed()) + CoreOp::Async(fut2.boxed(), blocks_exit) } Err(sync_err) => { let buf = serialize_result(promise_id, Err(sync_err)); if is_sync { CoreOp::Sync(buf) } else { - CoreOp::Async(futures::future::ok(buf).boxed()) + CoreOp::Async(futures::future::ok(buf).boxed(), true) } } } @@ -101,6 +102,6 @@ where let handle = pool .spawn_with_handle(futures::future::lazy(move |_cx| f())) .unwrap(); - Ok(JsonOp::Async(handle.boxed())) + Ok(JsonOp::Async(handle.boxed(), true)) } } diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index 2d5618d65c2bb7..9f2206fd11d023 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -164,7 +164,7 @@ where // works since they're simple polling futures. Op::Sync(futures::executor::block_on(fut).unwrap()) } else { - Op::Async(fut.boxed()) + Op::Async(fut.boxed(), true) } } } diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index e084fdeffb13f8..a8496a969fdf33 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -81,5 +81,5 @@ pub fn op_fetch( Ok(json_res) }; - Ok(JsonOp::Async(future.boxed())) + Ok(JsonOp::Async(future.boxed(), true)) } diff --git a/cli/ops/files.rs b/cli/ops/files.rs index 8bb3c8acb905e2..2fc24d9b37bb82 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -100,7 +100,7 @@ fn op_open( let buf = futures::executor::block_on(fut)?; Ok(JsonOp::Sync(buf)) } else { - Ok(JsonOp::Async(fut.boxed())) + Ok(JsonOp::Async(fut.boxed(), true)) } } @@ -172,6 +172,6 @@ fn op_seek( let buf = futures::executor::block_on(fut)?; Ok(JsonOp::Sync(buf)) } else { - Ok(JsonOp::Async(fut.boxed())) + Ok(JsonOp::Async(fut.boxed(), true)) } } diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 836ec2e8d39153..457f1d376188ec 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -130,7 +130,7 @@ fn op_accept( })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } #[derive(Deserialize)] @@ -173,7 +173,7 @@ fn op_connect( })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } #[derive(Deserialize)] diff --git a/cli/ops/process.rs b/cli/ops/process.rs index 30de5a7356079e..6a0e48e27b986b 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -244,7 +244,7 @@ fn op_run_status( let pool = futures::executor::ThreadPool::new().unwrap(); let handle = pool.spawn_with_handle(future).unwrap(); - Ok(JsonOp::Async(handle.boxed())) + Ok(JsonOp::Async(handle.boxed(), true)) } #[derive(Deserialize)] diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs index 8bec10f70d1f82..c07e44256786f9 100644 --- a/cli/ops/timers.rs +++ b/cli/ops/timers.rs @@ -51,7 +51,7 @@ fn op_global_timer( .new_timeout(deadline) .then(move |_| futures::future::ok(json!({}))); - Ok(JsonOp::Async(f.boxed())) + Ok(JsonOp::Async(f.boxed(), true)) } // Returns a milliseconds and nanoseconds subsec diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index 87a067a9e1d271..ca211b9722f1b3 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -116,7 +116,7 @@ pub fn op_connect_tls( })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } fn load_certs(path: &str) -> Result, ErrBox> { @@ -397,5 +397,5 @@ fn op_accept_tls( })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index eeffb39305c2d2..dab8022b728945 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -97,7 +97,7 @@ fn op_worker_get_message( Ok(json!({ "data": maybe_buf })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } /// Post message to host as guest worker @@ -258,7 +258,7 @@ fn op_host_get_worker_loaded( Ok(serialize_worker_result(result)) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } fn op_host_poll_worker( @@ -286,7 +286,7 @@ fn op_host_poll_worker( Ok(serialize_worker_result(result)) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } fn op_host_close_worker( @@ -348,7 +348,7 @@ fn op_host_get_message( Ok(json!({ "data": maybe_buf })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } #[derive(Deserialize)] diff --git a/cli/state.rs b/cli/state.rs index acd661f251a795..f7fb2aabeeb3c3 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -103,13 +103,13 @@ impl ThreadSafeState { state.metrics_op_completed(buf.len()); Op::Sync(buf) } - Op::Async(fut) => { + Op::Async(fut, blocks_exit) => { let state = state.clone(); let result_fut = fut.map_ok(move |buf: Buf| { state.metrics_op_completed(buf.len()); buf }); - Op::Async(result_fut.boxed()) + Op::Async(result_fut.boxed(), blocks_exit) } } } diff --git a/core/es_isolate.rs b/core/es_isolate.rs index a3231a90c21717..1063a8f7f145e1 100644 --- a/core/es_isolate.rs +++ b/core/es_isolate.rs @@ -650,7 +650,7 @@ pub mod tests { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed()) + Op::Async(futures::future::ok(buf).boxed(), true) }; isolate.register_op("test", dispatcher); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 5a5b43c5128270..a3e591d0b628ab 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -133,7 +133,7 @@ fn http_op( if is_sync { Op::Sync(futures::executor::block_on(fut).unwrap()) } else { - Op::Async(fut.boxed()) + Op::Async(fut.boxed(), true) } } } diff --git a/core/isolate.rs b/core/isolate.rs index 5617caa8609339..14e9f29bedba94 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -511,9 +511,11 @@ impl Isolate { let op_id = 0; Some((op_id, buf)) } - Op::Async(fut) => { + Op::Async(fut, blocks_exit) => { let fut2 = fut.map_ok(move |buf| (op_id, buf)); - self.pending_ops.push(fut2.boxed()); + self + .pending_ops + .push(Pin::new(Box::new(PendingOp(fut2.boxed(), blocks_exit)))); self.have_unpolled_ops = true; None } @@ -742,8 +744,8 @@ impl Future for Isolate { inner.check_promise_errors(); inner.check_last_exception()?; - // We're idle if pending_ops is empty. - if inner.pending_ops.is_empty() { + // We're idle if all pending_ops have blocks_exit flag false. + if inner.pending_ops.iter().all(|op| !op.1) { Poll::Ready(Ok(())) } else { if inner.have_unpolled_ops { @@ -814,6 +816,7 @@ pub mod tests { pub enum Mode { Async, + AsyncOptional, OverflowReqSync, OverflowResSync, OverflowReqAsync, @@ -834,7 +837,18 @@ pub mod tests { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed()) + Op::Async(futures::future::ok(buf).boxed(), true) + } + Mode::AsyncOptional => { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 42); + let fut = async { + // This future never finish. + futures::future::pending::<()>().await; + let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); + Ok(buf) + }; + Op::Async(fut.boxed(), false) } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); @@ -853,7 +867,7 @@ pub mod tests { Mode::OverflowReqAsync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed()) + Op::Async(futures::future::ok(buf).boxed(), true) } Mode::OverflowResAsync => { assert_eq!(control.len(), 1); @@ -862,7 +876,7 @@ pub mod tests { vec.resize(100 * 1024 * 1024, 0); vec[0] = 4; let buf = vec.into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed()) + Op::Async(futures::future::ok(buf).boxed(), true) } } }; @@ -953,6 +967,31 @@ pub mod tests { }); } + #[test] + fn test_poll_async_optional_ops() { + run_in_task(|cx| { + let (mut isolate, dispatch_count) = setup(Mode::AsyncOptional); + js_check(isolate.execute( + "check1.js", + r#" + Deno.core.setAsyncHandler(1, (buf) => { + // This handler will never be called + assert(false); + }); + let control = new Uint8Array([42]); + Deno.core.send(1, control); + "#, + )); + assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + // The above op never finish, but isolate can finish + // because the op is an optional async op (blocks_exit flag == false). + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); + }) + } + #[test] fn terminate_execution() { let (tx, rx) = std::sync::mpsc::channel::(); diff --git a/core/ops.rs b/core/ops.rs index 7ed14268280a7a..83fca539603ae4 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::sync::RwLock; +use std::task::{Context, Poll}; pub type OpId = u32; @@ -13,14 +14,26 @@ pub type Buf = Box<[u8]>; pub type OpAsyncFuture = Pin> + Send>>; -pub(crate) type PendingOpFuture = +pub(crate) type PendingOpInnerFuture = Pin> + Send>>; +pub(crate) struct PendingOp(pub PendingOpInnerFuture, pub bool); + +impl Future for PendingOp { + type Output = Result<(OpId, Buf), CoreError>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.as_mut().0.as_mut().poll(cx) + } +} + +pub(crate) type PendingOpFuture = Pin>; + pub type OpResult = Result, E>; pub enum Op { Sync(Buf), - Async(OpAsyncFuture), + /** The 2nd element is true when the op blocks exiting, false otherwise. */ + Async(OpAsyncFuture, bool), } pub type CoreError = (); diff --git a/test_plugin/src/lib.rs b/test_plugin/src/lib.rs index 95cd6e9ca4e82f..8cc39148cced33 100644 --- a/test_plugin/src/lib.rs +++ b/test_plugin/src/lib.rs @@ -49,5 +49,5 @@ pub fn op_test_async(data: &[u8], zero_copy: Option) -> CoreOp { Ok(result_box) }; - Op::Async(fut.boxed()) + Op::Async(fut.boxed(), true) } From d1933d5c3f29ed2d4d0ee5741b0d8565dcb64981 Mon Sep 17 00:00:00 2001 From: Yoshiya Hinosawa Date: Sun, 19 Jan 2020 18:52:24 +0900 Subject: [PATCH 2/7] docs: add a todo comment --- core/isolate.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/isolate.rs b/core/isolate.rs index 14e9f29bedba94..e5558b1317e93a 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -745,6 +745,9 @@ impl Future for Isolate { inner.check_last_exception()?; // We're idle if all pending_ops have blocks_exit flag false. + // TODO(kt3k): This might affect the performance of the event loop when + // the user created thousands of optional ops. See the discussion at + // https://github.com/denoland/deno/pull/3715/files#r368270169 if inner.pending_ops.iter().all(|op| !op.1) { Poll::Ready(Ok(())) } else { From 35bbdee5971c126a0a3cb40243ba836d01d47b0c Mon Sep 17 00:00:00 2001 From: Yoshiya Hinosawa Date: Tue, 21 Jan 2020 02:01:52 +0900 Subject: [PATCH 3/7] refactor: use AsyncOptional enum variant --- cli/ops/compiler.rs | 32 +++++++++++++------------------- cli/ops/dispatch_json.rs | 21 +++++++++++++++------ cli/ops/dispatch_minimal.rs | 2 +- cli/ops/fetch.rs | 2 +- cli/ops/files.rs | 4 ++-- cli/ops/net.rs | 4 ++-- cli/ops/process.rs | 2 +- cli/ops/timers.rs | 2 +- cli/ops/tls.rs | 4 ++-- cli/ops/workers.rs | 8 ++++---- cli/state.rs | 12 ++++++++++-- core/es_isolate.rs | 2 +- core/examples/http_bench.rs | 2 +- core/isolate.rs | 29 ++++++++++++++++------------- core/ops.rs | 20 +++++--------------- test_plugin/src/lib.rs | 2 +- 16 files changed, 76 insertions(+), 72 deletions(-) diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs index bc7fcd990fa138..5d6875fb0ed52e 100644 --- a/cli/ops/compiler.rs +++ b/cli/ops/compiler.rs @@ -148,7 +148,7 @@ fn op_fetch_source_files( Ok(v.into()) }); - Ok(JsonOp::Async(future, true)) + Ok(JsonOp::Async(future)) } #[derive(Deserialize, Debug)] @@ -166,16 +166,13 @@ fn op_compile( _zero_copy: Option, ) -> Result { let args: CompileArgs = serde_json::from_value(args)?; - Ok(JsonOp::Async( - runtime_compile_async( - state.global_state.clone(), - &args.root_name, - &args.sources, - args.bundle, - &args.options, - ), - true, - )) + Ok(JsonOp::Async(runtime_compile_async( + state.global_state.clone(), + &args.root_name, + &args.sources, + args.bundle, + &args.options, + ))) } #[derive(Deserialize, Debug)] @@ -190,12 +187,9 @@ fn op_transpile( _zero_copy: Option, ) -> Result { let args: TranspileArgs = serde_json::from_value(args)?; - Ok(JsonOp::Async( - runtime_transpile_async( - state.global_state.clone(), - &args.sources, - &args.options, - ), - true, - )) + Ok(JsonOp::Async(runtime_transpile_async( + state.global_state.clone(), + &args.sources, + &args.options, + ))) } diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index 011385ed08e383..057de5ed4e2aea 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -13,8 +13,10 @@ pub type AsyncJsonOp = pub enum JsonOp { Sync(Value), - /** The 2nd element is true when the op blocks exiting, false otherwise. */ - Async(AsyncJsonOp, bool), + Async(AsyncJsonOp), + // AsyncOptional is the variation of Async, which + // doesn't block the program exiting. + AsyncOptional(AsyncJsonOp), } fn json_err(err: ErrBox) -> Value { @@ -71,19 +73,26 @@ where assert!(promise_id.is_none()); CoreOp::Sync(serialize_result(promise_id, Ok(sync_value))) } - Ok(JsonOp::Async(fut, blocks_exit)) => { + Ok(JsonOp::Async(fut)) => { assert!(promise_id.is_some()); let fut2 = fut.then(move |result| { futures::future::ok(serialize_result(promise_id, result)) }); - CoreOp::Async(fut2.boxed(), blocks_exit) + CoreOp::Async(fut2.boxed()) + } + Ok(JsonOp::AsyncOptional(fut)) => { + assert!(promise_id.is_some()); + let fut2 = fut.then(move |result| { + futures::future::ok(serialize_result(promise_id, result)) + }); + CoreOp::AsyncOptional(fut2.boxed()) } Err(sync_err) => { let buf = serialize_result(promise_id, Err(sync_err)); if is_sync { CoreOp::Sync(buf) } else { - CoreOp::Async(futures::future::ok(buf).boxed(), true) + CoreOp::Async(futures::future::ok(buf).boxed()) } } } @@ -102,6 +111,6 @@ where let handle = pool .spawn_with_handle(futures::future::lazy(move |_cx| f())) .unwrap(); - Ok(JsonOp::Async(handle.boxed(), true)) + Ok(JsonOp::Async(handle.boxed())) } } diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index 9f2206fd11d023..2d5618d65c2bb7 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -164,7 +164,7 @@ where // works since they're simple polling futures. Op::Sync(futures::executor::block_on(fut).unwrap()) } else { - Op::Async(fut.boxed(), true) + Op::Async(fut.boxed()) } } } diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index a8496a969fdf33..e084fdeffb13f8 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -81,5 +81,5 @@ pub fn op_fetch( Ok(json_res) }; - Ok(JsonOp::Async(future.boxed(), true)) + Ok(JsonOp::Async(future.boxed())) } diff --git a/cli/ops/files.rs b/cli/ops/files.rs index 2fc24d9b37bb82..8bb3c8acb905e2 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -100,7 +100,7 @@ fn op_open( let buf = futures::executor::block_on(fut)?; Ok(JsonOp::Sync(buf)) } else { - Ok(JsonOp::Async(fut.boxed(), true)) + Ok(JsonOp::Async(fut.boxed())) } } @@ -172,6 +172,6 @@ fn op_seek( let buf = futures::executor::block_on(fut)?; Ok(JsonOp::Sync(buf)) } else { - Ok(JsonOp::Async(fut.boxed(), true)) + Ok(JsonOp::Async(fut.boxed())) } } diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 457f1d376188ec..836ec2e8d39153 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -130,7 +130,7 @@ fn op_accept( })) }; - Ok(JsonOp::Async(op.boxed(), true)) + Ok(JsonOp::Async(op.boxed())) } #[derive(Deserialize)] @@ -173,7 +173,7 @@ fn op_connect( })) }; - Ok(JsonOp::Async(op.boxed(), true)) + Ok(JsonOp::Async(op.boxed())) } #[derive(Deserialize)] diff --git a/cli/ops/process.rs b/cli/ops/process.rs index 6a0e48e27b986b..30de5a7356079e 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -244,7 +244,7 @@ fn op_run_status( let pool = futures::executor::ThreadPool::new().unwrap(); let handle = pool.spawn_with_handle(future).unwrap(); - Ok(JsonOp::Async(handle.boxed(), true)) + Ok(JsonOp::Async(handle.boxed())) } #[derive(Deserialize)] diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs index c07e44256786f9..8bec10f70d1f82 100644 --- a/cli/ops/timers.rs +++ b/cli/ops/timers.rs @@ -51,7 +51,7 @@ fn op_global_timer( .new_timeout(deadline) .then(move |_| futures::future::ok(json!({}))); - Ok(JsonOp::Async(f.boxed(), true)) + Ok(JsonOp::Async(f.boxed())) } // Returns a milliseconds and nanoseconds subsec diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index ca211b9722f1b3..87a067a9e1d271 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -116,7 +116,7 @@ pub fn op_connect_tls( })) }; - Ok(JsonOp::Async(op.boxed(), true)) + Ok(JsonOp::Async(op.boxed())) } fn load_certs(path: &str) -> Result, ErrBox> { @@ -397,5 +397,5 @@ fn op_accept_tls( })) }; - Ok(JsonOp::Async(op.boxed(), true)) + Ok(JsonOp::Async(op.boxed())) } diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index dab8022b728945..eeffb39305c2d2 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -97,7 +97,7 @@ fn op_worker_get_message( Ok(json!({ "data": maybe_buf })) }; - Ok(JsonOp::Async(op.boxed(), true)) + Ok(JsonOp::Async(op.boxed())) } /// Post message to host as guest worker @@ -258,7 +258,7 @@ fn op_host_get_worker_loaded( Ok(serialize_worker_result(result)) }; - Ok(JsonOp::Async(op.boxed(), true)) + Ok(JsonOp::Async(op.boxed())) } fn op_host_poll_worker( @@ -286,7 +286,7 @@ fn op_host_poll_worker( Ok(serialize_worker_result(result)) }; - Ok(JsonOp::Async(op.boxed(), true)) + Ok(JsonOp::Async(op.boxed())) } fn op_host_close_worker( @@ -348,7 +348,7 @@ fn op_host_get_message( Ok(json!({ "data": maybe_buf })) }; - Ok(JsonOp::Async(op.boxed(), true)) + Ok(JsonOp::Async(op.boxed())) } #[derive(Deserialize)] diff --git a/cli/state.rs b/cli/state.rs index f7fb2aabeeb3c3..7ff5c1276017cb 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -103,13 +103,21 @@ impl ThreadSafeState { state.metrics_op_completed(buf.len()); Op::Sync(buf) } - Op::Async(fut, blocks_exit) => { + Op::Async(fut) => { let state = state.clone(); let result_fut = fut.map_ok(move |buf: Buf| { state.metrics_op_completed(buf.len()); buf }); - Op::Async(result_fut.boxed(), blocks_exit) + Op::Async(result_fut.boxed()) + } + Op::AsyncOptional(fut) => { + let state = state.clone(); + let result_fut = fut.map_ok(move |buf: Buf| { + state.metrics_op_completed(buf.len()); + buf + }); + Op::AsyncOptional(result_fut.boxed()) } } } diff --git a/core/es_isolate.rs b/core/es_isolate.rs index 1063a8f7f145e1..a3231a90c21717 100644 --- a/core/es_isolate.rs +++ b/core/es_isolate.rs @@ -650,7 +650,7 @@ pub mod tests { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed(), true) + Op::Async(futures::future::ok(buf).boxed()) }; isolate.register_op("test", dispatcher); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index a3e591d0b628ab..5a5b43c5128270 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -133,7 +133,7 @@ fn http_op( if is_sync { Op::Sync(futures::executor::block_on(fut).unwrap()) } else { - Op::Async(fut.boxed(), true) + Op::Async(fut.boxed()) } } } diff --git a/core/isolate.rs b/core/isolate.rs index e5558b1317e93a..88891276fef5da 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -178,6 +178,7 @@ pub struct Isolate { needs_init: bool, pub(crate) shared: SharedQueue, pending_ops: FuturesUnordered, + pending_optional_ops: FuturesUnordered, have_unpolled_ops: bool, startup_script: Option, pub op_registry: Arc, @@ -340,6 +341,7 @@ impl Isolate { shared, needs_init, pending_ops: FuturesUnordered::new(), + pending_optional_ops: FuturesUnordered::new(), have_unpolled_ops: false, startup_script, op_registry: Arc::new(OpRegistry::new()), @@ -511,11 +513,15 @@ impl Isolate { let op_id = 0; Some((op_id, buf)) } - Op::Async(fut, blocks_exit) => { + Op::Async(fut) => { let fut2 = fut.map_ok(move |buf| (op_id, buf)); - self - .pending_ops - .push(Pin::new(Box::new(PendingOp(fut2.boxed(), blocks_exit)))); + self.pending_ops.push(fut2.boxed()); + self.have_unpolled_ops = true; + None + } + Op::AsyncOptional(fut) => { + let fut2 = fut.map_ok(move |buf| (op_id, buf)); + self.pending_optional_ops.push(fut2.boxed()); self.have_unpolled_ops = true; None } @@ -744,11 +750,8 @@ impl Future for Isolate { inner.check_promise_errors(); inner.check_last_exception()?; - // We're idle if all pending_ops have blocks_exit flag false. - // TODO(kt3k): This might affect the performance of the event loop when - // the user created thousands of optional ops. See the discussion at - // https://github.com/denoland/deno/pull/3715/files#r368270169 - if inner.pending_ops.iter().all(|op| !op.1) { + // We're idle if pending_ops is empty. + if inner.pending_ops.is_empty() { Poll::Ready(Ok(())) } else { if inner.have_unpolled_ops { @@ -840,7 +843,7 @@ pub mod tests { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed(), true) + Op::Async(futures::future::ok(buf).boxed()) } Mode::AsyncOptional => { assert_eq!(control.len(), 1); @@ -851,7 +854,7 @@ pub mod tests { let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); Ok(buf) }; - Op::Async(fut.boxed(), false) + Op::AsyncOptional(fut.boxed()) } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); @@ -870,7 +873,7 @@ pub mod tests { Mode::OverflowReqAsync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed(), true) + Op::Async(futures::future::ok(buf).boxed()) } Mode::OverflowResAsync => { assert_eq!(control.len(), 1); @@ -879,7 +882,7 @@ pub mod tests { vec.resize(100 * 1024 * 1024, 0); vec[0] = 4; let buf = vec.into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed(), true) + Op::Async(futures::future::ok(buf).boxed()) } } }; diff --git a/core/ops.rs b/core/ops.rs index 83fca539603ae4..e4f2121df979cc 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -5,7 +5,6 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::sync::RwLock; -use std::task::{Context, Poll}; pub type OpId = u32; @@ -14,26 +13,17 @@ pub type Buf = Box<[u8]>; pub type OpAsyncFuture = Pin> + Send>>; -pub(crate) type PendingOpInnerFuture = +pub(crate) type PendingOpFuture = Pin> + Send>>; -pub(crate) struct PendingOp(pub PendingOpInnerFuture, pub bool); - -impl Future for PendingOp { - type Output = Result<(OpId, Buf), CoreError>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - self.as_mut().0.as_mut().poll(cx) - } -} - -pub(crate) type PendingOpFuture = Pin>; - pub type OpResult = Result, E>; pub enum Op { Sync(Buf), - /** The 2nd element is true when the op blocks exiting, false otherwise. */ - Async(OpAsyncFuture, bool), + Async(OpAsyncFuture), + // AsyncOptional is the variation of Async, which + // doesn't block the program exiting. + AsyncOptional(OpAsyncFuture), } pub type CoreError = (); diff --git a/test_plugin/src/lib.rs b/test_plugin/src/lib.rs index 8cc39148cced33..95cd6e9ca4e82f 100644 --- a/test_plugin/src/lib.rs +++ b/test_plugin/src/lib.rs @@ -49,5 +49,5 @@ pub fn op_test_async(data: &[u8], zero_copy: Option) -> CoreOp { Ok(result_box) }; - Op::Async(fut.boxed(), true) + Op::Async(fut.boxed()) } From 985bc40d974e5fb1385851b5451d8c1ada26c0fc Mon Sep 17 00:00:00 2001 From: Yoshiya Hinosawa Date: Tue, 21 Jan 2020 02:25:17 +0900 Subject: [PATCH 4/7] refactor: restore previous version of pending_ops --- core/isolate.rs | 15 ++++++++++----- core/ops.rs | 14 +++++++++++++- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/core/isolate.rs b/core/isolate.rs index 88891276fef5da..204ec8f00009a6 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -178,7 +178,6 @@ pub struct Isolate { needs_init: bool, pub(crate) shared: SharedQueue, pending_ops: FuturesUnordered, - pending_optional_ops: FuturesUnordered, have_unpolled_ops: bool, startup_script: Option, pub op_registry: Arc, @@ -341,7 +340,6 @@ impl Isolate { shared, needs_init, pending_ops: FuturesUnordered::new(), - pending_optional_ops: FuturesUnordered::new(), have_unpolled_ops: false, startup_script, op_registry: Arc::new(OpRegistry::new()), @@ -515,13 +513,17 @@ impl Isolate { } Op::Async(fut) => { let fut2 = fut.map_ok(move |buf| (op_id, buf)); - self.pending_ops.push(fut2.boxed()); + self + .pending_ops + .push(Pin::new(Box::new(PendingOp(fut2.boxed(), true)))); self.have_unpolled_ops = true; None } Op::AsyncOptional(fut) => { let fut2 = fut.map_ok(move |buf| (op_id, buf)); - self.pending_optional_ops.push(fut2.boxed()); + self + .pending_ops + .push(Pin::new(Box::new(PendingOp(fut2.boxed(), false)))); self.have_unpolled_ops = true; None } @@ -751,7 +753,10 @@ impl Future for Isolate { inner.check_last_exception()?; // We're idle if pending_ops is empty. - if inner.pending_ops.is_empty() { + // TODO(kt3k): This might affect the performance of the event loop when + // the user created many optional ops. See the discussion at + // https://github.com/denoland/deno/pull/3715/files#r368270169 + if inner.pending_ops.iter().all(|op| !op.1) { Poll::Ready(Ok(())) } else { if inner.have_unpolled_ops { diff --git a/core/ops.rs b/core/ops.rs index e4f2121df979cc..56901fcfaf4a11 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::sync::RwLock; +use std::task::{Context, Poll}; pub type OpId = u32; @@ -13,9 +14,20 @@ pub type Buf = Box<[u8]>; pub type OpAsyncFuture = Pin> + Send>>; -pub(crate) type PendingOpFuture = +pub(crate) type PendingOpInnerFuture = Pin> + Send>>; +pub(crate) struct PendingOp(pub PendingOpInnerFuture, pub bool); + +impl Future for PendingOp { + type Output = Result<(OpId, Buf), CoreError>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.as_mut().0.as_mut().poll(cx) + } +} + +pub(crate) type PendingOpFuture = Pin>; + pub type OpResult = Result, E>; pub enum Op { From ee6cd48364ac22c608cd9e22cb924959277b921b Mon Sep 17 00:00:00 2001 From: Yoshiya Hinosawa Date: Wed, 22 Jan 2020 00:22:01 +0900 Subject: [PATCH 5/7] refactor: address review feedbacks - rename AsyncOptional -> AsyncUnref - create Isolate.pending_unref_ops and use select --- cli/ops/dispatch_json.rs | 12 +++++++----- cli/state.rs | 4 ++-- core/isolate.rs | 32 +++++++++++++++----------------- core/ops.rs | 22 ++++++---------------- 4 files changed, 30 insertions(+), 40 deletions(-) diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index 057de5ed4e2aea..b0138f4f014f2c 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -14,9 +14,11 @@ pub type AsyncJsonOp = pub enum JsonOp { Sync(Value), Async(AsyncJsonOp), - // AsyncOptional is the variation of Async, which - // doesn't block the program exiting. - AsyncOptional(AsyncJsonOp), + /** + * AsyncUnref is the variation of Async, which + * doesn't block the program exiting. + */ + AsyncUnref(AsyncJsonOp), } fn json_err(err: ErrBox) -> Value { @@ -80,12 +82,12 @@ where }); CoreOp::Async(fut2.boxed()) } - Ok(JsonOp::AsyncOptional(fut)) => { + Ok(JsonOp::AsyncUnref(fut)) => { assert!(promise_id.is_some()); let fut2 = fut.then(move |result| { futures::future::ok(serialize_result(promise_id, result)) }); - CoreOp::AsyncOptional(fut2.boxed()) + CoreOp::AsyncUnref(fut2.boxed()) } Err(sync_err) => { let buf = serialize_result(promise_id, Err(sync_err)); diff --git a/cli/state.rs b/cli/state.rs index 7ff5c1276017cb..60c240bea686ae 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -111,13 +111,13 @@ impl ThreadSafeState { }); Op::Async(result_fut.boxed()) } - Op::AsyncOptional(fut) => { + Op::AsyncUnref(fut) => { let state = state.clone(); let result_fut = fut.map_ok(move |buf: Buf| { state.metrics_op_completed(buf.len()); buf }); - Op::AsyncOptional(result_fut.boxed()) + Op::AsyncUnref(result_fut.boxed()) } } } diff --git a/core/isolate.rs b/core/isolate.rs index 204ec8f00009a6..c60b978c87fc92 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -16,6 +16,7 @@ use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use futures::future::FutureExt; use futures::future::TryFutureExt; +use futures::stream::select; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::task::AtomicWaker; @@ -178,6 +179,7 @@ pub struct Isolate { needs_init: bool, pub(crate) shared: SharedQueue, pending_ops: FuturesUnordered, + pending_unref_ops: FuturesUnordered, have_unpolled_ops: bool, startup_script: Option, pub op_registry: Arc, @@ -340,6 +342,7 @@ impl Isolate { shared, needs_init, pending_ops: FuturesUnordered::new(), + pending_unref_ops: FuturesUnordered::new(), have_unpolled_ops: false, startup_script, op_registry: Arc::new(OpRegistry::new()), @@ -513,17 +516,13 @@ impl Isolate { } Op::Async(fut) => { let fut2 = fut.map_ok(move |buf| (op_id, buf)); - self - .pending_ops - .push(Pin::new(Box::new(PendingOp(fut2.boxed(), true)))); + self.pending_ops.push(fut2.boxed()); self.have_unpolled_ops = true; None } - Op::AsyncOptional(fut) => { + Op::AsyncUnref(fut) => { let fut2 = fut.map_ok(move |buf| (op_id, buf)); - self - .pending_ops - .push(Pin::new(Box::new(PendingOp(fut2.boxed(), false)))); + self.pending_unref_ops.push(fut2.boxed()); self.have_unpolled_ops = true; None } @@ -721,7 +720,9 @@ impl Future for Isolate { // Now handle actual ops. inner.have_unpolled_ops = false; #[allow(clippy::match_wild_err_arm)] - match inner.pending_ops.poll_next_unpin(cx) { + match select(&mut inner.pending_ops, &mut inner.pending_unref_ops) + .poll_next_unpin(cx) + { Poll::Ready(Some(Err(_))) => panic!("unexpected op error"), Poll::Ready(None) => break, Poll::Pending => break, @@ -753,10 +754,7 @@ impl Future for Isolate { inner.check_last_exception()?; // We're idle if pending_ops is empty. - // TODO(kt3k): This might affect the performance of the event loop when - // the user created many optional ops. See the discussion at - // https://github.com/denoland/deno/pull/3715/files#r368270169 - if inner.pending_ops.iter().all(|op| !op.1) { + if inner.pending_ops.is_empty() { Poll::Ready(Ok(())) } else { if inner.have_unpolled_ops { @@ -827,7 +825,7 @@ pub mod tests { pub enum Mode { Async, - AsyncOptional, + AsyncUnref, OverflowReqSync, OverflowResSync, OverflowReqAsync, @@ -850,7 +848,7 @@ pub mod tests { let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); Op::Async(futures::future::ok(buf).boxed()) } - Mode::AsyncOptional => { + Mode::AsyncUnref => { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let fut = async { @@ -859,7 +857,7 @@ pub mod tests { let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); Ok(buf) }; - Op::AsyncOptional(fut.boxed()) + Op::AsyncUnref(fut.boxed()) } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); @@ -981,7 +979,7 @@ pub mod tests { #[test] fn test_poll_async_optional_ops() { run_in_task(|cx| { - let (mut isolate, dispatch_count) = setup(Mode::AsyncOptional); + let (mut isolate, dispatch_count) = setup(Mode::AsyncUnref); js_check(isolate.execute( "check1.js", r#" @@ -995,7 +993,7 @@ pub mod tests { )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); // The above op never finish, but isolate can finish - // because the op is an optional async op (blocks_exit flag == false). + // because the op is an unreffed async op. assert!(match isolate.poll_unpin(cx) { Poll::Ready(Ok(_)) => true, _ => false, diff --git a/core/ops.rs b/core/ops.rs index 56901fcfaf4a11..0ef0b46d162623 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -5,7 +5,6 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::sync::RwLock; -use std::task::{Context, Poll}; pub type OpId = u32; @@ -14,28 +13,19 @@ pub type Buf = Box<[u8]>; pub type OpAsyncFuture = Pin> + Send>>; -pub(crate) type PendingOpInnerFuture = +pub(crate) type PendingOpFuture = Pin> + Send>>; -pub(crate) struct PendingOp(pub PendingOpInnerFuture, pub bool); - -impl Future for PendingOp { - type Output = Result<(OpId, Buf), CoreError>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - self.as_mut().0.as_mut().poll(cx) - } -} - -pub(crate) type PendingOpFuture = Pin>; - pub type OpResult = Result, E>; pub enum Op { Sync(Buf), Async(OpAsyncFuture), - // AsyncOptional is the variation of Async, which - // doesn't block the program exiting. - AsyncOptional(OpAsyncFuture), + /** + * AsyncUnref is the variation of Async, which + * doesn't block the program exiting. + */ + AsyncUnref(OpAsyncFuture), } pub type CoreError = (); From dacba75ecb945c8e1cf7ab3f6c91eb759eb93c72 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Tue, 21 Jan 2020 11:04:22 -0500 Subject: [PATCH 6/7] fix comment style --- cli/ops/dispatch_json.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index b0138f4f014f2c..dc3a3a51009fab 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -14,10 +14,8 @@ pub type AsyncJsonOp = pub enum JsonOp { Sync(Value), Async(AsyncJsonOp), - /** - * AsyncUnref is the variation of Async, which - * doesn't block the program exiting. - */ + /// AsyncUnref is the variation of Async, which doesn't block the program + /// exiting. AsyncUnref(AsyncJsonOp), } From 71f28b81629865b1a709d3b63af3fe1bbefe5dd7 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Tue, 21 Jan 2020 11:05:17 -0500 Subject: [PATCH 7/7] comment --- core/ops.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/ops.rs b/core/ops.rs index 0ef0b46d162623..e0bdb0184c794e 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -21,10 +21,8 @@ pub type OpResult = Result, E>; pub enum Op { Sync(Buf), Async(OpAsyncFuture), - /** - * AsyncUnref is the variation of Async, which - * doesn't block the program exiting. - */ + /// AsyncUnref is the variation of Async, which doesn't block the program + /// exiting. AsyncUnref(OpAsyncFuture), }