Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workers: implement --max-worker-threads command line option #32606

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions doc/api/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,17 @@ changes:

Specify the maximum size, in bytes, of HTTP headers. Defaults to 16KB.

### `--max-worker-threads=n`
<!-- YAML
added: REPLACEME
-->

Specify the maximum number of worker threads that should be created for
this Node.js process. If the limit is exceeded, additional Worker threads
may be created but a process warning will be emitted. When set to any negative
value, the limit is set to four times the number of CPUs available. When set
to 0, the check is disabled. Defaults to 0.

### `--napi-modules`
<!-- YAML
added: v7.10.0
Expand Down Expand Up @@ -1148,6 +1159,7 @@ Node.js options that are allowed are:
* `--inspect-publish-uid`
* `--inspect`
* `--max-http-header-size`
* `--max-worker-threads`
* `--napi-modules`
* `--no-deprecation`
* `--no-force-async-hooks-checks`
Expand Down
7 changes: 7 additions & 0 deletions doc/node.1
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,13 @@ disappear in a non-semver-major release.
.It Fl -max-http-header-size Ns = Ns Ar size
Specify the maximum size of HTTP headers in bytes. Defaults to 16KB.
.
.It FL -max-worker-threads Ns = Ns Ar Size
Specify the maximum number of worker threads that should be created for
this Node.js process. If the limit is exceeded, additional Worker threads
may be created but a process warning will be emitted. When set to any negative
value, the limit is set to four times the number of CPUs available. When set to
0, the check is disabled. Defaults to 0.
.
.It Fl -napi-modules
This option is a no-op.
It is kept for compatibility.
Expand Down
16 changes: 16 additions & 0 deletions src/node_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ void PerProcessOptions::CheckOptions(std::vector<std::string>* errors) {
use_largepages != "silent") {
errors->push_back("invalid value for --use-largepages");
}

// When set to a negative, the value is auto-calculated
// based on the number of CPUs. Setting the value to 0
// disables the check.
if (max_worker_thread_count < 0) {
CPUInfo cpu_info;
max_worker_thread_count = cpu_info ?
cpu_info.count() * kMaxWorkerThreadMultiplier :
kMaxWorkerThreadMultiplier;
}

per_isolate->CheckOptions(errors);
}

Expand Down Expand Up @@ -737,6 +748,11 @@ PerProcessOptionsParser::PerProcessOptionsParser(
&PerProcessOptions::trace_sigint,
kAllowedInEnvironment);

AddOption("--max-worker-threads",
"specify max number of worker threads",
&PerProcessOptions::max_worker_thread_count,
kAllowedInEnvironment);

Insert(iop, &PerProcessOptions::get_per_isolate_options);
}

Expand Down
3 changes: 3 additions & 0 deletions src/node_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

