Skip to content

Commit

Permalink
fix(ext/node): worker_threads doesn't exit if there are message liste…
Browse files Browse the repository at this point in the history
…ners (#22944)

Closes #22934
  • Loading branch information
bartlomieju authored Mar 15, 2024
1 parent e40f9a5 commit c342cd3
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 36 deletions.
2 changes: 0 additions & 2 deletions cli/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,6 @@ impl CliMainWorkerFactory {
disable_deprecated_api_warning: shared.disable_deprecated_api_warning,
verbose_deprecated_api_warning: shared.verbose_deprecated_api_warning,
future: shared.enable_future_features,
close_on_idle: true,
},
extensions: custom_extensions,
startup_snapshot: crate::js::deno_isolate_init(),
Expand Down Expand Up @@ -815,7 +814,6 @@ fn create_web_worker_callback(
disable_deprecated_api_warning: shared.disable_deprecated_api_warning,
verbose_deprecated_api_warning: shared.verbose_deprecated_api_warning,
future: false,
close_on_idle: args.close_on_idle,
},
extensions: vec![],
startup_snapshot: crate::js::deno_isolate_init(),
Expand Down
17 changes: 6 additions & 11 deletions runtime/js/99_main.js
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,10 @@ function postMessage(message, transferOrOptions = {}) {

let isClosing = false;
let globalDispatchEvent;
let closeOnIdle;

function hasMessageEventListener() {
return event.listenerCount(globalThis, "message") > 0;
}

async function pollForMessages() {
if (!globalDispatchEvent) {
Expand All @@ -289,14 +292,7 @@ async function pollForMessages() {
);
}
while (!isClosing) {
const op = op_worker_recv_message();
// In a Node.js worker, unref() the op promise to prevent it from
// keeping the event loop alive. This avoids the need to explicitly
// call self.close() or worker.terminate().
if (closeOnIdle) {
core.unrefOpPromise(op);
}
const data = await op;
const data = await op_worker_recv_message();
if (data === null) break;
const v = messagePort.deserializeJsMessageData(data);
const message = v[0];
Expand Down Expand Up @@ -813,7 +809,6 @@ function bootstrapWorkerRuntime(
7: shouldDisableDeprecatedApiWarning,
8: shouldUseVerboseDeprecatedApiWarning,
9: _future,
10: closeOnIdle_,
} = runtimeOptions;

deprecatedApiWarningDisabled = shouldDisableDeprecatedApiWarning;
Expand Down Expand Up @@ -875,8 +870,8 @@ function bootstrapWorkerRuntime(

location.setLocationHref(location_);

closeOnIdle = closeOnIdle_;
globalThis.pollForMessages = pollForMessages;
globalThis.hasMessageEventListener = hasMessageEventListener;

// TODO(bartlomieju): deprecate --unstable
if (unstableFlag) {
Expand Down
89 changes: 72 additions & 17 deletions runtime/web_worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use crate::inspector_server::InspectorServer;
use crate::ops;
use crate::ops::worker_host::WorkersTable;
use crate::permissions::PermissionsContainer;
use crate::shared::maybe_transpile_source;
use crate::shared::runtime;
Expand All @@ -13,7 +14,6 @@ use crate::BootstrapOptions;
use deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_cache::CreateCache;
use deno_cache::SqliteBackedCache;
use deno_core::ascii_str;
use deno_core::error::AnyError;
use deno_core::error::JsError;
use deno_core::futures::channel::mpsc;
Expand Down Expand Up @@ -335,10 +335,12 @@ pub struct WebWorker {
pub js_runtime: JsRuntime,
pub name: String,
close_on_idle: bool,
has_executed_main_module: bool,
internal_handle: WebWorkerInternalHandle,
pub worker_type: WebWorkerType,
pub main_module: ModuleSpecifier,
poll_for_messages_fn: Option<v8::Global<v8::Value>>,
has_message_event_listener_fn: Option<v8::Global<v8::Value>>,
bootstrap_fn_global: Option<v8::Global<v8::Function>>,
// Consumed when `bootstrap_fn` is called
maybe_worker_metadata: Option<JsMessageData>,
Expand Down Expand Up @@ -609,8 +611,10 @@ impl WebWorker {
worker_type: options.worker_type,
main_module,
poll_for_messages_fn: None,
has_message_event_listener_fn: None,
bootstrap_fn_global: Some(bootstrap_fn_global),
close_on_idle: options.close_on_idle,
has_executed_main_module: false,
maybe_worker_metadata: options.maybe_worker_metadata,
},
external_handle,
Expand Down Expand Up @@ -646,22 +650,32 @@ impl WebWorker {
&[args, name_str, id_str, id, worker_data],
)
.unwrap();

let context = scope.get_current_context();
let global = context.global(scope);
let poll_for_messages_str =
v8::String::new_external_onebyte_static(scope, b"pollForMessages")
.unwrap();
let poll_for_messages_fn = global
.get(scope, poll_for_messages_str.into())
.expect("get globalThis.pollForMessages");
global.delete(scope, poll_for_messages_str.into());
self.poll_for_messages_fn =
Some(v8::Global::new(scope, poll_for_messages_fn));

let has_message_event_listener_str =
v8::String::new_external_onebyte_static(
scope,
b"hasMessageEventListener",
)
.unwrap();
let has_message_event_listener_fn = global
.get(scope, has_message_event_listener_str.into())
.expect("get globalThis.hasMessageEventListener");
global.delete(scope, has_message_event_listener_str.into());
self.has_message_event_listener_fn =
Some(v8::Global::new(scope, has_message_event_listener_fn));
}
// TODO(bartlomieju): this could be done using V8 API, without calling `execute_script`.
// Save a reference to function that will start polling for messages
// from a worker host; it will be called after the user code is loaded.
let script = ascii_str!(
r#"
const pollForMessages = globalThis.pollForMessages;
delete globalThis.pollForMessages;
pollForMessages
"#
);
let poll_for_messages_fn = self
.js_runtime
.execute_script(located_script_name!(), script)
.expect("Failed to execute worker bootstrap script");
self.poll_for_messages_fn = Some(poll_for_messages_fn);
}

/// See [JsRuntime::execute_script](deno_core::JsRuntime::execute_script)
Expand Down Expand Up @@ -730,6 +744,7 @@ impl WebWorker {

maybe_result = &mut receiver => {
debug!("received worker module evaluate {:#?}", maybe_result);
self.has_executed_main_module = true;
maybe_result
}

Expand Down Expand Up @@ -781,7 +796,22 @@ impl WebWorker {
Poll::Ready(Ok(()))
}
}
Poll::Pending => Poll::Pending,
Poll::Pending => {
// This is special code path for workers created from `node:worker_threads`
// module that have different semantics than Web workers.
// We want the worker thread to terminate automatically if we've done executing
// Top-Level await, there are no child workers spawned by that workers
// and there's no "message" event listener.
if self.close_on_idle
&& self.has_executed_main_module
&& !self.has_child_workers()
&& !self.has_message_event_listener()
{
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
}
}

Expand All @@ -803,6 +833,31 @@ impl WebWorker {
// This call may return `None` if worker is terminated.
fn_.call(scope, undefined.into(), &[]);
}

fn has_message_event_listener(&mut self) -> bool {
let has_message_event_listener_fn =
self.has_message_event_listener_fn.as_ref().unwrap();
let scope = &mut self.js_runtime.handle_scope();
let has_message_event_listener =
v8::Local::<v8::Value>::new(scope, has_message_event_listener_fn);
let fn_ =
v8::Local::<v8::Function>::try_from(has_message_event_listener).unwrap();
let undefined = v8::undefined(scope);
// This call may return `None` if worker is terminated.
match fn_.call(scope, undefined.into(), &[]) {
Some(result) => result.is_true(),
None => false,
}
}

fn has_child_workers(&mut self) -> bool {
!self
.js_runtime
.op_state()
.borrow()
.borrow::<WorkersTable>()
.is_empty()
}
}

fn print_worker_error(
Expand Down
5 changes: 0 additions & 5 deletions runtime/worker_bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ pub struct BootstrapOptions {
pub disable_deprecated_api_warning: bool,
pub verbose_deprecated_api_warning: bool,
pub future: bool,
pub close_on_idle: bool,
}

impl Default for BootstrapOptions {
Expand Down Expand Up @@ -95,7 +94,6 @@ impl Default for BootstrapOptions {
disable_deprecated_api_warning: false,
verbose_deprecated_api_warning: false,
future: false,
close_on_idle: false,
}
}
}
Expand Down Expand Up @@ -131,8 +129,6 @@ struct BootstrapV8<'a>(
bool,
// future
bool,
// close_on_idle
bool,
);

impl BootstrapOptions {
Expand All @@ -155,7 +151,6 @@ impl BootstrapOptions {
self.disable_deprecated_api_warning,
self.verbose_deprecated_api_warning,
self.future,
self.close_on_idle,
);

bootstrap.serialize(ser).unwrap()
Expand Down
1 change: 1 addition & 0 deletions tests/integration/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ itest!(worker_ids_are_sequential {
});

// Test for https://github.com/denoland/deno/issues/22629
// Test for https://github.com/denoland/deno/issues/22934
itest!(node_worker_auto_exits {
args: "run --quiet --allow-read workers/node_worker_auto_exits.mjs",
output: "workers/node_worker_auto_exits.mjs.out",
Expand Down
12 changes: 11 additions & 1 deletion tests/testdata/workers/node_worker_auto_exits.mjs
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import { isMainThread, Worker } from "node:worker_threads";
import { isMainThread, parentPort, Worker } from "node:worker_threads";

function onMessageOneshot() {
console.log("Got message from main thread!");
parentPort.off("message", onMessageOneshot);
}

if (isMainThread) {
// This re-loads the current file inside a Worker instance.
const w = new Worker(import.meta.filename);

setTimeout(() => {
w.postMessage("Hello! I am from the main thread.");
}, 500);
} else {
console.log("Inside Worker!");
console.log(isMainThread); // Prints 'false'.
parentPort.on("message", onMessageOneshot);
}
1 change: 1 addition & 0 deletions tests/testdata/workers/node_worker_auto_exits.mjs.out
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
Inside Worker!
false
Got message from main thread!

0 comments on commit c342cd3

Please sign in to comment.