Skip to content

Commit

Permalink
feat: support pull streams (#8)
Browse files Browse the repository at this point in the history
* feat: support pull streams

This PR updates the `normaliseInput` function to accept pull streams.

I've also made the following changes:

1. Update the docs for supported inputs
  * `Buffer|ArrayBuffer|TypedArray` is aliased as `Bytes`
  * `Blob|File` is aliased as `Bloby`
  * Added info for what a input "means" i.e. causes single/multiple files to be added
1. Peek the first item of an (async) iterator properly
1. Move file object check below `input[Symbol.asyncIterator]` check because Node.js streams have a path property that will false positive the `isFileObject` check
1. Fix `toFileObject` to allow objects with no `content` property
1. Simplify `toBuffer` to remove checks that `Buffer.from` already does

License: MIT
Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>

* fix: tests

License: MIT
Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
  • Loading branch information
Alan Shaw authored and achingbrain committed Sep 3, 2019
1 parent e51ff72 commit e5b2509
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 100 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"is-pull-stream": "0.0.0",
"is-stream": "^2.0.0",
"kind-of": "^6.0.2",
"pull-stream-to-async-iterator": "^1.0.2",
"readable-stream": "^3.4.0"
},
"devDependencies": {
Expand Down
246 changes: 171 additions & 75 deletions src/files/normalise-input.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,53 @@

const errCode = require('err-code')
const { Buffer } = require('buffer')
const pullStreamToIterable = require('pull-stream-to-async-iterator')

/*
* Transform one of:
*
* ```
* Buffer|ArrayBuffer|TypedArray
* Blob|File
* { path, content: Blob }
* { path, content: String }
* { path, content: Iterable<Number> }
* { path, content: Iterable<Buffer> }
* { path, content: Iterable<Iterable<Number>> }
* { path, content: AsyncIterable<Iterable<Number>> }
* String
* Iterable<Number>
* Iterable<Buffer>
* Iterable<Blob>
* Iterable<{ path, content: Buffer }>
* Iterable<{ path, content: Blob }>
* Iterable<{ path, content: Iterable<Number> }>
* Iterable<{ path, content: AsyncIterable<Buffer> }>
* AsyncIterable<Buffer>
* AsyncIterable<{ path, content: Buffer }>
* AsyncIterable<{ path, content: Blob }>
* AsyncIterable<{ path, content: Iterable<Buffer> }>
* AsyncIterable<{ path, content: AsyncIterable<Buffer> }>
* Bytes (Buffer|ArrayBuffer|TypedArray) [single file]
* Bloby (Blob|File) [single file]
* String [single file]
* { path, content: Bytes } [single file]
* { path, content: Bloby } [single file]
* { path, content: String } [single file]
* { path, content: Iterable<Number> } [single file]
* { path, content: Iterable<Bytes> } [single file]
* { path, content: AsyncIterable<Bytes> } [single file]
* { path, content: PullStream<Bytes> } [single file]
* Iterable<Number> [single file]
* Iterable<Bytes> [single file]
* Iterable<Bloby> [multiple files]
* Iterable<String> [multiple files]
* Iterable<{ path, content: Bytes }> [multiple files]
* Iterable<{ path, content: Bloby }> [multiple files]
* Iterable<{ path, content: String }> [multiple files]
* Iterable<{ path, content: Iterable<Number> }> [multiple files]
* Iterable<{ path, content: Iterable<Bytes> }> [multiple files]
* Iterable<{ path, content: AsyncIterable<Bytes> }> [multiple files]
* Iterable<{ path, content: PullStream<Bytes> }> [multiple files]
* AsyncIterable<Bytes> [single file]
* AsyncIterable<Bloby> [multiple files]
* AsyncIterable<String> [multiple files]
* AsyncIterable<{ path, content: Bytes }> [multiple files]
* AsyncIterable<{ path, content: Bloby }> [multiple files]
* AsyncIterable<{ path, content: String }> [multiple files]
* AsyncIterable<{ path, content: Iterable<Number> }> [multiple files]
* AsyncIterable<{ path, content: Iterable<Bytes> }> [multiple files]
* AsyncIterable<{ path, content: AsyncIterable<Bytes> }> [multiple files]
* AsyncIterable<{ path, content: PullStream<Bytes> }> [multiple files]
* PullStream<Bytes> [single file]
* PullStream<Bloby> [multiple files]
* PullStream<String> [multiple files]
* PullStream<{ path, content: Bytes }> [multiple files]
* PullStream<{ path, content: Bloby }> [multiple files]
* PullStream<{ path, content: String }> [multiple files]
* PullStream<{ path, content: Iterable<Number> }> [multiple files]
* PullStream<{ path, content: Iterable<Bytes> }> [multiple files]
* PullStream<{ path, content: AsyncIterable<Bytes> }> [multiple files]
* PullStream<{ path, content: PullStream<Bytes> }> [multiple files]
* ```
* Into:
*
Expand All @@ -44,13 +65,6 @@ module.exports = function normaliseInput (input) {
throw errCode(new Error(`Unexpected input: ${input}`, 'ERR_UNEXPECTED_INPUT'))
}

// { path, content: ? }
if (isFileObject(input)) {
return (async function * () { // eslint-disable-line require-await
yield toFileObject(input)
})()
}

// String
if (typeof input === 'string' || input instanceof String) {
return (async function * () { // eslint-disable-line require-await
Expand All @@ -68,76 +82,165 @@ module.exports = function normaliseInput (input) {

// Iterable<?>
if (input[Symbol.iterator]) {
// Iterable<Number>
if (!isNaN(input[0])) {
return (async function * () { // eslint-disable-line require-await
yield toFileObject([input])
})()
}

// Iterable<Buffer>
// Iterable<Blob>
return (async function * () { // eslint-disable-line require-await
for (const chunk of input) {
yield toFileObject(chunk)
const iterator = input[Symbol.iterator]()
const first = iterator.next()
if (first.done) return iterator

// Iterable<Number>
// Iterable<Bytes>
if (Number.isInteger(first.value) || isBytes(first.value)) {
yield toFileObject((function * () {
yield first.value
yield * iterator
})())
return
}

// Iterable<Bloby>
// Iterable<String>
// Iterable<{ path, content }>
if (isFileObject(first.value) || isBloby(first.value) || typeof first.value === 'string') {
yield toFileObject(first.value)
for (const obj of iterator) {
yield toFileObject(obj)
}
return
}

throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT')
})()
}

// AsyncIterable<?>
if (input[Symbol.asyncIterator]) {
return (async function * () {
const iterator = input[Symbol.asyncIterator]()
const first = await iterator.next()
if (first.done) return iterator

// AsyncIterable<Bytes>
if (isBytes(first.value)) {
yield toFileObject((async function * () { // eslint-disable-line require-await
yield first.value
yield * iterator
})())
return
}

// AsyncIterable<Bloby>
// AsyncIterable<String>
// AsyncIterable<{ path, content }>
if (isFileObject(first.value) || isBloby(first.value) || typeof first.value === 'string') {
yield toFileObject(first.value)
for await (const obj of iterator) {
yield toFileObject(obj)
}
return
}

throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT')
})()
}

// { path, content: ? }
// Note: Detected _after_ AsyncIterable<?> because Node.js streams have a
// `path` property that passes this check.
if (isFileObject(input)) {
return (async function * () { // eslint-disable-line require-await
for await (const chunk of input) {
yield toFileObject(chunk)
yield toFileObject(input)
})()
}

// PullStream<?>
if (typeof input === 'function') {
return (async function * () {
const iterator = pullStreamToIterable(input)[Symbol.asyncIterator]()
const first = await iterator.next()
if (first.done) return iterator

// PullStream<Bytes>
if (isBytes(first.value)) {
yield toFileObject((async function * () { // eslint-disable-line require-await
yield first.value
yield * iterator
})())
return
}

// PullStream<Bloby>
// PullStream<String>
// PullStream<{ path, content }>
if (isFileObject(first.value) || isBloby(first.value) || typeof first.value === 'string') {
yield toFileObject(first.value)
for await (const obj of iterator) {
yield toFileObject(obj)
}
return
}

throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT')
})()
}

throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT')
}

function toFileObject (input) {
return {
path: input.path || '',
content: toAsyncIterable(input.content || input)
const obj = { path: input.path || '' }

if (input.content) {
obj.content = toAsyncIterable(input.content)
} else if (!input.path) { // Not already a file object with path or content prop
obj.content = toAsyncIterable(input)
}

return obj
}

function toAsyncIterable (input) {
// Buffer|ArrayBuffer|TypedArray|array of bytes
if (isBytes(input)) {
return (async function * () { // eslint-disable-line require-await
yield toBuffer(input)
})()
}

if (typeof input === 'string' || input instanceof String) {
// Bytes | String
if (isBytes(input) || typeof input === 'string') {
return (async function * () { // eslint-disable-line require-await
yield toBuffer(input)
})()
}

// Blob|File
// Bloby
if (isBloby(input)) {
return blobToAsyncGenerator(input)
}

// Iterator<?>
if (input[Symbol.iterator]) {
if (!isNaN(input[0])) {
return (async function * () { // eslint-disable-line require-await
yield toBuffer(input)
})()
}

return (async function * () { // eslint-disable-line require-await
for (const chunk of input) {
yield toBuffer(chunk)
const iterator = input[Symbol.iterator]()
const first = iterator.next()
if (first.done) return iterator

// Iterable<Number>
if (Number.isInteger(first.value)) {
yield toBuffer(Array.from((function * () {
yield first.value
yield * iterator
})()))
return
}

// Iterable<Bytes>
if (isBytes(first.value)) {
yield toBuffer(first.value)
for (const chunk of iterator) {
yield toBuffer(chunk)
}
return
}

throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT')
})()
}

// AsyncIterable<?>
// AsyncIterable<Bytes>
if (input[Symbol.asyncIterator]) {
return (async function * () {
for await (const chunk of input) {
Expand All @@ -146,23 +249,16 @@ function toAsyncIterable (input) {
})()
}

// PullStream<Bytes>
if (typeof input === 'function') {
return pullStreamToIterable(input)
}

throw errCode(new Error(`Unexpected input: ${input}`, 'ERR_UNEXPECTED_INPUT'))
}

function toBuffer (chunk) {
if (isBytes(chunk)) {
return chunk
}

if (typeof chunk === 'string' || chunk instanceof String) {
return Buffer.from(chunk)
}

if (Array.isArray(chunk)) {
return Buffer.from(chunk)
}

throw new Error('Unexpected input: ' + typeof chunk)
return isBytes(chunk) ? chunk : Buffer.from(chunk)
}

function isBytes (obj) {
Expand Down
Loading

0 comments on commit e5b2509

Please sign in to comment.