namespace node {

constexpr uint64_t kMaxWorkerThreadMultiplier = 4;

class HostPort {
public:
HostPort(const std::string& host_name, int port)
Expand Down Expand Up @@ -199,6 +201,7 @@ class PerProcessOptions : public Options {
std::string trace_event_categories;
std::string trace_event_file_pattern = "node_trace.${rotation}.log";
int64_t v8_thread_pool_size = 4;
int64_t max_worker_thread_count = 0;
bool zero_fill_all_buffers = false;
bool debug_arraybuffer_allocations = false;
std::string disable_proto;
Expand Down
26 changes: 11 additions & 15 deletions src/node_os.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,30 +102,26 @@ static void GetCPUInfo(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Isolate* isolate = env->isolate();

uv_cpu_info_t* cpu_infos;
int count;
CPUInfo cpu_infos;

int err = uv_cpu_info(&cpu_infos, &count);
if (err)
if (!cpu_infos)
return;

// It's faster to create an array packed with all the data and
// assemble them into objects in JS than to call Object::Set() repeatedly
// The array is in the format
// [model, speed, (5 entries of cpu_times), model2, speed2, ...]
std::vector<Local<Value>> result(count * 7);
for (int i = 0, j = 0; i < count; i++) {
uv_cpu_info_t* ci = cpu_infos + i;
result[j++] = OneByteString(isolate, ci->model);
result[j++] = Number::New(isolate, ci->speed);
result[j++] = Number::New(isolate, ci->cpu_times.user);
result[j++] = Number::New(isolate, ci->cpu_times.nice);
result[j++] = Number::New(isolate, ci->cpu_times.sys);
result[j++] = Number::New(isolate, ci->cpu_times.idle);
result[j++] = Number::New(isolate, ci->cpu_times.irq);
std::vector<Local<Value>> result(cpu_infos.count() * 7);
for (int i = 0, j = 0; i < cpu_infos.count(); i++) {
result[j++] = OneByteString(isolate, cpu_infos[i].model);
result[j++] = Number::New(isolate, cpu_infos[i].speed);
result[j++] = Number::New(isolate, cpu_infos[i].cpu_times.user);
result[j++] = Number::New(isolate, cpu_infos[i].cpu_times.nice);
result[j++] = Number::New(isolate, cpu_infos[i].cpu_times.sys);
result[j++] = Number::New(isolate, cpu_infos[i].cpu_times.idle);
result[j++] = Number::New(isolate, cpu_infos[i].cpu_times.irq);
}

uv_free_cpu_info(cpu_infos, count);
args.GetReturnValue().Set(Array::New(isolate, result.data(), result.size()));
}

Expand Down
9 changes: 4 additions & 5 deletions src/node_report.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ constexpr double SEC_PER_MICROS = 1e-6;
namespace report {
using node::arraysize;
using node::ConditionVariable;
using node::CPUInfo;
using node::DiagnosticFilename;
using node::Environment;
using node::JSONWriter;
Expand Down Expand Up @@ -387,11 +388,10 @@ static void PrintVersionInformation(JSONWriter* writer) {

// Report CPU info
static void PrintCpuInfo(JSONWriter* writer) {
uv_cpu_info_t* cpu_info;
int count;
if (uv_cpu_info(&cpu_info, &count) == 0) {
CPUInfo cpu_info;
if (cpu_info) {
writer->json_arraystart("cpus");
for (int i = 0; i < count; i++) {
for (int i = 0; i < cpu_info.count(); i++) {
writer->json_start();
writer->json_keyvalue("model", cpu_info[i].model);
writer->json_keyvalue("speed", cpu_info[i].speed);
Expand All @@ -403,7 +403,6 @@ static void PrintCpuInfo(JSONWriter* writer) {
writer->json_end();
}
writer->json_arrayend();
uv_free_cpu_info(cpu_info, count);
}
}

Expand Down
46 changes: 46 additions & 0 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
#include "node_buffer.h"
#include "node_options-inl.h"
#include "node_perf.h"
#include "node_process.h"
#include "util-inl.h"
#include "async_wrap-inl.h"

#include <atomic>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -41,6 +43,31 @@ using v8::Value;
namespace node {
namespace worker {

namespace {
// The worker count is intentionally process scoped so
// we can track the total number of active workers
// across the process.
std::atomic<int64_t> active_worker_thread_count {0};
std::atomic<bool> active_worker_thread_warn {true};

void EmitWorkerThreadWarning(Environment* env, int64_t count, int64_t max) {
if (env->is_main_thread()) {
USE(ProcessEmitWarning(env,
"Too many active worker threads (%" PRId64 "). "
"Performance may be degraded. "
"(--max-worker-threads=%" PRId64 ")", count, max));
} else {
// If this is not the main thread, recursively move up
// until we get to the main thread.
env->worker_context()->env()
->SetImmediateThreadsafe([count, max](Environment* env) {
EmitWorkerThreadWarning(env, count, max);
});
}
}

} // namespace

Worker::Worker(Environment* env,
Local<Object> wrap,
const std::string& url,
Expand All @@ -56,6 +83,23 @@ Worker::Worker(Environment* env,
Debug(this, "Creating new worker instance with thread id %llu",
thread_id_.id);

int64_t count = ++active_worker_thread_count;
int64_t max_worker_thread_count =
per_process::cli_options->max_worker_thread_count;
if (max_worker_thread_count > 0 &&
count > max_worker_thread_count &&
active_worker_thread_warn.exchange(false)) {
// Having too many active worker threads can degrade overall
// performance of the entire Node.js application by causing
// too much CPU contention. The default max-worker-threads is
// 4 times the total number of CPUs available but may be set
Comment on lines +94 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// too much CPU contention. The default max-worker-threads is
// 4 times the total number of CPUs available but may be set
// too much CPU contention. By default max-worker-threads
// check is disabled but may be set

// explicitly using the --max-worker-threads=n command line option.
// Setting --max-worker-threads=0 disables this check.
// This is tracked per process rather than per environment/isolate
// so we can account also for all Workers created within Workers.
EmitWorkerThreadWarning(env, count, max_worker_thread_count);
}

// Set up everything that needs to be set up in the parent environment.
parent_port_ = MessagePort::New(env, env->context());
if (parent_port_ == nullptr) {
Expand Down Expand Up @@ -433,6 +477,8 @@ void Worker::JoinThread() {
}

Worker::~Worker() {
active_worker_thread_count--;

Mutex::ScopedLock lock(mutex_);

CHECK(stopped_);
Expand Down
27 changes: 27 additions & 0 deletions src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#pragma GCC diagnostic pop
#endif

#include "uv.h"

#include <cassert>
#include <climits> // PATH_MAX
#include <csignal>
Expand Down Expand Up @@ -764,6 +766,31 @@ class PersistentToLocal {
}
};

class CPUInfo {
public:
CPUInfo() {
if (uv_cpu_info(&info_, &count_) != 0) {
info_ = nullptr;
count_ = 0;
}
}
~CPUInfo() {
if (info_ != nullptr)
uv_free_cpu_info(info_, count_);
}
int count() const { return count_; }
operator bool() const {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think we usually try to put empty lines in between methods. Could you also run make format-cpp for consistency?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sigh...

C:\Users\jasne\Projects\node>vcbuild format-cpp
Error: invalid command line option `format-cpp`.

Yeah, when I update this I'll switch over to the linux box and tweak the formatting.

return info_ != nullptr;
}
const uv_cpu_info_t& operator[](int idx) const {
return info_[idx];
}

private:
jasnell marked this conversation as resolved.
Show resolved Hide resolved
uv_cpu_info_t* info_ = nullptr;
int count_ = 0;
};

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-cli-bad-options.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ if (process.features.inspector) {
requiresArgument('--debug-port=');
}
requiresArgument('--eval');
requiresArgument('--max-worker-threads');

function requiresArgument(option) {
const r = spawn(process.execPath, [option], { encoding: 'utf8' });
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-cli-node-options.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ expect('--no_warnings', 'B\n');
expect('--trace-warnings', 'B\n');
expect('--redirect-warnings=_', 'B\n');
expect('--trace-deprecation', 'B\n');
expectNoWorker('--max-worker-threads=1', 'B\n');
expect('--trace-sync-io', 'B\n');
expectNoWorker('--trace-events-enabled', 'B\n');
expect('--track-heap-objects', 'B\n');
Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-worker-max-count-auto.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Flags: --expose-internals --max-worker-threads=-1
'use strict';

// Check that when --max-worker-threads is negative,
// the option value is auto-calculated based on the
// number of CPUs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: (here and below)

Suggested change
// number of CPUs
// number of CPUs.


require('../common');
const { getOptionValue } = require('internal/options');
const { cpus } = require('os');
const assert = require('assert');

// Make sure the flag is actually set
assert(process.execArgv.indexOf('--max-worker-threads=-1') > -1);

const kWorkerThreadsMultiplier = 4;
const maxWorkerThreads = getOptionValue('--max-worker-threads');

assert.strictEqual(cpus().length * kWorkerThreadsMultiplier, maxWorkerThreads);
33 changes: 33 additions & 0 deletions test/parallel/test-worker-max-count-disabled.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Flags: --max-worker-threads=0
'use strict';
const common = require('../common');
const { Worker } = require('worker_threads');
const { cpus } = require('os');

const cpu_count = cpus().length;

// This may need to be updated in the future if there are
// any other warnings expected. For now, no warnings are
// emitted with this code.
process.on('warning', common.mustNotCall());

const expr = 'setInterval(() => {}, 10);';

function makeWorker(workers) {
return new Promise((res) => {
const worker = new Worker(expr, { eval: true });
worker.on('online', res);
workers.push(worker);
});
}

async function doTest() {
const workers = [];
const list = [];
for (let n = 0; n < cpu_count; n++)
list.push(makeWorker(workers));
await Promise.all(list);
workers.forEach((i) => i.terminate());
}
jasnell marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: simplification and a bit fewer promises (and same applies to the other test if you decide to change)

function makeWorker() {
  return new Promise((res) => {
    const worker = new Worker(expr, { eval: true });
    worker.once('online', () => res(worker));
  });
}

async function doTest() {
  const promises = [];
  for (let n = 0; n < cpu_count; n++)
    promises.push(makeWorker());
  return Promise.all(promises)
                .then((workers) => workers.forEach((i) => i.terminate())
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do that, then I’d go one step further and use

async function makeWorker() {
  const worker = new Worker(expr, { eval: true });
  await once(worker, 'online');
  return worker;
}

🙂


doTest().then(common.mustCall());
38 changes: 38 additions & 0 deletions test/parallel/test-worker-max-count-nested.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { spawn } = require('child_process');
const { Worker } = require('worker_threads');

if (process.argv[2] === 'child') {

// Checks that warning is emitted in the main thread,
// even tho the offending Worker is created from within
// another worker.
common.expectWarning({
Warning: [
'Too many active worker threads (2). Performance may be degraded. ' +
'(--max-worker-threads=1)'
] });

const expr = `
setInterval(() => {}, 10);
const { Worker, parentPort } = require('worker_threads');
const expr = 'setInterval(() => {}, 10);';
const worker = new Worker(expr, { eval: true });
worker.on('online', () => {
worker.terminate();
parentPort.postMessage({});
});
`;

const worker = new Worker(expr, { eval: true });
worker.on('message', () => worker.terminate());

} else {
const child = spawn(
process.execPath,
['--max-worker-threads=1', __filename, 'child'],
{ stdio: 'inherit' });
child.on('close', common.mustCall((code) => assert(!code)));
}
Loading