diff --git a/.gitignore b/.gitignore index 77203bd99..3421ead26 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,6 @@ coverage examples/sub-module/**/bundle.js examples/sub-module/**/*-minified.js examples/sub-module/*-bundle.js + +.vscode/ +.DS_Store diff --git a/src/add/index.js b/src/add/index.js index cd162c2f9..1146bd91b 100644 --- a/src/add/index.js +++ b/src/add/index.js @@ -3,7 +3,7 @@ const ndjson = require('iterable-ndjson') const CID = require('cids') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const { toFormData } = require('./form-data') const toCamel = require('../lib/object-to-camel') @@ -33,15 +33,17 @@ module.exports = configure(({ ky }) => { if (options.fileImportConcurrency != null) searchParams.set('file-import-concurrency', options.fileImportConcurrency) if (options.blockWriteConcurrency != null) searchParams.set('block-write-concurrency', options.blockWriteConcurrency) + const formData = await toFormData(input) + const res = await ky.post('add', { timeout: options.timeout, signal: options.signal, headers: options.headers, searchParams, - body: await toFormData(input) + body: formData }) - for await (let file of ndjson(toIterable(res.body))) { + for await (let file of ndjson(toAsyncIterable(res))) { file = toCamel(file) if (options.progress && file.bytes) { diff --git a/src/block/rm.js b/src/block/rm.js index f8fc8c103..c73988775 100644 --- a/src/block/rm.js +++ b/src/block/rm.js @@ -3,7 +3,7 @@ const CID = require('cids') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * rm (cid, options) { @@ -29,7 +29,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const removed of ndjson(toIterable(res.body))) { + for await (const removed of ndjson(toAsyncIterable(res))) { yield toCoreInterface(removed) } } diff --git a/src/cat.js b/src/cat.js index 3d4971a1c..67fe92561 100644 --- a/src/cat.js +++ b/src/cat.js @@ -3,7 +3,7 @@ const CID = require('cids') const { Buffer } = require('buffer') const configure = require('./lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('./lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * cat (path, options) { @@ -27,7 +27,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const chunk of toIterable(res.body)) { + for await (const chunk of toAsyncIterable(res)) { yield Buffer.from(chunk) } } diff --git a/src/dht/find-peer.js b/src/dht/find-peer.js index 9d8f517e5..52b9eca66 100644 --- a/src/dht/find-peer.js +++ b/src/dht/find-peer.js @@ -5,7 +5,7 @@ const CID = require('cids') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function findPeer (peerId, options) { @@ -22,7 +22,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const message of ndjson(toIterable(res.body))) { + for await (const message of ndjson(toAsyncIterable(res))) { // 3 = QueryError // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 // https://github.com/ipfs/go-ipfs/blob/eb11f569b064b960d1aba4b5b8ca155a3bd2cb21/core/commands/dht.go#L388-L389 diff --git a/src/dht/find-provs.js b/src/dht/find-provs.js index 7adbaf38c..80793495d 100644 --- a/src/dht/find-provs.js +++ b/src/dht/find-provs.js @@ -4,7 +4,7 @@ const CID = require('cids') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * findProvs (cid, options) { @@ -22,7 +22,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const message of ndjson(toIterable(res.body))) { + for await (const message of ndjson(toAsyncIterable(res))) { // 3 = QueryError // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 // https://github.com/libp2p/go-libp2p-kad-dht/blob/master/routing.go#L525-L526 diff --git a/src/dht/get.js b/src/dht/get.js index 4be7b80c2..4f162a365 100644 --- a/src/dht/get.js +++ b/src/dht/get.js @@ -2,7 +2,7 @@ const { Buffer } = require('buffer') const ndjson = require('iterable-ndjson') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const encodeBufferURIComponent = require('../lib/encode-buffer-uri-component') const configure = require('../lib/configure') @@ -23,7 +23,7 @@ module.exports = configure(({ ky }) => { headers: options.headers }) - for await (const message of ndjson(toIterable(res.body))) { + for await (const message of ndjson(toAsyncIterable(res))) { // 3 = QueryError // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 // https://github.com/ipfs/go-ipfs/blob/eb11f569b064b960d1aba4b5b8ca155a3bd2cb21/core/commands/dht.go#L472-L473 diff --git a/src/dht/provide.js b/src/dht/provide.js index f9013bfed..7b9946dd7 100644 --- a/src/dht/provide.js +++ b/src/dht/provide.js @@ -4,7 +4,7 @@ const CID = require('cids') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { @@ -24,7 +24,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (let message of ndjson(toIterable(res.body))) { + for await (let message of ndjson(toAsyncIterable(res))) { // 3 = QueryError // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 // https://github.com/ipfs/go-ipfs/blob/eb11f569b064b960d1aba4b5b8ca155a3bd2cb21/core/commands/dht.go#L283-L284 diff --git a/src/dht/put.js b/src/dht/put.js index 6d0ecf6fc..042e46f6c 100644 --- a/src/dht/put.js +++ b/src/dht/put.js @@ -5,7 +5,7 @@ const CID = require('cids') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const encodeBufferURIComponent = require('../lib/encode-buffer-uri-component') const toCamel = require('../lib/object-to-camel') @@ -26,7 +26,7 @@ module.exports = configure(({ ky }) => { headers: options.headers }) - for await (let message of ndjson(toIterable(res.body))) { + for await (let message of ndjson(toAsyncIterable(res))) { // 3 = QueryError // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 // https://github.com/ipfs/go-ipfs/blob/eb11f569b064b960d1aba4b5b8ca155a3bd2cb21/core/commands/dht.go#L472-L473 diff --git a/src/dht/query.js b/src/dht/query.js index 1628c0cc8..71ac78426 100644 --- a/src/dht/query.js +++ b/src/dht/query.js @@ -3,7 +3,7 @@ const CID = require('cids') const ndjson = require('iterable-ndjson') const multiaddr = require('multiaddr') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const configure = require('../lib/configure') const toCamel = require('../lib/object-to-camel') @@ -22,7 +22,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (let message of ndjson(toIterable(res.body))) { + for await (let message of ndjson(toAsyncIterable(res))) { message = toCamel(message) message.id = new CID(message.id) message.responses = (message.responses || []).map(({ ID, Addrs }) => ({ diff --git a/src/files/ls.js b/src/files/ls.js index f07c65c09..6a61e6c2a 100644 --- a/src/files/ls.js +++ b/src/files/ls.js @@ -2,7 +2,7 @@ const CID = require('cids') const ndjson = require('iterable-ndjson') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const configure = require('../lib/configure') const toCamelWithMetadata = require('../lib/object-to-camel-with-metadata') @@ -30,7 +30,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const result of ndjson(toIterable(res.body))) { + for await (const result of ndjson(toAsyncIterable(res))) { // go-ipfs does not yet support the "stream" option if ('Entries' in result) { for (const entry of result.Entries || []) { diff --git a/src/files/read.js b/src/files/read.js index 1800609d0..f2ba05af6 100644 --- a/src/files/read.js +++ b/src/files/read.js @@ -2,7 +2,7 @@ const { Buffer } = require('buffer') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * read (path, options) { @@ -20,7 +20,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const chunk of toIterable(res.body)) { + for await (const chunk of toAsyncIterable(res)) { yield Buffer.from(chunk) } } diff --git a/src/get.js b/src/get.js index 635f8b34f..221a7616d 100644 --- a/src/get.js +++ b/src/get.js @@ -4,7 +4,7 @@ const configure = require('./lib/configure') const Tar = require('it-tar') const { Buffer } = require('buffer') const CID = require('cids') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('./lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * get (path, options) { @@ -38,7 +38,7 @@ module.exports = configure(({ ky }) => { const extractor = Tar.extract() - for await (const { header, body } of extractor(toIterable(res.body))) { + for await (const { header, body } of extractor(toAsyncIterable(res))) { if (header.type === 'directory') { yield { path: header.name diff --git a/src/lib/stream-to-async-iterable.js b/src/lib/stream-to-async-iterable.js new file mode 100644 index 000000000..cdd6b36c6 --- /dev/null +++ b/src/lib/stream-to-async-iterable.js @@ -0,0 +1,27 @@ +'use strict' + +const toAsyncIterableOriginal = require('stream-to-it/source') + +// Note: Turned this into a helper that wraps `stream-to-it/source` +// to handle the body undefined case without requiring that other libs +// that consume that package such as `js-ipfs` and `js-ipfs-utils` modify +// how they use it + +module.exports = function toAsyncIterable (res) { + const { body } = res + + // An env where res.body getter for ReadableStream with getReader + // is not supported, for example in React Native + if (!body) { + if (res.arrayBuffer) { + return (async function * () { + const arrayBuffer = await res.arrayBuffer() + yield arrayBuffer + })() + } else { + throw new Error('Neither Response.body nor Response.arrayBuffer is defined') + } + } + + return toAsyncIterableOriginal(body) +} diff --git a/src/log/tail.js b/src/log/tail.js index 74b72b2c2..652741dad 100644 --- a/src/log/tail.js +++ b/src/log/tail.js @@ -2,7 +2,7 @@ const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * tail (options) { @@ -15,6 +15,6 @@ module.exports = configure(({ ky }) => { searchParams: options.searchParams }) - yield * ndjson(toIterable(res.body)) + yield * ndjson(toAsyncIterable(res)) } }) diff --git a/src/ls.js b/src/ls.js index ec7e37dfb..e29038ebd 100644 --- a/src/ls.js +++ b/src/ls.js @@ -3,7 +3,7 @@ const { Buffer } = require('buffer') const CID = require('cids') const ndjson = require('iterable-ndjson') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('./lib/stream-to-async-iterable') const configure = require('./lib/configure') module.exports = configure(({ ky }) => { @@ -25,7 +25,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (let result of ndjson(toIterable(res.body))) { + for await (let result of ndjson(toAsyncIterable(res))) { result = result.Objects if (!result) { diff --git a/src/name/resolve.js b/src/name/resolve.js index e7eb20b4f..0eeebf0bb 100644 --- a/src/name/resolve.js +++ b/src/name/resolve.js @@ -2,7 +2,7 @@ const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * (path, options) { @@ -23,7 +23,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const result of ndjson(toIterable(res.body))) { + for await (const result of ndjson(toAsyncIterable(res))) { yield result.Path } } diff --git a/src/pin/ls.js b/src/pin/ls.js index f9e0968ac..b89389477 100644 --- a/src/pin/ls.js +++ b/src/pin/ls.js @@ -3,7 +3,7 @@ const ndjson = require('iterable-ndjson') const CID = require('cids') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * ls (path, options) { @@ -28,7 +28,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const pin of ndjson(toIterable(res.body))) { + for await (const pin of ndjson(toAsyncIterable(res))) { if (pin.Keys) { // non-streaming response for (const cid of Object.keys(pin.Keys)) { yield { cid: new CID(cid), type: pin.Keys[cid].Type } diff --git a/src/ping.js b/src/ping.js index 332120934..d158efee6 100644 --- a/src/ping.js +++ b/src/ping.js @@ -2,7 +2,7 @@ const ndjson = require('iterable-ndjson') const configure = require('./lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('./lib/stream-to-async-iterable') const toCamel = require('./lib/object-to-camel') module.exports = configure(({ ky }) => { @@ -20,7 +20,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const chunk of ndjson(toIterable(res.body))) { + for await (const chunk of ndjson(toAsyncIterable(res))) { yield toCamel(chunk) } } diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js index 188a91664..ed56b42c2 100644 --- a/src/pubsub/subscribe.js +++ b/src/pubsub/subscribe.js @@ -5,7 +5,7 @@ const bs58 = require('bs58') const { Buffer } = require('buffer') const log = require('debug')('ipfs-http-client:pubsub:subscribe') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const SubscriptionTracker = require('./subscription-tracker') module.exports = configure((config) => { @@ -49,7 +49,7 @@ module.exports = configure((config) => { clearTimeout(ffWorkaround) - readMessages(ndjson(toIterable(res.body)), { + readMessages(ndjson(toAsyncIterable(res)), { onMessage: handler, onEnd: () => subsTracker.unsubscribe(topic, handler), onError: options.onError diff --git a/src/refs/index.js b/src/refs/index.js index 05a636feb..8f21fbaac 100644 --- a/src/refs/index.js +++ b/src/refs/index.js @@ -4,7 +4,7 @@ const configure = require('../lib/configure') const { Buffer } = require('buffer') const CID = require('cids') const ndjson = require('iterable-ndjson') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const toCamel = require('../lib/object-to-camel') module.exports = config => { @@ -49,7 +49,7 @@ module.exports = config => { searchParams }) - for await (const file of ndjson(toIterable(res.body))) { + for await (const file of ndjson(toAsyncIterable(res))) { yield toCamel(file) } } diff --git a/src/refs/local.js b/src/refs/local.js index 98e0fce40..da927edea 100644 --- a/src/refs/local.js +++ b/src/refs/local.js @@ -2,7 +2,7 @@ const configure = require('../lib/configure') const ndjson = require('iterable-ndjson') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { @@ -15,7 +15,7 @@ module.exports = configure(({ ky }) => { headers: options.headers }) - for await (const file of ndjson(toIterable(res.body))) { + for await (const file of ndjson(toAsyncIterable(res))) { yield toCamel(file) } } diff --git a/src/repo/gc.js b/src/repo/gc.js index fc60a46bc..abaf4821d 100644 --- a/src/repo/gc.js +++ b/src/repo/gc.js @@ -3,7 +3,7 @@ const CID = require('cids') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * gc (peerId, options) { @@ -19,7 +19,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const gcResult of ndjson(toIterable(res.body))) { + for await (const gcResult of ndjson(toAsyncIterable(res))) { yield { err: gcResult.Error ? new Error(gcResult.Error) : null, cid: (gcResult.Key || {})['/'] ? new CID(gcResult.Key['/']) : null diff --git a/src/stats/bw.js b/src/stats/bw.js index 12bc6d44a..5e7271ce0 100644 --- a/src/stats/bw.js +++ b/src/stats/bw.js @@ -3,7 +3,7 @@ const ndjson = require('iterable-ndjson') const Big = require('bignumber.js') const configure = require('../lib/configure') -const toIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * bw (options) { @@ -22,7 +22,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const stats of ndjson(toIterable(res.body))) { + for await (const stats of ndjson(toAsyncIterable(res))) { yield { totalIn: new Big(stats.TotalIn), totalOut: new Big(stats.TotalOut), diff --git a/test/lib.stream-to-async-iterable.spec.js b/test/lib.stream-to-async-iterable.spec.js new file mode 100644 index 000000000..2351198d4 --- /dev/null +++ b/test/lib.stream-to-async-iterable.spec.js @@ -0,0 +1,71 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('interface-ipfs-core/src/utils/mocha') +const toAsyncIterable = require('../src/lib/stream-to-async-iterable') + +describe('lib/stream-to-async-iterable', () => { + it('should return input if already async iterable', () => { + const input = { + [Symbol.asyncIterator] () { + return this + } + } + const res = { body: input } + expect(toAsyncIterable(res)).to.equal(input) + }) + + it('should convert reader to async iterable', async () => { + const inputData = [2, 31, 3, 4] + + const input = { + getReader () { + let i = 0 + return { + read () { + return Promise.resolve( + i === inputData.length + ? { done: true } + : { value: inputData[i++] } + ) + }, + releaseLock () { } + } + } + } + const res = { body: input } + + const chunks = [] + for await (const chunk of toAsyncIterable(res)) { + chunks.push(chunk) + } + + expect(chunks).to.eql(inputData) + }) + + it('should return an async iterable even if res.body is undefined', async () => { + const inputData = [2] + const res = { + arrayBuffer () { + return Promise.resolve(inputData[0]) + } + } + + const chunks = [] + for await (const chunk of toAsyncIterable(res)) { + chunks.push(chunk) + } + + expect(chunks).to.eql(inputData) + }) + + it('should throw if res.body and res.arrayBuffer are undefined', () => { + const res = {} + expect(() => toAsyncIterable(res)).to.throw('Neither Response.body nor Response.arrayBuffer is defined') + }) + + it('should throw on unknown stream', () => { + const res = { body: {} } + expect(() => toAsyncIterable(res)).to.throw('unknown stream') + }) +})