Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: implement streaming mfs.read methods
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
achingbrain committed May 9, 2018
1 parent ac213ee commit 3e5620b
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 102 deletions.
7 changes: 5 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"fs": false
},
"scripts": {
"test": "cross-env DEBUG=mfs:* aegir test -f test/browser.js",
"test": "cross-env aegir test -f test/browser.js",
"test:node": "aegir test -t node",
"test:browser": "aegir test -t browser -t webworker -f test/browser.js",
"build": "aegir build",
Expand Down Expand Up @@ -67,14 +67,17 @@
"pull-paramap": "^1.2.2",
"pull-pushable": "^2.2.0",
"pull-stream": "^3.6.7",
"pull-stream-to-stream": "^1.3.4",
"pull-traverse": "^1.0.3",
"stream-to-pull-stream": "^1.7.2"
},
"pre-commit": [
"lint",
"commit-lint",
"test"
],
"pre-push": [
"commit-lint"
],
"contributors": [
"Alex Potsides <alex.potsides@protocol.ai>"
]
Expand Down
2 changes: 2 additions & 0 deletions src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ module.exports = {
mkdir: require('./mkdir'),
mv: require('./mv'),
read: require('./read'),
readPullStream: require('./read-pull-stream'),
readReadableStream: require('./read-readable-stream'),
rm: require('./rm'),
stat: require('./stat'),
write: require('./write')
Expand Down
54 changes: 54 additions & 0 deletions src/core/read-pull-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict'

const exporter = require('ipfs-unixfs-engine').exporter
const promisify = require('promisify-es6')
const pull = require('pull-stream/pull')
const collect = require('pull-stream/sinks/collect')
const waterfall = require('async/waterfall')
const {
validatePath,
traverseTo
} = require('./utils')
const log = require('debug')('mfs:read-pull-stream')

const defaultOptions = {
offset: 0,
length: undefined
}

module.exports = function mfsReadPullStream (ipfs) {
return promisify((path, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}

options = Object.assign({}, defaultOptions, options)

try {
path = validatePath(path)
} catch (error) {
return callback(error)
}

log(`Reading ${path}`)

waterfall([
(done) => traverseTo(ipfs, path, {
parents: false
}, done),
(result, done) => {
waterfall([
(next) => pull(
exporter(result.node.multihash, ipfs._ipld, {
offset: options.offset,
length: options.length
}),
collect(next)
),
(files, next) => next(null, files[0].content)
], done)
}
], callback)
})
}
20 changes: 20 additions & 0 deletions src/core/read-readable-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
'use strict'

const promisify = require('promisify-es6')
const waterfall = require('async/waterfall')
const readPullStream = require('./read-pull-stream')
const toStream = require('pull-stream-to-stream')

module.exports = function mfsRead (ipfs) {
return promisify((path, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}

waterfall([
(cb) => readPullStream(ipfs)(path, options, cb),
(stream, cb) => cb(null, toStream.source(stream))
], callback)
})
}
47 changes: 7 additions & 40 deletions src/core/read.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
'use strict'

const exporter = require('ipfs-unixfs-engine').exporter
const promisify = require('promisify-es6')
const pull = require('pull-stream/pull')
const collect = require('pull-stream/sinks/collect')
const waterfall = require('async/waterfall')
const {
validatePath,
traverseTo
} = require('./utils')
const log = require('debug')('mfs:read')

const defaultOptions = {
offset: 0,
length: undefined
}
const readPullStream = require('./read-pull-stream')

module.exports = function mfsRead (ipfs) {
return promisify((path, options, callback) => {
Expand All @@ -23,36 +13,13 @@ module.exports = function mfsRead (ipfs) {
options = {}
}

options = Object.assign({}, defaultOptions, options)

try {
path = validatePath(path)
} catch (error) {
return callback(error)
}

log(`Reading ${path}`)

waterfall([
(done) => traverseTo(ipfs, path, {
parents: false
}, done),
(result, done) => {
waterfall([
(next) => pull(
exporter(result.node.multihash, ipfs._ipld, {
offset: options.offset,
length: options.length
}),
collect(next)
),
(files, next) => pull(
files[0].content,
collect(next)
),
(data, next) => next(null, Buffer.concat(data))
], done)
}
(cb) => readPullStream(ipfs)(path, options, cb),
(stream, cb) => pull(
stream,
collect(cb)
),
(buffers, cb) => cb(null, Buffer.concat(buffers))
], callback)
})
}
171 changes: 111 additions & 60 deletions test/read.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const expect = chai.expect
const path = require('path')
const loadFixture = require('aegir/fixtures')
const bufferStream = require('./fixtures/buffer-stream')
const pull = require('pull-stream/pull')
const collect = require('pull-stream/sinks/collect')

