Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test,quic: add common/udppair utility for quic #33380

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@
'src/js_native_api_v8_internals.h',
'src/js_stream.cc',
'src/json_utils.cc',
'src/js_udp_wrap.cc',
'src/module_wrap.cc',
'src/node.cc',
'src/node_api.cc',
Expand Down
1 change: 1 addition & 0 deletions src/async_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace node {
V(HTTPINCOMINGMESSAGE) \
V(HTTPCLIENTREQUEST) \
V(JSSTREAM) \
V(JSUDPWRAP) \
V(MESSAGEPORT) \
V(PIPECONNECTWRAP) \
V(PIPESERVERWRAP) \
Expand Down
218 changes: 218 additions & 0 deletions src/js_udp_wrap.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
#include "udp_wrap.h"
#include "async_wrap-inl.h"
#include "node_errors.h"
#include "node_sockaddr-inl.h"

#include <algorithm>

namespace node {

using errors::TryCatchScope;
using v8::Array;
using v8::Context;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Int32;
using v8::Local;
using v8::Object;
using v8::String;
using v8::Value;

// JSUDPWrap is a testing utility used by test/common/udppair.js
// to simulate UDP traffic deterministically in Node.js tests.
class JSUDPWrap final : public UDPWrapBase, public AsyncWrap {
public:
JSUDPWrap(Environment* env, Local<Object> obj);

int RecvStart() override;
int RecvStop() override;
ssize_t Send(uv_buf_t* bufs,
size_t nbufs,
const sockaddr* addr) override;
SocketAddress GetPeerName() override;
SocketAddress GetSockName() override;
AsyncWrap* GetAsyncWrap() override { return this; }

static void New(const FunctionCallbackInfo<Value>& args);
static void EmitReceived(const FunctionCallbackInfo<Value>& args);
static void OnSendDone(const FunctionCallbackInfo<Value>& args);
static void OnAfterBind(const FunctionCallbackInfo<Value>& args);

static void Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context,
void* priv);
SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(JSUDPWrap)
SET_SELF_SIZE(JSUDPWrap)
};

JSUDPWrap::JSUDPWrap(Environment* env, Local<Object> obj)
: AsyncWrap(env, obj, PROVIDER_JSUDPWRAP) {
MakeWeak();

obj->SetAlignedPointerInInternalField(
kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));
}

int JSUDPWrap::RecvStart() {
HandleScope scope(env()->isolate());
Context::Scope context_scope(env()->context());
TryCatchScope try_catch(env());
Local<Value> value;
int32_t value_int = UV_EPROTO;
if (!MakeCallback(env()->onreadstart_string(), 0, nullptr).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
if (try_catch.HasCaught() && !try_catch.HasTerminated())
errors::TriggerUncaughtException(env()->isolate(), try_catch);
}
return value_int;
}

int JSUDPWrap::RecvStop() {
HandleScope scope(env()->isolate());
Context::Scope context_scope(env()->context());
TryCatchScope try_catch(env());
Local<Value> value;
int32_t value_int = UV_EPROTO;
if (!MakeCallback(env()->onreadstop_string(), 0, nullptr).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
if (try_catch.HasCaught() && !try_catch.HasTerminated())
errors::TriggerUncaughtException(env()->isolate(), try_catch);
}
return value_int;
}

ssize_t JSUDPWrap::Send(uv_buf_t* bufs,
size_t nbufs,
const sockaddr* addr) {
HandleScope scope(env()->isolate());
Context::Scope context_scope(env()->context());
TryCatchScope try_catch(env());
Local<Value> value;
int64_t value_int = UV_EPROTO;
size_t total_len = 0;

MaybeStackBuffer<Local<Value>, 16> buffers(nbufs);
for (size_t i = 0; i < nbufs; i++) {
buffers[i] = Buffer::Copy(env(), bufs[i].base, bufs[i].len)
.ToLocalChecked();
total_len += bufs[i].len;
}

Local<Value> args[] = {
listener()->CreateSendWrap(total_len)->object(),
Array::New(env()->isolate(), buffers.out(), nbufs),
AddressToJS(env(), addr)
};

if (!MakeCallback(env()->onwrite_string(), arraysize(args), args)
.ToLocal(&value) ||
!value->IntegerValue(env()->context()).To(&value_int)) {
if (try_catch.HasCaught() && !try_catch.HasTerminated())
errors::TriggerUncaughtException(env()->isolate(), try_catch);
}
return value_int;
}

