Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: cherry-pick from downstream, worker prep #15707

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@
'src/signal_wrap.cc',
'src/spawn_sync.cc',
'src/string_bytes.cc',
'src/string_search.cc',
'src/stream_base.cc',
'src/stream_wrap.cc',
'src/tcp_wrap.cc',
Expand Down Expand Up @@ -678,14 +677,14 @@
'<(OBJ_PATH)<(OBJ_SEPARATOR)node_debug_options.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)async-wrap.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)env.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)handle_wrap.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)node.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)node_buffer.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)node_i18n.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)node_perf.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)node_url.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)util.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)string_bytes.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)string_search.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)stream_base.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)node_constants.<(OBJ_SUFFIX)',
'<(OBJ_TRACING_PATH)<(OBJ_SEPARATOR)agent.<(OBJ_SUFFIX)',
Expand Down
18 changes: 14 additions & 4 deletions src/async-wrap-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ inline v8::MaybeLocal<v8::Value> AsyncWrap::MakeCallback(
const v8::Local<v8::String> symbol,
int argc,
v8::Local<v8::Value>* argv) {
v8::Local<v8::Value> cb_v = object()->Get(symbol);
CHECK(cb_v->IsFunction());
v8::Local<v8::Value> cb_v;
if (!object()->Get(object()->CreationContext(), symbol).ToLocal(&cb_v))
return v8::MaybeLocal<v8::Value>();
if (!cb_v->IsFunction()) {
env()->ThrowError("callback must be a function");
return v8::MaybeLocal<v8::Value>();
}
return MakeCallback(cb_v.As<v8::Function>(), argc, argv);
}

Expand All @@ -60,8 +65,13 @@ inline v8::MaybeLocal<v8::Value> AsyncWrap::MakeCallback(
uint32_t index,
int argc,
v8::Local<v8::Value>* argv) {
v8::Local<v8::Value> cb_v = object()->Get(index);
CHECK(cb_v->IsFunction());
v8::Local<v8::Value> cb_v;
if (!object()->Get(object()->CreationContext(), index).ToLocal(&cb_v))
return v8::MaybeLocal<v8::Value>();
if (!cb_v->IsFunction()) {
env()->ThrowError("callback must be a function");
return v8::MaybeLocal<v8::Value>();
}
return MakeCallback(cb_v.As<v8::Function>(), argc, argv);
}

