Skip to content

Commit

Permalink
fix: usage with readble-stream (#333)
Browse files Browse the repository at this point in the history
The readable-stream module has different timings to using plain
generators - it turns out that detection of the end of the walkDag
method was being done incorrectly - instead of waiting for it's
promise to resolve we should wait for the output queue to finish.
  • Loading branch information
achingbrain authored May 11, 2023
1 parent 5cc0ff2 commit 9b6203f
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 48 deletions.
6 changes: 5 additions & 1 deletion packages/ipfs-unixfs-exporter/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@types/readable-stream": "^2.3.15",
"@types/sinon": "^10.0.0",
"aegir": "^38.1.2",
"blockstore-core": "^4.0.1",
Expand All @@ -163,12 +164,15 @@
"it-all": "^3.0.2",
"it-buffer-stream": "^3.0.0",
"it-first": "^3.0.2",
"it-to-buffer": "^4.0.2",
"merge-options": "^3.0.4",
"readable-stream": "^4.4.0",
"sinon": "^15.0.0",
"wherearewe": "^2.0.1"
},
"browser": {
"fs": false
"fs": false,
"readable-stream": false
},
"typedoc": {
"entryPoint": "./src/index.ts"
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-unixfs-exporter/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export interface Exportable<T> {
cid: CID
depth: number
size: bigint
content: (options?: ExporterOptions) => AsyncIterable<T>
content: (options?: ExporterOptions) => AsyncGenerator<T, void, unknown>
}

export interface UnixFSFile extends Exportable<Uint8Array> {
Expand Down
8 changes: 4 additions & 4 deletions packages/ipfs-unixfs-exporter/src/resolvers/identity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import { CustomProgressEvent } from 'progress-events'
const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGenerator<Uint8Array, void, undefined>) => {
async function * contentGenerator (options: ExporterOptions = {}): AsyncGenerator<Uint8Array, void, undefined> {
const {
offset,
length
start,
end
} = validateOffsetAndLength(node.length, options.offset, options.length)

const buf = extractDataFromBlock(node, 0n, offset, offset + length)
const buf = extractDataFromBlock(node, 0n, start, end)

options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:identity', {
bytesRead: BigInt(buf.byteLength),
totalBytes: length - offset,
totalBytes: end - start,
fileSize: BigInt(node.byteLength)
}))

Expand Down
8 changes: 4 additions & 4 deletions packages/ipfs-unixfs-exporter/src/resolvers/raw.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import { CustomProgressEvent } from 'progress-events'
const rawContent = (node: Uint8Array): ((options?: ExporterOptions) => AsyncGenerator<Uint8Array, void, undefined>) => {
async function * contentGenerator (options: ExporterOptions = {}): AsyncGenerator<Uint8Array, void, undefined> {
const {
offset,
length
start,
end
} = validateOffsetAndLength(node.length, options.offset, options.length)

const buf = extractDataFromBlock(node, 0n, offset, offset + length)
const buf = extractDataFromBlock(node, 0n, start, end)

options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:raw', {
bytesRead: BigInt(buf.byteLength),
totalBytes: length - offset,
totalBytes: end - start,
fileSize: BigInt(node.byteLength)
}))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import { CustomProgressEvent } from 'progress-events'
async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8Array, queue: Pushable<Uint8Array>, streamPosition: bigint, start: bigint, end: bigint, options: ExporterOptions): Promise<void> {
// a `raw` node
if (node instanceof Uint8Array) {
queue.push(extractDataFromBlock(node, streamPosition, start, end))
const buf = extractDataFromBlock(node, streamPosition, start, end)

queue.push(buf)

return
}
Expand Down Expand Up @@ -123,6 +125,10 @@ async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8A
}
}
)

if (streamPosition >= end) {
queue.end()
}
}

const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, blockstore) => {
Expand All @@ -134,34 +140,23 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth,
}

const {
offset,
length
start,
end
} = validateOffsetAndLength(fileSize, options.offset, options.length)

if (length === 0n) {
if (end === 0n) {
return
}

let read = 0n
const wanted = length - offset
const wanted = end - start
const queue = pushable()

options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:file', {
cid
}))

void walkDAG(blockstore, node, queue, 0n, offset, offset + length, options)
.then(() => {
if (read < wanted) {
throw errCode(new Error('Traversed entire DAG but did not read enough bytes'), 'ERR_UNDER_READ')
}

if (read > wanted) {
throw errCode(new Error('Read too many bytes - the file size reported by the UnixFS data in the root node may be incorrect'), 'ERR_OVER_READ')
}

queue.end()
})
void walkDAG(blockstore, node, queue, 0n, start, end, options)
.catch(err => {
queue.end(err)
})
Expand All @@ -173,7 +168,12 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth,

read += BigInt(buf.byteLength)

if (read === length) {
if (read > wanted) {
queue.end()
throw errCode(new Error('Read too many bytes - the file size reported by the UnixFS data in the root node may be incorrect'), 'ERR_OVER_READ')
}

if (read === wanted) {
queue.end()
}

Expand All @@ -185,6 +185,10 @@ const fileContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth,

yield buf
}

