Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

Commit

Permalink
quic: add QuicSocketListener
Browse files Browse the repository at this point in the history
Fixes: #208
PR-URL: #207
Reviewed-By: #207
  • Loading branch information
jasnell authored and addaleax committed Dec 11, 2019
1 parent 2b32e5f commit 081a37f
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 25 deletions.
147 changes: 122 additions & 25 deletions src/node_quic_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,83 @@ inline uint32_t GenerateReservedVersion(
}
} // namespace

QuicSocketListener::~QuicSocketListener() {
if (socket_ != nullptr)
socket_->RemoveListener(this);
}

void QuicSocketListener::OnError(ssize_t code) {
if (previous_listener_ != nullptr)
previous_listener_->OnError(code);
}

void QuicSocketListener::OnError(int code) {
if (previous_listener_ != nullptr)
previous_listener_->OnError(code);
}

void QuicSocketListener::OnSessionReady(BaseObjectPtr<QuicSession> session) {
if (previous_listener_ != nullptr)
previous_listener_->OnSessionReady(session);
}

void QuicSocketListener::OnServerBusy(bool busy) {
if (previous_listener_ != nullptr)
previous_listener_->OnServerBusy(busy);
}

void QuicSocketListener::OnDone() {
if (previous_listener_ != nullptr)
previous_listener_->OnDone();
}

void QuicSocketListener::OnDestroy() {
if (previous_listener_ != nullptr)
previous_listener_->OnDestroy();
}

void JSQuicSocketListener::OnError(ssize_t code) {
Environment* env = Socket()->env();
HandleScope scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Value> arg = Number::New(env->isolate(), static_cast<double>(code));
Socket()->MakeCallback(env->quic_on_socket_error_function(), 1, &arg);
}

void JSQuicSocketListener::OnError(int code) {
Environment* env = Socket()->env();
HandleScope scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Value> arg = Integer::New(env->isolate(), code);
Socket()->MakeCallback(env->quic_on_socket_error_function(), 1, &arg);
}

void JSQuicSocketListener::OnSessionReady(BaseObjectPtr<QuicSession> session) {
Environment* env = Socket()->env();
Local<Value> arg = session->object();
Context::Scope context_scope(env->context());
Socket()->MakeCallback(env->quic_on_session_ready_function(), 1, &arg);
}

void JSQuicSocketListener::OnServerBusy(bool busy) {
Environment* env = Socket()->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Value> arg = Boolean::New(env->isolate(), busy);
Socket()->MakeCallback(env->quic_on_socket_server_busy_function(), 1, &arg);
}

void JSQuicSocketListener::OnDone() {
Environment* env = Socket()->env();
HandleScope scope(env->isolate());
Context::Scope context_scope(env->context());
Socket()->MakeCallback(env->ondone_string(), 0, nullptr);
}

void JSQuicSocketListener::OnDestroy() {
// Do nothing here.
}