SocketAddress JSUDPWrap::GetPeerName() {
SocketAddress ret;
CHECK(SocketAddress::New(AF_INET, "127.0.0.1", 1337, &ret));
return ret;
}

SocketAddress JSUDPWrap::GetSockName() {
SocketAddress ret;
CHECK(SocketAddress::New(AF_INET, "127.0.0.1", 1337, &ret));
return ret;
}

void JSUDPWrap::New(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args.IsConstructCall());
new JSUDPWrap(env, args.Holder());
}

void JSUDPWrap::EmitReceived(const FunctionCallbackInfo<Value>& args) {
JSUDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
Environment* env = wrap->env();

ArrayBufferViewContents<char> buffer(args[0]);
const char* data = buffer.data();
int len = buffer.length();

CHECK(args[1]->IsInt32()); // family
CHECK(args[2]->IsString()); // address
CHECK(args[3]->IsInt32()); // port
CHECK(args[4]->IsInt32()); // flags
int family = args[1].As<Int32>()->Value() == 4 ? AF_INET : AF_INET6;
Utf8Value address(env->isolate(), args[2]);
int port = args[3].As<Int32>()->Value();
int flags = args[3].As<Int32>()->Value();

sockaddr_storage addr;
CHECK_EQ(sockaddr_for_family(family, *address, port, &addr), 0);

// Repeatedly ask the stream's owner for memory, copy the data that we
// just read from JS into those buffers and emit them as reads.
while (len != 0) {
uv_buf_t buf = wrap->listener()->OnAlloc(len);
ssize_t avail = std::min<size_t>(buf.len, len);
memcpy(buf.base, data, avail);
data += avail;
len -= avail;
wrap->listener()->OnRecv(
avail, buf, reinterpret_cast<sockaddr*>(&addr), flags);
}
}

void JSUDPWrap::OnSendDone(const FunctionCallbackInfo<Value>& args) {
JSUDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

CHECK(args[0]->IsObject());
CHECK(args[1]->IsInt32());
ReqWrap<uv_udp_send_t>* req_wrap;
ASSIGN_OR_RETURN_UNWRAP(&req_wrap, args[0].As<Object>());
int status = args[1].As<Int32>()->Value();

wrap->listener()->OnSendDone(req_wrap, status);
}

void JSUDPWrap::OnAfterBind(const FunctionCallbackInfo<Value>& args) {
JSUDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

wrap->listener()->OnAfterBind();
}

void JSUDPWrap::Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context,
void* priv) {
Environment* env = Environment::GetCurrent(context);

Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
Local<String> js_udp_wrap_string =
FIXED_ONE_BYTE_STRING(env->isolate(), "JSUDPWrap");
t->SetClassName(js_udp_wrap_string);
t->InstanceTemplate()
->SetInternalFieldCount(UDPWrapBase::kUDPWrapBaseField + 1);
t->Inherit(AsyncWrap::GetConstructorTemplate(env));

UDPWrapBase::AddMethods(env, t);
env->SetProtoMethod(t, "emitReceived", EmitReceived);
env->SetProtoMethod(t, "onSendDone", OnSendDone);
env->SetProtoMethod(t, "onAfterBind", OnAfterBind);

target->Set(env->context(),
js_udp_wrap_string,
t->GetFunction(context).ToLocalChecked()).Check();
}


} // namespace node

NODE_MODULE_CONTEXT_AWARE_INTERNAL(js_udp_wrap, node::JSUDPWrap::Initialize)
1 change: 1 addition & 0 deletions src/node_binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
V(http_parser) \
V(inspector) \
V(js_stream) \
V(js_udp_wrap) \
V(messaging) \
V(module_wrap) \
V(native_module) \
Expand Down
5 changes: 5 additions & 0 deletions src/udp_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ class UDPWrap final : public HandleWrap,
v8::Local<v8::Object> current_send_req_wrap_;
};

