diff --git a/src/http.js b/src/http.js index 7ec3b0f..4510e86 100644 --- a/src/http.js +++ b/src/http.js @@ -136,6 +136,22 @@ class HTTP { throw new HTTPError(response) } + response.ndjson = async function * () { + const it = streamToAsyncIterator(response.body) + + if (!isAsyncIterator(it)) { + throw new Error('Can\'t convert fetch body into a Async Iterator:') + } + + for await (const chunk of ndjson(it)) { + if (options.transform) { + yield options.transform(chunk) + } else { + yield chunk + } + } + } + return response } @@ -183,52 +199,6 @@ class HTTP { options (resource, options = {}) { return this.fetch(resource, merge(this.opts, options, { method: 'OPTIONS' })) } - - /** - * @param {string | URL | Request} resource - * @param {APIOptions} options - * @returns {Promise>} - */ - async stream (resource, options = {}) { - const res = await this.fetch(resource, merge(this.opts, options)) - - return res.body - } - - /** - * @param {string | URL | Request} resource - * @param {APIOptions} options - * @returns {AsyncGenerator} - */ - async * iterator (resource, options = {}) { - const res = await this.fetch(resource, merge(this.opts, options)) - const it = streamToAsyncIterator(res.body) - - if (!isAsyncIterator(it)) { - throw new Error('Can\'t convert fetch body into a Async Iterator:') - } - - for await (const chunk of it) { - yield chunk - } - } - - /** - * @param {string | URL | Request} resource - * @param {APIOptions} options - * @returns {AsyncGenerator} - */ - ndjson (resource, options = {}) { - const source = ndjson(this.iterator(resource, merge(this.opts, options))) - if (options.transform) { - return (async function * () { - for await (const chunk of source) { - yield options.transform(chunk) - } - })() - } - return source - } } /** @@ -309,7 +279,6 @@ const isAsyncIterator = (obj) => { HTTP.HTTPError = HTTPError HTTP.TimeoutError = TimeoutError -HTTP.ndjson = ndjson HTTP.streamToAsyncIterator = streamToAsyncIterator /** diff --git a/test/http.spec.js b/test/http.spec.js index 73c879b..8fc2632 100644 --- a/test/http.spec.js +++ b/test/http.spec.js @@ -7,6 +7,7 @@ const toStream = require('it-to-stream') const delay = require('delay') const AbortController = require('abort-controller') const drain = require('it-drain') +const all = require('it-all') const { isBrowser, isWebWorker } = require('../src/env') describe('http', function () { @@ -27,6 +28,16 @@ describe('http', function () { await expect(res).to.eventually.be.rejectedWith(/aborted/) }) + it('parses the response as ndjson', async function () { + const res = await HTTP.post('http://localhost:3000', { + body: '{}\n{}' + }) + + const entities = await all(res.ndjson()) + + expect(entities).to.deep.equal([{}, {}]) + }) + it.skip('should handle errors in streaming bodies', async function () { if (isBrowser || isWebWorker) { // streaming bodies not supported by browsers