Skip to content

Commit

Permalink
feat: return body as consumable body
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jul 23, 2021
1 parent 02a9d13 commit 344bae6
Showing 1 changed file with 218 additions and 3 deletions.
221 changes: 218 additions & 3 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,31 @@ const {
const util = require('../core/util')
const { AsyncResource } = require('async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')
const assert = require('assert')

let Blob
let ReadableStream

const kAbort = Symbol('abort')
const kPush = Symbol('push')
const kDestroy = Symbol('destroy')
const kBody = Symbol('body')
const kResume = Symbol('resume')

const kStreamType = 1
const kWebStreamType = 2
const kTextType = 3
const kBlobType = 5
const kArrayBufferType = 6
const kJSONType = 7
const kMaxType = 8

class RequestResponse extends Readable {
constructor (resume, abort) {
super({ autoDestroy: true, read: resume })
this[kResume] = resume
this[kAbort] = abort
this[kBody] = null
}

_destroy (err, callback) {
Expand All @@ -28,6 +46,77 @@ class RequestResponse extends Readable {

callback(err)
}

[kPush] (val) {
if (this[kBody]) {
return this[kBody].push(val)
} else {
return this.push(val)
}
}

[kDestroy] (err) {
assert(err)

if (!this[kBody]) {
// TODO (fix): Not sure this aligns with web body mixin.
throw err
}

this[kBody].destroy(err)
}

destroy (err) {
if (!this[kBody] || this[kBody].type !== kStreamType) {
consumeType(this, kStreamType)
}
return Readable.prototype.destroy.call(this, err)
}

read (n) {
if (!this[kBody] || this[kBody].type !== kStreamType) {
consumeType(this, kStreamType)
}
return Readable.prototype.read.call(this, n)
}

on (ev, fn) {
if (!this[kBody] || this[kBody].type !== kStreamType) {
consumeType(this, kStreamType)
}
return Readable.prototype.on.call(this, ev, fn)
}

addListener (ev, fn) {
if (!this[kBody] || this[kBody].type !== kStreamType) {
consumeType(this, kStreamType)
}
return Readable.prototype.addListener.call(this, ev, fn)
}

get bodyUsed () {
return !!this[kBody]
}

get body () {
return consumeType(this, kWebStreamType)
}

text () {
return consumeType(this, kTextType)
}

json () {
return consumeType(this, kJSONType)
}

blob () {
return consumeType(this, kBlobType)
}

arrayBuffer () {
return consumeType(this, kArrayBufferType)
}
}

class RequestHandler extends AsyncResource {
Expand Down Expand Up @@ -109,7 +198,7 @@ class RequestHandler extends AsyncResource {

onData (chunk) {
const { res } = this
return res.push(chunk)
return res[kPush](chunk)
}

onComplete (trailers) {
Expand All @@ -119,7 +208,7 @@ class RequestHandler extends AsyncResource {

util.parseHeaders(trailers, this.trailers)

res.push(null)
res[kPush](null)
}

onError (err) {
Expand All @@ -139,7 +228,7 @@ class RequestHandler extends AsyncResource {
this.res = null
// Ensure all queued handlers are invoked before destroying res.
queueMicrotask(() => {
util.destroy(res, err)
res[kDestroy](err)
})
}

Expand Down Expand Up @@ -170,4 +259,130 @@ function request (opts, callback) {
}
}

function start (self) {
const state = self._readableState
while (state.buffer.length) {
self[kBody].push(state.buffer.shift())
}
if (state.ended) {
self[kBody].push(null)
}

self[kResume]()
}

function consumeType (self, type) {
assert(type > 0 && type < kMaxType)

if (self[kBody]) {
throw new TypeError('disturbed')
}

if (type === kStreamType) {
self[kBody] = {
type: kStreamType,
push: self.push.bind(self),
destroy: util.destroy.bind(null, self)
}
return self
}

if (type === kWebStreamType) {
if (!ReadableStream) {
ReadableStream = require('stream/web').ReadableStream
}

self[kBody] = {
type,
buffer: self,
controller: null,
push (val) {
if (!this.controller) {
this.buffer.push(val)
return false
}

if (!val) {
this.controller.close()
} else {
// TODO: Do we need to detach from Buffer pool?
// Buffers come from net/tls socket. Not sure if those
// are attached to pool or not.
// val = new Uint8Array(val)
this.controller.enqueue(val)
}

return this.controller.desiredSize > 0
},
destroy (err) {
this.controller.error(err)
}
}

return new ReadableStream({
start (c) {
self[kBody].controller = c
start(self)
},

pull () {
self[kResume]()
},

cancel (reason) {
self[kAbort]()
}
}, { highWaterMark: 16 * 1024 })
}

return new Promise((resolve, reject) => {
self[kBody] = {
type,
body: null,
resolve,
reject,
destroy (err) {
this.reject(err)
},
push (val) {
try {
if (this.type === kTextType || this.type === kJSONType) {
if (!this.body) {
this.body = ''
}

if (val) {
this.body += val
} else if (this.type === kTextType) {
this.resolve(this.body)
} else if (this.type === kJSONType) {
this.resolve(JSON.parse(this.body))
}
} else {
if (!this.body) {
this.body = []
}

if (val) {
this.body.push(val)
} else if (this.type === kArrayBufferType) {
this.resolve(Buffer.concat(this.body).buffer)
} else if (this.type === kBlobType) {
if (!Blob) {
Blob = require('buffer').Blob
}
this.resolve(new Blob(this.body))
}
}
return true
} catch (err) {
this.reject(err)
}
}
}

start(self)
})
}

module.exports = request

0 comments on commit 344bae6

Please sign in to comment.