Expand Down
4 changes: 4 additions & 0 deletions src/async-wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ static void DestroyAsyncIdsCallback(uv_timer_t* handle) {
do {
std::vector<double> destroy_async_id_list;
destroy_async_id_list.swap(*env->destroy_async_id_list());
if (!env->can_call_into_js()) return;
for (auto async_id : destroy_async_id_list) {
// Want each callback to be cleaned up after itself, instead of cleaning
// them all up after the while() loop completes.
Expand All @@ -174,6 +175,9 @@ static void PushBackDestroyAsyncId(Environment* env, double id) {
if (env->async_hooks()->fields()[AsyncHooks::kDestroy] == 0)
return;

if (!env->can_call_into_js())
return;

if (env->destroy_async_id_list()->empty())
uv_timer_start(env->destroy_async_ids_timer_handle(),
DestroyAsyncIdsCallback, 0, 0);
Expand Down
45 changes: 20 additions & 25 deletions src/cares_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ using v8::Value;

namespace {

Mutex ares_library_mutex;

inline uint16_t cares_get_16bit(const unsigned char* p) {
return static_cast<uint32_t>(p[0] << 8U) | (static_cast<uint32_t>(p[1]));
}
Expand Down Expand Up @@ -271,9 +273,8 @@ void ares_poll_cb(uv_poll_t* watcher, int status, int events) {
}


void ares_poll_close_cb(uv_handle_t* watcher) {
node_ares_task* task = ContainerOf(&node_ares_task::poll_watcher,
reinterpret_cast<uv_poll_t*>(watcher));
void ares_poll_close_cb(uv_poll_t* watcher) {
node_ares_task* task = ContainerOf(&node_ares_task::poll_watcher, watcher);
free(task);
}

Expand Down Expand Up @@ -351,8 +352,7 @@ void ares_sockstate_cb(void* data,
"When an ares socket is closed we should have a handle for it");

channel->task_list()->erase(it);
uv_close(reinterpret_cast<uv_handle_t*>(&task->poll_watcher),
ares_poll_close_cb);
channel->env()->CloseHandle(&task->poll_watcher, ares_poll_close_cb);

if (channel->task_list()->empty()) {
uv_timer_stop(channel->timer_handle());
Expand Down Expand Up @@ -496,6 +496,7 @@ void ChannelWrap::Setup() {

int r;
if (!library_inited_) {
Mutex::ScopedLock lock(ares_library_mutex);
// Multiple calls to ares_library_init() increase a reference counter,
// so this is a no-op except for the first call to it.
r = ares_library_init(ARES_LIB_INIT_ALL);
Expand All @@ -509,6 +510,7 @@ void ChannelWrap::Setup() {
ARES_OPT_FLAGS | ARES_OPT_SOCK_STATE_CB);

if (r != ARES_SUCCESS) {
Mutex::ScopedLock lock(ares_library_mutex);
ares_library_cleanup();
return env()->ThrowError(ToErrorCodeString(r));
}
Expand All @@ -526,6 +528,7 @@ void ChannelWrap::Setup() {

ChannelWrap::~ChannelWrap() {
if (library_inited_) {
Mutex::ScopedLock lock(ares_library_mutex);
// This decreases the reference counter increased by ares_library_init().
ares_library_cleanup();
}
Expand All @@ -538,10 +541,7 @@ ChannelWrap::~ChannelWrap() {
void ChannelWrap::CleanupTimer() {
if (timer_handle_ == nullptr) return;

uv_close(reinterpret_cast<uv_handle_t*>(timer_handle_),
[](uv_handle_t* handle) {
delete reinterpret_cast<uv_timer_t*>(handle);
});
env()->CloseHandle(timer_handle_, [](uv_timer_t* handle){ delete handle; });
timer_handle_ = nullptr;
}

Expand Down Expand Up @@ -636,8 +636,7 @@ class QueryWrap : public AsyncWrap {
static_cast<void*>(this));
}

static void CaresAsyncClose(uv_handle_t* handle) {
uv_async_t* async = reinterpret_cast<uv_async_t*>(handle);
static void CaresAsyncClose(uv_async_t* async) {
auto data = static_cast<struct CaresAsyncData*>(async->data);
delete data->wrap;
delete data;
Expand All @@ -662,7 +661,7 @@ class QueryWrap : public AsyncWrap {
free(host);
}

uv_close(reinterpret_cast<uv_handle_t*>(handle), CaresAsyncClose);
wrap->env()->CloseHandle(handle, CaresAsyncClose);
}

static void Callback(void *arg, int status, int timeouts,
Expand Down Expand Up @@ -1970,13 +1969,11 @@ void GetAddrInfo(const FunctionCallbackInfo<Value>& args) {
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = flags;

int err = uv_getaddrinfo(env->event_loop(),
req_wrap->req(),
AfterGetAddrInfo,
*hostname,
nullptr,
&hints);
req_wrap->Dispatched();
int err = req_wrap->Dispatch(uv_getaddrinfo,
AfterGetAddrInfo,
*hostname,
nullptr,
&hints);
if (err)
delete req_wrap;

Expand All @@ -2000,12 +1997,10 @@ void GetNameInfo(const FunctionCallbackInfo<Value>& args) {

GetNameInfoReqWrap* req_wrap = new GetNameInfoReqWrap(env, req_wrap_obj);

int err = uv_getnameinfo(env->event_loop(),
req_wrap->req(),
AfterGetNameInfo,
(struct sockaddr*)&addr,
NI_NAMEREQD);
req_wrap->Dispatched();
int err = req_wrap->Dispatch(uv_getnameinfo,
AfterGetNameInfo,
(struct sockaddr*)&addr,
NI_NAMEREQD);
if (err)
delete req_wrap;

Expand Down
2 changes: 1 addition & 1 deletion src/connection_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ConnectionWrap : public StreamWrap {
ConnectionWrap(Environment* env,
v8::Local<v8::Object> object,
ProviderType provider);
~ConnectionWrap() {
virtual ~ConnectionWrap() {
}

UVType handle_;
Expand Down
62 changes: 59 additions & 3 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ inline Environment::Environment(IsolateData* isolate_data,
#if HAVE_INSPECTOR
inspector_agent_(this),
#endif
handle_cleanup_waiting_(0),
http_parser_buffer_(nullptr),
fs_stats_field_array_(nullptr),
context_(context->GetIsolate(), context) {
Expand Down Expand Up @@ -334,6 +333,7 @@ inline Environment::~Environment() {
delete[] heap_space_statistics_buffer_;
delete[] http_parser_buffer_;
delete http2_state_;
delete[] fs_stats_field_array_;
free(performance_state_);
}

Expand Down Expand Up @@ -375,8 +375,36 @@ inline void Environment::RegisterHandleCleanup(uv_handle_t* handle,
handle_cleanup_queue_.PushBack(new HandleCleanup(handle, cb, arg));
}

inline void Environment::FinishHandleCleanup(uv_handle_t* handle) {
handle_cleanup_waiting_--;
template <typename T, typename OnCloseCallback>
inline void Environment::CloseHandle(T* handle, OnCloseCallback callback) {
handle_cleanup_waiting_++;
static_assert(sizeof(T) >= sizeof(uv_handle_t), "T is a libuv handle");
static_assert(offsetof(T, data) == offsetof(uv_handle_t, data),
"T is a libuv handle");
static_assert(offsetof(T, close_cb) == offsetof(uv_handle_t, close_cb),
"T is a libuv handle");
struct CloseData {
Environment* env;
OnCloseCallback callback;
void* original_data;
};
handle->data = new CloseData { this, callback, handle->data };
uv_close(reinterpret_cast<uv_handle_t*>(handle), [](uv_handle_t* handle) {
CloseData* data = static_cast<CloseData*>(handle->data);
data->env->handle_cleanup_waiting_--;
handle->data = data->original_data;
data->callback(reinterpret_cast<T*>(handle));
delete data;
});
}

void Environment::IncreaseWaitingRequestCounter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this is supposed to proxy the status of the libuv. If I'm correct IMHO it should be a libuv API.
libuv/libuv#1528 seems like a good start.

request_waiting_++;
}

void Environment::DecreaseWaitingRequestCounter() {
request_waiting_--;
CHECK_GE(request_waiting_, 0);
}

inline uv_loop_t* Environment::event_loop() const {
Expand Down Expand Up @@ -505,6 +533,14 @@ inline void Environment::set_fs_stats_field_array(double* fields) {
fs_stats_field_array_ = fields;
}

inline bool Environment::can_call_into_js() const {
return can_call_into_js_;
}

inline void Environment::set_can_call_into_js(bool can_call_into_js) {
can_call_into_js_ = can_call_into_js;
}

inline performance::performance_state* Environment::performance_state() {
return performance_state_;
}
Expand Down Expand Up @@ -598,6 +634,26 @@ inline void Environment::SetTemplateMethod(v8::Local<v8::FunctionTemplate> that,
t->SetClassName(name_string); // NODE_SET_METHOD() compatibility.
}

void Environment::AddCleanupHook(void (*fn)(void*), void* arg) {
cleanup_hooks_[arg].push_back(
CleanupHookCallback { fn, arg, cleanup_hook_counter_++ });
}

void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) {
auto map_it = cleanup_hooks_.find(arg);
if (map_it == cleanup_hooks_.end())
return;

for (auto it = map_it->second.begin(); it != map_it->second.end(); ++it) {
if (it->fun_ == fn && it->arg_ == arg) {
map_it->second.erase(it);
if (map_it->second.empty())
cleanup_hooks_.erase(arg);
return;
}
}
}

#define VP(PropertyName, StringValue) V(v8::Private, PropertyName)
#define VS(PropertyName, StringValue) V(v8::String, PropertyName)
#define V(TypeName, PropertyName) \
Expand Down
42 changes: 35 additions & 7 deletions src/env.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "node_internals.h"
#include "async-wrap.h"
#include "v8-profiler.h"
#include "req-wrap-inl.h"

#if defined(_MSC_VER)
#define getpid GetCurrentProcessId
Expand Down Expand Up @@ -51,11 +52,7 @@ void Environment::Start(int argc,
uv_timer_init(event_loop(), destroy_async_ids_timer_handle());

auto close_and_finish = [](Environment* env, uv_handle_t* handle, void* arg) {
handle->data = env;

uv_close(handle, [](uv_handle_t* handle) {
static_cast<Environment*>(handle->data)->FinishHandleCleanup(handle);
});
env->CloseHandle(handle, [](uv_handle_t* handle) {});
};

RegisterHandleCleanup(
Expand Down Expand Up @@ -95,14 +92,22 @@ void Environment::Start(int argc,
}

void Environment::CleanupHandles() {
for (auto r : req_wrap_queue_)
r->Cancel();

for (auto w : handle_wrap_queue_)
w->Close();

while (HandleCleanup* hc = handle_cleanup_queue_.PopFront()) {
handle_cleanup_waiting_++;
hc->cb_(this, hc->handle_, hc->arg_);
delete hc;
}

while (handle_cleanup_waiting_ != 0)
while (handle_cleanup_waiting_ != 0 ||
request_waiting_ != 0 ||
!handle_wrap_queue_.IsEmpty()) {
uv_run(event_loop(), UV_RUN_ONCE);
}
}

void Environment::StartProfilerIdleNotifier() {
Expand Down Expand Up @@ -166,6 +171,29 @@ void Environment::PrintSyncTrace() const {
fflush(stderr);
}

void Environment::RunCleanup() {
CleanupHandles();

while (!cleanup_hooks_.empty()) {
std::vector<CleanupHookCallback> callbacks;
// Concatenate all vectors in cleanup_hooks_
for (const auto& pair : cleanup_hooks_)
callbacks.insert(callbacks.end(), pair.second.begin(), pair.second.end());
cleanup_hooks_.clear();
std::sort(callbacks.begin(), callbacks.end(),
[](const CleanupHookCallback& a, const CleanupHookCallback& b) {
// Sort in descending order so that the last-inserted callbacks get run
// first.
return a.insertion_order_counter_ > b.insertion_order_counter_;
});

for (const CleanupHookCallback& cb : callbacks) {
cb.fun_(cb.arg_);
CleanupHandles();
}
}
}

void Environment::RunAtExitCallbacks() {
for (AtExitCallback at_exit : at_exit_functions_) {
at_exit.cb_(at_exit.arg_);
Expand Down
Loading