Skip to content

Commit

Permalink
refactor: unify error body handling (#2060)
Browse files Browse the repository at this point in the history
* refactor: unify error body handling

* fixup

* fixup

* fixup
  • Loading branch information
ronag committed Apr 14, 2023
1 parent 3f1eaf7 commit 98fa282
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 101 deletions.
58 changes: 15 additions & 43 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
const Readable = require('./readable')
const {
InvalidArgumentError,
RequestAbortedError,
ResponseStatusCodeError
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')

Expand Down Expand Up @@ -78,40 +78,39 @@ class RequestHandler extends AsyncResource {
}

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
const { callback, opaque, abort, context, highWaterMark } = this
const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this

const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)

if (statusCode < 200) {
if (this.onInfo) {
const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
this.onInfo({ statusCode, headers })
}
return
}

const parsedHeaders = util.parseHeaders(rawHeaders)
const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
const contentType = parsedHeaders['content-type']
const body = new Readable({ resume, abort, contentType, highWaterMark })

this.callback = null
this.res = body
const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)

if (callback !== null) {
if (this.throwOnError && statusCode >= 400) {
this.runInAsyncScope(getResolveErrorBodyCallback, null,
{ callback, body, contentType, statusCode, statusMessage, headers }
)
return
} else {
this.runInAsyncScope(callback, null, null, {
statusCode,
headers,
trailers: this.trailers,
opaque,
body,
context
})
}

this.runInAsyncScope(callback, null, null, {
statusCode,
headers,
trailers: this.trailers,
opaque,
body,
context
})
}
}

Expand Down Expand Up @@ -158,33 +157,6 @@ class RequestHandler extends AsyncResource {
}
}

async function getResolveErrorBodyCallback ({ callback, body, contentType, statusCode, statusMessage, headers }) {
if (statusCode === 204 || !contentType) {
body.dump()
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers))
return
}

try {
if (contentType.startsWith('application/json')) {
const payload = await body.json()
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload))
return
}

if (contentType.startsWith('text/')) {
const payload = await body.text()
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload))
return
}
} catch (err) {
// Process in a fallback if error
}

body.dump()
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers))
}

function request (opts, callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
Expand Down
103 changes: 46 additions & 57 deletions lib/api/api-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ const { finished, PassThrough } = require('stream')
const {
InvalidArgumentError,
InvalidReturnValueError,
RequestAbortedError,
ResponseStatusCodeError
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')

Expand Down Expand Up @@ -79,77 +79,66 @@ class StreamHandler extends AsyncResource {
}

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
const { factory, opaque, context, callback } = this
const { factory, opaque, context, callback, responseHeaders } = this

const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)

if (statusCode < 200) {
if (this.onInfo) {
const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
this.onInfo({ statusCode, headers })
}
return
}

this.factory = null
const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
const res = this.runInAsyncScope(factory, null, {
statusCode,
headers,
opaque,
context
})

if (this.throwOnError && statusCode >= 400) {
const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
const chunks = []
const pt = new PassThrough()
pt
.on('data', (chunk) => chunks.push(chunk))
.on('end', () => {
const payload = Buffer.concat(chunks).toString('utf8')
this.runInAsyncScope(
callback,
null,
new ResponseStatusCodeError(
`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`,
statusCode,
headers,
payload
)
)
})
.on('error', (err) => {
this.onError(err)
})
this.res = pt
return
}
let res

if (
!res ||
typeof res.write !== 'function' ||
typeof res.end !== 'function' ||
typeof res.on !== 'function'
) {
throw new InvalidReturnValueError('expected Writable')
}
if (this.throwOnError && statusCode >= 400) {
const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
const contentType = parsedHeaders['content-type']
res = new PassThrough()

res.on('drain', resume)
// TODO: Avoid finished. It registers an unnecessary amount of listeners.
finished(res, { readable: false }, (err) => {
const { callback, res, opaque, trailers, abort } = this
this.callback = null
this.runInAsyncScope(getResolveErrorBodyCallback, null,
{ callback, body: res, contentType, statusCode, statusMessage, headers }
)
} else {
res = this.runInAsyncScope(factory, null, {
statusCode,
headers,
opaque,
context
})

this.res = null
if (err || !res.readable) {
util.destroy(res, err)
if (
!res ||
typeof res.write !== 'function' ||
typeof res.end !== 'function' ||
typeof res.on !== 'function'
) {
throw new InvalidReturnValueError('expected Writable')
}

this.callback = null
this.runInAsyncScope(callback, null, err || null, { opaque, trailers })
// TODO: Avoid finished. It registers an unnecessary amount of listeners.
finished(res, { readable: false }, (err) => {
const { callback, res, opaque, trailers, abort } = this

if (err) {
abort()
}
})
this.res = null
if (err || !res.readable) {
util.destroy(res, err)
}

this.callback = null
this.runInAsyncScope(callback, null, err || null, { opaque, trailers })

if (err) {
abort()
}
})
}

res.on('drain', resume)

this.res = res

Expand Down
46 changes: 46 additions & 0 deletions lib/api/util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
const assert = require('assert')
const {
ResponseStatusCodeError
} = require('../core/errors')
const { toUSVString } = require('../core/util')

async function getResolveErrorBodyCallback ({ callback, body, contentType, statusCode, statusMessage, headers }) {
assert(body)

let chunks = []
let limit = 0

for await (const chunk of body) {
chunks.push(chunk)
limit += chunk.length
if (limit > 128 * 1024) {
chunks = null
break
}
}

if (statusCode === 204 || !contentType || !chunks) {
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers))
return
}

try {
if (contentType.startsWith('application/json')) {
const payload = JSON.parse(toUSVString(Buffer.concat(chunks)))
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload))
return
}

if (contentType.startsWith('text/')) {
const payload = toUSVString(Buffer.concat(chunks))
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload))
return
}
} catch (err) {
// Process in a fallback if error
}

process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers))
}

module.exports = { getResolveErrorBodyCallback }
2 changes: 1 addition & 1 deletion test/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ test('stream legacy needDrain', (t) => {
})
})

test('steam throwOnError', (t) => {
test('stream throwOnError', (t) => {
t.plan(2)

const errStatusCode = 500
Expand Down

0 comments on commit 98fa282

Please sign in to comment.