Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: add option to track unmanaged file descriptors #34303

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,10 @@ if (isMainThread) {
<!-- YAML
added: v10.5.0
changes:
- version:
- REPLACEME
pr-url: ???
description: The `trackUnmanagedFds` option was introduced.
- version:
- v13.13.0
- v12.17.0
Expand Down Expand Up @@ -679,6 +683,12 @@ changes:
occur as described in the [HTML structured clone algorithm][], and an error
will be thrown if the object cannot be cloned (e.g. because it contains
`function`s).
* `trackUnmanagedFds` {boolean} If this is set to `true`, then the Worker will
track raw file descriptors managed through [`fs.open()`][] and
[`fs.close()`][], and close them when the Worker exits, similar to other
resources like network sockets or file descriptors managed through
the [`FileHandle`][] API. This option is automatically inherited by all
nested `Worker`s. **Default**: `false`.
* `transferList` {Object[]} If one or more `MessagePort`-like objects
are passed in `workerData`, a `transferList` is required for those
items or [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`][] will be thrown.
Expand Down Expand Up @@ -900,6 +910,8 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`WebAssembly.Module`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/WebAssembly/Module
[`Worker`]: #worker_threads_class_worker
[`cluster` module]: cluster.html
[`fs.open()`]: fs.html#fs_fs_open_path_flags_mode_callback
[`fs.close()`]: fs.html#fs_fs_close_fd_callback
[`markAsUntransferable()`]: #worker_threads_worker_markasuntransferable_object
[`port.on('message')`]: #worker_threads_event_message
[`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ class Worker extends EventEmitter {
this[kHandle] = new WorkerImpl(url,
env === process.env ? null : env,
options.execArgv,
parseResourceLimits(options.resourceLimits));
parseResourceLimits(options.resourceLimits),
!!options.trackUnmanagedFds);
if (this[kHandle].invalidExecArgv) {
throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
}
Expand Down
4 changes: 4 additions & 0 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,10 @@ inline bool Environment::owns_inspector() const {
return flags_ & EnvironmentFlags::kOwnsInspector;
}

inline bool Environment::tracks_unmanaged_fds() const {
return flags_ & EnvironmentFlags::kTrackUnmanagedFds;
}

bool Environment::filehandle_close_warning() const {
return emit_filehandle_warning_;
}
Expand Down
24 changes: 24 additions & 0 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,12 @@ void Environment::RunCleanup() {
}
CleanupHandles();
}

for (const int fd : unmanaged_fds_) {
uv_fs_t close_req;
uv_fs_close(nullptr, &close_req, fd, nullptr);
uv_fs_req_cleanup(&close_req);
}
}

