diff --git a/node.gyp b/node.gyp index c03623846a2b6e..1a540953500b39 100644 --- a/node.gyp +++ b/node.gyp @@ -563,6 +563,7 @@ 'src/js_native_api_v8_internals.h', 'src/js_stream.cc', 'src/json_utils.cc', + 'src/js_udp_wrap.cc', 'src/module_wrap.cc', 'src/node.cc', 'src/node_api.cc', diff --git a/src/async_wrap.h b/src/async_wrap.h index 577d11b6cc881f..ac1e9ccd04c4fa 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -51,6 +51,7 @@ namespace node { V(HTTPINCOMINGMESSAGE) \ V(HTTPCLIENTREQUEST) \ V(JSSTREAM) \ + V(JSUDPWRAP) \ V(MESSAGEPORT) \ V(PIPECONNECTWRAP) \ V(PIPESERVERWRAP) \ diff --git a/src/js_udp_wrap.cc b/src/js_udp_wrap.cc new file mode 100644 index 00000000000000..c51683141186f0 --- /dev/null +++ b/src/js_udp_wrap.cc @@ -0,0 +1,218 @@ +#include "udp_wrap.h" +#include "async_wrap-inl.h" +#include "node_errors.h" +#include "node_sockaddr-inl.h" + +#include + +namespace node { + +using errors::TryCatchScope; +using v8::Array; +using v8::Context; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::HandleScope; +using v8::Int32; +using v8::Local; +using v8::Object; +using v8::String; +using v8::Value; + +// JSUDPWrap is a testing utility used by test/common/udppair.js +// to simulate UDP traffic deterministically in Node.js tests. +class JSUDPWrap final : public UDPWrapBase, public AsyncWrap { + public: + JSUDPWrap(Environment* env, Local obj); + + int RecvStart() override; + int RecvStop() override; + ssize_t Send(uv_buf_t* bufs, + size_t nbufs, + const sockaddr* addr) override; + SocketAddress GetPeerName() override; + SocketAddress GetSockName() override; + AsyncWrap* GetAsyncWrap() override { return this; } + + static void New(const FunctionCallbackInfo& args); + static void EmitReceived(const FunctionCallbackInfo& args); + static void OnSendDone(const FunctionCallbackInfo& args); + static void OnAfterBind(const FunctionCallbackInfo& args); + + static void Initialize(Local target, + Local unused, + Local context, + void* priv); + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(JSUDPWrap) + SET_SELF_SIZE(JSUDPWrap) +}; + +JSUDPWrap::JSUDPWrap(Environment* env, Local obj) + : AsyncWrap(env, obj, PROVIDER_JSUDPWRAP) { + MakeWeak(); + + obj->SetAlignedPointerInInternalField( + kUDPWrapBaseField, static_cast(this)); +} + +int JSUDPWrap::RecvStart() { + HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + TryCatchScope try_catch(env()); + Local value; + int32_t value_int = UV_EPROTO; + if (!MakeCallback(env()->onreadstart_string(), 0, nullptr).ToLocal(&value) || + !value->Int32Value(env()->context()).To(&value_int)) { + if (try_catch.HasCaught() && !try_catch.HasTerminated()) + errors::TriggerUncaughtException(env()->isolate(), try_catch); + } + return value_int; +} + +int JSUDPWrap::RecvStop() { + HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + TryCatchScope try_catch(env()); + Local value; + int32_t value_int = UV_EPROTO; + if (!MakeCallback(env()->onreadstop_string(), 0, nullptr).ToLocal(&value) || + !value->Int32Value(env()->context()).To(&value_int)) { + if (try_catch.HasCaught() && !try_catch.HasTerminated()) + errors::TriggerUncaughtException(env()->isolate(), try_catch); + } + return value_int; +} + +ssize_t JSUDPWrap::Send(uv_buf_t* bufs, + size_t nbufs, + const sockaddr* addr) { + HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + TryCatchScope try_catch(env()); + Local value; + int64_t value_int = UV_EPROTO; + size_t total_len = 0; + + MaybeStackBuffer, 16> buffers(nbufs); + for (size_t i = 0; i < nbufs; i++) { + buffers[i] = Buffer::Copy(env(), bufs[i].base, bufs[i].len) + .ToLocalChecked(); + total_len += bufs[i].len; + } + + Local args[] = { + listener()->CreateSendWrap(total_len)->object(), + Array::New(env()->isolate(), buffers.out(), nbufs), + AddressToJS(env(), addr) + }; + + if (!MakeCallback(env()->onwrite_string(), arraysize(args), args) + .ToLocal(&value) || + !value->IntegerValue(env()->context()).To(&value_int)) { + if (try_catch.HasCaught() && !try_catch.HasTerminated()) + errors::TriggerUncaughtException(env()->isolate(), try_catch); + } + return value_int; +} + +SocketAddress JSUDPWrap::GetPeerName() { + SocketAddress ret; + CHECK(SocketAddress::New(AF_INET, "127.0.0.1", 1337, &ret)); + return ret; +} + +SocketAddress JSUDPWrap::GetSockName() { + SocketAddress ret; + CHECK(SocketAddress::New(AF_INET, "127.0.0.1", 1337, &ret)); + return ret; +} + +void JSUDPWrap::New(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + CHECK(args.IsConstructCall()); + new JSUDPWrap(env, args.Holder()); +} + +void JSUDPWrap::EmitReceived(const FunctionCallbackInfo& args) { + JSUDPWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + Environment* env = wrap->env(); + + ArrayBufferViewContents buffer(args[0]); + const char* data = buffer.data(); + int len = buffer.length(); + + CHECK(args[1]->IsInt32()); // family + CHECK(args[2]->IsString()); // address + CHECK(args[3]->IsInt32()); // port + CHECK(args[4]->IsInt32()); // flags + int family = args[1].As()->Value() == 4 ? AF_INET : AF_INET6; + Utf8Value address(env->isolate(), args[2]); + int port = args[3].As()->Value(); + int flags = args[3].As()->Value(); + + sockaddr_storage addr; + CHECK_EQ(sockaddr_for_family(family, *address, port, &addr), 0); + + // Repeatedly ask the stream's owner for memory, copy the data that we + // just read from JS into those buffers and emit them as reads. + while (len != 0) { + uv_buf_t buf = wrap->listener()->OnAlloc(len); + ssize_t avail = std::min(buf.len, len); + memcpy(buf.base, data, avail); + data += avail; + len -= avail; + wrap->listener()->OnRecv( + avail, buf, reinterpret_cast(&addr), flags); + } +} + +void JSUDPWrap::OnSendDone(const FunctionCallbackInfo& args) { + JSUDPWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + CHECK(args[0]->IsObject()); + CHECK(args[1]->IsInt32()); + ReqWrap* req_wrap; + ASSIGN_OR_RETURN_UNWRAP(&req_wrap, args[0].As()); + int status = args[1].As()->Value(); + + wrap->listener()->OnSendDone(req_wrap, status); +} + +void JSUDPWrap::OnAfterBind(const FunctionCallbackInfo& args) { + JSUDPWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + wrap->listener()->OnAfterBind(); +} + +void JSUDPWrap::Initialize(Local target, + Local unused, + Local context, + void* priv) { + Environment* env = Environment::GetCurrent(context); + + Local t = env->NewFunctionTemplate(New); + Local js_udp_wrap_string = + FIXED_ONE_BYTE_STRING(env->isolate(), "JSUDPWrap"); + t->SetClassName(js_udp_wrap_string); + t->InstanceTemplate() + ->SetInternalFieldCount(UDPWrapBase::kUDPWrapBaseField + 1); + t->Inherit(AsyncWrap::GetConstructorTemplate(env)); + + UDPWrapBase::AddMethods(env, t); + env->SetProtoMethod(t, "emitReceived", EmitReceived); + env->SetProtoMethod(t, "onSendDone", OnSendDone); + env->SetProtoMethod(t, "onAfterBind", OnAfterBind); + + target->Set(env->context(), + js_udp_wrap_string, + t->GetFunction(context).ToLocalChecked()).Check(); +} + + +} // namespace node + +NODE_MODULE_CONTEXT_AWARE_INTERNAL(js_udp_wrap, node::JSUDPWrap::Initialize) diff --git a/src/node_binding.cc b/src/node_binding.cc index 6f0e80551223c0..28963c169e3990 100644 --- a/src/node_binding.cc +++ b/src/node_binding.cc @@ -52,6 +52,7 @@ V(http_parser_llhttp) \ V(inspector) \ V(js_stream) \ + V(js_udp_wrap) \ V(messaging) \ V(module_wrap) \ V(native_module) \ diff --git a/src/udp_wrap.h b/src/udp_wrap.h index 6fed1d2dfea810..75a123d8fa793e 100644 --- a/src/udp_wrap.h +++ b/src/udp_wrap.h @@ -215,6 +215,11 @@ class UDPWrap final : public HandleWrap, v8::Local current_send_req_wrap_; }; +int sockaddr_for_family(int address_family, + const char* address, + const unsigned short port, + sockaddr_storage* addr); + } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/test/common/README.md b/test/common/README.md index f17024bf6030fa..49ce50cc495871 100644 --- a/test/common/README.md +++ b/test/common/README.md @@ -929,6 +929,19 @@ The realpath of the testing temporary directory. Deletes and recreates the testing temporary directory. +## UDP pair helper + +The `common/udppair` module exports a function `makeUDPPair` and a class +`FakeUDPWrap`. + +`FakeUDPWrap` emits `'send'` events when data is to be sent on it, and provides +an `emitReceived()` API for actin as if data has been received on it. + +`makeUDPPair` returns an object `{ clientSide, serverSide }` where each side +is an `FakeUDPWrap` connected to the other side. + +There is no difference between cient or server side beyond their names. + ## WPT Module ### `harness` diff --git a/test/common/udppair.js b/test/common/udppair.js new file mode 100644 index 00000000000000..a7e532a99778e9 --- /dev/null +++ b/test/common/udppair.js @@ -0,0 +1,102 @@ +/* eslint-disable node-core/require-common-first, node-core/required-modules */ +'use strict'; +const { internalBinding } = require('internal/test/binding'); +const { JSUDPWrap } = internalBinding('js_udp_wrap'); +const EventEmitter = require('events'); + +// FakeUDPWrap is a testing utility that emulates a UDP connection +// for the sake of making UDP tests more deterministic. +class FakeUDPWrap extends EventEmitter { + constructor() { + super(); + + this._handle = new JSUDPWrap(); + + this._handle.onreadstart = () => this._startReading(); + this._handle.onreadstop = () => this._stopReading(); + this._handle.onwrite = + (wrap, buffers, addr) => this._write(wrap, buffers, addr); + this._handle.getsockname = (obj) => { + Object.assign(obj, { address: '127.0.0.1', family: 'IPv4', port: 1337 }); + return 0; + }; + + this.reading = false; + this.bufferedReceived = []; + this.emitBufferedImmediate = null; + } + + _emitBuffered = () => { + if (!this.reading) return; + if (this.bufferedReceived.length > 0) { + this.emitReceived(this.bufferedReceived.shift()); + this.emitBufferedImmediate = setImmediate(this._emitBuffered); + } else { + this.emit('wantRead'); + } + }; + + _startReading() { + this.reading = true; + this.emitBufferedImmediate = setImmediate(this._emitBuffered); + } + + _stopReading() { + this.reading = false; + clearImmediate(this.emitBufferedImmediate); + } + + _write(wrap, buffers, addr) { + this.emit('send', { buffers, addr }); + setImmediate(() => this._handle.onSendDone(wrap, 0)); + } + + afterBind() { + this._handle.onAfterBind(); + } + + emitReceived(info) { + if (!this.reading) { + this.bufferedReceived.push(info); + return; + } + + const { + buffers, + addr: { + family = 4, + address = '127.0.0.1', + port = 1337, + }, + flags = 0 + } = info; + + let familyInt; + switch (family) { + case 'IPv4': familyInt = 4; break; + case 'IPv6': familyInt = 6; break; + default: throw new Error('bad family'); + } + + for (const buffer of buffers) { + this._handle.emitReceived(buffer, familyInt, address, port, flags); + } + } +} + +function makeUDPPair() { + const serverSide = new FakeUDPWrap(); + const clientSide = new FakeUDPWrap(); + + serverSide.on('send', + (chk) => setImmediate(() => clientSide.emitReceived(chk))); + clientSide.on('send', + (chk) => setImmediate(() => serverSide.emitReceived(chk))); + + return { serverSide, clientSide }; +} + +module.exports = { + FakeUDPWrap, + makeUDPPair +}; diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js index ab1f2110a3debc..957c7f7440a4bc 100644 --- a/test/sequential/test-async-wrap-getasyncid.js +++ b/test/sequential/test-async-wrap-getasyncid.js @@ -45,6 +45,7 @@ const { getSystemErrorName } = require('util'); delete providers.STREAMPIPE; delete providers.MESSAGEPORT; delete providers.WORKER; + delete providers.JSUDPWRAP; if (!common.isMainThread) delete providers.INSPECTORJSBINDING; delete providers.KEYPAIRGENREQUEST;