From 45b904c3cbac0340e9373b7a0b9a49297b12fd35 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 2 Nov 2023 18:55:27 +0100 Subject: [PATCH] fix: stream body handling (#2391) --- lib/client.js | 24 +++++------------------- lib/core/request.js | 41 ++++++++++++++++++++++++++++++++++++++++- lib/core/util.js | 2 +- 3 files changed, 46 insertions(+), 21 deletions(-) diff --git a/lib/client.js b/lib/client.js index 0ac23f3ee99..968a7f92071 100644 --- a/lib/client.js +++ b/lib/client.js @@ -1462,23 +1462,7 @@ function _resume (client, sync) { return } - if (util.isStream(request.body) && util.bodyLength(request.body) === 0) { - request.body - .on('data', /* istanbul ignore next */ function () { - /* istanbul ignore next */ - assert(false) - }) - .on('error', function (err) { - errorRequest(client, request, err) - }) - .on('end', function () { - util.destroy(this) - }) - - request.body = null - } - - if (client[kRunning] > 0 && + if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 && (util.isStream(request.body) || util.isAsyncIterable(request.body))) { // Request with stream or iterator body can error while other requests // are inflight and indirectly error those as well. @@ -1532,7 +1516,9 @@ function write (client, request) { body.read(0) } - let contentLength = util.bodyLength(body) + const bodyLength = util.bodyLength(body) + + let contentLength = bodyLength if (contentLength === null) { contentLength = request.contentLength @@ -1630,7 +1616,7 @@ function write (client, request) { } /* istanbul ignore else: assertion */ - if (!body) { + if (!body || bodyLength === 0) { if (contentLength === 0) { socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1') } else { diff --git a/lib/core/request.js b/lib/core/request.js index 43973884f1a..1b938bd5747 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -112,10 +112,28 @@ class Request { this.method = method + this.abort = null + if (body == null) { this.body = null } else if (util.isStream(body)) { this.body = body + + if (!this.body._readableState?.autoDestroy) { + this.endHandler = function autoDestroy () { + util.destroy(this) + } + this.body.on('end', this.endHandler) + } + + this.errorHandler = err => { + if (this.abort) { + this.abort(err) + } else { + this.error = err + } + } + this.body.on('error', this.errorHandler) } else if (util.isBuffer(body)) { this.body = body.byteLength ? body : null } else if (ArrayBuffer.isView(body)) { @@ -236,7 +254,12 @@ class Request { assert(!this.aborted) assert(!this.completed) - return this[kHandler].onConnect(abort) + if (this.error) { + abort(this.error) + } else { + this.abort = abort + return this[kHandler].onConnect(abort) + } } onHeaders (statusCode, headers, resume, statusText) { @@ -265,6 +288,8 @@ class Request { } onComplete (trailers) { + this.onFinally() + assert(!this.aborted) this.completed = true @@ -275,6 +300,8 @@ class Request { } onError (error) { + this.onFinally() + if (channels.error.hasSubscribers) { channels.error.publish({ request: this, error }) } @@ -286,6 +313,18 @@ class Request { return this[kHandler].onError(error) } + onFinally () { + if (this.errorHandler) { + this.body.off('error', this.errorHandler) + this.errorHandler = null + } + + if (this.endHandler) { + this.body.off('end', this.endHandler) + this.endHandler = null + } + } + // TODO: adjust to support H2 addHeader (key, value) { processHeader(this, key, value) diff --git a/lib/core/util.js b/lib/core/util.js index 769811f57f7..f2ead039363 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -190,7 +190,7 @@ function isReadableAborted (stream) { } function destroy (stream, err) { - if (!isStream(stream) || isDestroyed(stream)) { + if (stream == null || !isStream(stream) || isDestroyed(stream)) { return }