void Environment::RunAtExitCallbacks() {
Expand Down Expand Up @@ -981,6 +987,24 @@ Environment* Environment::worker_parent_env() const {
return worker_context()->env();
}

void Environment::AddUnmanagedFd(int fd) {
if (!tracks_unmanaged_fds()) return;
auto result = unmanaged_fds_.insert(fd);
if (!result.second) {
ProcessEmitWarning(
this, "File descriptor %d opened in unmanaged mode twice", fd);
}
}

void Environment::RemoveUnmanagedFd(int fd) {
if (!tracks_unmanaged_fds()) return;
size_t removed_count = unmanaged_fds_.erase(fd);
if (removed_count == 0) {
ProcessEmitWarning(
this, "File descriptor %d closed but not opened in unmanaged mode", fd);
}
}

void Environment::BuildEmbedderGraph(Isolate* isolate,
EmbedderGraph* graph,
void* data) {
Expand Down
6 changes: 6 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,7 @@ class Environment : public MemoryRetainer {
inline bool should_not_register_esm_loader() const;
inline bool owns_process_state() const;
inline bool owns_inspector() const;
inline bool tracks_unmanaged_fds() const;
inline uint64_t thread_id() const;
inline worker::Worker* worker_context() const;
Environment* worker_parent_env() const;
Expand Down Expand Up @@ -1266,6 +1267,9 @@ class Environment : public MemoryRetainer {
inline std::unordered_map<char*, std::unique_ptr<v8::BackingStore>>*
released_allocated_buffers();

void AddUnmanagedFd(int fd);
void RemoveUnmanagedFd(int fd);

private:
inline void ThrowError(v8::Local<v8::Value> (*fun)(v8::Local<v8::String>),
const char* errmsg);
Expand Down Expand Up @@ -1406,6 +1410,8 @@ class Environment : public MemoryRetainer {
int64_t initial_base_object_count_ = 0;
std::atomic_bool is_stopping_ { false };

std::unordered_set<int> unmanaged_fds_;

std::function<void(Environment*, int)> process_exit_handler_ {
DefaultProcessExitHandler };

Expand Down
5 changes: 4 additions & 1 deletion src/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,10 @@ enum Flags : uint64_t {
// Set if Node.js should not run its own esm loader. This is needed by some
// embedders, because it's possible for the Node.js esm loader to conflict
// with another one in an embedder environment, e.g. Blink's in Chromium.
kNoRegisterESMLoader = 1 << 3
kNoRegisterESMLoader = 1 << 3,
// Set this flag to make Node.js track "raw" file descriptors, i.e. managed
// by fs.open() and fs.close(), and close them during FreeEnvironment().
kTrackUnmanagedFds = 1 << 4
};
} // namespace EnvironmentFlags

Expand Down
6 changes: 6 additions & 0 deletions src/node_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,9 @@ void AfterInteger(uv_fs_t* req) {
FSReqBase* req_wrap = FSReqBase::from_req(req);
FSReqAfterScope after(req_wrap, req);

if (req->result >= 0 && req_wrap->is_plain_open())
req_wrap->env()->AddUnmanagedFd(req->result);

if (after.Proceed())
req_wrap->Resolve(Integer::New(req_wrap->env()->isolate(), req->result));
}
Expand Down Expand Up @@ -862,6 +865,7 @@ void Close(const FunctionCallbackInfo<Value>& args) {

CHECK(args[0]->IsInt32());
int fd = args[0].As<Int32>()->Value();
env->RemoveUnmanagedFd(fd);

FSReqBase* req_wrap_async = GetReqWrap(args, 1);
if (req_wrap_async != nullptr) { // close(fd, req)
Expand Down Expand Up @@ -1706,6 +1710,7 @@ static void Open(const FunctionCallbackInfo<Value>& args) {

FSReqBase* req_wrap_async = GetReqWrap(args, 3);
if (req_wrap_async != nullptr) { // open(path, flags, mode, req)
req_wrap_async->set_is_plain_open(true);
AsyncCall(env, req_wrap_async, args, "open", UTF8, AfterInteger,
uv_fs_open, *path, flags, mode);
} else { // open(path, flags, mode, undefined, ctx)
Expand All @@ -1715,6 +1720,7 @@ static void Open(const FunctionCallbackInfo<Value>& args) {
int result = SyncCall(env, args[4], &req_wrap_sync, "open",
uv_fs_open, *path, flags, mode);
FS_SYNC_TRACE_END(open);
if (result >= 0) env->AddUnmanagedFd(result);
args.GetReturnValue().Set(result);
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/node_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ class FSReqBase : public ReqWrap<uv_fs_t> {
const char* data() const { return has_data_ ? *buffer_ : nullptr; }
enum encoding encoding() const { return encoding_; }
bool use_bigint() const { return use_bigint_; }
bool is_plain_open() const { return is_plain_open_; }

void set_is_plain_open(bool value) { is_plain_open_ = value; }

FSContinuationData* continuation_data() const {
return continuation_data_.get();
Expand All @@ -113,6 +116,7 @@ class FSReqBase : public ReqWrap<uv_fs_t> {
enum encoding encoding_ = UTF8;
bool has_data_ = false;
bool use_bigint_ = false;
bool is_plain_open_ = false;
const char* syscall_ = nullptr;

BaseObjectPtr<BindingData> binding_data_;
Expand Down
7 changes: 5 additions & 2 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ void Worker::Run() {
context,
std::move(argv_),
std::move(exec_argv_),
EnvironmentFlags::kNoFlags,
static_cast<EnvironmentFlags::Flags>(environment_flags_),
thread_id_,
std::move(inspector_parent_handle_)));
if (is_stopped()) return;
Expand Down Expand Up @@ -452,7 +452,6 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {

std::vector<std::string> exec_argv_out;

CHECK_EQ(args.Length(), 4);
// Argument might be a string or URL
if (!args[0]->IsNullOrUndefined()) {
Utf8Value value(
Expand Down Expand Up @@ -578,6 +577,10 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
limit_info->CopyContents(worker->resource_limits_,
sizeof(worker->resource_limits_));

CHECK(args[4]->IsBoolean());
if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
}

void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
Expand Down
1 change: 1 addition & 0 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class Worker : public AsyncWrap {
bool stopped_ = true;

bool has_ref_ = true;
uint64_t environment_flags_ = EnvironmentFlags::kNoFlags;

// The real Environment of the worker object. It has a lesser
// lifespan than the worker object itself - comes to life
Expand Down
69 changes: 69 additions & 0 deletions test/parallel/test-worker-track-unmanaged-fds.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Worker } = require('worker_threads');
const { once } = require('events');
const fs = require('fs');

// All the tests here are run sequentially, to avoid accidentally opening an fd
// which another part of the test expects to be closed.

const preamble = `
const fs = require("fs");
const { parentPort } = require('worker_threads');
const __filename = ${JSON.stringify(__filename)};
process.on('warning', (warning) => parentPort.postMessage({ warning }));
`;

(async () => {
// Consistency check: Without trackUnmanagedFds, FDs are *not* closed.
{
const w = new Worker(`${preamble}
parentPort.postMessage(fs.openSync(__filename));
`, { eval: true, trackUnmanagedFds: false });
const [ [ fd ] ] = await Promise.all([once(w, 'message'), once(w, 'exit')]);
assert(fd > 2);
fs.fstatSync(fd); // Does not throw.
fs.closeSync(fd);
}

// With trackUnmanagedFds, FDs are closed automatically.
{
const w = new Worker(`${preamble}
parentPort.postMessage(fs.openSync(__filename));
`, { eval: true, trackUnmanagedFds: true });
const [ [ fd ] ] = await Promise.all([once(w, 'message'), once(w, 'exit')]);
assert(fd > 2);
assert.throws(() => fs.fstatSync(fd), { code: 'EBADF' });
}

// There is a warning when an fd is unexpectedly opened twice.
{
const w = new Worker(`${preamble}
parentPort.postMessage(fs.openSync(__filename));
parentPort.once('message', () => {
const reopened = fs.openSync(__filename);
fs.closeSync(reopened);
});
`, { eval: true, trackUnmanagedFds: true });
const [ fd ] = await once(w, 'message');
fs.closeSync(fd);
w.postMessage('');
const [ { warning } ] = await once(w, 'message');
assert.match(warning.message,
/File descriptor \d+ opened in unmanaged mode twice/);
}

// There is a warning when an fd is unexpectedly closed.
{
const w = new Worker(`${preamble}
parentPort.once('message', (fd) => {
fs.closeSync(fd);
});
`, { eval: true, trackUnmanagedFds: true });
w.postMessage(fs.openSync(__filename));
const [ { warning } ] = await once(w, 'message');
assert.match(warning.message,
/File descriptor \d+ closed but not opened in unmanaged mode/);
}
})().then(common.mustCall());