if (read < wanted) {
throw errCode(new Error('Traversed entire DAG but did not read enough bytes'), 'ERR_UNDER_READ')
}
}

return yieldFileContent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ const rawContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, b
const size = unixfs.data.length

const {
offset,
length
start,
end
} = validateOffsetAndLength(size, options.offset, options.length)

options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:raw', {
cid
}))

const buf = extractDataFromBlock(unixfs.data, 0n, offset, offset + length)
const buf = extractDataFromBlock(unixfs.data, 0n, start, end)

options.onProgress?.(new CustomProgressEvent<ExportProgress>('unixfs:exporter:progress:unixfs:raw', {
bytesRead: BigInt(buf.byteLength),
totalBytes: length - offset,
totalBytes: end - start,
fileSize: BigInt(unixfs.data.byteLength)
}))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,37 @@
import errCode from 'err-code'

const validateOffsetAndLength = (size: number | bigint, offset: number | bigint = 0, length: number | bigint = size): { offset: bigint, length: bigint } => {
offset = BigInt(offset ?? 0)
length = BigInt(length ?? size)
const validateOffsetAndLength = (size: number | bigint, offset: number | bigint = 0, length: number | bigint = size): { start: bigint, end: bigint } => {
const fileSize = BigInt(size)
const start = BigInt(offset ?? 0)
let end = BigInt(length)

if (offset == null) {
offset = 0n
if (end !== fileSize) {
end = start + end
}

if (offset < 0n) {
throw errCode(new Error('Offset must be greater than or equal to 0'), 'ERR_INVALID_PARAMS')
if (end > fileSize) {
end = fileSize
}

if (offset > size) {
throw errCode(new Error('Offset must be less than the file size'), 'ERR_INVALID_PARAMS')
if (start < 0n) {
throw errCode(new Error('Offset must be greater than or equal to 0'), 'ERR_INVALID_PARAMS')
}

if (length == null) {
length = BigInt(size) - offset
if (start > fileSize) {
throw errCode(new Error('Offset must be less than the file size'), 'ERR_INVALID_PARAMS')
}

if (length < 0n) {
if (end < 0n) {
throw errCode(new Error('Length must be greater than or equal to 0'), 'ERR_INVALID_PARAMS')
}

if (offset + length > size) {
length = BigInt(size) - offset
if (end > fileSize) {
throw errCode(new Error('Length must be less than the file size'), 'ERR_INVALID_PARAMS')
}

return {
offset,
length
start,
end
}
}

Expand Down
80 changes: 80 additions & 0 deletions packages/ipfs-unixfs-exporter/test/exporter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import type { Blockstore } from 'interface-blockstore'
import { balanced, FileLayout, flat, trickle } from 'ipfs-unixfs-importer/layout'
import type { Chunker } from 'ipfs-unixfs-importer/chunker'
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
import toBuffer from 'it-to-buffer'
import { Readable } from 'readable-stream'
import { isNode } from 'wherearewe'

const ONE_MEG = Math.pow(1024, 2)

Expand Down Expand Up @@ -1229,4 +1232,81 @@ describe('exporter', () => {
signal: abortController.signal
})).to.eventually.be.rejectedWith(message)
})

it('should support being used with readable-stream', async () => {
if (!isNode) {
// node-only test
return
}

let dataSizeInBytes = 10

// iterate through order of magnitude in size until hitting 10MB
while (dataSizeInBytes <= 10_000_000) {
const bytes = await toBuffer(randomBytes(dataSizeInBytes))

// chunk up the bytes to simulate a more real-world like behavior
const chunkLength = 100_000
let currentIndex = 0

const readableStream = new Readable({
read (): void {
// if this is the last chunk
if (currentIndex + chunkLength > bytes.length) {
this.push(bytes.subarray(currentIndex))
this.push(null)
} else {
this.push(bytes.subarray(currentIndex, currentIndex + chunkLength))

currentIndex = currentIndex + chunkLength
}
}
})

const result = await last(importer([{
content: readableStream
}], block))

if (result == null) {
throw new Error('Import failed')
}

const file = await exporter(result.cid, block)
const contentIterator = file.content()

const readableStreamToBytes = async (readableStream: Readable): Promise<Uint8Array> => {
return await new Promise((resolve, reject) => {
const chunks: any[] = []
readableStream.on('data', chunk => {
chunks.push(chunk)
})

readableStream.on('end', () => {
const uint8Array = uint8ArrayConcat(chunks)
resolve(uint8Array)
})

readableStream.on('error', reject)
})
}

const dataStream = new Readable({
async read (): Promise<void> {
const result = await contentIterator.next()
if (result.done === true) {
this.push(null) // end the stream
} else {
this.push(result.value)
}
}
})

const data = await readableStreamToBytes(dataStream)

expect(data.byteLength).to.equal(dataSizeInBytes)
expect(data).to.equalBytes(bytes)

dataSizeInBytes *= 10
}
})
})

0 comments on commit 9b6203f

Please sign in to comment.