From 6ae5288625a8b72f0cc18eb0d708eae4a16a3ca3 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Tue, 30 Jan 2018 21:56:30 +0000 Subject: [PATCH 1/5] feat(breaking change): use stream on stats.bw --- src/stats/bw.js | 45 +++++++++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/src/stats/bw.js b/src/stats/bw.js index 3c0d79c87..7a9d882fc 100644 --- a/src/stats/bw.js +++ b/src/stats/bw.js @@ -2,20 +2,15 @@ const promisify = require('promisify-es6') const streamToValue = require('../utils/stream-to-value') +const { Transform } = require('readable-stream') -const transform = function (res, callback) { - streamToValue(res, (err, data) => { - if (err) { - return callback(err) - } - - callback(null, { - totalIn: data[0].TotalIn, - totalOut: data[0].TotalOut, - rateIn: data[0].RateIn, - rateOut: data[0].RateOut - }) - }) +const transformChunk = (chunk) => { + return { + totalIn: chunk.TotalIn, + totalOut: chunk.TotalOut, + rateIn: chunk.RateIn, + rateOut: chunk.RateOut + } } module.exports = (send) => { @@ -28,6 +23,28 @@ module.exports = (send) => { send.andTransform({ path: 'stats/bw', qs: opts - }, transform, callback) + }, (res, callback) => { + if (!opts.poll) { + // If not polling, just send the result. + return streamToValue(res, (err, data) => { + if (err) { + return callback(err) + } + + callback(null, transformChunk(data[0])) + }) + } + + // If polling, return a readable stream. + const output = new Transform({ + objectMode: true, + transform (chunk, encoding, cb) { + cb(null, transformChunk(chunk)) + } + }) + + res.pipe(output) + callback(null, output) + }, callback) }) } From bcc3b0cc0190746b316ff6e3a0b0d7c4c808da6f Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Wed, 31 Jan 2018 11:27:47 +0000 Subject: [PATCH 2/5] add some type checking --- src/bitswap/stat.js | 4 ++-- src/stats/bitswap.js | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/bitswap/stat.js b/src/bitswap/stat.js index 733b99617..acaa9e0eb 100644 --- a/src/bitswap/stat.js +++ b/src/bitswap/stat.js @@ -5,8 +5,8 @@ const promisify = require('promisify-es6') const transform = function (res, callback) { callback(null, { provideBufLen: res.ProvideBufLen, - wantlist: res.Wantlist, - peers: res.Peers, + wantlist: res.Wantlist || [], + peers: res.Peers || [], blocksReceived: res.BlocksReceived, dataReceived: res.DataReceived, blocksSent: res.BlocksSent, diff --git a/src/stats/bitswap.js b/src/stats/bitswap.js index 9c893fa1c..cb7f341da 100644 --- a/src/stats/bitswap.js +++ b/src/stats/bitswap.js @@ -5,8 +5,8 @@ const promisify = require('promisify-es6') const transform = function (res, callback) { callback(null, { provideBufLen: res.ProvideBufLen, - wantlist: res.Wantlist, - peers: res.Peers, + wantlist: res.Wantlist || [], + peers: res.Peers || [], blocksReceived: res.BlocksReceived, dataReceived: res.DataReceived, blocksSent: res.BlocksSent, From 30d562390299e9234c687af88ef80567857eb083 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Tue, 6 Feb 2018 21:50:46 +0000 Subject: [PATCH 3/5] readable stream and pull stream on bandwidth stats --- package.json | 3 ++- src/bitswap/stat.js | 13 ++++++----- src/repo/stat.js | 7 +++--- src/stats/bitswap.js | 13 ++++++----- src/stats/bw-pull-stream.js | 36 +++++++++++++++++++++++++++++ src/stats/bw-readable-stream.js | 31 +++++++++++++++++++++++++ src/stats/bw-util.js | 12 ++++++++++ src/stats/bw.js | 41 ++++++++------------------------- src/stats/index.js | 2 ++ src/stats/repo.js | 7 +++--- 10 files changed, 115 insertions(+), 50 deletions(-) create mode 100644 src/stats/bw-pull-stream.js create mode 100644 src/stats/bw-readable-stream.js create mode 100644 src/stats/bw-util.js diff --git a/package.json b/package.json index 6ad31253c..d1861fc19 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ }, "dependencies": { "async": "^2.6.0", + "big.js": "^5.0.3", "bs58": "^4.0.1", "cids": "~0.5.2", "concat-stream": "^1.6.0", @@ -68,8 +69,8 @@ "eslint-plugin-react": "^7.5.1", "go-ipfs-dep": "^0.4.13", "gulp": "^3.9.1", - "interface-ipfs-core": "~0.43.0", "hapi": "^16.6.2", + "interface-ipfs-core": "~0.43.0", "ipfsd-ctl": "~0.27.0", "pre-commit": "^1.2.2", "socket.io": "^2.0.4", diff --git a/src/bitswap/stat.js b/src/bitswap/stat.js index acaa9e0eb..ef12433e2 100644 --- a/src/bitswap/stat.js +++ b/src/bitswap/stat.js @@ -1,18 +1,19 @@ 'use strict' const promisify = require('promisify-es6') +const Big = require('big.js') const transform = function (res, callback) { callback(null, { provideBufLen: res.ProvideBufLen, wantlist: res.Wantlist || [], peers: res.Peers || [], - blocksReceived: res.BlocksReceived, - dataReceived: res.DataReceived, - blocksSent: res.BlocksSent, - dataSent: res.DataSent, - dupBlksReceived: res.DupBlksReceived, - dupDataReceived: res.DupDataReceived + blocksReceived: new Big(res.BlocksReceived), + dataReceived: new Big(res.DataReceived), + blocksSent: new Big(res.BlocksSent), + dataSent: new Big(res.DataSent), + dupBlksReceived: new Big(res.DupBlksReceived), + dupDataReceived: new Big(res.DupDataReceived) }) } diff --git a/src/repo/stat.js b/src/repo/stat.js index aca22bf11..840aad4c5 100644 --- a/src/repo/stat.js +++ b/src/repo/stat.js @@ -1,14 +1,15 @@ 'use strict' const promisify = require('promisify-es6') +const Big = require('big.js') const transform = function (res, callback) { callback(null, { - numObjects: res.NumObjects, - repoSize: res.RepoSize, + numObjects: new Big(res.NumObjects), + repoSize: new Big(res.RepoSize), repoPath: res.RepoPath, version: res.Version, - storageMax: res.StorageMax + storageMax: new Big(res.StorageMax) }) } diff --git a/src/stats/bitswap.js b/src/stats/bitswap.js index cb7f341da..cdf892240 100644 --- a/src/stats/bitswap.js +++ b/src/stats/bitswap.js @@ -1,18 +1,19 @@ 'use strict' const promisify = require('promisify-es6') +const Big = require('big.js') const transform = function (res, callback) { callback(null, { provideBufLen: res.ProvideBufLen, wantlist: res.Wantlist || [], peers: res.Peers || [], - blocksReceived: res.BlocksReceived, - dataReceived: res.DataReceived, - blocksSent: res.BlocksSent, - dataSent: res.DataSent, - dupBlksReceived: res.DupBlksReceived, - dupDataReceived: res.DupDataReceived + blocksReceived: new Big(res.BlocksReceived), + dataReceived: new Big(res.DataReceived), + blocksSent: new Big(res.BlocksSent), + dataSent: new Big(res.DataSent), + dupBlksReceived: new Big(res.DupBlksReceived), + dupDataReceived: new Big(res.DupDataReceived) }) } diff --git a/src/stats/bw-pull-stream.js b/src/stats/bw-pull-stream.js new file mode 100644 index 000000000..a3bc9466c --- /dev/null +++ b/src/stats/bw-pull-stream.js @@ -0,0 +1,36 @@ +'use strict' + +const toPull = require('stream-to-pull-stream') +const pull = require('pull-stream') +const transformChunk = require('./bw-util') +const deferred = require('pull-defer') + +const transform = () => (read) => (abort, cb) => { + read(abort, (err, data) => { + console.log(data) + if (err) return cb(err) + cb(null, transformChunk(data)) + }) +} + +module.exports = (send) => { + return (hash, opts) => { + opts = opts || {} + + const p = deferred.through() + + send({ + path: 'stats/bw', + qs: opts + }, (err, stream) => { + if (err) { + return p.end(err) + } + + pull(toPull(stream), p) + p.resolve(transform) + }) + + return p + } +} diff --git a/src/stats/bw-readable-stream.js b/src/stats/bw-readable-stream.js new file mode 100644 index 000000000..aa9f0701a --- /dev/null +++ b/src/stats/bw-readable-stream.js @@ -0,0 +1,31 @@ +'use strict' + +const Stream = require('readable-stream') +const pump = require('pump') +const transformChunk = require('./bw-util') + +module.exports = (send) => { + return (hash, opts) => { + opts = opts || {} + + const pt = new Stream.Transform({ + objectMode: true, + transform (chunk, encoding, cb) { + cb(null, transformChunk(chunk)) + } + }) + + send({ + path: 'stats/bw', + qs: opts + }, (err, stream) => { + if (err) { + return pt.destroy(err) + } + + pump(stream, pt) + }) + + return pt + } +} diff --git a/src/stats/bw-util.js b/src/stats/bw-util.js new file mode 100644 index 000000000..828874024 --- /dev/null +++ b/src/stats/bw-util.js @@ -0,0 +1,12 @@ +'use strict' + +const Big = require('big.js') + +module.exports = (chunk) => { + return { + totalIn: new Big(chunk.TotalIn), + totalOut: new Big(chunk.TotalOut), + rateIn: new Big(chunk.RateIn), + rateOut: new Big(chunk.RateOut) + } +} diff --git a/src/stats/bw.js b/src/stats/bw.js index 7a9d882fc..1db636a25 100644 --- a/src/stats/bw.js +++ b/src/stats/bw.js @@ -2,15 +2,16 @@ const promisify = require('promisify-es6') const streamToValue = require('../utils/stream-to-value') -const { Transform } = require('readable-stream') +const transformChunk = require('./bw-util') -const transformChunk = (chunk) => { - return { - totalIn: chunk.TotalIn, - totalOut: chunk.TotalOut, - rateIn: chunk.RateIn, - rateOut: chunk.RateOut - } +const transform = (res, callback) => { + return streamToValue(res, (err, data) => { + if (err) { + return callback(err) + } + + callback(null, transformChunk(data[0])) + }) } module.exports = (send) => { @@ -23,28 +24,6 @@ module.exports = (send) => { send.andTransform({ path: 'stats/bw', qs: opts - }, (res, callback) => { - if (!opts.poll) { - // If not polling, just send the result. - return streamToValue(res, (err, data) => { - if (err) { - return callback(err) - } - - callback(null, transformChunk(data[0])) - }) - } - - // If polling, return a readable stream. - const output = new Transform({ - objectMode: true, - transform (chunk, encoding, cb) { - cb(null, transformChunk(chunk)) - } - }) - - res.pipe(output) - callback(null, output) - }, callback) + }, transform, callback) }) } diff --git a/src/stats/index.js b/src/stats/index.js index 60b752587..445a39835 100644 --- a/src/stats/index.js +++ b/src/stats/index.js @@ -8,6 +8,8 @@ module.exports = (arg) => { return { bitswap: require('./bitswap')(send), bw: require('./bw')(send), + bwReadableStream: require('./bw-readable-stream')(send), + bwPullStream: require('./bw-pull-stream')(send), repo: require('./repo')(send) } } diff --git a/src/stats/repo.js b/src/stats/repo.js index 8e2a87a7e..8d7943dec 100644 --- a/src/stats/repo.js +++ b/src/stats/repo.js @@ -1,14 +1,15 @@ 'use strict' const promisify = require('promisify-es6') +const Big = require('big.js') const transform = function (res, callback) { callback(null, { - numObjects: res.NumObjects, - repoSize: res.RepoSize, + numObjects: new Big(res.NumObjects), + repoSize: new Big(res.RepoSize), repoPath: res.RepoPath, version: res.Version, - storageMax: res.StorageMax + storageMax: new Big(res.StorageMax) }) } From 9cc8af50170b3d86d8d35868abfb07440990130a Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Wed, 7 Feb 2018 08:04:06 +0000 Subject: [PATCH 4/5] fix bw pull stream --- src/stats/bw-pull-stream.js | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/stats/bw-pull-stream.js b/src/stats/bw-pull-stream.js index a3bc9466c..51f5a86d9 100644 --- a/src/stats/bw-pull-stream.js +++ b/src/stats/bw-pull-stream.js @@ -5,19 +5,11 @@ const pull = require('pull-stream') const transformChunk = require('./bw-util') const deferred = require('pull-defer') -const transform = () => (read) => (abort, cb) => { - read(abort, (err, data) => { - console.log(data) - if (err) return cb(err) - cb(null, transformChunk(data)) - }) -} - module.exports = (send) => { return (hash, opts) => { opts = opts || {} - const p = deferred.through() + const p = deferred.source() send({ path: 'stats/bw', @@ -27,8 +19,10 @@ module.exports = (send) => { return p.end(err) } - pull(toPull(stream), p) - p.resolve(transform) + p.resolve(pull( + toPull.source(stream), + pull.map(transformChunk) + )) }) return p From 062ce4c1a56bc3f24d7af0a88e96e1aac0adc53d Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Wed, 7 Feb 2018 09:44:15 +0000 Subject: [PATCH 5/5] Bump interface-ipfs-core version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index d1861fc19..d24198e91 100644 --- a/package.json +++ b/package.json @@ -70,7 +70,7 @@ "go-ipfs-dep": "^0.4.13", "gulp": "^3.9.1", "hapi": "^16.6.2", - "interface-ipfs-core": "~0.43.0", + "interface-ipfs-core": "~0.47.0", "ipfsd-ctl": "~0.27.0", "pre-commit": "^1.2.2", "socket.io": "^2.0.4",