Skip to content

Commit

Permalink
fix: stream body handling (#2391)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag authored Nov 2, 2023
1 parent 3a77cbb commit 45b904c
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 21 deletions.
24 changes: 5 additions & 19 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1462,23 +1462,7 @@ function _resume (client, sync) {
return
}

if (util.isStream(request.body) && util.bodyLength(request.body) === 0) {
request.body
.on('data', /* istanbul ignore next */ function () {
/* istanbul ignore next */
assert(false)
})
.on('error', function (err) {
errorRequest(client, request, err)
})
.on('end', function () {
util.destroy(this)
})

request.body = null
}

if (client[kRunning] > 0 &&
if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
(util.isStream(request.body) || util.isAsyncIterable(request.body))) {
// Request with stream or iterator body can error while other requests
// are inflight and indirectly error those as well.
Expand Down Expand Up @@ -1532,7 +1516,9 @@ function write (client, request) {
body.read(0)
}

let contentLength = util.bodyLength(body)
const bodyLength = util.bodyLength(body)

let contentLength = bodyLength

if (contentLength === null) {
contentLength = request.contentLength
Expand Down Expand Up @@ -1630,7 +1616,7 @@ function write (client, request) {
}

/* istanbul ignore else: assertion */
if (!body) {
if (!body || bodyLength === 0) {
if (contentLength === 0) {
socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
} else {
Expand Down
41 changes: 40 additions & 1 deletion lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,28 @@ class Request {

this.method = method

this.abort = null

if (body == null) {
this.body = null
} else if (util.isStream(body)) {
this.body = body

if (!this.body._readableState?.autoDestroy) {
this.endHandler = function autoDestroy () {
util.destroy(this)
}
this.body.on('end', this.endHandler)
}

this.errorHandler = err => {
if (this.abort) {
this.abort(err)
} else {
this.error = err
}
}
this.body.on('error', this.errorHandler)
} else if (util.isBuffer(body)) {
this.body = body.byteLength ? body : null
} else if (ArrayBuffer.isView(body)) {
Expand Down Expand Up @@ -236,7 +254,12 @@ class Request {
assert(!this.aborted)
assert(!this.completed)

return this[kHandler].onConnect(abort)
if (this.error) {
abort(this.error)
} else {
this.abort = abort
return this[kHandler].onConnect(abort)
}
}

onHeaders (statusCode, headers, resume, statusText) {
Expand Down Expand Up @@ -265,6 +288,8 @@ class Request {
}

onComplete (trailers) {
this.onFinally()

assert(!this.aborted)

this.completed = true
Expand All @@ -275,6 +300,8 @@ class Request {
}

onError (error) {
this.onFinally()

if (channels.error.hasSubscribers) {
channels.error.publish({ request: this, error })
}
Expand All @@ -286,6 +313,18 @@ class Request {
return this[kHandler].onError(error)
}

onFinally () {
if (this.errorHandler) {
this.body.off('error', this.errorHandler)
this.errorHandler = null
}

if (this.endHandler) {
this.body.off('end', this.endHandler)
this.endHandler = null
}
}

// TODO: adjust to support H2
addHeader (key, value) {
processHeader(this, key, value)
Expand Down
2 changes: 1 addition & 1 deletion lib/core/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ function isReadableAborted (stream) {
}

function destroy (stream, err) {
if (!isStream(stream) || isDestroyed(stream)) {
if (stream == null || !isStream(stream) || isDestroyed(stream)) {
return
}

Expand Down

0 comments on commit 45b904c

Please sign in to comment.