diff --git a/.gitignore b/.gitignore index a589b3fa49..40b58eb613 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,8 @@ +.DS_Store +.connect-deps-cache/ +.connect-deps.json +prettier.config.js + # Dependency tools package-lock.json yarn.lock diff --git a/packages/ipfs-http-client/src/add/form-data.browser.js b/packages/ipfs-http-client/src/add/form-data.browser.js index 2b26f7abec..77069b8149 100644 --- a/packages/ipfs-http-client/src/add/form-data.browser.js +++ b/packages/ipfs-http-client/src/add/form-data.browser.js @@ -5,15 +5,21 @@ const normaliseInput = require('ipfs-utils/src/files/normalise-input') const mtimeToObject = require('../lib/mtime-to-object') exports.toFormData = async input => { + console.log("toFormData browser"); const files = normaliseInput(input) + console.log({ files }) const formData = new FormData() + + // console.log({ formData }) let i = 0 for await (const file of files) { // TODO FormData.append doesnt have a 4th arg + console.log({ file }) const headers = {} if (file.mtime !== undefined && file.mtime !== null) { + console.log("file.mtime !== undefined && file.mtime !== null") const mtime = mtimeToObject(file.mtime) if (mtime) { @@ -23,26 +29,39 @@ exports.toFormData = async input => { } if (file.mode !== undefined && file.mode !== null) { + console.log("file.mode !== undefined && file.mode !== null") headers.mode = file.mode.toString(8).padStart(4, '0') } if (file.content) { + console.log("file.content", file.content) // In the browser there's _currently_ no streaming upload, buffer up our // async iterator chunks and append a big Blob :( // One day, this will be browser streams const bufs = [] for await (const chunk of file.content) { + console.log("file.content chunk", chunk) bufs.push(chunk) } + const newBlob = new Blob(bufs, { type: 'application/octet-stream' }); + console.log({ newBlob }) + + if (newBlob.data) { + const newBlobData = newBlob.data; + console.log("newBlob data", newBlobData); + } + + const encodedUriComponent = encodeURIComponent(file.path); formData.append( `file-${i}`, - new Blob(bufs, { type: 'application/octet-stream' }), - encodeURIComponent(file.path) + newBlob, + encodedUriComponent // { // header: headers // } ) + } else { formData.append( `dir-${i}`, @@ -54,8 +73,32 @@ exports.toFormData = async input => { ) } + const keyToGet = 'file-0' + + if (formData.entries) { + const formDataEntries = formData.entries(); + for(var pair of formDataEntries) { + console.log(pair[0]+ ', '+ pair[1]); + } + const field = formData.get(keyToGet) + console.log({ field }) + } + + if (formData.getParts) { + const formDataParts = formData.getParts(); + console.log({ formDataParts }) + const field = formDataParts.find(item => item.fieldName === keyToGet); + if (field) { + console.log({ field }) + } + } + i++ } + // for (var p of formData.entries()) { + // console.log({ p }) + // } + return formData } diff --git a/packages/ipfs-http-client/src/add/form-data.js b/packages/ipfs-http-client/src/add/form-data.js index 80411c3c6d..7e244f4166 100644 --- a/packages/ipfs-http-client/src/add/form-data.js +++ b/packages/ipfs-http-client/src/add/form-data.js @@ -7,6 +7,7 @@ const normaliseInput = require('ipfs-utils/src/files/normalise-input') const mtimeToObject = require('../lib/mtime-to-object') exports.toFormData = async input => { + console.log("toFormData regular NOT USED IN REACT NATIVE"); const files = normaliseInput(input) const formData = new FormData() let i = 0 diff --git a/packages/ipfs-http-client/src/add/index.js b/packages/ipfs-http-client/src/add/index.js index 94e3fd6dc3..d1488c08ec 100644 --- a/packages/ipfs-http-client/src/add/index.js +++ b/packages/ipfs-http-client/src/add/index.js @@ -1,13 +1,17 @@ 'use strict' const CID = require('cids') + const merge = require('merge-options') const { toFormData } = require('./form-data') const toCamel = require('../lib/object-to-camel') const configure = require('../lib/configure') + module.exports = configure((api) => { return async function * add (input, options = {}) { + console.log("Add called"); + console.log("Using monorepo version") const progressFn = options.progress options = merge( options, @@ -18,20 +22,28 @@ module.exports = configure((api) => { } ) + const formData = await toFormData(input) + + console.log({ formData }) + const res = await api.ndjson('add', { method: 'POST', searchParams: options, - body: await toFormData(input), + body: formData, timeout: options.timeout, signal: options.signal }) for await (let file of res) { + console.log({ file }); file = toCamel(file) + console.log("toCamelifiedFile", file); if (progressFn && file.bytes) { + console.log("progressFn && file.bytes"); progressFn(file.bytes) } else { + console.log("else"); yield toCoreInterface(file) } } diff --git a/packages/ipfs-http-client/src/dag/put.js b/packages/ipfs-http-client/src/dag/put.js index 05707f4f3a..1c0a0b7744 100644 --- a/packages/ipfs-http-client/src/dag/put.js +++ b/packages/ipfs-http-client/src/dag/put.js @@ -8,6 +8,8 @@ const configure = require('../lib/configure') module.exports = configure(api => { return async (dagNode, options = {}) => { + console.log("dag.put called") + console.log("Using monorepo version") if (options.cid && (options.format || options.hashAlg)) { throw new Error('Failed to put DAG node. Provide either `cid` OR `format` and `hashAlg` options') } else if ((options.format && !options.hashAlg) || (!options.format && options.hashAlg)) { diff --git a/packages/ipfs-http-client/src/files/read.js b/packages/ipfs-http-client/src/files/read.js index 38a51f3f6d..47871b5fac 100644 --- a/packages/ipfs-http-client/src/files/read.js +++ b/packages/ipfs-http-client/src/files/read.js @@ -1,6 +1,8 @@ 'use strict' const { Buffer } = require('buffer') +const toAsyncIterable = require('../lib/stream-to-async-iterable') +// TODO: Decide if we can remove `toIterable` const toIterable = require('stream-to-it/source') const configure = require('../lib/configure') @@ -13,7 +15,7 @@ module.exports = configure(api => { searchParams: options }) - for await (const chunk of toIterable(res.body)) { + for await (const chunk of toAsyncIterable(res)) { yield Buffer.from(chunk) } } diff --git a/packages/ipfs-http-client/src/get.js b/packages/ipfs-http-client/src/get.js index b8ff34ac03..710a934e7c 100644 --- a/packages/ipfs-http-client/src/get.js +++ b/packages/ipfs-http-client/src/get.js @@ -3,6 +3,7 @@ const Tar = require('it-tar') const { Buffer } = require('buffer') const CID = require('cids') + const configure = require('./lib/configure') module.exports = configure(api => { diff --git a/packages/ipfs-http-client/src/lib/stream-to-async-iterable.js b/packages/ipfs-http-client/src/lib/stream-to-async-iterable.js new file mode 100644 index 0000000000..cdd6b36c6f --- /dev/null +++ b/packages/ipfs-http-client/src/lib/stream-to-async-iterable.js @@ -0,0 +1,27 @@ +'use strict' + +const toAsyncIterableOriginal = require('stream-to-it/source') + +// Note: Turned this into a helper that wraps `stream-to-it/source` +// to handle the body undefined case without requiring that other libs +// that consume that package such as `js-ipfs` and `js-ipfs-utils` modify +// how they use it + +module.exports = function toAsyncIterable (res) { + const { body } = res + + // An env where res.body getter for ReadableStream with getReader + // is not supported, for example in React Native + if (!body) { + if (res.arrayBuffer) { + return (async function * () { + const arrayBuffer = await res.arrayBuffer() + yield arrayBuffer + })() + } else { + throw new Error('Neither Response.body nor Response.arrayBuffer is defined') + } + } + + return toAsyncIterableOriginal(body) +} diff --git a/packages/ipfs-http-client/src/pubsub/subscribe.js b/packages/ipfs-http-client/src/pubsub/subscribe.js index d9a6368c98..4e84ac8538 100644 --- a/packages/ipfs-http-client/src/pubsub/subscribe.js +++ b/packages/ipfs-http-client/src/pubsub/subscribe.js @@ -4,6 +4,9 @@ const bs58 = require('bs58') const { Buffer } = require('buffer') const log = require('debug')('ipfs-http-client:pubsub:subscribe') const SubscriptionTracker = require('./subscription-tracker') + +// TODO: Update streamToAsyncIterator with any chances in light of +// this feature branch const { streamToAsyncIterator, ndjson } = require('../lib/core') const configure = require('../lib/configure') @@ -45,6 +48,9 @@ module.exports = configure((api, options) => { clearTimeout(ffWorkaround) + // Note: It's interesting that subscribe + // keeps this ndjson(tranformation(res)) pattern although + // that's now long from other IPFS methods readMessages(ndjson(streamToAsyncIterator(res)), { onMessage: handler, onEnd: () => subsTracker.unsubscribe(topic, handler), diff --git a/packages/ipfs-http-client/test/lib.stream-to-async-iterable.spec.js b/packages/ipfs-http-client/test/lib.stream-to-async-iterable.spec.js new file mode 100644 index 0000000000..2351198d4d --- /dev/null +++ b/packages/ipfs-http-client/test/lib.stream-to-async-iterable.spec.js @@ -0,0 +1,71 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('interface-ipfs-core/src/utils/mocha') +const toAsyncIterable = require('../src/lib/stream-to-async-iterable') + +describe('lib/stream-to-async-iterable', () => { + it('should return input if already async iterable', () => { + const input = { + [Symbol.asyncIterator] () { + return this + } + } + const res = { body: input } + expect(toAsyncIterable(res)).to.equal(input) + }) + + it('should convert reader to async iterable', async () => { + const inputData = [2, 31, 3, 4] + + const input = { + getReader () { + let i = 0 + return { + read () { + return Promise.resolve( + i === inputData.length + ? { done: true } + : { value: inputData[i++] } + ) + }, + releaseLock () { } + } + } + } + const res = { body: input } + + const chunks = [] + for await (const chunk of toAsyncIterable(res)) { + chunks.push(chunk) + } + + expect(chunks).to.eql(inputData) + }) + + it('should return an async iterable even if res.body is undefined', async () => { + const inputData = [2] + const res = { + arrayBuffer () { + return Promise.resolve(inputData[0]) + } + } + + const chunks = [] + for await (const chunk of toAsyncIterable(res)) { + chunks.push(chunk) + } + + expect(chunks).to.eql(inputData) + }) + + it('should throw if res.body and res.arrayBuffer are undefined', () => { + const res = {} + expect(() => toAsyncIterable(res)).to.throw('Neither Response.body nor Response.arrayBuffer is defined') + }) + + it('should throw on unknown stream', () => { + const res = { body: {} } + expect(() => toAsyncIterable(res)).to.throw('unknown stream') + }) +}) diff --git a/packages/ipfs-multipart/src/it-multipart.js b/packages/ipfs-multipart/src/it-multipart.js new file mode 100644 index 0000000000..08a4c8d379 --- /dev/null +++ b/packages/ipfs-multipart/src/it-multipart.js @@ -0,0 +1,195 @@ +'use strict' + +const bIndexOf = require('buffer-indexof') +const parseHeaders = require('parse-headers') + +module.exports = multipart + +async function* multipart(stream, boundary) { + if (!boundary) { + if ( + stream && + stream.headers && + stream.headers['content-type'] && + stream.headers['content-type'].includes('boundary') + ) { + boundary = stream.headers['content-type'].split('boundary=')[1].trim() + } else { + throw new Error('Not a multipart request') + } + } + + boundary = `--${boundary}` + const headerEnd = Buffer.from('\r\n\r\n') + + // allow pushing data back into stream + stream = prefixStream(stream) + + // consume initial boundary + await consumeUntilAfter(stream, Buffer.from(boundary)) + + for await (const chunk of stream) { + console.log('multipart chunk', chunk) + stream.push(chunk) + + const headers = (await collect(yieldUntilAfter(stream, headerEnd))).toString() + + console.log({ headers }) + + // the final boundary has `--\r\n` appended to it + if (headers === '--\r\n') { + console.log('hit final boundary') + return + } + + const yieldUntilAfterResult = yieldUntilAfter(stream, Buffer.from(`\r\n${boundary}`)) + console.log({ yieldUntilAfterResult }) + + // Just for logging + for await (const yieldUntilAfterResultChunk of yieldUntilAfterResult) { + console.log({ yieldUntilAfterResultChunk }) + } + // end of extra logging code + + // wait for this part's body to be consumed before we try reading the next one + const result = waitForStreamToBeConsumed(yieldUntilAfterResult) + + console.log({ result }) + + const part = { + headers: parseHeaders(headers), + body: result.iterator + // body: yieldUntilAfter(stream, Buffer.from(`\r\n${boundary}`)) + } + + yield part + + await result.complete + } +} + +// yield chunks of buffer until a the needle is reached. consume the needle without yielding it +async function* yieldUntilAfter(haystack, needle) { + console.log('yieldUntilAfter') + let buffer = Buffer.alloc(0) + + for await (const chunk of haystack) { + console.log('yieldUntilAfter chunk', chunk) + console.log(chunk.toString('utf8')) + buffer = Buffer.concat([buffer, chunk], buffer.length + chunk.length) // slow + + const index = bIndexOf(buffer, needle) + + if (index !== -1) { + console.log('&&&&&&') + console.log('found needle!', needle) + + // found needle + if (index > 0) { + yield buffer.slice(0, index) + } + + // consume needle but preserve rest of chunk + haystack.push(buffer.slice(index + needle.length)) + + return + } else { + console.log('????????') + console.log('did NOT find needle!') + } + + if (buffer.length > needle.length) { + // can emit the beginning chunk as it does not contain the needle + yield buffer.slice(0, buffer.length - needle.length) + + // cache the rest for next time + buffer = buffer.slice(buffer.length - needle.length) + } + } + + // yield anything left over + if (buffer.length) { + yield buffer + } +} + +async function consumeUntilAfter(haystack, needle) { + for await (const chunk of yieldUntilAfter(haystack, needle)) { + console.log('consumeUntilAfter chunk', chunk) + // eslint-disable-line no-unused-vars + } +} + +// a stream that lets us push content back into it for consumption elsewhere +function prefixStream(stream) { + const buffer = [] + const streamIterator = stream[Symbol.asyncIterator]() + + const iterator = { + [Symbol.asyncIterator]: () => { + return iterator + }, + next: () => { + if (buffer.length) { + return { + done: false, + value: buffer.shift() + } + } + + return streamIterator.next() + }, + push: function(buf) { + buffer.push(buf) + } + } + + return iterator +} + +function waitForStreamToBeConsumed(stream) { + let pending + const complete = new Promise((resolve, reject) => { + pending = { + resolve, + reject + } + }) + const streamIterator = stream[Symbol.asyncIterator]() + + const iterator = { + [Symbol.asyncIterator]: () => { + return iterator + }, + next: async () => { + try { + const next = await streamIterator.next() + + if (next.done) { + pending.resolve() + } + + return next + } catch (err) { + pending.reject(err) + } + } + } + + return { + complete, + iterator + } +} + +const collect = async (stream) => { + const buffers = [] + let size = 0 + + for await (const buf of stream) { + size += buf.length + buffers.push(buf) + } + + return Buffer.concat(buffers, size) +} diff --git a/packages/ipfs-multipart/src/parser.js b/packages/ipfs-multipart/src/parser.js index 922ee33256..0d7ee5f8ef 100644 --- a/packages/ipfs-multipart/src/parser.js +++ b/packages/ipfs-multipart/src/parser.js @@ -1,7 +1,7 @@ 'use strict' const Content = require('@hapi/content') -const multipart = require('it-multipart') +const multipart = require('./it-multipart') const multipartFormdataType = 'multipart/form-data' const applicationDirectory = 'application/x-directory' @@ -46,6 +46,14 @@ const ignore = async (stream) => { async function * parseEntry (stream, options) { for await (const part of stream) { + console.log('########') + console.log({ part }) + + // Just for logging + for await (const parseEntryChunk of part.body) { + console.log({ parseEntryChunk }) + } + // end of extra logging code if (!part.headers['content-type']) { throw new Error('No content-type in multipart part') } @@ -92,6 +100,7 @@ async function * parseEntry (stream, options) { const disposition = parseDisposition(part.headers['content-disposition']) + console.log({ disposition }) entry.name = decodeURIComponent(disposition.filename) entry.body = part.body @@ -100,7 +109,11 @@ async function * parseEntry (stream, options) { } async function * parser (stream, options) { + // console.log({ stream }); + console.log('boundary', options.boundary) for await (const entry of parseEntry(multipart(stream, options.boundary), options)) { + console.log('*********') + console.log({ entry }) if (entry.type === 'directory') { yield { type: 'directory', diff --git a/packages/ipfs-utils/src/files/normalise-input.js b/packages/ipfs-utils/src/files/normalise-input.js index fb892692fa..2e0bab445f 100644 --- a/packages/ipfs-utils/src/files/normalise-input.js +++ b/packages/ipfs-utils/src/files/normalise-input.js @@ -62,6 +62,7 @@ module.exports = function normaliseInput (input) { // Buffer|ArrayBuffer|TypedArray // Blob|File if (isBytes(input) || isBloby(input)) { + console.log('Bytes or blob') return (async function * () { // eslint-disable-line require-await yield toFileObject(input) })() @@ -152,6 +153,7 @@ module.exports = function normaliseInput (input) { } function toFileObject (input) { + console.log('toFileObject input', input) const obj = { path: input.path || '', mode: input.mode, @@ -159,17 +161,22 @@ function toFileObject (input) { } if (input.content) { + console.log('input.content', input.content) obj.content = toAsyncIterable(input.content) } else if (!input.path) { // Not already a file object with path or content prop + console.log('Not file object yet') obj.content = toAsyncIterable(input) } + console.log('obj.content', obj.content) + return obj } function toAsyncIterable (input) { // Bytes | String if (isBytes(input) || typeof input === 'string') { + console.log('bytes') return (async function * () { // eslint-disable-line require-await yield toBuffer(input) })() @@ -177,9 +184,12 @@ function toAsyncIterable (input) { // Bloby if (isBloby(input)) { + console.log('bloby') return blobToAsyncGenerator(input) } + console.log('other') + // Browser stream if (typeof input.getReader === 'function') { return browserStreamToIt(input) diff --git a/packages/ipfs/src/http/api/resources/dag.js b/packages/ipfs/src/http/api/resources/dag.js index 8b6ff198ce..07aa33a3bb 100644 --- a/packages/ipfs/src/http/api/resources/dag.js +++ b/packages/ipfs/src/http/api/resources/dag.js @@ -237,6 +237,13 @@ exports.put = { // main route handler which is called after the above `parseArgs`, but only if the args were valid async handler (request, h) { + console.log('========') + console.log('========') + console.log('========') + console.log('========') + console.log('========') + console.log('Incoming request!') + console.log('========') const { ipfs } = request.server.app const { node, format, hashAlg } = request.pre.args diff --git a/packages/ipfs/src/http/api/resources/files-regular.js b/packages/ipfs/src/http/api/resources/files-regular.js index 92afdacf3f..c9a64736c9 100644 --- a/packages/ipfs/src/http/api/resources/files-regular.js +++ b/packages/ipfs/src/http/api/resources/files-regular.js @@ -138,6 +138,14 @@ exports.add = { if (!request.payload) { throw Boom.badRequest('Array, Buffer, or String is required.') } + console.log('========') + console.log('========') + console.log('========') + console.log('========') + console.log('========') + console.log('Incoming request!') + console.log('========') + // console.log(request.payload); const { ipfs } = request.server.app let filesParsed = false @@ -153,12 +161,22 @@ exports.add = { pipe( multipart(request), async function * (source) { + console.log('-------') + console.log({ source }) for await (const entry of source) { + console.log('-------') + // console.log({ entry }); currentFileName = entry.name || 'unknown' if (entry.type === 'file') { filesParsed = true + // Just for logging + for await (const chunk of entry.content) { + console.log({ chunk }) + } + // end of extra logging code + yield { path: entry.name, content: entry.content,