From 081a37f3a16db2e6c4dbac8852dd8cb25a81f3cb Mon Sep 17 00:00:00 2001 From: James M Snell Date: Mon, 2 Dec 2019 16:20:14 -0800 Subject: [PATCH] quic: add QuicSocketListener Fixes: https://github.com/nodejs/quic/issues/208 PR-URL: https://github.com/nodejs/quic/pull/207 Reviewed-By: https://github.com/nodejs/quic/pull/207 --- src/node_quic_socket.cc | 147 +++++++++++++++++++++++++++++++++------- src/node_quic_socket.h | 41 +++++++++++ 2 files changed, 163 insertions(+), 25 deletions(-) diff --git a/src/node_quic_socket.cc b/src/node_quic_socket.cc index d4677fd9ef..10d3275ee0 100644 --- a/src/node_quic_socket.cc +++ b/src/node_quic_socket.cc @@ -66,6 +66,83 @@ inline uint32_t GenerateReservedVersion( } } // namespace +QuicSocketListener::~QuicSocketListener() { + if (socket_ != nullptr) + socket_->RemoveListener(this); +} + +void QuicSocketListener::OnError(ssize_t code) { + if (previous_listener_ != nullptr) + previous_listener_->OnError(code); +} + +void QuicSocketListener::OnError(int code) { + if (previous_listener_ != nullptr) + previous_listener_->OnError(code); +} + +void QuicSocketListener::OnSessionReady(BaseObjectPtr session) { + if (previous_listener_ != nullptr) + previous_listener_->OnSessionReady(session); +} + +void QuicSocketListener::OnServerBusy(bool busy) { + if (previous_listener_ != nullptr) + previous_listener_->OnServerBusy(busy); +} + +void QuicSocketListener::OnDone() { + if (previous_listener_ != nullptr) + previous_listener_->OnDone(); +} + +void QuicSocketListener::OnDestroy() { + if (previous_listener_ != nullptr) + previous_listener_->OnDestroy(); +} + +void JSQuicSocketListener::OnError(ssize_t code) { + Environment* env = Socket()->env(); + HandleScope scope(env->isolate()); + Context::Scope context_scope(env->context()); + Local arg = Number::New(env->isolate(), static_cast(code)); + Socket()->MakeCallback(env->quic_on_socket_error_function(), 1, &arg); +} + +void JSQuicSocketListener::OnError(int code) { + Environment* env = Socket()->env(); + HandleScope scope(env->isolate()); + Context::Scope context_scope(env->context()); + Local arg = Integer::New(env->isolate(), code); + Socket()->MakeCallback(env->quic_on_socket_error_function(), 1, &arg); +} + +void JSQuicSocketListener::OnSessionReady(BaseObjectPtr session) { + Environment* env = Socket()->env(); + Local arg = session->object(); + Context::Scope context_scope(env->context()); + Socket()->MakeCallback(env->quic_on_session_ready_function(), 1, &arg); +} + +void JSQuicSocketListener::OnServerBusy(bool busy) { + Environment* env = Socket()->env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + Local arg = Boolean::New(env->isolate(), busy); + Socket()->MakeCallback(env->quic_on_socket_server_busy_function(), 1, &arg); +} + +void JSQuicSocketListener::OnDone() { + Environment* env = Socket()->env(); + HandleScope scope(env->isolate()); + Context::Scope context_scope(env->context()); + Socket()->MakeCallback(env->ondone_string(), 0, nullptr); +} + +void JSQuicSocketListener::OnDestroy() { + // Do nothing here. +} + QuicSocket::QuicSocket( Environment* env, Local wrap, @@ -86,6 +163,7 @@ QuicSocket::QuicSocket( sizeof(socket_stats_) / sizeof(uint64_t), reinterpret_cast(&socket_stats_)) { MakeWeak(); + PushListener(&default_listener_); udp_ = static_cast( udp_base_wrap->GetAlignedPointerFromInternalField( @@ -130,6 +208,11 @@ QuicSocket::~QuicSocket() { socket_stats_.packets_ignored, socket_stats_.server_sessions, socket_stats_.client_sessions); + QuicSocketListener* listener = listener_; + listener_->OnDestroy(); + // Remove the listener if it didn't remove itself already. + if (listener == listener_) + RemoveListener(listener_); } void QuicSocket::MemoryInfo(MemoryTracker* tracker) const { @@ -204,7 +287,7 @@ void QuicSocket::StopListening() { void QuicSocket::WaitForPendingCallbacks() { if (!HasPendingCallbacks()) { Debug(this, "No pending callbacks, calling ondone immediately"); - MakeCallback(env()->ondone_string(), 0, nullptr); + listener_->OnDone(); return; } SetFlag(QUICSOCKET_FLAGS_WAITING_FOR_CALLBACKS); @@ -227,11 +310,7 @@ void QuicSocket::OnRecv( if (nread < 0) { Debug(this, "Reading data from UDP socket failed. Error %d", nread); - Environment* env = this->env(); - HandleScope scope(env->isolate()); - Context::Scope context_scope(env->context()); - Local arg = Number::New(env->isolate(), static_cast(nread)); - MakeCallback(env->quic_on_socket_error_function(), 1, &arg); + listener_->OnError(nread); return; } @@ -376,11 +455,7 @@ void QuicSocket::RemoveSession(const QuicCID& cid, const sockaddr* addr) { } void QuicSocket::ReportSendError(int error) { - HandleScope scope(env()->isolate()); - Context::Scope context_scope(env()->context()); - Local arg = Integer::New(env()->isolate(), error); - MakeCallback(env()->quic_on_socket_error_function(), 1, &arg); - return; + listener_->OnError(error); } void QuicSocket::SendInitialConnectionClose( @@ -637,13 +712,8 @@ BaseObjectPtr QuicSocket::AcceptInitialPacket( server_options_, initial_connection_close, qlog_); - Local arg = session->object(); - MakeCallback(env()->quic_on_session_ready_function(), 1, &arg); - // The above MakeCallback will notify the JavaScript side that a new - // server QuicSession has been created in an event emitted on nextTick. - // The user may destroy() the server QuicSession in that event but that - // won't impact the code here. + listener_->OnSessionReady(session); return session; } @@ -672,11 +742,7 @@ size_t QuicSocket::GetCurrentSocketAddressCounter(const sockaddr* addr) { void QuicSocket::SetServerBusy(bool on) { Debug(this, "Turning Server Busy Response %s", on ? "on" : "off"); SetFlag(QUICSOCKET_FLAGS_SERVER_BUSY, on); - - HandleScope handle_scope(env()->isolate()); - Context::Scope context_scope(env()->context()); - Local arg = Boolean::New(env()->isolate(), on); - MakeCallback(env()->quic_on_socket_server_busy_function(), 1, &arg); + listener_->OnServerBusy(on); } QuicSocket::SendWrap::SendWrap( @@ -830,9 +896,7 @@ void QuicSocket::OnSend( if (!HasPendingCallbacks() && IsFlagSet(QUICSOCKET_FLAGS_WAITING_FOR_CALLBACKS)) { - HandleScope handle_scope(env()->isolate()); - Context::Scope context_scope(env()->context()); - MakeCallback(env()->ondone_string(), 0, nullptr); + listener_->OnDone(); } } @@ -869,6 +933,39 @@ void QuicSocket::DecreaseAllocatedSize(size_t size) { current_ngtcp2_memory_ -= size; } +void QuicSocket::PushListener(QuicSocketListener* listener) { + CHECK_NOT_NULL(listener); + CHECK_NULL(listener->socket_); + + listener->previous_listener_ = listener_; + listener->socket_ = this; + + listener_ = listener; +} + +void QuicSocket::RemoveListener(QuicSocketListener* listener) { + CHECK_NOT_NULL(listener); + + QuicSocketListener* previous; + QuicSocketListener* current; + + for (current = listener_, previous = nullptr; + /* No loop condition because we want a crash if listener is not found */ + ; previous = current, current = current->previous_listener_) { + CHECK_NOT_NULL(current); + if (current == listener) { + if (previous != nullptr) + previous->previous_listener_ = current->previous_listener_; + else + listener_ = listener->previous_listener_; + break; + } + } + + listener->socket_ = nullptr; + listener->previous_listener_ = nullptr; +} + // JavaScript API namespace { void NewQuicSocket(const FunctionCallbackInfo& args) { diff --git a/src/node_quic_socket.h b/src/node_quic_socket.h index 3fd92cec2b..ad40174c3c 100644 --- a/src/node_quic_socket.h +++ b/src/node_quic_socket.h @@ -44,6 +44,40 @@ enum QuicSocketOptions : uint32_t { QUICSOCKET_OPTIONS_VALIDATE_ADDRESS_LRU = 0x2, }; +class QuicSocket; + +// This is the generic interface for objects that control QuicSocket +// instances. The default `JSQuicSocketListener` emits events to +// JavaScript +class QuicSocketListener { + public: + virtual ~QuicSocketListener(); + + virtual void OnError(ssize_t code); + virtual void OnError(int code); + virtual void OnSessionReady(BaseObjectPtr session); + virtual void OnServerBusy(bool busy); + virtual void OnDone(); + virtual void OnDestroy(); + + QuicSocket* Socket() { return socket_; } + + private: + QuicSocket* socket_ = nullptr; + QuicSocketListener* previous_listener_ = nullptr; + friend class QuicSocket; +}; + +class JSQuicSocketListener : public QuicSocketListener { + public: + void OnError(ssize_t code) override; + void OnError(int code) override; + void OnSessionReady(BaseObjectPtr session) override; + void OnServerBusy(bool busy) override; + void OnDone() override; + void OnDestroy() override; +}; + class QuicSocket : public AsyncWrap, public UDPListener, public mem::NgLibMemoryManager { @@ -126,6 +160,9 @@ class QuicSocket : public AsyncWrap, const QuicCID& scid, const sockaddr* addr); + void PushListener(QuicSocketListener* listener); + void RemoveListener(QuicSocketListener* listener); + private: static void OnAlloc( uv_handle_t* handle, @@ -237,6 +274,8 @@ class QuicSocket : public AsyncWrap, double rx_loss_ = 0.0; double tx_loss_ = 0.0; + QuicSocketListener* listener_; + JSQuicSocketListener default_listener_; SocketAddress local_address_; QuicSessionConfig server_session_config_; QlogMode qlog_ = QlogMode::kDisabled; @@ -346,6 +385,8 @@ class QuicSocket : public AsyncWrap, int Send(const sockaddr* addr, MallocedBuffer&& data, const char* diagnostic_label = "unspecified"); + + friend class QuicSocketListener; }; } // namespace quic