Skip to content

Commit

Permalink
bootstrap: merge main thread and worker thread initializations
Browse files Browse the repository at this point in the history
Instead of doing the initializations of worker threads using small
helper functions that are also used by the main thread initialization,
wrap everything into a common prepareExecution() function with
an isMainThread switch to turn off initializations that shouldn't
be done for worker threads, so that we don't have to replicate
all the initialization steps in the worker code, which can be
error-prone.

PR-URL: #44869
Reviewed-By: Chengzhong Wu <legendecas@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
joyeecheung authored and danielleadams committed Dec 30, 2022
1 parent 847637a commit 5f21c90
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 106 deletions.
62 changes: 10 additions & 52 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,8 @@ const {
} = primordials;

const {
patchProcessObject,
setupCoverageHooks,
setupInspectorHooks,
setupWarningHandler,
setupFetch,
setupWebCrypto,
setupCustomEvent,
setupDebugEnv,
setupPerfHooks,
initializeDeprecations,
initializeWASI,
initializeCJSLoader,
initializeESMLoader,
initializeFrozenIntrinsics,
initializeReport,
initializeSourceMapsHandlers,
loadPreloadModules,
setupTraceCategoryState,
prepareWorkerThreadExecution,
setupUserModules,
markBootstrapComplete
} = require('internal/process/pre_execution');

Expand Down Expand Up @@ -60,28 +44,13 @@ const {
onGlobalUncaughtException
} = require('internal/process/execution');

const publicWorker = require('worker_threads');
let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
debug = fn;
});

const assert = require('internal/assert');

patchProcessObject();
setupInspectorHooks();
setupDebugEnv();

setupWarningHandler();
setupFetch();
setupWebCrypto();
setupCustomEvent();
initializeSourceMapsHandlers();

// Since worker threads cannot switch cwd, we do not need to
// overwrite the process.env.NODE_V8_COVERAGE variable.
if (process.env.NODE_V8_COVERAGE) {
setupCoverageHooks(process.env.NODE_V8_COVERAGE);
}
prepareWorkerThreadExecution();

debug(`[${threadId}] is setting up worker child environment`);

