diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 3c029dbaf55f80..5568aa1df4d423 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -154,6 +154,7 @@ const STREAM_FLAGS_CLOSED = 0x2; const STREAM_FLAGS_HEADERS_SENT = 0x4; const STREAM_FLAGS_HEAD_REQUEST = 0x8; const STREAM_FLAGS_ABORTED = 0x10; +const STREAM_FLAGS_HAS_TRAILERS = 0x20; const SESSION_FLAGS_PENDING = 0x0; const SESSION_FLAGS_READY = 0x1; @@ -278,27 +279,14 @@ function onStreamClose(code) { if (stream.destroyed) return; - const state = stream[kState]; - debug(`Http2Stream ${stream[kID]} [Http2Session ` + `${sessionName(stream[kSession][kType])}]: closed with code ${code}`); - if (!stream.closed) { - // Unenroll from timeouts - unenroll(stream); - stream.removeAllListeners('timeout'); - - // Set the state flags - state.flags |= STREAM_FLAGS_CLOSED; - state.rstCode = code; - - // Close the writable side of the stream - abort(stream); - stream.end(); - } + if (!stream.closed) + closeStream(stream, code, false); - if (state.fd !== undefined) - tryClose(state.fd); + if (this[kState].fd !== undefined) + tryClose(this[kState].fd); // Defer destroy we actually emit end. if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) { @@ -454,7 +442,7 @@ function requestOnConnect(headers, options) { // At this point, the stream should have already been destroyed during // the session.destroy() method. Do nothing else. - if (session.destroyed) + if (session === undefined || session.destroyed) return; // If the session was closed while waiting for the connect, destroy @@ -1369,6 +1357,9 @@ class ClientHttp2Session extends Http2Session { if (options.endStream) stream.end(); + if (options.waitForTrailers) + stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS; + const onConnect = requestOnConnect.bind(stream, headersList, options); if (this.connecting) { this.on('connect', onConnect); @@ -1425,8 +1416,11 @@ function afterDoStreamWrite(status, handle, req) { } function streamOnResume() { - if (!this.destroyed && !this.pending) + if (!this.destroyed && !this.pending) { + if (!this[kState].didRead) + this[kState].didRead = true; this[kHandle].readStart(); + } } function streamOnPause() { @@ -1434,16 +1428,6 @@ function streamOnPause() { this[kHandle].readStop(); } -// If the writable side of the Http2Stream is still open, emit the -// 'aborted' event and set the aborted flag. -function abort(stream) { - if (!stream.aborted && - !(stream._writableState.ended || stream._writableState.ending)) { - stream[kState].flags |= STREAM_FLAGS_ABORTED; - stream.emit('aborted'); - } -} - function afterShutdown() { this.callback(); const stream = this.handle[kOwner]; @@ -1451,6 +1435,51 @@ function afterShutdown() { stream[kMaybeDestroy](); } +function closeStream(stream, code, shouldSubmitRstStream = true) { + const state = stream[kState]; + state.flags |= STREAM_FLAGS_CLOSED; + state.rstCode = code; + + // Clear timeout and remove timeout listeners + stream.setTimeout(0); + stream.removeAllListeners('timeout'); + + const { ending, finished } = stream._writableState; + + if (!ending) { + // If the writable side of the Http2Stream is still open, emit the + // 'aborted' event and set the aborted flag. + if (!stream.aborted) { + state.flags |= STREAM_FLAGS_ABORTED; + stream.emit('aborted'); + } + + // Close the writable side. + stream.end(); + } + + if (shouldSubmitRstStream) { + const finishFn = finishCloseStream.bind(stream, code); + if (!ending || finished || code !== NGHTTP2_NO_ERROR) + finishFn(); + else + stream.once('finish', finishFn); + } +} + +function finishCloseStream(code) { + const rstStreamFn = submitRstStream.bind(this, code); + // If the handle has not yet been assigned, queue up the request to + // ensure that the RST_STREAM frame is sent after the stream ID has + // been determined. + if (this.pending) { + this.push(null); + this.once('ready', rstStreamFn); + return; + } + rstStreamFn(); +} + // An Http2Stream is a Duplex stream that is backed by a // node::http2::Http2Stream handle implementing StreamBase. class Http2Stream extends Duplex { @@ -1468,6 +1497,7 @@ class Http2Stream extends Duplex { session[kState].pendingStreams.add(this); this[kState] = { + didRead: false, flags: STREAM_FLAGS_PENDING, rstCode: NGHTTP2_NO_ERROR, writeQueueSize: 0, @@ -1749,6 +1779,8 @@ class Http2Stream extends Duplex { throw headersList; this[kSentTrailers] = headers; + this[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS; + const ret = this[kHandle].trailers(headersList); if (ret < 0) this.destroy(new NghttpError(ret)); @@ -1779,38 +1811,13 @@ class Http2Stream extends Duplex { if (callback !== undefined && typeof callback !== 'function') throw new errors.TypeError('ERR_INVALID_CALLBACK'); - // Unenroll the timeout. - unenroll(this); - this.removeAllListeners('timeout'); - - // Close the writable - abort(this); - this.end(); - if (this.closed) return; - const state = this[kState]; - state.flags |= STREAM_FLAGS_CLOSED; - state.rstCode = code; - - if (callback !== undefined) { + if (callback !== undefined) this.once('close', callback); - } - if (this[kHandle] === undefined) - return; - - const rstStreamFn = submitRstStream.bind(this, code); - // If the handle has not yet been assigned, queue up the request to - // ensure that the RST_STREAM frame is sent after the stream ID has - // been determined. - if (this.pending) { - this.push(null); - this.once('ready', rstStreamFn); - return; - } - rstStreamFn(); + closeStream(this, code); } // Called by this.destroy(). @@ -1825,24 +1832,19 @@ class Http2Stream extends Duplex { debug(`Http2Stream ${this[kID] || ''} [Http2Session ` + `${sessionName(session[kType])}]: destroying stream`); const state = this[kState]; - const code = state.rstCode = - err != null ? - NGHTTP2_INTERNAL_ERROR : - state.rstCode || NGHTTP2_NO_ERROR; - if (handle !== undefined) { - // If the handle exists, we need to close, then destroy the handle - this.close(code); - if (!this._readableState.ended && !this._readableState.ending) - this.push(null); + const code = err != null ? + NGHTTP2_INTERNAL_ERROR : (state.rstCode || NGHTTP2_NO_ERROR); + + const hasHandle = handle !== undefined; + + if (!this.closed) + closeStream(this, code, hasHandle); + this.push(null); + + if (hasHandle) { handle.destroy(); session[kState].streams.delete(id); } else { - unenroll(this); - this.removeAllListeners('timeout'); - state.flags |= STREAM_FLAGS_CLOSED; - abort(this); - this.end(); - this.push(null); session[kState].pendingStreams.delete(this); } @@ -1878,13 +1880,23 @@ class Http2Stream extends Duplex { } // TODO(mcollina): remove usage of _*State properties - if (this._readableState.ended && - this._writableState.ended && - this._writableState.pendingcb === 0 && - this.closed) { - this.destroy(); - // This should return, but eslint complains. - // return + if (this._writableState.ended && this._writableState.pendingcb === 0) { + if (this._readableState.ended && this.closed) { + this.destroy(); + return; + } + + // We've submitted a response from our server session, have not attempted + // to process any incoming data, and have no trailers. This means we can + // attempt to gracefully close the session. + const state = this[kState]; + if (this.headersSent && + this[kSession][kType] === NGHTTP2_SESSION_SERVER && + !(state.flags & STREAM_FLAGS_HAS_TRAILERS) && + !state.didRead && + !this._readableState.resumeScheduled) { + this.close(); + } } } } @@ -2045,7 +2057,6 @@ function afterOpen(session, options, headers, streamOptions, err, fd) { } if (this.destroyed || this.closed) { tryClose(fd); - abort(this); return; } state.fd = fd; @@ -2181,8 +2192,10 @@ class ServerHttp2Stream extends Http2Stream { if (options.endStream) streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD; - if (options.waitForTrailers) + if (options.waitForTrailers) { streamOptions |= STREAM_OPTION_GET_TRAILERS; + state.flags |= STREAM_FLAGS_HAS_TRAILERS; + } headers = processHeaders(headers); const statusCode = headers[HTTP2_HEADER_STATUS] |= 0; @@ -2248,8 +2261,10 @@ class ServerHttp2Stream extends Http2Stream { } let streamOptions = 0; - if (options.waitForTrailers) + if (options.waitForTrailers) { streamOptions |= STREAM_OPTION_GET_TRAILERS; + this[kState].flags |= STREAM_FLAGS_HAS_TRAILERS; + } if (typeof fd !== 'number') throw new errors.TypeError('ERR_INVALID_ARG_TYPE', @@ -2315,8 +2330,10 @@ class ServerHttp2Stream extends Http2Stream { } let streamOptions = 0; - if (options.waitForTrailers) + if (options.waitForTrailers) { streamOptions |= STREAM_OPTION_GET_TRAILERS; + this[kState].flags |= STREAM_FLAGS_HAS_TRAILERS; + } const session = this[kSession]; debug(`Http2Stream ${this[kID]} [Http2Session ` + diff --git a/src/node_http2.cc b/src/node_http2.cc index 163ec8ab723c00..2744c8bb43df51 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1391,16 +1391,35 @@ void Http2Session::MaybeScheduleWrite() { // storage for data and metadata that was associated with these writes. void Http2Session::ClearOutgoing(int status) { CHECK_NE(flags_ & SESSION_STATE_SENDING, 0); - flags_ &= ~SESSION_STATE_SENDING; - for (const nghttp2_stream_write& wr : outgoing_buffers_) { - WriteWrap* wrap = wr.req_wrap; - if (wrap != nullptr) - wrap->Done(status); + if (outgoing_buffers_.size() > 0) { + outgoing_storage_.clear(); + + for (const nghttp2_stream_write& wr : outgoing_buffers_) { + WriteWrap* wrap = wr.req_wrap; + if (wrap != nullptr) + wrap->Done(status); + } + + outgoing_buffers_.clear(); } - outgoing_buffers_.clear(); - outgoing_storage_.clear(); + flags_ &= ~SESSION_STATE_SENDING; + + // Now that we've finished sending queued data, if there are any pending + // RstStreams we should try sending again and then flush them one by one. + if (pending_rst_streams_.size() > 0) { + std::vector current_pending_rst_streams; + pending_rst_streams_.swap(current_pending_rst_streams); + + SendPendingData(); + + for (int32_t stream_id : current_pending_rst_streams) { + Http2Stream* stream = FindStream(stream_id); + if (stream != nullptr) + stream->FlushRstStream(); + } + } } // Queue a given block of data for sending. This always creates a copy, @@ -1424,18 +1443,19 @@ void Http2Session::CopyDataIntoOutgoing(const uint8_t* src, size_t src_length) { // chunk out to the i/o socket to be sent. This is a particularly hot method // that will generally be called at least twice be event loop iteration. // This is a potential performance optimization target later. -void Http2Session::SendPendingData() { +// Returns non-zero value if a write is already in progress. +uint8_t Http2Session::SendPendingData() { DEBUG_HTTP2SESSION(this, "sending pending data"); // Do not attempt to send data on the socket if the destroying flag has // been set. That means everything is shutting down and the socket // will not be usable. if (IsDestroyed()) - return; + return 0; flags_ &= ~SESSION_STATE_WRITE_SCHEDULED; // SendPendingData should not be called recursively. if (flags_ & SESSION_STATE_SENDING) - return; + return 1; // This is cleared by ClearOutgoing(). flags_ |= SESSION_STATE_SENDING; @@ -1459,15 +1479,15 @@ void Http2Session::SendPendingData() { // does take care of things like closing the individual streams after // a socket has been torn down, so we still need to call it. ClearOutgoing(UV_ECANCELED); - return; + return 0; } // Part Two: Pass Data to the underlying stream size_t count = outgoing_buffers_.size(); if (count == 0) { - flags_ &= ~SESSION_STATE_SENDING; - return; + ClearOutgoing(0); + return 0; } MaybeStackBuffer bufs; bufs.AllocateSufficientStorage(count); @@ -1497,7 +1517,7 @@ void Http2Session::SendPendingData() { if (stream_->DoTryWrite(&writebufs, &count) != 0 || count == 0) { // All writes finished synchronously, nothing more to do here. ClearOutgoing(0); - return; + return 0; } WriteWrap* req = AllocateSend(); @@ -1507,6 +1527,8 @@ void Http2Session::SendPendingData() { DEBUG_HTTP2SESSION2(this, "wants data in return? %d", nghttp2_session_want_read(session_)); + + return 0; } @@ -1939,12 +1961,25 @@ inline int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec, // peer. inline void Http2Stream::SubmitRstStream(const uint32_t code) { CHECK(!this->IsDestroyed()); + code_ = code; + // If possible, force a purge of any currently pending data here to make sure + // it is sent before closing the stream. If it returns non-zero then we need + // to wait until the current write finishes and try again to avoid nghttp2 + // behaviour where it prioritizes RstStream over everything else. + if (session_->SendPendingData() != 0) { + session_->AddPendingRstStream(id_); + return; + } + + FlushRstStream(); +} + +void Http2Stream::FlushRstStream() { + if (IsDestroyed()) + return; Http2Scope h2scope(this); - // Force a purge of any currently pending data here to make sure - // it is sent before closing the stream. - session_->SendPendingData(); CHECK_EQ(nghttp2_submit_rst_stream(**session_, NGHTTP2_FLAG_NONE, - id_, code), 0); + id_, code_), 0); } diff --git a/src/node_http2.h b/src/node_http2.h index 1ea7142774d000..972c08a2cd98ba 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -592,6 +592,8 @@ class Http2Stream : public AsyncWrap, // Submits an RST_STREAM frame using the given code inline void SubmitRstStream(const uint32_t code); + void FlushRstStream(); + // Submits a PUSH_PROMISE frame with this stream as the parent. inline Http2Stream* SubmitPushPromise( nghttp2_nv* nva, @@ -815,7 +817,7 @@ class Http2Session : public AsyncWrap { bool Ping(v8::Local function); - inline void SendPendingData(); + inline uint8_t SendPendingData(); // Submits a new request. If the request is a success, assigned // will be a pointer to the Http2Stream instance assigned. @@ -869,6 +871,11 @@ class Http2Session : public AsyncWrap { return stream_buf_; } + // Schedule an RstStream for after the current write finishes. + inline void AddPendingRstStream(int32_t stream_id) { + pending_rst_streams_.emplace_back(stream_id); + } + static void OnStreamAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx); @@ -1085,6 +1092,7 @@ class Http2Session : public AsyncWrap { std::vector outgoing_buffers_; std::vector outgoing_storage_; + std::vector pending_rst_streams_; void CopyDataIntoOutgoing(const uint8_t* src, size_t src_length); void ClearOutgoing(int status); diff --git a/test/fixtures/person-large.jpg b/test/fixtures/person-large.jpg new file mode 100644 index 00000000000000..3d0d0af42375c3 Binary files /dev/null and b/test/fixtures/person-large.jpg differ diff --git a/test/parallel/test-http2-client-destroy.js b/test/parallel/test-http2-client-destroy.js index eab413e2327d8f..6238363511a791 100644 --- a/test/parallel/test-http2-client-destroy.js +++ b/test/parallel/test-http2-client-destroy.js @@ -125,3 +125,21 @@ const Countdown = require('../common/countdown'); req.on('error', () => {}); })); } + +// test destroy before connect +{ + const server = h2.createServer(); + server.on('stream', common.mustNotCall()); + + server.listen(0, common.mustCall(() => { + const client = h2.connect(`http://localhost:${server.address().port}`); + + server.on('connection', common.mustCall(() => { + server.close(); + client.close(); + })); + + const req = client.request(); + req.destroy(); + })); +} diff --git a/test/parallel/test-http2-client-rststream-before-connect.js b/test/parallel/test-http2-client-rststream-before-connect.js index 72374611b49349..59cc7f104c17fc 100644 --- a/test/parallel/test-http2-client-rststream-before-connect.js +++ b/test/parallel/test-http2-client-rststream-before-connect.js @@ -62,8 +62,14 @@ server.listen(0, common.mustCall(() => { message: 'Stream closed with error code NGHTTP2_PROTOCOL_ERROR' })); - req.on('response', common.mustCall()); - req.resume(); + // The `response` event should not fire as the server should receive the + // RST_STREAM frame before it ever has a chance to reply. + req.on('response', common.mustNotCall()); + + // The `end` event should still fire as we close the readable stream by + // pushing a `null` chunk. req.on('end', common.mustCall()); + + req.resume(); req.end(); })); diff --git a/test/parallel/test-http2-client-upload-reject.js b/test/parallel/test-http2-client-upload-reject.js new file mode 100644 index 00000000000000..ece7cbdf233f1f --- /dev/null +++ b/test/parallel/test-http2-client-upload-reject.js @@ -0,0 +1,48 @@ +'use strict'; + +// Verifies that uploading data from a client works + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const http2 = require('http2'); +const fs = require('fs'); +const fixtures = require('../common/fixtures'); + +const loc = fixtures.path('person-large.jpg'); + +assert(fs.existsSync(loc)); + +fs.readFile(loc, common.mustCall((err, data) => { + assert.ifError(err); + + const server = http2.createServer(); + + server.on('stream', common.mustCall((stream) => { + stream.on('close', common.mustCall(() => { + assert.strictEqual(stream.rstCode, 0); + })); + + stream.respond({ ':status': 400 }); + stream.end(); + })); + + server.listen(0, common.mustCall(() => { + const client = http2.connect(`http://localhost:${server.address().port}`); + + const req = client.request({ ':method': 'POST' }); + req.on('response', common.mustCall((headers) => { + assert.strictEqual(headers[':status'], 400); + })); + + req.resume(); + req.on('end', common.mustCall(() => { + server.close(); + client.close(); + })); + + const str = fs.createReadStream(loc); + str.pipe(req); + })); +})); diff --git a/test/parallel/test-http2-client-upload.js b/test/parallel/test-http2-client-upload.js index 70a8ff3ced01c6..78c6d47cbb4f44 100644 --- a/test/parallel/test-http2-client-upload.js +++ b/test/parallel/test-http2-client-upload.js @@ -11,7 +11,7 @@ const fs = require('fs'); const fixtures = require('../common/fixtures'); const Countdown = require('../common/countdown'); -const loc = fixtures.path('person.jpg'); +const loc = fixtures.path('person-large.jpg'); let fileData; assert(fs.existsSync(loc)); diff --git a/test/parallel/test-http2-large-write-close.js b/test/parallel/test-http2-large-write-close.js new file mode 100644 index 00000000000000..f9dee357d6da7b --- /dev/null +++ b/test/parallel/test-http2-large-write-close.js @@ -0,0 +1,44 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const fixtures = require('../common/fixtures'); +const http2 = require('http2'); + +const content = Buffer.alloc(1e5, 0x44); + +const server = http2.createSecureServer({ + key: fixtures.readKey('agent1-key.pem'), + cert: fixtures.readKey('agent1-cert.pem') +}); +server.on('stream', common.mustCall((stream) => { + stream.respond({ + 'Content-Type': 'application/octet-stream', + 'Content-Length': (content.length.toString() * 2), + 'Vary': 'Accept-Encoding' + }); + + stream.write(content); + stream.write(content); + stream.end(); + stream.close(); +})); + +server.listen(0, common.mustCall(() => { + const client = http2.connect(`https://localhost:${server.address().port}`, + { rejectUnauthorized: false }); + + const req = client.request({ ':path': '/' }); + req.end(); + + let receivedBufferLength = 0; + req.on('data', common.mustCallAtLeast((buf) => { + receivedBufferLength += buf.length; + }, 1)); + req.on('close', common.mustCall(() => { + assert.strictEqual(receivedBufferLength, content.length * 2); + client.close(); + server.close(); + })); +})); diff --git a/test/parallel/test-http2-perf_hooks.js b/test/parallel/test-http2-perf_hooks.js index e30d0ac83e0d1f..b06e6efa2b6727 100644 --- a/test/parallel/test-http2-perf_hooks.js +++ b/test/parallel/test-http2-perf_hooks.js @@ -26,7 +26,7 @@ const obs = new PerformanceObserver(common.mustCall((items) => { switch (entry.type) { case 'server': assert.strictEqual(entry.streamCount, 1); - assert.strictEqual(entry.framesReceived, 5); + assert(entry.framesReceived >= 3); break; case 'client': assert.strictEqual(entry.streamCount, 1);