From f0d401617bb7dd5b23dbc6f389d9988b73abdb75 Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Tue, 21 Jan 2020 21:45:22 -0600 Subject: [PATCH 01/12] Handling case where res.body is undefined --- src/add/index.js | 2 +- src/block/rm-async-iterator.js | 2 +- src/cat.js | 2 +- src/dht/find-peer.js | 2 +- src/dht/find-provs.js | 2 +- src/dht/get.js | 2 +- src/dht/provide.js | 2 +- src/dht/put.js | 2 +- src/dht/query.js | 2 +- src/files/ls.js | 2 +- src/files/read.js | 2 +- src/get.js | 2 +- src/lib/stream-to-iterable.js | 13 ++++++++++++- src/log/tail.js | 2 +- src/object/data.js | 2 +- src/ping.js | 2 +- src/pubsub/subscribe.js | 2 +- src/refs/index.js | 2 +- src/refs/local.js | 2 +- src/repo/gc.js | 2 +- src/stats/bw.js | 2 +- 21 files changed, 32 insertions(+), 21 deletions(-) diff --git a/src/add/index.js b/src/add/index.js index 546e5428d..157a595e6 100644 --- a/src/add/index.js +++ b/src/add/index.js @@ -40,7 +40,7 @@ module.exports = configure(({ ky }) => { body: await toFormData(input) }) - for await (let file of ndjson(toIterable(res.body))) { + for await (let file of ndjson(toIterable(res))) { file = toCamel(file) // console.log(file) if (options.progress && file.bytes) { diff --git a/src/block/rm-async-iterator.js b/src/block/rm-async-iterator.js index e44aedcd2..e2ad496f7 100644 --- a/src/block/rm-async-iterator.js +++ b/src/block/rm-async-iterator.js @@ -30,7 +30,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const removed of ndjson(toIterable(res.body))) { + for await (const removed of ndjson(toIterable(res))) { yield toCamel(removed) } } diff --git a/src/cat.js b/src/cat.js index 32bccf59e..03104f7cc 100644 --- a/src/cat.js +++ b/src/cat.js @@ -27,7 +27,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const chunk of toIterable(res.body)) { + for await (const chunk of toIterable(res)) { yield Buffer.from(chunk) } } diff --git a/src/dht/find-peer.js b/src/dht/find-peer.js index 71779952c..a61e61dd8 100644 --- a/src/dht/find-peer.js +++ b/src/dht/find-peer.js @@ -22,7 +22,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const message of ndjson(toIterable(res.body))) { + for await (const message of ndjson(toIterable(res))) { // 2 = FinalPeer // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 if (message.Type === 2 && message.Responses) { diff --git a/src/dht/find-provs.js b/src/dht/find-provs.js index a0fc35722..b0f3e8306 100644 --- a/src/dht/find-provs.js +++ b/src/dht/find-provs.js @@ -23,7 +23,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const message of ndjson(toIterable(res.body))) { + for await (const message of ndjson(toIterable(res))) { // 4 = Provider // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L20 if (message.Type === 4 && message.Responses) { diff --git a/src/dht/get.js b/src/dht/get.js index d2cd0db22..68544b692 100644 --- a/src/dht/get.js +++ b/src/dht/get.js @@ -19,7 +19,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const message of ndjson(toIterable(res.body))) { + for await (const message of ndjson(toIterable(res))) { // 5 = Value // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L21 if (message.Type === 5) { diff --git a/src/dht/provide.js b/src/dht/provide.js index cb72f9c6d..a11ac096a 100644 --- a/src/dht/provide.js +++ b/src/dht/provide.js @@ -25,7 +25,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (let message of ndjson(toIterable(res.body))) { + for await (let message of ndjson(toIterable(res))) { message = toCamel(message) if (message.responses) { message.responses = message.responses.map(({ ID, Addrs }) => { diff --git a/src/dht/put.js b/src/dht/put.js index 49a6947aa..3443e5189 100644 --- a/src/dht/put.js +++ b/src/dht/put.js @@ -26,7 +26,7 @@ module.exports = configure(({ ky }) => { headers: options.headers }) - for await (let message of ndjson(toIterable(res.body))) { + for await (let message of ndjson(toIterable(res))) { message = toCamel(message) if (message.responses) { message.responses = message.responses.map(({ ID, Addrs }) => { diff --git a/src/dht/query.js b/src/dht/query.js index 5aefaf90d..03d340b89 100644 --- a/src/dht/query.js +++ b/src/dht/query.js @@ -21,7 +21,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const message of ndjson(toIterable(res.body))) { + for await (const message of ndjson(toIterable(res))) { yield new PeerInfo(PeerId.createFromB58String(message.ID)) } } diff --git a/src/files/ls.js b/src/files/ls.js index 51ee33912..8b4371bb8 100644 --- a/src/files/ls.js +++ b/src/files/ls.js @@ -28,7 +28,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const result of ndjson(toIterable(res.body))) { + for await (const result of ndjson(toIterable(res))) { // go-ipfs does not yet support the "stream" option if ('Entries' in result) { for (const entry of result.Entries || []) { diff --git a/src/files/read.js b/src/files/read.js index 5a6a14acb..4c20afb8c 100644 --- a/src/files/read.js +++ b/src/files/read.js @@ -20,7 +20,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const chunk of toIterable(res.body)) { + for await (const chunk of toIterable(res)) { yield Buffer.from(chunk) } } diff --git a/src/get.js b/src/get.js index 1a5ebfd93..45e67c08b 100644 --- a/src/get.js +++ b/src/get.js @@ -38,7 +38,7 @@ module.exports = configure(({ ky }) => { const extractor = Tar.extract() - for await (const { header, body } of extractor(toIterable(res.body))) { + for await (const { header, body } of extractor(toIterable(res))) { if (header.type === 'directory') { yield { path: header.name diff --git a/src/lib/stream-to-iterable.js b/src/lib/stream-to-iterable.js index 5e06a99c6..7366ccdd6 100644 --- a/src/lib/stream-to-iterable.js +++ b/src/lib/stream-to-iterable.js @@ -1,6 +1,17 @@ 'use strict' -module.exports = function toIterable (body) { +module.exports = function toIterable (res) { + // An env where res.body getter for ReadableStream with getReader + // is not supported, for example in React Native + if (!res.body) { + return (async function * () { + const arrayBuffer = await res.arrayBuffer() + yield arrayBuffer + })() + } + + const { body } = res + // Node.js stream if (body[Symbol.asyncIterator]) return body diff --git a/src/log/tail.js b/src/log/tail.js index 00708e9b9..dcf88eac7 100644 --- a/src/log/tail.js +++ b/src/log/tail.js @@ -15,6 +15,6 @@ module.exports = configure(({ ky }) => { searchParams: options.searchParams }) - yield * ndjson(toIterable(res.body)) + yield * ndjson(toIterable(res)) } }) diff --git a/src/object/data.js b/src/object/data.js index 48291f722..5f5b1d0d3 100644 --- a/src/object/data.js +++ b/src/object/data.js @@ -19,7 +19,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const chunk of toIterable(res.body)) { + for await (const chunk of toIterable(res)) { yield Buffer.from(chunk) } } diff --git a/src/ping.js b/src/ping.js index 33b275617..d547bf69e 100644 --- a/src/ping.js +++ b/src/ping.js @@ -20,7 +20,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const chunk of ndjson(toIterable(res.body))) { + for await (const chunk of ndjson(toIterable(res))) { yield toCamel(chunk) } } diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js index 7950a274a..934f6b51b 100644 --- a/src/pubsub/subscribe.js +++ b/src/pubsub/subscribe.js @@ -50,7 +50,7 @@ module.exports = configure((config) => { clearTimeout(ffWorkaround) - readMessages(ndjson(toIterable(res.body)), { + readMessages(ndjson(toIterable(res)), { onMessage: handler, onEnd: () => subsTracker.unsubscribe(topic, handler), onError: options.onError diff --git a/src/refs/index.js b/src/refs/index.js index b15f2cd0c..df4bb17e7 100644 --- a/src/refs/index.js +++ b/src/refs/index.js @@ -49,7 +49,7 @@ module.exports = config => { searchParams }) - for await (const file of ndjson(toIterable(res.body))) { + for await (const file of ndjson(toIterable(res))) { yield toCamel(file) } } diff --git a/src/refs/local.js b/src/refs/local.js index afa1630ea..c92fcf24a 100644 --- a/src/refs/local.js +++ b/src/refs/local.js @@ -15,7 +15,7 @@ module.exports = configure(({ ky }) => { headers: options.headers }) - for await (const file of ndjson(toIterable(res.body))) { + for await (const file of ndjson(toIterable(res))) { yield toCamel(file) } } diff --git a/src/repo/gc.js b/src/repo/gc.js index 3d92dee4c..4e1b75951 100644 --- a/src/repo/gc.js +++ b/src/repo/gc.js @@ -19,7 +19,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const gcResult of ndjson(toIterable(res.body))) { + for await (const gcResult of ndjson(toIterable(res))) { yield { err: gcResult.Error ? new Error(gcResult.Error) : null, cid: (gcResult.Key || {})['/'] ? new CID(gcResult.Key['/']) : null diff --git a/src/stats/bw.js b/src/stats/bw.js index f68ad23ba..6986d6c6b 100644 --- a/src/stats/bw.js +++ b/src/stats/bw.js @@ -22,7 +22,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const stats of ndjson(toIterable(res.body))) { + for await (const stats of ndjson(toIterable(res))) { yield { totalIn: new Big(stats.TotalIn), totalOut: new Big(stats.TotalOut), From 37b7d276a6e0c1cb67144d1dd3d2e28e934f8bab Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Tue, 21 Jan 2020 23:16:16 -0600 Subject: [PATCH 02/12] Fix tests, add test, rename to async iterable, fix addFromURL --- src/add-from-url.js | 8 +-- src/add/index.js | 33 +++++++--- src/block/rm-async-iterator.js | 4 +- src/cat.js | 4 +- src/dht/find-peer.js | 4 +- src/dht/find-provs.js | 6 +- src/dht/get.js | 4 +- src/dht/provide.js | 6 +- src/dht/put.js | 12 ++-- src/dht/query.js | 4 +- src/files/ls.js | 4 +- src/files/read.js | 4 +- src/get.js | 4 +- ...terable.js => stream-to-async-iterable.js} | 18 ++--- src/log/tail.js | 4 +- src/object/data.js | 4 +- src/ping.js | 4 +- src/pubsub/subscribe.js | 15 +++-- src/refs/index.js | 13 ++-- src/refs/local.js | 4 +- src/repo/gc.js | 6 +- src/stats/bw.js | 4 +- test/lib.stream-to-async-iterable.spec.js | 66 +++++++++++++++++++ test/lib.stream-to-iterable.spec.js | 40 ----------- 24 files changed, 164 insertions(+), 111 deletions(-) rename src/lib/{stream-to-iterable.js => stream-to-async-iterable.js} (73%) create mode 100644 test/lib.stream-to-async-iterable.spec.js delete mode 100644 test/lib.stream-to-iterable.spec.js diff --git a/src/add-from-url.js b/src/add-from-url.js index deb3f4bad..b3a1ab570 100644 --- a/src/add-from-url.js +++ b/src/add-from-url.js @@ -1,19 +1,19 @@ 'use strict' const kyDefault = require('ky-universal').default -const toIterable = require('./lib/stream-to-iterable') +const toAsyncIterable = require('./lib/stream-to-async-iterable') -module.exports = (config) => { +module.exports = config => { const add = require('./add')(config) return async function * addFromURL (url, options) { options = options || {} - const { body } = await kyDefault.get(url) + const res = await kyDefault.get(url) const input = { path: decodeURIComponent(new URL(url).pathname.split('/').pop() || ''), - content: toIterable(body) + content: toAsyncIterable(res) } yield * add(input, options) diff --git a/src/add/index.js b/src/add/index.js index 157a595e6..83d0af9a7 100644 --- a/src/add/index.js +++ b/src/add/index.js @@ -2,7 +2,7 @@ const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const { toFormData } = require('./form-data') const toCamel = require('../lib/object-to-camel') @@ -16,21 +16,36 @@ module.exports = configure(({ ky }) => { if (options.chunker) searchParams.set('chunker', options.chunker) if (options.cidVersion) searchParams.set('cid-version', options.cidVersion) if (options.cidBase) searchParams.set('cid-base', options.cidBase) - if (options.enableShardingExperiment != null) searchParams.set('enable-sharding-experiment', options.enableShardingExperiment) + if (options.enableShardingExperiment != null) { + searchParams.set( + 'enable-sharding-experiment', + options.enableShardingExperiment + ) + } if (options.hashAlg) searchParams.set('hash', options.hashAlg) - if (options.onlyHash != null) searchParams.set('only-hash', options.onlyHash) + if (options.onlyHash != null) { searchParams.set('only-hash', options.onlyHash) } if (options.pin != null) searchParams.set('pin', options.pin) if (options.progress) searchParams.set('progress', true) if (options.quiet != null) searchParams.set('quiet', options.quiet) if (options.quieter != null) searchParams.set('quieter', options.quieter) - if (options.rawLeaves != null) searchParams.set('raw-leaves', options.rawLeaves) - if (options.shardSplitThreshold) searchParams.set('shard-split-threshold', options.shardSplitThreshold) + if (options.rawLeaves != null) { searchParams.set('raw-leaves', options.rawLeaves) } + if (options.shardSplitThreshold) { searchParams.set('shard-split-threshold', options.shardSplitThreshold) } if (options.silent) searchParams.set('silent', options.silent) if (options.trickle != null) searchParams.set('trickle', options.trickle) - if (options.wrapWithDirectory != null) searchParams.set('wrap-with-directory', options.wrapWithDirectory) + if (options.wrapWithDirectory != null) { searchParams.set('wrap-with-directory', options.wrapWithDirectory) } if (options.preload != null) searchParams.set('preload', options.preload) - if (options.fileImportConcurrency != null) searchParams.set('file-import-concurrency', options.fileImportConcurrency) - if (options.blockWriteConcurrency != null) searchParams.set('block-write-concurrency', options.blockWriteConcurrency) + if (options.fileImportConcurrency != null) { + searchParams.set( + 'file-import-concurrency', + options.fileImportConcurrency + ) + } + if (options.blockWriteConcurrency != null) { + searchParams.set( + 'block-write-concurrency', + options.blockWriteConcurrency + ) + } const res = await ky.post('add', { timeout: options.timeout, @@ -40,7 +55,7 @@ module.exports = configure(({ ky }) => { body: await toFormData(input) }) - for await (let file of ndjson(toIterable(res))) { + for await (let file of ndjson(toAsyncIterable(res))) { file = toCamel(file) // console.log(file) if (options.progress && file.bytes) { diff --git a/src/block/rm-async-iterator.js b/src/block/rm-async-iterator.js index e2ad496f7..f1b25eb45 100644 --- a/src/block/rm-async-iterator.js +++ b/src/block/rm-async-iterator.js @@ -3,7 +3,7 @@ const CID = require('cids') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { @@ -30,7 +30,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const removed of ndjson(toIterable(res))) { + for await (const removed of ndjson(toAsyncIterable(res))) { yield toCamel(removed) } } diff --git a/src/cat.js b/src/cat.js index 03104f7cc..67fe92561 100644 --- a/src/cat.js +++ b/src/cat.js @@ -3,7 +3,7 @@ const CID = require('cids') const { Buffer } = require('buffer') const configure = require('./lib/configure') -const toIterable = require('./lib/stream-to-iterable') +const toAsyncIterable = require('./lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * cat (path, options) { @@ -27,7 +27,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const chunk of toIterable(res)) { + for await (const chunk of toAsyncIterable(res)) { yield Buffer.from(chunk) } } diff --git a/src/dht/find-peer.js b/src/dht/find-peer.js index a61e61dd8..7b41f59a9 100644 --- a/src/dht/find-peer.js +++ b/src/dht/find-peer.js @@ -5,7 +5,7 @@ const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * findPeer (peerId, options) { @@ -22,7 +22,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const message of ndjson(toIterable(res))) { + for await (const message of ndjson(toAsyncIterable(res))) { // 2 = FinalPeer // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18 if (message.Type === 2 && message.Responses) { diff --git a/src/dht/find-provs.js b/src/dht/find-provs.js index b0f3e8306..4816df18c 100644 --- a/src/dht/find-provs.js +++ b/src/dht/find-provs.js @@ -5,7 +5,7 @@ const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * findProvs (cid, options) { @@ -13,7 +13,7 @@ module.exports = configure(({ ky }) => { const searchParams = new URLSearchParams(options.searchParams) searchParams.set('arg', `${cid}`) - if (options.numProviders) searchParams.set('num-providers', options.numProviders) + if (options.numProviders) { searchParams.set('num-providers', options.numProviders) } if (options.verbose != null) searchParams.set('verbose', options.verbose) const res = await ky.post('dht/findprovs', { @@ -23,7 +23,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const message of ndjson(toIterable(res))) { + for await (const message of ndjson(toAsyncIterable(res))) { // 4 = Provider // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L20 if (message.Type === 4 && message.Responses) { diff --git a/src/dht/get.js b/src/dht/get.js index 68544b692..1c4003013 100644 --- a/src/dht/get.js +++ b/src/dht/get.js @@ -2,7 +2,7 @@ const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * get (key, options) { @@ -19,7 +19,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const message of ndjson(toIterable(res))) { + for await (const message of ndjson(toAsyncIterable(res))) { // 5 = Value // https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L21 if (message.Type === 5) { diff --git a/src/dht/provide.js b/src/dht/provide.js index a11ac096a..f88e10753 100644 --- a/src/dht/provide.js +++ b/src/dht/provide.js @@ -5,7 +5,7 @@ const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { @@ -15,7 +15,7 @@ module.exports = configure(({ ky }) => { const searchParams = new URLSearchParams(options.searchParams) cids.forEach(cid => searchParams.append('arg', `${cid}`)) - if (options.recursive != null) searchParams.set('recursive', options.recursive) + if (options.recursive != null) { searchParams.set('recursive', options.recursive) } if (options.verbose != null) searchParams.set('verbose', options.verbose) const res = await ky.post('dht/provide', { @@ -25,7 +25,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (let message of ndjson(toIterable(res))) { + for await (let message of ndjson(toAsyncIterable(res))) { message = toCamel(message) if (message.responses) { message.responses = message.responses.map(({ ID, Addrs }) => { diff --git a/src/dht/put.js b/src/dht/put.js index 3443e5189..be14b4d27 100644 --- a/src/dht/put.js +++ b/src/dht/put.js @@ -5,7 +5,7 @@ const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const encodeBufferURIComponent = require('../lib/encode-buffer-uri-component') const toCamel = require('../lib/object-to-camel') @@ -16,8 +16,12 @@ module.exports = configure(({ ky }) => { const searchParams = new URLSearchParams(options.searchParams) if (options.verbose != null) searchParams.set('verbose', options.verbose) - key = Buffer.isBuffer(key) ? encodeBufferURIComponent(key) : encodeURIComponent(key) - value = Buffer.isBuffer(value) ? encodeBufferURIComponent(value) : encodeURIComponent(value) + key = Buffer.isBuffer(key) + ? encodeBufferURIComponent(key) + : encodeURIComponent(key) + value = Buffer.isBuffer(value) + ? encodeBufferURIComponent(value) + : encodeURIComponent(value) const url = `dht/put?arg=${key}&arg=${value}&${searchParams}` const res = await ky.post(url, { @@ -26,7 +30,7 @@ module.exports = configure(({ ky }) => { headers: options.headers }) - for await (let message of ndjson(toIterable(res))) { + for await (let message of ndjson(toAsyncIterable(res))) { message = toCamel(message) if (message.responses) { message.responses = message.responses.map(({ ID, Addrs }) => { diff --git a/src/dht/query.js b/src/dht/query.js index 03d340b89..a6e526f60 100644 --- a/src/dht/query.js +++ b/src/dht/query.js @@ -4,7 +4,7 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * query (peerId, options) { @@ -21,7 +21,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const message of ndjson(toIterable(res))) { + for await (const message of ndjson(toAsyncIterable(res))) { yield new PeerInfo(PeerId.createFromB58String(message.ID)) } } diff --git a/src/files/ls.js b/src/files/ls.js index 8b4371bb8..afd10bd0a 100644 --- a/src/files/ls.js +++ b/src/files/ls.js @@ -2,7 +2,7 @@ const CID = require('cids') const ndjson = require('iterable-ndjson') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const configure = require('../lib/configure') const toCamelWithMetadata = require('../lib/object-to-camel-with-metadata') @@ -28,7 +28,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const result of ndjson(toIterable(res))) { + for await (const result of ndjson(toAsyncIterable(res))) { // go-ipfs does not yet support the "stream" option if ('Entries' in result) { for (const entry of result.Entries || []) { diff --git a/src/files/read.js b/src/files/read.js index 4c20afb8c..f2ba05af6 100644 --- a/src/files/read.js +++ b/src/files/read.js @@ -2,7 +2,7 @@ const { Buffer } = require('buffer') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * read (path, options) { @@ -20,7 +20,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const chunk of toIterable(res)) { + for await (const chunk of toAsyncIterable(res)) { yield Buffer.from(chunk) } } diff --git a/src/get.js b/src/get.js index 45e67c08b..221a7616d 100644 --- a/src/get.js +++ b/src/get.js @@ -4,7 +4,7 @@ const configure = require('./lib/configure') const Tar = require('it-tar') const { Buffer } = require('buffer') const CID = require('cids') -const toIterable = require('./lib/stream-to-iterable') +const toAsyncIterable = require('./lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * get (path, options) { @@ -38,7 +38,7 @@ module.exports = configure(({ ky }) => { const extractor = Tar.extract() - for await (const { header, body } of extractor(toIterable(res))) { + for await (const { header, body } of extractor(toAsyncIterable(res))) { if (header.type === 'directory') { yield { path: header.name diff --git a/src/lib/stream-to-iterable.js b/src/lib/stream-to-async-iterable.js similarity index 73% rename from src/lib/stream-to-iterable.js rename to src/lib/stream-to-async-iterable.js index 7366ccdd6..e728e464f 100644 --- a/src/lib/stream-to-iterable.js +++ b/src/lib/stream-to-async-iterable.js @@ -1,17 +1,19 @@ 'use strict' -module.exports = function toIterable (res) { +module.exports = function toAsyncIterable (res) { + const { body } = res + // An env where res.body getter for ReadableStream with getReader // is not supported, for example in React Native - if (!res.body) { - return (async function * () { - const arrayBuffer = await res.arrayBuffer() - yield arrayBuffer - })() + if (!body) { + if (res.arrayBuffer) { + return (async function * () { + const arrayBuffer = await res.arrayBuffer() + yield arrayBuffer + })() + } } - const { body } = res - // Node.js stream if (body[Symbol.asyncIterator]) return body diff --git a/src/log/tail.js b/src/log/tail.js index dcf88eac7..652741dad 100644 --- a/src/log/tail.js +++ b/src/log/tail.js @@ -2,7 +2,7 @@ const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * tail (options) { @@ -15,6 +15,6 @@ module.exports = configure(({ ky }) => { searchParams: options.searchParams }) - yield * ndjson(toIterable(res)) + yield * ndjson(toAsyncIterable(res)) } }) diff --git a/src/object/data.js b/src/object/data.js index 5f5b1d0d3..d053db630 100644 --- a/src/object/data.js +++ b/src/object/data.js @@ -3,7 +3,7 @@ const { Buffer } = require('buffer') const CID = require('cids') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * data (cid, options) { @@ -19,7 +19,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const chunk of toIterable(res)) { + for await (const chunk of toAsyncIterable(res)) { yield Buffer.from(chunk) } } diff --git a/src/ping.js b/src/ping.js index d547bf69e..d158efee6 100644 --- a/src/ping.js +++ b/src/ping.js @@ -2,7 +2,7 @@ const ndjson = require('iterable-ndjson') const configure = require('./lib/configure') -const toIterable = require('./lib/stream-to-iterable') +const toAsyncIterable = require('./lib/stream-to-async-iterable') const toCamel = require('./lib/object-to-camel') module.exports = configure(({ ky }) => { @@ -20,7 +20,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const chunk of ndjson(toIterable(res))) { + for await (const chunk of ndjson(toAsyncIterable(res))) { yield toCamel(chunk) } } diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js index 934f6b51b..c7f3ee97c 100644 --- a/src/pubsub/subscribe.js +++ b/src/pubsub/subscribe.js @@ -6,10 +6,10 @@ const bs58 = require('bs58') const { Buffer } = require('buffer') const log = require('debug')('ipfs-http-client:pubsub:subscribe') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const SubscriptionTracker = require('./subscription-tracker') -module.exports = configure((config) => { +module.exports = configure(config => { const ky = config.ky const subsTracker = SubscriptionTracker.singleton() const publish = require('./publish')(config) @@ -20,7 +20,7 @@ module.exports = configure((config) => { const searchParams = new URLSearchParams(options.searchParams) searchParams.set('arg', topic) - if (options.discover != null) searchParams.set('discover', options.discover) + if (options.discover != null) { searchParams.set('discover', options.discover) } let res @@ -28,7 +28,9 @@ module.exports = configure((config) => { // is received. If this doesn't happen within 1 second send an empty message // to kickstart the process. const ffWorkaround = setTimeout(async () => { - log(`Publishing empty message to "${topic}" to resolve subscription request`) + log( + `Publishing empty message to "${topic}" to resolve subscription request` + ) try { await publish(topic, Buffer.alloc(0), options) } catch (err) { @@ -43,14 +45,15 @@ module.exports = configure((config) => { headers: options.headers, searchParams }) - } catch (err) { // Initial subscribe fail, ensure we clean up + } catch (err) { + // Initial subscribe fail, ensure we clean up subsTracker.unsubscribe(topic, handler) throw err } clearTimeout(ffWorkaround) - readMessages(ndjson(toIterable(res)), { + readMessages(ndjson(toAsyncIterable(res)), { onMessage: handler, onEnd: () => subsTracker.unsubscribe(topic, handler), onError: options.onError diff --git a/src/refs/index.js b/src/refs/index.js index df4bb17e7..87cdd6e4e 100644 --- a/src/refs/index.js +++ b/src/refs/index.js @@ -4,11 +4,11 @@ const configure = require('../lib/configure') const { Buffer } = require('buffer') const CID = require('cids') const ndjson = require('iterable-ndjson') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const toCamel = require('../lib/object-to-camel') module.exports = config => { - const refs = (configure(({ ky }) => { + const refs = configure(({ ky }) => { return async function * refs (args, options) { options = options || {} @@ -39,7 +39,10 @@ module.exports = config => { } for (const arg of args) { - searchParams.append('arg', `${Buffer.isBuffer(arg) ? new CID(arg) : arg}`) + searchParams.append( + 'arg', + `${Buffer.isBuffer(arg) ? new CID(arg) : arg}` + ) } const res = await ky.post('refs', { @@ -49,11 +52,11 @@ module.exports = config => { searchParams }) - for await (const file of ndjson(toIterable(res))) { + for await (const file of ndjson(toAsyncIterable(res))) { yield toCamel(file) } } - }))(config) + })(config) refs.local = require('./local')(config) diff --git a/src/refs/local.js b/src/refs/local.js index c92fcf24a..da927edea 100644 --- a/src/refs/local.js +++ b/src/refs/local.js @@ -2,7 +2,7 @@ const configure = require('../lib/configure') const ndjson = require('iterable-ndjson') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { @@ -15,7 +15,7 @@ module.exports = configure(({ ky }) => { headers: options.headers }) - for await (const file of ndjson(toIterable(res))) { + for await (const file of ndjson(toAsyncIterable(res))) { yield toCamel(file) } } diff --git a/src/repo/gc.js b/src/repo/gc.js index 4e1b75951..0c61dffe8 100644 --- a/src/repo/gc.js +++ b/src/repo/gc.js @@ -3,14 +3,14 @@ const CID = require('cids') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * gc (peerId, options) { options = options || {} const searchParams = new URLSearchParams(options.searchParams) - if (options.streamErrors) searchParams.set('stream-errors', options.streamErrors) + if (options.streamErrors) { searchParams.set('stream-errors', options.streamErrors) } const res = await ky.post('repo/gc', { timeout: options.timeout, @@ -19,7 +19,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const gcResult of ndjson(toIterable(res))) { + for await (const gcResult of ndjson(toAsyncIterable(res))) { yield { err: gcResult.Error ? new Error(gcResult.Error) : null, cid: (gcResult.Key || {})['/'] ? new CID(gcResult.Key['/']) : null diff --git a/src/stats/bw.js b/src/stats/bw.js index 6986d6c6b..5e7271ce0 100644 --- a/src/stats/bw.js +++ b/src/stats/bw.js @@ -3,7 +3,7 @@ const ndjson = require('iterable-ndjson') const Big = require('bignumber.js') const configure = require('../lib/configure') -const toIterable = require('../lib/stream-to-iterable') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * bw (options) { @@ -22,7 +22,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const stats of ndjson(toIterable(res))) { + for await (const stats of ndjson(toAsyncIterable(res))) { yield { totalIn: new Big(stats.TotalIn), totalOut: new Big(stats.TotalOut), diff --git a/test/lib.stream-to-async-iterable.spec.js b/test/lib.stream-to-async-iterable.spec.js new file mode 100644 index 000000000..e01d1b391 --- /dev/null +++ b/test/lib.stream-to-async-iterable.spec.js @@ -0,0 +1,66 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('interface-ipfs-core/src/utils/mocha') +const toAsyncIterable = require('../src/lib/stream-to-async-iterable') + +describe('lib/stream-to-async-iterable', () => { + it('should return input if already async iterable', () => { + const input = { + [Symbol.asyncIterator] () { + return this + } + } + const res = { body: input } + expect(toAsyncIterable(res)).to.equal(input) + }) + + it('should convert reader to async iterable', async () => { + const inputData = [2, 31, 3, 4] + + const input = { + getReader () { + let i = 0 + return { + read () { + return Promise.resolve( + i === inputData.length + ? { done: true } + : { value: inputData[i++] } + ) + }, + releaseLock () {} + } + } + } + const res = { body: input } + + const chunks = [] + for await (const chunk of toAsyncIterable(res)) { + chunks.push(chunk) + } + + expect(chunks).to.eql(inputData) + }) + + it('should return an async iterable even if res.body is undefined', async () => { + const inputData = [2] + const res = { + arrayBuffer () { + return Promise.resolve(inputData[0]) + } + } + + const chunks = [] + for await (const chunk of toAsyncIterable(res)) { + chunks.push(chunk) + } + + expect(chunks).to.eql(inputData) + }) + + it('should throw on unknown stream', () => { + const res = { body: {} } + expect(() => toAsyncIterable(res)).to.throw('unknown stream') + }) +}) diff --git a/test/lib.stream-to-iterable.spec.js b/test/lib.stream-to-iterable.spec.js deleted file mode 100644 index 3ad326480..000000000 --- a/test/lib.stream-to-iterable.spec.js +++ /dev/null @@ -1,40 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const { expect } = require('interface-ipfs-core/src/utils/mocha') -const toIterable = require('../src/lib/stream-to-iterable') - -describe('lib/stream-to-iterable', () => { - it('should return input if already async iterable', () => { - const input = { [Symbol.asyncIterator] () { return this } } - expect(toIterable(input)).to.equal(input) - }) - - it('should convert reader to async iterable', async () => { - const inputData = [2, 31, 3, 4] - const input = { - getReader () { - let i = 0 - return { - read () { - return i === inputData.length - ? { done: true } - : { value: inputData[i++] } - }, - releaseLock () {} - } - } - } - - const chunks = [] - for await (const chunk of toIterable(input)) { - chunks.push(chunk) - } - - expect(chunks).to.eql(inputData) - }) - - it('should throw on unknown stream', () => { - expect(() => toIterable({})).to.throw('unknown stream') - }) -}) From 3ee00160bda3ce760bb615248c122484d9e2d6c1 Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Wed, 22 Jan 2020 10:24:44 -0600 Subject: [PATCH 03/12] Added local editor config dir to .gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 77203bd99..4651d7509 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ coverage examples/sub-module/**/bundle.js examples/sub-module/**/*-minified.js examples/sub-module/*-bundle.js + +.vscode/ From 74c286ed08862fc7775a2f5ba51a44248faebcef Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Wed, 22 Jan 2020 10:25:24 -0600 Subject: [PATCH 04/12] Handling case where body and arrayBuffer both are not defined --- src/lib/stream-to-async-iterable.js | 2 ++ test/lib.stream-to-async-iterable.spec.js | 15 ++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/lib/stream-to-async-iterable.js b/src/lib/stream-to-async-iterable.js index e728e464f..d04f2fd97 100644 --- a/src/lib/stream-to-async-iterable.js +++ b/src/lib/stream-to-async-iterable.js @@ -11,6 +11,8 @@ module.exports = function toAsyncIterable (res) { const arrayBuffer = await res.arrayBuffer() yield arrayBuffer })() + } else { + throw new Error('Neither Response.body nor Response.arrayBuffer is defined') } } diff --git a/test/lib.stream-to-async-iterable.spec.js b/test/lib.stream-to-async-iterable.spec.js index e01d1b391..01adb3e61 100644 --- a/test/lib.stream-to-async-iterable.spec.js +++ b/test/lib.stream-to-async-iterable.spec.js @@ -7,7 +7,7 @@ const toAsyncIterable = require('../src/lib/stream-to-async-iterable') describe('lib/stream-to-async-iterable', () => { it('should return input if already async iterable', () => { const input = { - [Symbol.asyncIterator] () { + [Symbol.asyncIterator]() { return this } } @@ -19,17 +19,17 @@ describe('lib/stream-to-async-iterable', () => { const inputData = [2, 31, 3, 4] const input = { - getReader () { + getReader() { let i = 0 return { - read () { + read() { return Promise.resolve( i === inputData.length ? { done: true } : { value: inputData[i++] } ) }, - releaseLock () {} + releaseLock() { } } } } @@ -46,7 +46,7 @@ describe('lib/stream-to-async-iterable', () => { it('should return an async iterable even if res.body is undefined', async () => { const inputData = [2] const res = { - arrayBuffer () { + arrayBuffer() { return Promise.resolve(inputData[0]) } } @@ -59,6 +59,11 @@ describe('lib/stream-to-async-iterable', () => { expect(chunks).to.eql(inputData) }) + it('should throw if res.body and res.arrayBuffer are undefined', () => { + const res = {} + expect(() => toAsyncIterable(res)).to.throw('Neither Response.body nor Response.arrayBuffer is defined') + }) + it('should throw on unknown stream', () => { const res = { body: {} } expect(() => toAsyncIterable(res)).to.throw('unknown stream') From 528a0b9f18af550718860782324fc83e3dd2c091 Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Wed, 22 Jan 2020 10:37:40 -0600 Subject: [PATCH 05/12] Reverting whitespace changes --- src/add/index.js | 33 +++++++++------------------------ src/dht/put.js | 10 +++------- 2 files changed, 12 insertions(+), 31 deletions(-) diff --git a/src/add/index.js b/src/add/index.js index 83d0af9a7..02b555cfa 100644 --- a/src/add/index.js +++ b/src/add/index.js @@ -7,7 +7,7 @@ const { toFormData } = require('./form-data') const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { - return async function * add (input, options) { + return async function* add(input, options) { options = options || {} const searchParams = new URLSearchParams(options.searchParams) @@ -16,36 +16,21 @@ module.exports = configure(({ ky }) => { if (options.chunker) searchParams.set('chunker', options.chunker) if (options.cidVersion) searchParams.set('cid-version', options.cidVersion) if (options.cidBase) searchParams.set('cid-base', options.cidBase) - if (options.enableShardingExperiment != null) { - searchParams.set( - 'enable-sharding-experiment', - options.enableShardingExperiment - ) - } + if (options.enableShardingExperiment != null) searchParams.set('enable-sharding-experiment', options.enableShardingExperiment) if (options.hashAlg) searchParams.set('hash', options.hashAlg) - if (options.onlyHash != null) { searchParams.set('only-hash', options.onlyHash) } + if (options.onlyHash != null) searchParams.set('only-hash', options.onlyHash) if (options.pin != null) searchParams.set('pin', options.pin) if (options.progress) searchParams.set('progress', true) if (options.quiet != null) searchParams.set('quiet', options.quiet) if (options.quieter != null) searchParams.set('quieter', options.quieter) - if (options.rawLeaves != null) { searchParams.set('raw-leaves', options.rawLeaves) } - if (options.shardSplitThreshold) { searchParams.set('shard-split-threshold', options.shardSplitThreshold) } + if (options.rawLeaves != null) searchParams.set('raw-leaves', options.rawLeaves) + if (options.shardSplitThreshold) searchParams.set('shard-split-threshold', options.shardSplitThreshold) if (options.silent) searchParams.set('silent', options.silent) if (options.trickle != null) searchParams.set('trickle', options.trickle) - if (options.wrapWithDirectory != null) { searchParams.set('wrap-with-directory', options.wrapWithDirectory) } + if (options.wrapWithDirectory != null) searchParams.set('wrap-with-directory', options.wrapWithDirectory) if (options.preload != null) searchParams.set('preload', options.preload) - if (options.fileImportConcurrency != null) { - searchParams.set( - 'file-import-concurrency', - options.fileImportConcurrency - ) - } - if (options.blockWriteConcurrency != null) { - searchParams.set( - 'block-write-concurrency', - options.blockWriteConcurrency - ) - } + if (options.fileImportConcurrency != null) searchParams.set('file-import-concurrency', options.fileImportConcurrency) + if (options.blockWriteConcurrency != null) searchParams.set('block-write-concurrency', options.blockWriteConcurrency) const res = await ky.post('add', { timeout: options.timeout, @@ -67,7 +52,7 @@ module.exports = configure(({ ky }) => { } }) -function toCoreInterface ({ name, hash, size, mode, mtime }) { +function toCoreInterface({ name, hash, size, mode, mtime }) { const output = { path: name, hash, diff --git a/src/dht/put.js b/src/dht/put.js index be14b4d27..aa4a81dae 100644 --- a/src/dht/put.js +++ b/src/dht/put.js @@ -10,18 +10,14 @@ const encodeBufferURIComponent = require('../lib/encode-buffer-uri-component') const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { - return async function * put (key, value, options) { + return async function* put(key, value, options) { options = options || {} const searchParams = new URLSearchParams(options.searchParams) if (options.verbose != null) searchParams.set('verbose', options.verbose) - key = Buffer.isBuffer(key) - ? encodeBufferURIComponent(key) - : encodeURIComponent(key) - value = Buffer.isBuffer(value) - ? encodeBufferURIComponent(value) - : encodeURIComponent(value) + key = Buffer.isBuffer(key) ? encodeBufferURIComponent(key) : encodeURIComponent(key) + value = Buffer.isBuffer(value) ? encodeBufferURIComponent(value) : encodeURIComponent(value) const url = `dht/put?arg=${key}&arg=${value}&${searchParams}` const res = await ky.post(url, { From 4ea0b031299ca0d9cd2c79a37332e12d5ab82926 Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Wed, 22 Jan 2020 10:58:32 -0600 Subject: [PATCH 06/12] Reverting more automatic style changes --- src/add/index.js | 4 ++-- src/dht/find-provs.js | 2 +- src/dht/provide.js | 2 +- src/pubsub/subscribe.js | 11 ++++------- src/refs/index.js | 9 +++------ src/repo/gc.js | 2 +- 6 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/add/index.js b/src/add/index.js index 02b555cfa..71050449a 100644 --- a/src/add/index.js +++ b/src/add/index.js @@ -7,7 +7,7 @@ const { toFormData } = require('./form-data') const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { - return async function* add(input, options) { + return async function * add (input, options) { options = options || {} const searchParams = new URLSearchParams(options.searchParams) @@ -52,7 +52,7 @@ module.exports = configure(({ ky }) => { } }) -function toCoreInterface({ name, hash, size, mode, mtime }) { +function toCoreInterface ({ name, hash, size, mode, mtime }) { const output = { path: name, hash, diff --git a/src/dht/find-provs.js b/src/dht/find-provs.js index 4816df18c..8bb6c86de 100644 --- a/src/dht/find-provs.js +++ b/src/dht/find-provs.js @@ -13,7 +13,7 @@ module.exports = configure(({ ky }) => { const searchParams = new URLSearchParams(options.searchParams) searchParams.set('arg', `${cid}`) - if (options.numProviders) { searchParams.set('num-providers', options.numProviders) } + if (options.numProviders) searchParams.set('num-providers', options.numProviders) if (options.verbose != null) searchParams.set('verbose', options.verbose) const res = await ky.post('dht/findprovs', { diff --git a/src/dht/provide.js b/src/dht/provide.js index f88e10753..89184fe7d 100644 --- a/src/dht/provide.js +++ b/src/dht/provide.js @@ -15,7 +15,7 @@ module.exports = configure(({ ky }) => { const searchParams = new URLSearchParams(options.searchParams) cids.forEach(cid => searchParams.append('arg', `${cid}`)) - if (options.recursive != null) { searchParams.set('recursive', options.recursive) } + if (options.recursive != null) searchParams.set('recursive', options.recursive) if (options.verbose != null) searchParams.set('verbose', options.verbose) const res = await ky.post('dht/provide', { diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js index c7f3ee97c..794400f58 100644 --- a/src/pubsub/subscribe.js +++ b/src/pubsub/subscribe.js @@ -9,7 +9,7 @@ const configure = require('../lib/configure') const toAsyncIterable = require('../lib/stream-to-async-iterable') const SubscriptionTracker = require('./subscription-tracker') -module.exports = configure(config => { +module.exports = configure((config) => { const ky = config.ky const subsTracker = SubscriptionTracker.singleton() const publish = require('./publish')(config) @@ -20,7 +20,7 @@ module.exports = configure(config => { const searchParams = new URLSearchParams(options.searchParams) searchParams.set('arg', topic) - if (options.discover != null) { searchParams.set('discover', options.discover) } + if (options.discover != null) searchParams.set('discover', options.discover) let res @@ -28,9 +28,7 @@ module.exports = configure(config => { // is received. If this doesn't happen within 1 second send an empty message // to kickstart the process. const ffWorkaround = setTimeout(async () => { - log( - `Publishing empty message to "${topic}" to resolve subscription request` - ) + log(`Publishing empty message to "${topic}" to resolve subscription request`) try { await publish(topic, Buffer.alloc(0), options) } catch (err) { @@ -45,8 +43,7 @@ module.exports = configure(config => { headers: options.headers, searchParams }) - } catch (err) { - // Initial subscribe fail, ensure we clean up + } catch (err) { // Initial subscribe fail, ensure we clean up subsTracker.unsubscribe(topic, handler) throw err } diff --git a/src/refs/index.js b/src/refs/index.js index 87cdd6e4e..8f21fbaac 100644 --- a/src/refs/index.js +++ b/src/refs/index.js @@ -8,7 +8,7 @@ const toAsyncIterable = require('../lib/stream-to-async-iterable') const toCamel = require('../lib/object-to-camel') module.exports = config => { - const refs = configure(({ ky }) => { + const refs = (configure(({ ky }) => { return async function * refs (args, options) { options = options || {} @@ -39,10 +39,7 @@ module.exports = config => { } for (const arg of args) { - searchParams.append( - 'arg', - `${Buffer.isBuffer(arg) ? new CID(arg) : arg}` - ) + searchParams.append('arg', `${Buffer.isBuffer(arg) ? new CID(arg) : arg}`) } const res = await ky.post('refs', { @@ -56,7 +53,7 @@ module.exports = config => { yield toCamel(file) } } - })(config) + }))(config) refs.local = require('./local')(config) diff --git a/src/repo/gc.js b/src/repo/gc.js index 0c61dffe8..abaf4821d 100644 --- a/src/repo/gc.js +++ b/src/repo/gc.js @@ -10,7 +10,7 @@ module.exports = configure(({ ky }) => { options = options || {} const searchParams = new URLSearchParams(options.searchParams) - if (options.streamErrors) { searchParams.set('stream-errors', options.streamErrors) } + if (options.streamErrors) searchParams.set('stream-errors', options.streamErrors) const res = await ky.post('repo/gc', { timeout: options.timeout, From 02b62699cf8e0e12ecaa845865bfb3f186982d0d Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Wed, 22 Jan 2020 11:01:13 -0600 Subject: [PATCH 07/12] Reverting paren change for single param arrow function --- src/add-from-url.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/add-from-url.js b/src/add-from-url.js index b3a1ab570..f02a3d46d 100644 --- a/src/add-from-url.js +++ b/src/add-from-url.js @@ -3,7 +3,7 @@ const kyDefault = require('ky-universal').default const toAsyncIterable = require('./lib/stream-to-async-iterable') -module.exports = config => { +module.exports = (config) => { const add = require('./add')(config) return async function * addFromURL (url, options) { From 4819f0e7c24cbd2f29c4142538b0567da8fa68ac Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Wed, 22 Jan 2020 11:02:31 -0600 Subject: [PATCH 08/12] Reverting generator function style change --- src/dht/put.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dht/put.js b/src/dht/put.js index aa4a81dae..bcce7d0ee 100644 --- a/src/dht/put.js +++ b/src/dht/put.js @@ -10,7 +10,7 @@ const encodeBufferURIComponent = require('../lib/encode-buffer-uri-component') const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { - return async function* put(key, value, options) { + return async function * put (key, value, options) { options = options || {} const searchParams = new URLSearchParams(options.searchParams) From 7a0d504d3d84a9e2e8af59ab332f23329c39ad5e Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Wed, 22 Jan 2020 14:28:30 -0600 Subject: [PATCH 09/12] Fixed linting errors --- test/lib.stream-to-async-iterable.spec.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/lib.stream-to-async-iterable.spec.js b/test/lib.stream-to-async-iterable.spec.js index 01adb3e61..2351198d4 100644 --- a/test/lib.stream-to-async-iterable.spec.js +++ b/test/lib.stream-to-async-iterable.spec.js @@ -7,7 +7,7 @@ const toAsyncIterable = require('../src/lib/stream-to-async-iterable') describe('lib/stream-to-async-iterable', () => { it('should return input if already async iterable', () => { const input = { - [Symbol.asyncIterator]() { + [Symbol.asyncIterator] () { return this } } @@ -19,17 +19,17 @@ describe('lib/stream-to-async-iterable', () => { const inputData = [2, 31, 3, 4] const input = { - getReader() { + getReader () { let i = 0 return { - read() { + read () { return Promise.resolve( i === inputData.length ? { done: true } : { value: inputData[i++] } ) }, - releaseLock() { } + releaseLock () { } } } } @@ -46,7 +46,7 @@ describe('lib/stream-to-async-iterable', () => { it('should return an async iterable even if res.body is undefined', async () => { const inputData = [2] const res = { - arrayBuffer() { + arrayBuffer () { return Promise.resolve(inputData[0]) } } From 250848946b59bd804894134ed0b0829211c28de6 Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Tue, 28 Jan 2020 16:49:15 -0600 Subject: [PATCH 10/12] Augmenting the behavior of stream-to-it/source --- .gitignore | 1 + src/add/index.js | 2 +- src/block/rm.js | 2 +- src/cat.js | 2 +- src/dht/find-peer.js | 2 +- src/dht/find-provs.js | 2 +- src/dht/get.js | 2 +- src/dht/provide.js | 2 +- src/dht/put.js | 2 +- src/dht/query.js | 2 +- src/files/ls.js | 2 +- src/files/read.js | 2 +- src/get.js | 2 +- src/lib/stream-to-async-iterable.js | 29 ++++++++--------------------- src/log/tail.js | 2 +- src/ls.js | 2 +- src/name/resolve.js | 2 +- src/pin/ls.js | 2 +- src/ping.js | 2 +- src/pubsub/subscribe.js | 2 +- src/refs/index.js | 2 +- src/refs/local.js | 2 +- src/repo/gc.js | 2 +- src/stats/bw.js | 2 +- 24 files changed, 31 insertions(+), 43 deletions(-) diff --git a/.gitignore b/.gitignore index 4651d7509..3421ead26 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ examples/sub-module/**/*-minified.js examples/sub-module/*-bundle.js .vscode/ +.DS_Store diff --git a/src/add/index.js b/src/add/index.js index d7e02c341..09d9e4b9e 100644 --- a/src/add/index.js +++ b/src/add/index.js @@ -3,7 +3,7 @@ const ndjson = require('iterable-ndjson') const CID = require('cids') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const { toFormData } = require('./form-data') const toCamel = require('../lib/object-to-camel') diff --git a/src/block/rm.js b/src/block/rm.js index 0e93d6dc6..c73988775 100644 --- a/src/block/rm.js +++ b/src/block/rm.js @@ -3,7 +3,7 @@ const CID = require('cids') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * rm (cid, options) { diff --git a/src/cat.js b/src/cat.js index a047f1686..c0a6b5854 100644 --- a/src/cat.js +++ b/src/cat.js @@ -3,7 +3,7 @@ const CID = require('cids') const { Buffer } = require('buffer') const configure = require('./lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * cat (path, options) { diff --git a/src/dht/find-peer.js b/src/dht/find-peer.js index 25d8c69dc..52b9eca66 100644 --- a/src/dht/find-peer.js +++ b/src/dht/find-peer.js @@ -5,7 +5,7 @@ const CID = require('cids') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function findPeer (peerId, options) { diff --git a/src/dht/find-provs.js b/src/dht/find-provs.js index ca42821de..80793495d 100644 --- a/src/dht/find-provs.js +++ b/src/dht/find-provs.js @@ -4,7 +4,7 @@ const CID = require('cids') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * findProvs (cid, options) { diff --git a/src/dht/get.js b/src/dht/get.js index 0ba02576f..4f162a365 100644 --- a/src/dht/get.js +++ b/src/dht/get.js @@ -2,7 +2,7 @@ const { Buffer } = require('buffer') const ndjson = require('iterable-ndjson') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const encodeBufferURIComponent = require('../lib/encode-buffer-uri-component') const configure = require('../lib/configure') diff --git a/src/dht/provide.js b/src/dht/provide.js index 1713ef8d5..7b9946dd7 100644 --- a/src/dht/provide.js +++ b/src/dht/provide.js @@ -4,7 +4,7 @@ const CID = require('cids') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { diff --git a/src/dht/put.js b/src/dht/put.js index 41b6f9011..042e46f6c 100644 --- a/src/dht/put.js +++ b/src/dht/put.js @@ -5,7 +5,7 @@ const CID = require('cids') const multiaddr = require('multiaddr') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const encodeBufferURIComponent = require('../lib/encode-buffer-uri-component') const toCamel = require('../lib/object-to-camel') diff --git a/src/dht/query.js b/src/dht/query.js index 25dcafb60..71ac78426 100644 --- a/src/dht/query.js +++ b/src/dht/query.js @@ -3,7 +3,7 @@ const CID = require('cids') const ndjson = require('iterable-ndjson') const multiaddr = require('multiaddr') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const configure = require('../lib/configure') const toCamel = require('../lib/object-to-camel') diff --git a/src/files/ls.js b/src/files/ls.js index 64c16a9bc..6a61e6c2a 100644 --- a/src/files/ls.js +++ b/src/files/ls.js @@ -2,7 +2,7 @@ const CID = require('cids') const ndjson = require('iterable-ndjson') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const configure = require('../lib/configure') const toCamelWithMetadata = require('../lib/object-to-camel-with-metadata') diff --git a/src/files/read.js b/src/files/read.js index 4e8ec6edd..f2ba05af6 100644 --- a/src/files/read.js +++ b/src/files/read.js @@ -2,7 +2,7 @@ const { Buffer } = require('buffer') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * read (path, options) { diff --git a/src/get.js b/src/get.js index 7eecfbfc5..b66d57e82 100644 --- a/src/get.js +++ b/src/get.js @@ -4,7 +4,7 @@ const configure = require('./lib/configure') const Tar = require('it-tar') const { Buffer } = require('buffer') const CID = require('cids') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * get (path, options) { diff --git a/src/lib/stream-to-async-iterable.js b/src/lib/stream-to-async-iterable.js index d04f2fd97..cdd6b36c6 100644 --- a/src/lib/stream-to-async-iterable.js +++ b/src/lib/stream-to-async-iterable.js @@ -1,5 +1,12 @@ 'use strict' +const toAsyncIterableOriginal = require('stream-to-it/source') + +// Note: Turned this into a helper that wraps `stream-to-it/source` +// to handle the body undefined case without requiring that other libs +// that consume that package such as `js-ipfs` and `js-ipfs-utils` modify +// how they use it + module.exports = function toAsyncIterable (res) { const { body } = res @@ -16,25 +23,5 @@ module.exports = function toAsyncIterable (res) { } } - // Node.js stream - if (body[Symbol.asyncIterator]) return body - - // Browser ReadableStream - if (body.getReader) { - return (async function * () { - const reader = body.getReader() - - try { - while (true) { - const { done, value } = await reader.read() - if (done) return - yield value - } - } finally { - reader.releaseLock() - } - })() - } - - throw new Error('unknown stream') + return toAsyncIterableOriginal(body) } diff --git a/src/log/tail.js b/src/log/tail.js index 259bac3d2..652741dad 100644 --- a/src/log/tail.js +++ b/src/log/tail.js @@ -2,7 +2,7 @@ const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * tail (options) { diff --git a/src/ls.js b/src/ls.js index 84fc6cf79..fe03ba7f1 100644 --- a/src/ls.js +++ b/src/ls.js @@ -3,7 +3,7 @@ const { Buffer } = require('buffer') const CID = require('cids') const ndjson = require('iterable-ndjson') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const configure = require('./lib/configure') module.exports = configure(({ ky }) => { diff --git a/src/name/resolve.js b/src/name/resolve.js index 99845c485..776ae1849 100644 --- a/src/name/resolve.js +++ b/src/name/resolve.js @@ -2,7 +2,7 @@ const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * (path, options) { diff --git a/src/pin/ls.js b/src/pin/ls.js index 97c8c0ccc..4d2976b3c 100644 --- a/src/pin/ls.js +++ b/src/pin/ls.js @@ -3,7 +3,7 @@ const ndjson = require('iterable-ndjson') const CID = require('cids') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * ls (path, options) { diff --git a/src/ping.js b/src/ping.js index e5a90df56..b5014c501 100644 --- a/src/ping.js +++ b/src/ping.js @@ -2,7 +2,7 @@ const ndjson = require('iterable-ndjson') const configure = require('./lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const toCamel = require('./lib/object-to-camel') module.exports = configure(({ ky }) => { diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js index 9c9dc7f96..ed56b42c2 100644 --- a/src/pubsub/subscribe.js +++ b/src/pubsub/subscribe.js @@ -5,7 +5,7 @@ const bs58 = require('bs58') const { Buffer } = require('buffer') const log = require('debug')('ipfs-http-client:pubsub:subscribe') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const SubscriptionTracker = require('./subscription-tracker') module.exports = configure((config) => { diff --git a/src/refs/index.js b/src/refs/index.js index 251cf40d2..8f21fbaac 100644 --- a/src/refs/index.js +++ b/src/refs/index.js @@ -4,7 +4,7 @@ const configure = require('../lib/configure') const { Buffer } = require('buffer') const CID = require('cids') const ndjson = require('iterable-ndjson') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const toCamel = require('../lib/object-to-camel') module.exports = config => { diff --git a/src/refs/local.js b/src/refs/local.js index e31f9233d..da927edea 100644 --- a/src/refs/local.js +++ b/src/refs/local.js @@ -2,7 +2,7 @@ const configure = require('../lib/configure') const ndjson = require('iterable-ndjson') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') const toCamel = require('../lib/object-to-camel') module.exports = configure(({ ky }) => { diff --git a/src/repo/gc.js b/src/repo/gc.js index 451b25bbe..abaf4821d 100644 --- a/src/repo/gc.js +++ b/src/repo/gc.js @@ -3,7 +3,7 @@ const CID = require('cids') const ndjson = require('iterable-ndjson') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * gc (peerId, options) { diff --git a/src/stats/bw.js b/src/stats/bw.js index 047459b54..5e7271ce0 100644 --- a/src/stats/bw.js +++ b/src/stats/bw.js @@ -3,7 +3,7 @@ const ndjson = require('iterable-ndjson') const Big = require('bignumber.js') const configure = require('../lib/configure') -const toAsyncIterable = require('stream-to-it/source') +const toAsyncIterable = require('../lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * bw (options) { From 658597173fea26285a60116a5573e7f4f9f9cae0 Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Tue, 28 Jan 2020 21:35:03 -0600 Subject: [PATCH 11/12] Fixed relative imports --- src/cat.js | 2 +- src/get.js | 2 +- src/ls.js | 2 +- src/ping.js | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cat.js b/src/cat.js index c0a6b5854..67fe92561 100644 --- a/src/cat.js +++ b/src/cat.js @@ -3,7 +3,7 @@ const CID = require('cids') const { Buffer } = require('buffer') const configure = require('./lib/configure') -const toAsyncIterable = require('../lib/stream-to-async-iterable') +const toAsyncIterable = require('./lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * cat (path, options) { diff --git a/src/get.js b/src/get.js index b66d57e82..221a7616d 100644 --- a/src/get.js +++ b/src/get.js @@ -4,7 +4,7 @@ const configure = require('./lib/configure') const Tar = require('it-tar') const { Buffer } = require('buffer') const CID = require('cids') -const toAsyncIterable = require('../lib/stream-to-async-iterable') +const toAsyncIterable = require('./lib/stream-to-async-iterable') module.exports = configure(({ ky }) => { return async function * get (path, options) { diff --git a/src/ls.js b/src/ls.js index fe03ba7f1..53ce45787 100644 --- a/src/ls.js +++ b/src/ls.js @@ -3,7 +3,7 @@ const { Buffer } = require('buffer') const CID = require('cids') const ndjson = require('iterable-ndjson') -const toAsyncIterable = require('../lib/stream-to-async-iterable') +const toAsyncIterable = require('./lib/stream-to-async-iterable') const configure = require('./lib/configure') module.exports = configure(({ ky }) => { diff --git a/src/ping.js b/src/ping.js index b5014c501..d158efee6 100644 --- a/src/ping.js +++ b/src/ping.js @@ -2,7 +2,7 @@ const ndjson = require('iterable-ndjson') const configure = require('./lib/configure') -const toAsyncIterable = require('../lib/stream-to-async-iterable') +const toAsyncIterable = require('./lib/stream-to-async-iterable') const toCamel = require('./lib/object-to-camel') module.exports = configure(({ ky }) => { From 843a97e37fff6fa60f5f34ea2df31553cefbc2a0 Mon Sep 17 00:00:00 2001 From: Paul Cowgill Date: Wed, 29 Jan 2020 10:53:55 -0600 Subject: [PATCH 12/12] Fixed test failures --- src/add/index.js | 4 +++- src/ls.js | 2 +- src/name/resolve.js | 2 +- src/pin/ls.js | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/add/index.js b/src/add/index.js index 09d9e4b9e..1146bd91b 100644 --- a/src/add/index.js +++ b/src/add/index.js @@ -33,12 +33,14 @@ module.exports = configure(({ ky }) => { if (options.fileImportConcurrency != null) searchParams.set('file-import-concurrency', options.fileImportConcurrency) if (options.blockWriteConcurrency != null) searchParams.set('block-write-concurrency', options.blockWriteConcurrency) + const formData = await toFormData(input) + const res = await ky.post('add', { timeout: options.timeout, signal: options.signal, headers: options.headers, searchParams, - body: await toFormData(input) + body: formData }) for await (let file of ndjson(toAsyncIterable(res))) { diff --git a/src/ls.js b/src/ls.js index 53ce45787..e29038ebd 100644 --- a/src/ls.js +++ b/src/ls.js @@ -25,7 +25,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (let result of ndjson(toAsyncIterable(res.body))) { + for await (let result of ndjson(toAsyncIterable(res))) { result = result.Objects if (!result) { diff --git a/src/name/resolve.js b/src/name/resolve.js index 776ae1849..0eeebf0bb 100644 --- a/src/name/resolve.js +++ b/src/name/resolve.js @@ -23,7 +23,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const result of ndjson(toAsyncIterable(res.body))) { + for await (const result of ndjson(toAsyncIterable(res))) { yield result.Path } } diff --git a/src/pin/ls.js b/src/pin/ls.js index 4d2976b3c..b89389477 100644 --- a/src/pin/ls.js +++ b/src/pin/ls.js @@ -28,7 +28,7 @@ module.exports = configure(({ ky }) => { searchParams }) - for await (const pin of ndjson(toAsyncIterable(res.body))) { + for await (const pin of ndjson(toAsyncIterable(res))) { if (pin.Keys) { // non-streaming response for (const cid of Object.keys(pin.Keys)) { yield { cid: new CID(cid), type: pin.Keys[cid].Type }