Expand Down Expand Up @@ -126,23 +95,11 @@ port.on('message', (message) => {
hasStdin
} = message;

setupTraceCategoryState();
setupPerfHooks();
initializeReport();
if (manifestSrc) {
require('internal/process/policy').setup(manifestSrc, manifestURL);
}
initializeDeprecations();
initializeWASI();

require('internal/dns/utils').initializeDns();

initializeCJSLoader();
initializeESMLoader();

if (argv !== undefined) {
ArrayPrototypePushApply(process.argv, argv);
}

const publicWorker = require('worker_threads');
publicWorker.parentPort = publicPort;
publicWorker.workerData = workerData;

Expand All @@ -164,10 +121,10 @@ port.on('message', (message) => {
};
workerIo.sharedCwdCounter = cwdCounter;

const CJSLoader = require('internal/modules/cjs/loader');
assert(!CJSLoader.hasLoadedAnyUserCJSModule);
loadPreloadModules();
initializeFrozenIntrinsics();
if (manifestSrc) {
require('internal/process/policy').setup(manifestSrc, manifestURL);
}
setupUserModules();

if (!hasStdin)
process.stdin.push(null);
Expand Down Expand Up @@ -198,6 +155,7 @@ port.on('message', (message) => {
// runMain here might be monkey-patched by users in --require.
// XXX: the monkey-patchability here should probably be deprecated.
ArrayPrototypeSplice(process.argv, 1, 0, filename);
const CJSLoader = require('internal/modules/cjs/loader');
CJSLoader.Module.runMain(filename);
}
} else if (message.type === STDIO_PAYLOAD) {
Expand Down
115 changes: 61 additions & 54 deletions lib/internal/process/pre_execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,26 @@ const {
isBuildingSnapshot,
} = require('v8').startupSnapshot;

function prepareMainThreadExecution(expandArgv1 = false,
initialzeModules = true) {
refreshRuntimeOptions();
function prepareMainThreadExecution(expandArgv1 = false, initialzeModules = true) {
prepareExecution({
expandArgv1,
initialzeModules,
isMainThread: true
});
}

function prepareWorkerThreadExecution() {
prepareExecution({
expandArgv1: false,
initialzeModules: false, // Will need to initialize it after policy setup
isMainThread: false
});
}

// TODO(joyeecheung): this is also necessary for workers when they deserialize
// this toggle from the snapshot.
function prepareExecution(options) {
const { expandArgv1, initialzeModules, isMainThread } = options;

refreshRuntimeOptions();
reconnectZeroFillToggle();

// Patch the process object with legacy properties and normalizations
Expand All @@ -60,48 +74,57 @@ function prepareMainThreadExecution(expandArgv1 = false,
}

setupDebugEnv();

// Print stack trace on `SIGINT` if option `--trace-sigint` presents.
setupStacktracePrinterOnSigint();

// Process initial diagnostic reporting configuration, if present.
initializeReport();
initializeReportSignalHandlers(); // Main-thread-only.

initializeHeapSnapshotSignalHandlers();

// If the process is spawned with env NODE_CHANNEL_FD, it's probably
// spawned by our child_process module, then initialize IPC.
// This attaches some internal event listeners and creates:
// process.send(), process.channel, process.connected,
// process.disconnect().
setupChildProcessIpcChannel();

// Load policy from disk and parse it.
initializePolicy();

// If this is a worker in cluster mode, start up the communication
// channel. This needs to be done before any user code gets executed
// (including preload modules).
initializeClusterIPC();

initializeSourceMapsHandlers();
initializeDeprecations();
initializeWASI();

require('internal/dns/utils').initializeDns();

require('internal/v8/startup_snapshot').runDeserializeCallbacks();
if (isMainThread) {
assert(internalBinding('worker').isMainThread);
// Worker threads will get the manifest in the message handler.
const policy = readPolicyFromDisk();
if (policy) {
require('internal/process/policy')
.setup(policy.manifestSrc, policy.manifestURL);
}

if (!initialzeModules) {
return;
// Print stack trace on `SIGINT` if option `--trace-sigint` presents.
setupStacktracePrinterOnSigint();
initializeReportSignalHandlers(); // Main-thread-only.
initializeHeapSnapshotSignalHandlers();
// If the process is spawned with env NODE_CHANNEL_FD, it's probably
// spawned by our child_process module, then initialize IPC.
// This attaches some internal event listeners and creates:
// process.send(), process.channel, process.connected,
// process.disconnect().
setupChildProcessIpcChannel();
// If this is a worker in cluster mode, start up the communication
// channel. This needs to be done before any user code gets executed
// (including preload modules).
initializeClusterIPC();

// TODO(joyeecheung): do this for worker threads as well.
require('internal/v8/startup_snapshot').runDeserializeCallbacks();
} else {
assert(!internalBinding('worker').isMainThread);
// The setup should be called in LOAD_SCRIPT message handler.
assert(!initialzeModules);
}

if (initialzeModules) {
setupUserModules();
}
}

function setupUserModules() {
initializeCJSLoader();
initializeESMLoader();
const CJSLoader = require('internal/modules/cjs/loader');
assert(!CJSLoader.hasLoadedAnyUserCJSModule);
loadPreloadModules();
// Need to be done after --require setup.
initializeFrozenIntrinsics();
}

Expand Down Expand Up @@ -476,7 +499,7 @@ function initializeClusterIPC() {
}
}

function initializePolicy() {
function readPolicyFromDisk() {
const experimentalPolicy = getOptionValue('--experimental-policy');
if (experimentalPolicy) {
process.emitWarning('Policies are experimental.',
Expand Down Expand Up @@ -520,8 +543,9 @@ function initializePolicy() {
throw new ERR_MANIFEST_ASSERT_INTEGRITY(manifestURL, realIntegrities);
}
}
require('internal/process/policy')
.setup(src, manifestURL.href);
return {
manifestSrc: src, manifestURL: manifestURL.href
};
}
}

Expand Down Expand Up @@ -603,25 +627,8 @@ function markBootstrapComplete() {
}

module.exports = {
refreshRuntimeOptions,
patchProcessObject,
setupCoverageHooks,
setupWarningHandler,
setupFetch,
setupWebCrypto,
setupCustomEvent,
setupDebugEnv,
setupPerfHooks,
setupUserModules,
prepareMainThreadExecution,
initializeDeprecations,
initializeESMLoader,
initializeFrozenIntrinsics,
initializeSourceMapsHandlers,
loadPreloadModules,
setupTraceCategoryState,
setupInspectorHooks,
initializeReport,
initializeCJSLoader,
initializeWASI,
prepareWorkerThreadExecution,
markBootstrapComplete
};

0 comments on commit 5f21c90

Please sign in to comment.