QuicSocket::QuicSocket(
Environment* env,
Local<Object> wrap,
Expand All @@ -86,6 +163,7 @@ QuicSocket::QuicSocket(
sizeof(socket_stats_) / sizeof(uint64_t),
reinterpret_cast<uint64_t*>(&socket_stats_)) {
MakeWeak();
PushListener(&default_listener_);

udp_ = static_cast<UDPWrapBase*>(
udp_base_wrap->GetAlignedPointerFromInternalField(
Expand Down Expand Up @@ -130,6 +208,11 @@ QuicSocket::~QuicSocket() {
socket_stats_.packets_ignored,
socket_stats_.server_sessions,
socket_stats_.client_sessions);
QuicSocketListener* listener = listener_;
listener_->OnDestroy();
// Remove the listener if it didn't remove itself already.
if (listener == listener_)
RemoveListener(listener_);
}

void QuicSocket::MemoryInfo(MemoryTracker* tracker) const {
Expand Down Expand Up @@ -204,7 +287,7 @@ void QuicSocket::StopListening() {
void QuicSocket::WaitForPendingCallbacks() {
if (!HasPendingCallbacks()) {
Debug(this, "No pending callbacks, calling ondone immediately");
MakeCallback(env()->ondone_string(), 0, nullptr);
listener_->OnDone();
return;
}
SetFlag(QUICSOCKET_FLAGS_WAITING_FOR_CALLBACKS);
Expand All @@ -227,11 +310,7 @@ void QuicSocket::OnRecv(

if (nread < 0) {
Debug(this, "Reading data from UDP socket failed. Error %d", nread);
Environment* env = this->env();
HandleScope scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Value> arg = Number::New(env->isolate(), static_cast<double>(nread));
MakeCallback(env->quic_on_socket_error_function(), 1, &arg);
listener_->OnError(nread);
return;
}

Expand Down Expand Up @@ -376,11 +455,7 @@ void QuicSocket::RemoveSession(const QuicCID& cid, const sockaddr* addr) {
}

void QuicSocket::ReportSendError(int error) {
HandleScope scope(env()->isolate());
Context::Scope context_scope(env()->context());
Local<Value> arg = Integer::New(env()->isolate(), error);
MakeCallback(env()->quic_on_socket_error_function(), 1, &arg);
return;
listener_->OnError(error);
}

void QuicSocket::SendInitialConnectionClose(
Expand Down Expand Up @@ -637,13 +712,8 @@ BaseObjectPtr<QuicSession> QuicSocket::AcceptInitialPacket(
server_options_,
initial_connection_close,
qlog_);
Local<Value> arg = session->object();
MakeCallback(env()->quic_on_session_ready_function(), 1, &arg);

// The above MakeCallback will notify the JavaScript side that a new
// server QuicSession has been created in an event emitted on nextTick.
// The user may destroy() the server QuicSession in that event but that
// won't impact the code here.
listener_->OnSessionReady(session);

return session;
}
Expand Down Expand Up @@ -672,11 +742,7 @@ size_t QuicSocket::GetCurrentSocketAddressCounter(const sockaddr* addr) {
void QuicSocket::SetServerBusy(bool on) {
Debug(this, "Turning Server Busy Response %s", on ? "on" : "off");
SetFlag(QUICSOCKET_FLAGS_SERVER_BUSY, on);

HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
Local<Value> arg = Boolean::New(env()->isolate(), on);
MakeCallback(env()->quic_on_socket_server_busy_function(), 1, &arg);
listener_->OnServerBusy(on);
}

QuicSocket::SendWrap::SendWrap(
Expand Down Expand Up @@ -830,9 +896,7 @@ void QuicSocket::OnSend(

if (!HasPendingCallbacks() &&
IsFlagSet(QUICSOCKET_FLAGS_WAITING_FOR_CALLBACKS)) {
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
MakeCallback(env()->ondone_string(), 0, nullptr);
listener_->OnDone();
}
}

Expand Down Expand Up @@ -869,6 +933,39 @@ void QuicSocket::DecreaseAllocatedSize(size_t size) {
current_ngtcp2_memory_ -= size;
}

void QuicSocket::PushListener(QuicSocketListener* listener) {
CHECK_NOT_NULL(listener);
CHECK_NULL(listener->socket_);

listener->previous_listener_ = listener_;
listener->socket_ = this;

listener_ = listener;
}

void QuicSocket::RemoveListener(QuicSocketListener* listener) {
CHECK_NOT_NULL(listener);

QuicSocketListener* previous;
QuicSocketListener* current;

for (current = listener_, previous = nullptr;
/* No loop condition because we want a crash if listener is not found */
; previous = current, current = current->previous_listener_) {
CHECK_NOT_NULL(current);
if (current == listener) {
if (previous != nullptr)
previous->previous_listener_ = current->previous_listener_;
else
listener_ = listener->previous_listener_;
break;
}
}

listener->socket_ = nullptr;
listener->previous_listener_ = nullptr;
}

// JavaScript API
namespace {
void NewQuicSocket(const FunctionCallbackInfo<Value>& args) {
Expand Down
41 changes: 41 additions & 0 deletions src/node_quic_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,40 @@ enum QuicSocketOptions : uint32_t {
QUICSOCKET_OPTIONS_VALIDATE_ADDRESS_LRU = 0x2,
};

class QuicSocket;

// This is the generic interface for objects that control QuicSocket
// instances. The default `JSQuicSocketListener` emits events to
// JavaScript
class QuicSocketListener {
public:
virtual ~QuicSocketListener();

virtual void OnError(ssize_t code);
virtual void OnError(int code);
virtual void OnSessionReady(BaseObjectPtr<QuicSession> session);
virtual void OnServerBusy(bool busy);
virtual void OnDone();
virtual void OnDestroy();

QuicSocket* Socket() { return socket_; }

private:
QuicSocket* socket_ = nullptr;
QuicSocketListener* previous_listener_ = nullptr;
friend class QuicSocket;
};

class JSQuicSocketListener : public QuicSocketListener {
public:
void OnError(ssize_t code) override;
void OnError(int code) override;
void OnSessionReady(BaseObjectPtr<QuicSession> session) override;
void OnServerBusy(bool busy) override;
void OnDone() override;
void OnDestroy() override;
};

class QuicSocket : public AsyncWrap,
public UDPListener,
public mem::NgLibMemoryManager<QuicSocket, ngtcp2_mem> {
Expand Down Expand Up @@ -126,6 +160,9 @@ class QuicSocket : public AsyncWrap,
const QuicCID& scid,
const sockaddr* addr);

void PushListener(QuicSocketListener* listener);
void RemoveListener(QuicSocketListener* listener);

private:
static void OnAlloc(
uv_handle_t* handle,
Expand Down Expand Up @@ -237,6 +274,8 @@ class QuicSocket : public AsyncWrap,
double rx_loss_ = 0.0;
double tx_loss_ = 0.0;

QuicSocketListener* listener_;
JSQuicSocketListener default_listener_;
SocketAddress local_address_;
QuicSessionConfig server_session_config_;
QlogMode qlog_ = QlogMode::kDisabled;
Expand Down Expand Up @@ -346,6 +385,8 @@ class QuicSocket : public AsyncWrap,
int Send(const sockaddr* addr,
MallocedBuffer<char>&& data,
const char* diagnostic_label = "unspecified");

friend class QuicSocketListener;
};

} // namespace quic
Expand Down

0 comments on commit 081a37f

Please sign in to comment.