Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix: handle node readable streams properly (#3890)
Browse files Browse the repository at this point in the history
Readable streams returned from `fs.createReadStream` have a `.path`
property which was throwing off the content normalisation.

Fixes #3882
  • Loading branch information
achingbrain authored Sep 24, 2021
1 parent be4a542 commit b0f367d
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 5 deletions.
9 changes: 8 additions & 1 deletion packages/ipfs-core-utils/src/files/normalise-content.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ import {
/**
* @param {import('./normalise').ToContent} input
*/
export async function * normaliseContent (input) {
export async function normaliseContent (input) {
return toAsyncGenerator(input)
}

/**
* @param {import('./normalise').ToContent} input
*/
async function * toAsyncGenerator (input) {
// Bytes | String
if (isBytes(input)) {
yield toBytes(input)
Expand Down
12 changes: 9 additions & 3 deletions packages/ipfs-core-utils/src/files/normalise.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {

/**
* @param {ImportCandidate | ImportCandidateStream} input
* @param {(content:ToContent) => AsyncIterable<Uint8Array>} normaliseContent
* @param {(content:ToContent) => Promise<AsyncIterable<Uint8Array>>} normaliseContent
*/
// eslint-disable-next-line complexity
export async function * normalise (input, normaliseContent) {
Expand Down Expand Up @@ -72,6 +72,13 @@ export async function * normalise (input, normaliseContent) {
return
}

// fs.ReadStream<Bytes>
if (value._readableState) {
// @ts-ignore Node readable streams have a `.path` property so we need to pass it as the content
yield * map(peekable, (/** @type {ImportCandidate} */ value) => toFileObject({ content: value }, normaliseContent))
return
}

// (Async)Iterable<Blob>
// (Async)Iterable<String>
// (Async)Iterable<{ path, content }>
Expand Down Expand Up @@ -103,7 +110,7 @@ export async function * normalise (input, normaliseContent) {

/**
* @param {ImportCandidate} input
* @param {(content:ToContent) => AsyncIterable<Uint8Array>} normaliseContent
* @param {(content:ToContent) => Promise<AsyncIterable<Uint8Array>>} normaliseContent
*/
async function toFileObject (input, normaliseContent) {
// @ts-ignore - Those properties don't exist on most input types
Expand All @@ -117,7 +124,6 @@ async function toFileObject (input, normaliseContent) {
}

if (content) {
// @ts-ignore TODO vmx 2021-03-30 enable again
file.content = await normaliseContent(content)
} else if (!path) { // Not already a file object with path or content prop
// @ts-ignore - input still can be different ToContent
Expand Down
29 changes: 29 additions & 0 deletions packages/ipfs-core-utils/test/files/normalise-input.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import all from 'it-all'
import { File } from '@web-std/file'
import { normaliseInput } from '../../src/files/normalise-input.js'
import { isNode } from 'ipfs-utils/src/env.js'
import resolve from 'aegir/utils/resolve.js'

const { Blob, ReadableStream } = globalThis

Expand Down Expand Up @@ -208,4 +210,31 @@ describe('normalise-input', function () {
describe('TypedArray', () => {
testInputType(TYPEDARRAY, 'TypedArray', true)
})

if (isNode) {
/** @type {import('fs')} */
let fs

before(async () => {
fs = await import('fs')
})

describe('Node fs.ReadStream', () => {
const NODEFSREADSTREAM = () => {
const path = resolve('test/fixtures/file.txt', 'ipfs-core-utils')

return fs.createReadStream(path)
}

testInputType(NODEFSREADSTREAM, 'Node fs.ReadStream', false)

it('Iterable<Node fs.ReadStream>', async function () {
await testContent(iterableOf(NODEFSREADSTREAM()))
})

it('AsyncIterable<Node fs.ReadStream>', async function () {
await testContent(asyncIterableOf(NODEFSREADSTREAM()))
})
})
}
})
1 change: 1 addition & 0 deletions packages/ipfs-core-utils/test/fixtures/file.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
hello world
2 changes: 1 addition & 1 deletion packages/ipfs-grpc-client/src/core-api/files/write.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
* @param {*} content
*/
async function * stream (path, content) {
for await (const buf of normaliseContent(content)) {
for await (const buf of await normaliseContent(content)) {
yield { path, content: buf }
}
}
Expand Down

0 comments on commit b0f367d

Please sign in to comment.