Skip to content

Commit

Permalink
feat: use dfs traversal
Browse files Browse the repository at this point in the history
  • Loading branch information
guanzo committed Sep 13, 2023
1 parent 8a271b2 commit ee5a574
Showing 1 changed file with 62 additions and 51 deletions.
113 changes: 62 additions & 51 deletions packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8A
streamPosition += BigInt(buf.byteLength)
}

const childOps: Array<{ link: dagPb.PBLink, blockStart: bigint }> = []
let childOps: Array<{ link: dagPb.PBLink, blockStart: bigint }> = []

if (node.Links.length !== file.blockSizes.length) {
throw errCode(new Error('Inconsistent block sizes and dag links'), 'ERR_NOT_UNIXFS')
Expand All @@ -64,67 +64,78 @@ async function walkDAG (blockstore: ReadableStorage, node: dagPb.PBNode | Uint8A
})
}

// raw blocks have no children and can be traversed in parallel
if (childLink.Hash.code !== raw.code) {
await handleChildren(childOps)
childOps = []
}
streamPosition = childEnd

if (streamPosition > end) {
break
}
}

await pipe(
childOps,
(source) => map(source, (op) => {
return async () => {
const block = await blockstore.get(op.link.Hash, options)
if (childOps.length) {
await handleChildren(childOps)
}

return {
...op,
block
async function handleChildren (childOps: Array<{ link: dagPb.PBLink, blockStart: bigint }>) {
await pipe(
childOps,
(source) => map(source, (op) => {
return async () => {
const block = await blockstore.get(op.link.Hash, options)

return {
...op,
block
}
}
}
}),
(source) => parallel(source, {
ordered: true
}),
async (source) => {
for await (const { link, block, blockStart } of source) {
let child: dagPb.PBNode | Uint8Array
switch (link.Hash.code) {
case dagPb.code:
child = dagPb.decode(block)
break
case raw.code:
child = block
break
default:
queue.end(errCode(new Error(`Unsupported codec: ${link.Hash.code}`), 'ERR_NOT_UNIXFS'))
return
}),
(source) => parallel(source, {
ordered: true
}),
async (source) => {
for await (const { link, block, blockStart } of source) {
let child: dagPb.PBNode | Uint8Array
switch (link.Hash.code) {
case dagPb.code:
child = dagPb.decode(block)
break
case raw.code:
child = block
break
default:
queue.end(errCode(new Error(`Unsupported codec: ${link.Hash.code}`), 'ERR_NOT_UNIXFS'))
return
}

// create a queue for this child - we use a queue instead of recursion
// to avoid overflowing the stack
const childQueue = new PQueue({
concurrency: 1
})
// if any of the child jobs error, end the read queue with the error
childQueue.on('error', error => {
queue.end(error)
})

// if the job rejects the 'error' event will be emitted on the child queue
void childQueue.add(async () => {
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:file', {
cid: link.Hash
}))

await walkDAG(blockstore, child, queue, blockStart, start, end, options)
})

// wait for this child to complete before moving on to the next
await childQueue.onIdle()
}

// create a queue for this child - we use a queue instead of recursion
// to avoid overflowing the stack
const childQueue = new PQueue({
concurrency: 1
})
// if any of the child jobs error, end the read queue with the error
childQueue.on('error', error => {
queue.end(error)
})

// if the job rejects the 'error' event will be emitted on the child queue
void childQueue.add(async () => {
options.onProgress?.(new CustomProgressEvent<ExportWalk>('unixfs:exporter:walk:file', {
cid: link.Hash
}))

await walkDAG(blockstore, child, queue, blockStart, start, end, options)
})

// wait for this child to complete before moving on to the next
await childQueue.onIdle()
}
}
)
)
}

if (streamPosition >= end) {
queue.end()
Expand Down

0 comments on commit ee5a574

Please sign in to comment.