From 7cfa0c4ebbac762b71a5861db6ae3684a7b7cc02 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Sat, 27 May 2023 07:09:57 -0700 Subject: [PATCH] quic: address feedback and fix bugs --- src/quic/application.cc | 3 +++ src/quic/defs.h | 4 +-- src/quic/packet.cc | 60 +++++++++++++++++++++++++---------------- src/quic/session.cc | 18 +++++++++++++ 4 files changed, 60 insertions(+), 25 deletions(-) diff --git a/src/quic/application.cc b/src/quic/application.cc index b949a7fa99cf12..ef40161bb7c54c 100644 --- a/src/quic/application.cc +++ b/src/quic/application.cc @@ -1,3 +1,4 @@ +#include "uv.h" #if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC #include "application.h" @@ -243,6 +244,7 @@ void Session::Application::SendPendingData() { } } + packet->Done(UV_ECANCELED); session_->last_error_ = QuicError::ForNgtcp2Error(nwrite); return session_->Close(Session::CloseMethod::SILENT); } @@ -252,6 +254,7 @@ void Session::Application::SendPendingData() { // Since we are closing the session here, we don't worry about updating // the pkt tx time. The failed StreamCommit should have updated the // last_error_ appropriately. + packet->Done(UV_ECANCELED); return session_->Close(Session::CloseMethod::SILENT); } diff --git a/src/quic/defs.h b/src/quic/defs.h index f95aeb34e7a311..65ebe812efa1b7 100644 --- a/src/quic/defs.h +++ b/src/quic/defs.h @@ -9,9 +9,9 @@ namespace node { namespace quic { -#define NGTCP2_ERR(V) (V != 0) -#define NGTCP2_OK(V) (V == 0) #define NGTCP2_SUCCESS 0 +#define NGTCP2_ERR(V) (V != NGTCP2_SUCCESS) +#define NGTCP2_OK(V) (V == NGTCP2_SUCCESS) template bool SetOption(Environment* env, diff --git a/src/quic/packet.cc b/src/quic/packet.cc index 2277110aff56b8..a25bd9e78180bd 100644 --- a/src/quic/packet.cc +++ b/src/quic/packet.cc @@ -199,11 +199,12 @@ int Packet::Send(uv_udp_t* handle, BaseObjectPtr ref) { } void Packet::Done(int status) { - DCHECK_NOT_NULL(listener_); - listener_->PacketDone(status); + if (listener_ != nullptr) { + listener_->PacketDone(status); + } + listener_ = nullptr; handle_.reset(); data_.reset(); - listener_ = nullptr; Reset(); // As a performance optimization, we add this packet to a freelist @@ -261,7 +262,10 @@ BaseObjectPtr Packet::CreateRetryPacket( path_descriptor.dcid, vec.base, vec.len); - if (nwrite <= 0) return BaseObjectPtr(); + if (nwrite <= 0) { + packet->Done(UV_ECANCELED); + return BaseObjectPtr(); + } packet->Truncate(static_cast(nwrite)); return packet; } @@ -272,13 +276,16 @@ BaseObjectPtr Packet::CreateConnectionClosePacket( const SocketAddress& destination, ngtcp2_conn* conn, const QuicError& error) { - auto packet = Packet::Create( + auto packet = Create( env, listener, destination, kDefaultMaxPacketLength, "connection close"); ngtcp2_vec vec = *packet; ssize_t nwrite = ngtcp2_conn_write_connection_close( conn, nullptr, nullptr, vec.base, vec.len, error, uv_hrtime()); - if (nwrite < 0) return BaseObjectPtr(); + if (nwrite < 0) { + packet->Done(UV_ECANCELED); + return BaseObjectPtr(); + } packet->Truncate(static_cast(nwrite)); return packet; } @@ -288,11 +295,11 @@ BaseObjectPtr Packet::CreateImmediateConnectionClosePacket( Listener* listener, const PathDescriptor& path_descriptor, const QuicError& reason) { - auto packet = Packet::Create(env, - listener, - path_descriptor.remote_address, - kDefaultMaxPacketLength, - "immediate connection close (endpoint)"); + auto packet = Create(env, + listener, + path_descriptor.remote_address, + kDefaultMaxPacketLength, + "immediate connection close (endpoint)"); ngtcp2_vec vec = *packet; ssize_t nwrite = ngtcp2_crypto_write_connection_close( vec.base, @@ -305,7 +312,10 @@ BaseObjectPtr Packet::CreateImmediateConnectionClosePacket( // there is one in the QuicError nullptr, 0); - if (nwrite <= 0) return BaseObjectPtr(); + if (nwrite <= 0) { + packet->Done(UV_ECANCELED); + return BaseObjectPtr(); + } packet->Truncate(static_cast(nwrite)); return packet; } @@ -329,16 +339,17 @@ BaseObjectPtr Packet::CreateStatelessResetPacket( uint8_t random[kRandlen]; CHECK(crypto::CSPRNG(random, kRandlen).is_ok()); - auto packet = Packet::Create(env, - listener, - path_descriptor.remote_address, - kDefaultMaxPacketLength, - "stateless reset"); + auto packet = Create(env, + listener, + path_descriptor.remote_address, + kDefaultMaxPacketLength, + "stateless reset"); ngtcp2_vec vec = *packet; ssize_t nwrite = ngtcp2_pkt_write_stateless_reset( vec.base, pktlen, token, random, kRandlen); if (nwrite <= static_cast(kMinStatelessResetLen)) { + packet->Done(UV_ECANCELED); return BaseObjectPtr(); } @@ -377,11 +388,11 @@ BaseObjectPtr Packet::CreateVersionNegotiationPacket( size_t pktlen = path_descriptor.dcid.length() + path_descriptor.scid.length() + (sizeof(sv)) + 7; - auto packet = Packet::Create(env, - listener, - path_descriptor.remote_address, - kDefaultMaxPacketLength, - "version negotiation"); + auto packet = Create(env, + listener, + path_descriptor.remote_address, + kDefaultMaxPacketLength, + "version negotiation"); ngtcp2_vec vec = *packet; ssize_t nwrite = @@ -394,7 +405,10 @@ BaseObjectPtr Packet::CreateVersionNegotiationPacket( path_descriptor.scid.length(), sv, arraysize(sv)); - if (nwrite <= 0) return BaseObjectPtr(); + if (nwrite <= 0) { + packet->Done(UV_ECANCELED); + return BaseObjectPtr(); + } packet->Truncate(static_cast(nwrite)); return packet; } diff --git a/src/quic/session.cc b/src/quic/session.cc index 4754d4cc9960eb..cec2285cf522b0 100644 --- a/src/quic/session.cc +++ b/src/quic/session.cc @@ -467,6 +467,9 @@ Session::Session(BaseObjectPtr endpoint, } Session::~Session() { + if (conn_closebuf_) { + conn_closebuf_->Done(0); + } if (qlog_stream_) { env()->SetImmediate( [ptr = std::move(qlog_stream_)](Environment*) { ptr->End(); }); @@ -721,13 +724,20 @@ bool Session::Receive(Store&& store, } void Session::Send(BaseObjectPtr packet) { + // Sending a Packet is generally best effort. If we're not in a state + // where we can send a packet, it's ok to drop it on the floor. The + // packet loss mechanisms will cause the packet data to be resent later + // if appropriate (and possible). DCHECK(!is_destroyed()); DCHECK(!is_in_draining_period()); if (can_send_packets() && packet->length() > 0) { STAT_INCREMENT_N(Stats, bytes_sent, packet->length()); endpoint_->Send(std::move(packet)); + return; } + + packet->Done(packet->length() > 0 ? UV_ECANCELED : 0); } void Session::Send(BaseObjectPtr packet, const PathStorage& path) { @@ -783,12 +793,14 @@ uint64_t Session::SendDatagram(Store&& data) { uv_hrtime()); if (nwrite < 0) { + // Nothing was written to the packet. switch (nwrite) { case 0: { // We cannot send data because of congestion control or the data will // not fit. Since datagrams are best effort, we are going to abandon // the attempt and just return. CHECK_EQ(accepted, 0); + packet->Done(UV_ECANCELED); return 0; } case NGTCP2_ERR_WRITE_MORE: { @@ -798,20 +810,25 @@ uint64_t Session::SendDatagram(Store&& data) { case NGTCP2_ERR_INVALID_STATE: { // The remote endpoint does not want to accept datagrams. That's ok, // just return 0. + packet->Done(UV_ECANCELED); return 0; } case NGTCP2_ERR_INVALID_ARGUMENT: { // The datagram is too large. That should have been caught above but // that's ok. We'll just abandon the attempt and return. + packet->Done(UV_ECANCELED); return 0; } } + packet->Done(UV_ECANCELED); last_error_ = QuicError::ForNgtcp2Error(nwrite); Close(CloseMethod::SILENT); return 0; } // In this case, a complete packet was written and we need to send it along. + // Note that this doesn't mean that the packet actually contains the datagram! + // We'll check that next by checking the accepted value. packet->Truncate(nwrite); Send(std::move(packet)); ngtcp2_conn_update_pkt_tx_time(*this, uv_hrtime()); @@ -1137,6 +1154,7 @@ void Session::SendConnectionClose() { *this, &path, nullptr, vec.base, vec.len, last_error_, uv_hrtime()); if (UNLIKELY(nwrite < 0)) { + packet->Done(UV_ECANCELED); last_error_ = QuicError::ForNgtcp2Error(NGTCP2_INTERNAL_ERROR); Close(CloseMethod::SILENT); } else {