Skip to content

Commit

Permalink
http2: fix responses to long payload reqs
Browse files Browse the repository at this point in the history
When a request with a long payload is received, http2 does
not allow a response that does not process all the incoming
payload. Add a conditional Http2Stream.close call that runs
only if the user hasn't attempted to read the stream.

PR-URL: nodejs#20084
Fixes: nodejs#20060
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
apapirovski authored and kjin committed Aug 23, 2018
1 parent 0d4ec20 commit eb18c26
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 105 deletions.
181 changes: 99 additions & 82 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ const STREAM_FLAGS_CLOSED = 0x2;
const STREAM_FLAGS_HEADERS_SENT = 0x4;
const STREAM_FLAGS_HEAD_REQUEST = 0x8;
const STREAM_FLAGS_ABORTED = 0x10;
const STREAM_FLAGS_HAS_TRAILERS = 0x20;

const SESSION_FLAGS_PENDING = 0x0;
const SESSION_FLAGS_READY = 0x1;
Expand Down Expand Up @@ -278,27 +279,14 @@ function onStreamClose(code) {
if (stream.destroyed)
return;

const state = stream[kState];

debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: closed with code ${code}`);

if (!stream.closed) {
// Unenroll from timeouts
unenroll(stream);
stream.removeAllListeners('timeout');

// Set the state flags
state.flags |= STREAM_FLAGS_CLOSED;
state.rstCode = code;

// Close the writable side of the stream
abort(stream);
stream.end();
}
if (!stream.closed)
closeStream(stream, code, false);

if (state.fd !== undefined)
tryClose(state.fd);
if (this[kState].fd !== undefined)
tryClose(this[kState].fd);

// Defer destroy we actually emit end.
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
Expand Down Expand Up @@ -454,7 +442,7 @@ function requestOnConnect(headers, options) {

// At this point, the stream should have already been destroyed during
// the session.destroy() method. Do nothing else.
if (session.destroyed)
if (session === undefined || session.destroyed)
return;

// If the session was closed while waiting for the connect, destroy
Expand Down Expand Up @@ -1369,6 +1357,9 @@ class ClientHttp2Session extends Http2Session {
if (options.endStream)
stream.end();

if (options.waitForTrailers)
stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;

const onConnect = requestOnConnect.bind(stream, headersList, options);
if (this.connecting) {
this.on('connect', onConnect);
Expand Down Expand Up @@ -1425,32 +1416,70 @@ function afterDoStreamWrite(status, handle, req) {
}

function streamOnResume() {
if (!this.destroyed && !this.pending)
if (!this.destroyed && !this.pending) {
if (!this[kState].didRead)
this[kState].didRead = true;
this[kHandle].readStart();
}
}

function streamOnPause() {
if (!this.destroyed && !this.pending)
this[kHandle].readStop();
}

// If the writable side of the Http2Stream is still open, emit the
// 'aborted' event and set the aborted flag.
function abort(stream) {
if (!stream.aborted &&
!(stream._writableState.ended || stream._writableState.ending)) {
stream[kState].flags |= STREAM_FLAGS_ABORTED;
stream.emit('aborted');
}
}

function afterShutdown() {
this.callback();
const stream = this.handle[kOwner];
if (stream)
stream[kMaybeDestroy]();
}

function closeStream(stream, code, shouldSubmitRstStream = true) {
const state = stream[kState];
state.flags |= STREAM_FLAGS_CLOSED;
state.rstCode = code;

// Clear timeout and remove timeout listeners
stream.setTimeout(0);
stream.removeAllListeners('timeout');

const { ending, finished } = stream._writableState;

if (!ending) {
// If the writable side of the Http2Stream is still open, emit the
// 'aborted' event and set the aborted flag.
if (!stream.aborted) {
state.flags |= STREAM_FLAGS_ABORTED;
stream.emit('aborted');
}

// Close the writable side.
stream.end();
}

if (shouldSubmitRstStream) {
const finishFn = finishCloseStream.bind(stream, code);
if (!ending || finished || code !== NGHTTP2_NO_ERROR)
finishFn();
else
stream.once('finish', finishFn);
}
}

function finishCloseStream(code) {
const rstStreamFn = submitRstStream.bind(this, code);
// If the handle has not yet been assigned, queue up the request to
// ensure that the RST_STREAM frame is sent after the stream ID has
// been determined.
if (this.pending) {
this.push(null);
this.once('ready', rstStreamFn);
return;
}
rstStreamFn();
}

// An Http2Stream is a Duplex stream that is backed by a
// node::http2::Http2Stream handle implementing StreamBase.
class Http2Stream extends Duplex {
Expand All @@ -1468,6 +1497,7 @@ class Http2Stream extends Duplex {
session[kState].pendingStreams.add(this);

this[kState] = {
didRead: false,
flags: STREAM_FLAGS_PENDING,
rstCode: NGHTTP2_NO_ERROR,
writeQueueSize: 0,
Expand Down Expand Up @@ -1749,6 +1779,8 @@ class Http2Stream extends Duplex {
throw headersList;
this[kSentTrailers] = headers;

this[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS;

const ret = this[kHandle].trailers(headersList);
if (ret < 0)
this.destroy(new NghttpError(ret));
Expand Down Expand Up @@ -1779,38 +1811,13 @@ class Http2Stream extends Duplex {
if (callback !== undefined && typeof callback !== 'function')
throw new errors.TypeError('ERR_INVALID_CALLBACK');

// Unenroll the timeout.
unenroll(this);
this.removeAllListeners('timeout');

// Close the writable
abort(this);
this.end();

if (this.closed)
return;

const state = this[kState];
state.flags |= STREAM_FLAGS_CLOSED;
state.rstCode = code;

if (callback !== undefined) {
if (callback !== undefined)
this.once('close', callback);
}

if (this[kHandle] === undefined)
return;

const rstStreamFn = submitRstStream.bind(this, code);
// If the handle has not yet been assigned, queue up the request to
// ensure that the RST_STREAM frame is sent after the stream ID has
// been determined.
if (this.pending) {
this.push(null);
this.once('ready', rstStreamFn);
return;
}
rstStreamFn();
closeStream(this, code);
}

// Called by this.destroy().
Expand All @@ -1825,24 +1832,19 @@ class Http2Stream extends Duplex {
debug(`Http2Stream ${this[kID] || '<pending>'} [Http2Session ` +
`${sessionName(session[kType])}]: destroying stream`);
const state = this[kState];
const code = state.rstCode =
err != null ?
NGHTTP2_INTERNAL_ERROR :
state.rstCode || NGHTTP2_NO_ERROR;
if (handle !== undefined) {
// If the handle exists, we need to close, then destroy the handle
this.close(code);
if (!this._readableState.ended && !this._readableState.ending)
this.push(null);
const code = err != null ?
NGHTTP2_INTERNAL_ERROR : (state.rstCode || NGHTTP2_NO_ERROR);

const hasHandle = handle !== undefined;

if (!this.closed)
closeStream(this, code, hasHandle);
this.push(null);

if (hasHandle) {
handle.destroy();
session[kState].streams.delete(id);
} else {
unenroll(this);
this.removeAllListeners('timeout');
state.flags |= STREAM_FLAGS_CLOSED;
abort(this);
this.end();
this.push(null);
session[kState].pendingStreams.delete(this);
}

Expand Down Expand Up @@ -1878,13 +1880,23 @@ class Http2Stream extends Duplex {
}

// TODO(mcollina): remove usage of _*State properties
if (this._readableState.ended &&
this._writableState.ended &&
this._writableState.pendingcb === 0 &&
this.closed) {
this.destroy();
// This should return, but eslint complains.
// return
if (this._writableState.ended && this._writableState.pendingcb === 0) {
if (this._readableState.ended && this.closed) {
this.destroy();
return;
}

// We've submitted a response from our server session, have not attempted
// to process any incoming data, and have no trailers. This means we can
// attempt to gracefully close the session.
const state = this[kState];
if (this.headersSent &&
this[kSession][kType] === NGHTTP2_SESSION_SERVER &&
!(state.flags & STREAM_FLAGS_HAS_TRAILERS) &&
!state.didRead &&
!this._readableState.resumeScheduled) {
this.close();
}
}
}
}
Expand Down Expand Up @@ -2045,7 +2057,6 @@ function afterOpen(session, options, headers, streamOptions, err, fd) {
}
if (this.destroyed || this.closed) {
tryClose(fd);
abort(this);
return;
}
state.fd = fd;
Expand Down Expand Up @@ -2181,8 +2192,10 @@ class ServerHttp2Stream extends Http2Stream {
if (options.endStream)
streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD;

if (options.waitForTrailers)
if (options.waitForTrailers) {
streamOptions |= STREAM_OPTION_GET_TRAILERS;
state.flags |= STREAM_FLAGS_HAS_TRAILERS;
}

headers = processHeaders(headers);
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
Expand Down Expand Up @@ -2248,8 +2261,10 @@ class ServerHttp2Stream extends Http2Stream {
}

let streamOptions = 0;
if (options.waitForTrailers)
if (options.waitForTrailers) {
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
}

if (typeof fd !== 'number')
throw new errors.TypeError('ERR_INVALID_ARG_TYPE',
Expand Down Expand Up @@ -2315,8 +2330,10 @@ class ServerHttp2Stream extends Http2Stream {
}

let streamOptions = 0;
if (options.waitForTrailers)
if (options.waitForTrailers) {
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
}

const session = this[kSession];
debug(`Http2Stream ${this[kID]} [Http2Session ` +
Expand Down
Loading

0 comments on commit eb18c26

Please sign in to comment.