From 36bb4e3260c0a6f79a42ee5aeabd1bede151d927 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Tue, 20 Feb 2018 19:04:41 +0000 Subject: [PATCH] feat: .stats.bw* --- package.json | 2 +- src/cli/commands/stats/bw.js | 48 +++++++++++++++++++++ src/core/components/stats.js | 76 ++++++++++++++++++++++++++++++++- src/http/api/resources/stats.js | 39 +++++++++++++++++ src/http/api/routes/stats.js | 8 ++++ test/cli/commands.js | 2 +- 6 files changed, 172 insertions(+), 3 deletions(-) create mode 100644 src/cli/commands/stats/bw.js diff --git a/package.json b/package.json index 04d259c173..b6959fff03 100644 --- a/package.json +++ b/package.json @@ -119,7 +119,7 @@ "is-ipfs": "^0.3.2", "is-stream": "^1.1.0", "joi": "^13.1.2", - "libp2p": "~0.18.0", + "libp2p": "libp2p/js-libp2p#09069d4", "libp2p-circuit": "~0.1.4", "libp2p-floodsub": "~0.14.1", "libp2p-kad-dht": "~0.8.0", diff --git a/src/cli/commands/stats/bw.js b/src/cli/commands/stats/bw.js new file mode 100644 index 0000000000..0eb318fe6f --- /dev/null +++ b/src/cli/commands/stats/bw.js @@ -0,0 +1,48 @@ +'use strict' + +const pull = require('pull-stream') + +module.exports = { + command: 'bw', + + describe: 'Get bandwidth information.', + + builder: { + peer: { + type: 'string', + default: '' + }, + proto: { + type: 'string', + default: '' + }, + poll: { + type: 'boolean', + default: false + }, + interval: { + type: 'string', + default: '1s' + } + }, + + handler (argv) { + const stream = argv.ipfs.stats.bwPullStream({ + peer: argv.peer, + proto: argv.proto, + poll: argv.poll, + interval: argv.interval + }) + + pull( + stream, + pull.drain((chunk) => { + console.log(`bandwidth status + total in: ${chunk.totalIn}B + total out: ${chunk.totalOut}B + rate in: ${chunk.rateIn}B/s + rate out: ${chunk.rateOut}B/s`) + }) + ) + } +} diff --git a/src/core/components/stats.js b/src/core/components/stats.js index 6bc7121301..a1c6dafec8 100644 --- a/src/core/components/stats.js +++ b/src/core/components/stats.js @@ -1,8 +1,82 @@ 'use strict' +const promisify = require('promisify-es6') +const Big = require('big.js') +const Pushable = require('pull-pushable') +const human = require('human-to-milliseconds') +const toStream = require('pull-stream-to-stream') + +function bandwidthStats (self, opts) { + return new Promise((resolve, reject) => { + let stats + + if (opts.peer) { + stats = self._libp2pNode.stats.forPeer(opts.peer) + } else if (opts.proto) { + stats = self._libp2pNode.stats.forProtocol(opts.proto) + } else { + stats = self._libp2pNode.stats.global + } + + if (!stats) { + resolve({ + totalIn: new Big(0), + totalOut: new Big(0), + rateIn: new Big(0), + rateOut: new Big(0) + }) + return + } + + resolve({ + totalIn: stats.snapshot.dataReceived, + totalOut: stats.snapshot.dataSent, + rateIn: new Big(stats.movingAverages.dataReceived['60000'].movingAverage() / 60), + rateOut: new Big(stats.movingAverages.dataSent['60000'].movingAverage() / 60) + }) + }) +} + module.exports = function stats (self) { + const _bwPullStream = (opts) => { + let interval = null + let stream = Pushable(true, () => { + if (interval) { + clearInterval(interval) + } + }) + + if (opts.poll) { + human(opts.interval || '1s', (err, value) => { + if (err) throw err + + interval = setInterval(() => { + bandwidthStats(self, opts) + .then((stats) => stream.push(stats)) + .catch((err) => stream.end(err)) + }, value) + }) + } else { + bandwidthStats(self, opts) + .then((stats) => { + stream.push(stats) + stream.end() + }) + .catch((err) => stream.end(err)) + } + + return stream.source + } + return { bitswap: require('./bitswap')(self).stat, - repo: require('./repo')(self).stat + repo: require('./repo')(self).stat, + bw: promisify((opts, callback) => { + bandwidthStats(self, opts) + .then((stats) => callback(null, stats)) + .catch((err) => callback(err)) + }), + bwReadableStream: (opts) => toStream.source(_bwPullStream(opts)), + bwPullStream: _bwPullStream } } diff --git a/src/http/api/resources/stats.js b/src/http/api/resources/stats.js index 839f92a3b3..f1d1a50707 100644 --- a/src/http/api/resources/stats.js +++ b/src/http/api/resources/stats.js @@ -1,7 +1,46 @@ 'use strict' +const { Transform, Readable } = require('readable-stream') + +const transformBandwidth = (stat) => { + return { + TotalIn: stat.totalIn, + TotalOut: stat.totalOut, + RateIn: stat.rateIn, + RateOut: stat.rateOut + } +} + exports = module.exports exports.bitswap = require('./bitswap').stat exports.repo = require('./repo').stat + +exports.bw = (request, reply) => { + const ipfs = request.server.app.ipfs + const options = { + peer: request.query.peer, + proto: request.query.proto, + poll: request.query.poll === 'true', + interval: request.query.interval || '1s' + } + + const res = ipfs.stats.bwReadableStream(options) + const output = new Transform({ + writableObjectMode:true, + transform (chunk, encoding, cb) { + this.push(JSON.stringify(transformBandwidth(chunk)) + '\n') + cb() + } + }) + + request.on('disconnect', () => { + res.destroy() + }) + + res.pipe(output) + reply(output) + .header('content-type', 'application/json') + .header('x-chunked-output', '1') +} diff --git a/src/http/api/routes/stats.js b/src/http/api/routes/stats.js index c167ec58c0..35ec768083 100644 --- a/src/http/api/routes/stats.js +++ b/src/http/api/routes/stats.js @@ -20,4 +20,12 @@ module.exports = (server) => { handler: resources.stats.repo } }) + + api.route({ + method: '*', + path: '/api/v0/stats/bw', + config: { + handler: resources.stats.bw + } + }) } diff --git a/test/cli/commands.js b/test/cli/commands.js index d1d0812957..06154cfac6 100644 --- a/test/cli/commands.js +++ b/test/cli/commands.js @@ -4,7 +4,7 @@ const expect = require('chai').expect const runOnAndOff = require('../utils/on-and-off') -const commandCount = 72 +const commandCount = 73 describe('commands', () => runOnAndOff((thing) => { let ipfs