const {
createMfs
Expand All @@ -29,71 +31,120 @@ describe('read', function () {
mfs.node.stop(done)
})

it('reads a small file', () => {
const filePath = '/small-file.txt'
const methods = [{
name: 'read',
read: function () { return mfs.read.apply(mfs, arguments) },
collect: (buffer) => buffer
}, {
name: 'readPullStream',
read: function () { return mfs.readPullStream.apply(mfs, arguments) },
collect: (stream) => {
return new Promise((resolve, reject) => {
pull(
stream,
collect((error, buffers) => {
if (error) {
return reject(error)
}

return mfs.write(filePath, smallFile, {
create: true
})
.then(() => mfs.read(filePath))
.then((buffer) => {
expect(buffer).to.deep.equal(smallFile)
resolve(Buffer.concat(buffers))
})
)
})
})
}
}, {
name: 'readReadableStream',
read: function () { return mfs.readReadableStream.apply(mfs, arguments) },
collect: (stream) => {
return new Promise((resolve, reject) => {
let data = Buffer.alloc(0)

it('reads a file with an offset', () => {
const path = `/some-file-${Math.random()}.txt`
let data = Buffer.alloc(0)
const offset = 10

return mfs.write(path, bufferStream(100, {
collector: (bytes) => {
data = Buffer.concat([data, bytes])
}
}), {
create: true
})
.then(() => mfs.read(path, {
offset
}))
.then((buffer) => expect(buffer).to.deep.equal(data.slice(offset)))
})
stream.on('data', (buffer) => {
data = Buffer.concat([data, buffer])
})

it('reads a file with a length', () => {
const path = `/some-file-${Math.random()}.txt`
let data = Buffer.alloc(0)
const length = 10

return mfs.write(path, bufferStream(100, {
collector: (bytes) => {
data = Buffer.concat([data, bytes])
}
}), {
create: true
})
.then(() => mfs.read(path, {
length
}))
.then((buffer) => expect(buffer).to.deep.equal(data.slice(0, length)))
})
stream.on('end', (buffer) => {
resolve(data)
})

stream.on('error', (error) => reject(error))
})
}
}]

it('reads a file with an offset and a length', () => {
const path = `/some-file-${Math.random()}.txt`
let data = Buffer.alloc(0)
const offset = 10
const length = 10

return mfs.write(path, bufferStream(100, {
collector: (bytes) => {
data = Buffer.concat([data, bytes])
}
}), {
create: true
methods.forEach(method => {
describe(`read ${method.name}`, function () {
it('reads a small file', () => {
const filePath = '/small-file.txt'

return mfs.write(filePath, smallFile, {
create: true
})
.then(() => method.read(filePath))
.then((result) => method.collect(result))
.then((buffer) => {
expect(buffer).to.deep.equal(smallFile)
})
})

it('reads a file with an offset', () => {
const path = `/some-file-${Math.random()}.txt`
let data = Buffer.alloc(0)
const offset = 10

return mfs.write(path, bufferStream(100, {
collector: (bytes) => {
data = Buffer.concat([data, bytes])
}
}), {
create: true
})
.then(() => method.read(path, {
offset
}))
.then((result) => method.collect(result))
.then((buffer) => expect(buffer).to.deep.equal(data.slice(offset)))
})

it('reads a file with a length', () => {
const path = `/some-file-${Math.random()}.txt`
let data = Buffer.alloc(0)
const length = 10

return mfs.write(path, bufferStream(100, {
collector: (bytes) => {
data = Buffer.concat([data, bytes])
}
}), {
create: true
})
.then(() => method.read(path, {
length
}))
.then((result) => method.collect(result))
.then((buffer) => expect(buffer).to.deep.equal(data.slice(0, length)))
})

it('reads a file with an offset and a length', () => {
const path = `/some-file-${Math.random()}.txt`
let data = Buffer.alloc(0)
const offset = 10
const length = 10

return mfs.write(path, bufferStream(100, {
collector: (bytes) => {
data = Buffer.concat([data, bytes])
}
}), {
create: true
})
.then(() => method.read(path, {
offset,
length
}))
.then((result) => method.collect(result))
.then((buffer) => expect(buffer).to.deep.equal(data.slice(offset, offset + length)))
})
})
.then(() => mfs.read(path, {
offset,
length
}))
.then((buffer) => expect(buffer).to.deep.equal(data.slice(offset, offset + length)))
})
})

0 comments on commit 3e5620b

Please sign in to comment.