int sockaddr_for_family(int address_family,
const char* address,
const unsigned short port,
sockaddr_storage* addr);

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
Expand Down
13 changes: 13 additions & 0 deletions test/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,19 @@ listener to process `'beforeExit'`. If a file needs to be left open until
Node.js completes, use a child process and call `refresh()` only in the
parent.

## UDP pair helper

The `common/udppair` module exports a function `makeUDPPair` and a class
`FakeUDPWrap`.

`FakeUDPWrap` emits `'send'` events when data is to be sent on it, and provides
an `emitReceived()` API for actin as if data has been received on it.

`makeUDPPair` returns an object `{ clientSide, serverSide }` where each side
is an `FakeUDPWrap` connected to the other side.

There is no difference between cient or server side beyond their names.

## WPT Module

### `harness`
Expand Down
102 changes: 102 additions & 0 deletions test/common/udppair.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/* eslint-disable node-core/require-common-first, node-core/required-modules */
'use strict';
const { internalBinding } = require('internal/test/binding');
const { JSUDPWrap } = internalBinding('js_udp_wrap');
const EventEmitter = require('events');

// FakeUDPWrap is a testing utility that emulates a UDP connection
// for the sake of making UDP tests more deterministic.
class FakeUDPWrap extends EventEmitter {
constructor() {
super();

this._handle = new JSUDPWrap();

this._handle.onreadstart = () => this._startReading();
this._handle.onreadstop = () => this._stopReading();
this._handle.onwrite =
(wrap, buffers, addr) => this._write(wrap, buffers, addr);
this._handle.getsockname = (obj) => {
Object.assign(obj, { address: '127.0.0.1', family: 'IPv4', port: 1337 });
return 0;
};

this.reading = false;
this.bufferedReceived = [];
this.emitBufferedImmediate = null;
}

_emitBuffered = () => {
if (!this.reading) return;
if (this.bufferedReceived.length > 0) {
this.emitReceived(this.bufferedReceived.shift());
this.emitBufferedImmediate = setImmediate(this._emitBuffered);
} else {
this.emit('wantRead');
}
};

_startReading() {
this.reading = true;
this.emitBufferedImmediate = setImmediate(this._emitBuffered);
}

_stopReading() {
this.reading = false;
clearImmediate(this.emitBufferedImmediate);
}

_write(wrap, buffers, addr) {
this.emit('send', { buffers, addr });
setImmediate(() => this._handle.onSendDone(wrap, 0));
}

afterBind() {
this._handle.onAfterBind();
}

emitReceived(info) {
if (!this.reading) {
this.bufferedReceived.push(info);
return;
}

const {
buffers,
addr: {
family = 4,
address = '127.0.0.1',
port = 1337,
},
flags = 0
} = info;

let familyInt;
switch (family) {
case 'IPv4': familyInt = 4; break;
case 'IPv6': familyInt = 6; break;
default: throw new Error('bad family');
}

for (const buffer of buffers) {
this._handle.emitReceived(buffer, familyInt, address, port, flags);
}
}
}

function makeUDPPair() {
const serverSide = new FakeUDPWrap();
const clientSide = new FakeUDPWrap();

serverSide.on('send',
(chk) => setImmediate(() => clientSide.emitReceived(chk)));
clientSide.on('send',
(chk) => setImmediate(() => serverSide.emitReceived(chk)));

return { serverSide, clientSide };
}

module.exports = {
FakeUDPWrap,
makeUDPPair
};
1 change: 1 addition & 0 deletions test/sequential/test-async-wrap-getasyncid.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const { getSystemErrorName } = require('util');
delete providers.STREAMPIPE;
delete providers.MESSAGEPORT;
delete providers.WORKER;
delete providers.JSUDPWRAP;
if (!common.isMainThread)
delete providers.INSPECTORJSBINDING;
delete providers.KEYPAIRGENREQUEST;
Expand Down