Skip to content

Commit

Permalink
feat: return body as consumable body
Browse files Browse the repository at this point in the history
Change undici.request to return a Body object with
various consume helpers instead of regular node stream.

This aligns with recent work on quic and web streams in
node core.
  • Loading branch information
ronag committed Jul 21, 2021
1 parent 05cea88 commit 6913094
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 45 deletions.
92 changes: 87 additions & 5 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@ const {
InvalidArgumentError,
RequestAbortedError
} = require('../core/errors')
const { Blob } = require('buffer')
const util = require('../core/util')
const { AsyncResource } = require('async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')

const kAbort = Symbol('abort')
const kResume = Symbol('resume')
const kError = Symbol('error')
const kDestroy = Symbol('destroy')
const kPush = Symbol('push')
const kBody = Symbol('body')

class RequestResponse extends Readable {
class RequestNodeReadable extends Readable {
constructor (resume, abort) {
super({ autoDestroy: true, read: resume })
this[kAbort] = abort
Expand All @@ -30,6 +36,82 @@ class RequestResponse extends Readable {
}
}

class RequestResponse {
constructor (resume, abort) {
this[kAbort] = abort
this[kResume] = resume
this[kBody] = null
this[kError] = undefined
}

[kPush] (chunk) {
if (!this[kBody]) {
this[kBody] = this.readableNodeStream()
}
this[kBody].push(chunk)
}

[kDestroy] (err) {
if (this[kBody]) {
util.destroy(this[kBody], err)
} else {
this[kError] = err
}
}

readableNodeStream () {
if (this[kBody]) {
throw new Error('consumed') // TODO: Proper error.
}
this[kBody] = new RequestNodeReadable(this[kResume], this[kAbort])
if (this[kError] !== undefined) {
util.destroy(this[kBody], this[kError])
}
return this[kBody]
}

readableWebStream () {
// TODO: Optimize.
return this.readableNodeStream().toWeb()
}

async blob () {
// TODO: Optimize.
const sources = []
for await (const chunk of this) {
sources.push(chunk)
}
return new Blob(sources)
}

async arrayBuffer () {
// TODO: Optimize.
const blob = await this.blob()
return await blob.arrayBuffer()
}

* [Symbol.asyncIterator] () {
// TODO: Optimize.
yield * this.readableNodeStream()
}

async text () {
// TODO: Optimize.
// TODO: Validate content-type req & res headers?
let ret = ''
for await (const chunk of this) {
ret += chunk
}
return JSON.parse(ret)
}

async json () {
// TODO: Optimize.
// TODO: Validate content-type req & res headers?
return JSON.parse(await this.text())
}
}

class RequestHandler extends AsyncResource {
constructor (opts, callback) {
if (!opts || typeof opts !== 'object') {
Expand Down Expand Up @@ -109,7 +191,7 @@ class RequestHandler extends AsyncResource {

onData (chunk) {
const { res } = this
return res.push(chunk)
return res[kPush](null)
}

onComplete (trailers) {
Expand All @@ -119,7 +201,7 @@ class RequestHandler extends AsyncResource {

util.parseHeaders(trailers, this.trailers)

res.push(null)
res[kPush](null)
}

onError (err) {
Expand All @@ -139,13 +221,13 @@ class RequestHandler extends AsyncResource {
this.res = null
// Ensure all queued handlers are invoked before destroying res.
queueMicrotask(() => {
util.destroy(res, err)
res[kDestroy](err)
})
}

if (body) {
this.body = null
util.destroy(body, err)
util.destroy(this.body, err)
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions test/abort-controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ for (const { AbortControllerImpl, controllerName } of controllers) {
client.request({ path: '/', method: 'GET' }, (err, response) => {
t.error(err)
const bufs = []
response.body.on('data', (buf) => {
const stream = response.body.readableNodeStream()
stream.on('data', (buf) => {
bufs.push(buf)
})
response.body.on('end', () => {
stream.on('end', () => {
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
})
})
Expand Down Expand Up @@ -136,10 +137,11 @@ for (const { AbortControllerImpl, controllerName } of controllers) {

client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => {
t.error(err)
response.body.on('data', () => {
const stream = response.body.readableNodeStream()
stream.on('data', () => {
abortController.abort()
})
response.body.on('error', err => {
stream.on('error', err => {
t.type(err, errors.RequestAbortedError)
})
})
Expand Down
11 changes: 7 additions & 4 deletions test/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,11 @@ test('with globalAgent', t => {
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
const stream = body.readableNodeStream()
stream.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
stream.on('end', () => {
t.equal(wanted, Buffer.concat(bufs).toString('utf8'))
})
})
Expand Down Expand Up @@ -315,10 +316,12 @@ test('with local agent', t => {
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {

const stream = body.readableNodeStream()
stream.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
stream.on('end', () => {
t.equal(wanted, Buffer.concat(bufs).toString('utf8'))
})
})
Expand Down
36 changes: 21 additions & 15 deletions test/async_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,31 @@ test('async hooks', (t) => {

client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
body.resume()
const stream = body.readableNodeStream()
stream.resume()
t.strictSame(getCurrentTransaction(), null)

setCurrentTransaction({ hello: 'world2' })

client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.strictSame(getCurrentTransaction(), { hello: 'world2' })

body.once('data', () => {
const stream = body.readableNodeStream()
stream.once('data', () => {
t.pass()
body.resume()
stream.resume()
})

body.on('end', () => {
stream.on('end', () => {
t.pass()
})
})
})

client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
body.resume()
const stream = body.readableNodeStream()
stream.resume()
t.strictSame(getCurrentTransaction(), null)

setCurrentTransaction({ hello: 'world' })
Expand All @@ -88,20 +90,22 @@ test('async hooks', (t) => {
t.error(err)
t.strictSame(getCurrentTransaction(), { hello: 'world' })

body.once('data', () => {
const stream = body.readableNodeStream()
stream.once('data', () => {
t.pass()
body.resume()
stream.resume()
})

body.on('end', () => {
stream.on('end', () => {
t.pass()
})
})
})

client.request({ path: '/', method: 'HEAD' }, (err, { statusCode, headers, body }) => {
t.error(err)
body.resume()
const stream = body.readableNodeStream()
stream.resume()
t.strictSame(getCurrentTransaction(), null)

setCurrentTransaction({ hello: 'world' })
Expand All @@ -110,12 +114,13 @@ test('async hooks', (t) => {
t.error(err)
t.strictSame(getCurrentTransaction(), { hello: 'world' })

body.once('data', () => {
const stream = body.readableNodeStream()
stream.once('data', () => {
t.pass()
body.resume()
stream.resume()
})

body.on('end', () => {
stream.on('end', () => {
t.pass()
})
})
Expand Down Expand Up @@ -159,8 +164,9 @@ test('async hooks client is destroyed', (t) => {

client.request({ path: '/', method: 'GET' }, (err, { body }) => {
t.error(err)
body.resume()
body.on('error', (err) => {
const stream = body.readableNodeStream()
stream.resume()
stream.on('error', (err) => {
t.ok(err)
})
t.strictSame(getCurrentTransaction(), null)
Expand Down
3 changes: 2 additions & 1 deletion test/client-abort.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ test('aborted response errors', (t) => {

client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
body.destroy()
const stream = body.readableNodeStream()
stream.destroy()
body
.on('error', err => {
t.type(err, errors.RequestAbortedError)
Expand Down
25 changes: 15 additions & 10 deletions test/client-errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ test('GET errors and reconnect with pipelining 1', (t) => {
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
const stream = body.readableNodeStream()
stream.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
stream.on('end', () => {
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
})
})
Expand Down Expand Up @@ -100,10 +101,11 @@ test('GET errors and reconnect with pipelining 3', (t) => {
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
const stream = body.readableNodeStream()
stream.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
stream.on('end', () => {
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
})
})
Expand Down Expand Up @@ -169,10 +171,11 @@ function errorAndPipelining (type) {
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
const stream = body.readableNodeStream()
stream.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
stream.on('end', () => {
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
})
})
Expand Down Expand Up @@ -238,10 +241,11 @@ function errorAndChunkedEncodingPipelining (type) {
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
const stream = body.readableNodeStream()
stream.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
stream.on('end', () => {
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
})
})
Expand Down Expand Up @@ -704,8 +708,9 @@ test('GET errors body', (t) => {

client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
body.resume()
body.on('error', err => (
const stream = body.readableNodeStream()
stream.resume()
stream.on('error', err => (
t.ok(err)
))
})
Expand Down
5 changes: 3 additions & 2 deletions test/https.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ test('https get with tls opts', (t) => {
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
const stream = body.readableNodeStream()
stream.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
stream.on('end', () => {
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
})
})
Expand Down
5 changes: 3 additions & 2 deletions test/issue-803.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ test('https://github.com/nodejs/undici/issues/803', (t) => {
t.error(err)

let pos = 0
data.body.on('data', (buf) => {
const stream = data.body.readableNodeStream()
stream.on('data', (buf) => {
pos += buf.length
})
data.body.on('end', () => {
stream.on('end', () => {
t.equal(pos, SIZE)
})
})
Expand Down
Loading

0 comments on commit 6913094

Please sign in to comment.