-
Notifications
You must be signed in to change notification settings - Fork 37
observe traffic and expose statistics #243
Changes from 18 commits
494d7fd
758cf2a
0afd916
0f5bfaa
20d6934
7210efc
0eb8c26
84e766b
fa6e377
a1d9014
3d0ad42
7ee4fc5
6ea2061
e692fa2
905680a
f6db42d
ae88175
39f01af
84b00f9
eb2cf59
fb0d209
a2cd482
2ed0cf8
5b3cb3b
6c0ff32
efc7c92
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca | |
- [`switch.stop(callback)`](#swarmclosecallback) | ||
- [`switch.connection`](#connection) | ||
- [Internal Transports API](#transports) | ||
- [`switch.stats`](#stats-api) | ||
- [Design Notes](#designnotes) | ||
- [Multitransport](#multitransport) | ||
- [Connection upgrades](#connection-upgrades) | ||
|
@@ -51,7 +52,7 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca | |
```JavaScript | ||
const switch = require('libp2p-switch') | ||
|
||
const sw = new switch(peerInfo [, peerBook]) | ||
const sw = new switch(peerInfo [, peerBook [, options]]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes it look like options only exists in the context of passing peerBook. What about changing this to just a single options object and assert on its properties? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @diasdavid can do this, but will be a breaking API change.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait, I see that a peerBook is always required. The only optional argument should be |
||
``` | ||
|
||
## API | ||
|
@@ -223,6 +224,106 @@ To avoid the confusion between connection, stream, transport, and other names th | |
- connection - something that implements the transversal expectations of a stream between two peers, including the benefits of using a stream plus having a way to do half duplex, full duplex | ||
- transport - something that as a dial/listen interface and return objs that implement a connection interface | ||
|
||
### Stats API | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
#### Global stats | ||
|
||
##### `switch.stats.global.snapshot` | ||
|
||
Should return a stats snapshot, which is an object containing the following keys and respective values: | ||
|
||
* dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number | ||
* dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
##### `switch.stats.global.movingAverages` | ||
|
||
Returns an object containing the following keys: | ||
|
||
* dataSent | ||
* dataReceived | ||
|
||
Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds). | ||
|
||
Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme). | ||
|
||
#### Per-transport stats | ||
|
||
##### `switch.stats.transports()` | ||
|
||
Returns an array containing the tags (string) for each observed transport. | ||
|
||
##### `switch.stats.forTransports(transportTag).snapshot` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nickpick |
||
|
||
Should return a stats snapshot, which is an object containing the following keys and respective values: | ||
|
||
* dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number | ||
* dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number | ||
|
||
##### `switch.stats.forTransports(transportTag).movingAverages` | ||
|
||
Returns an object containing the following keys: | ||
|
||
* dataSent | ||
* dataReceived | ||
|
||
Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds). | ||
|
||
Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme). | ||
|
||
#### Per-protocol stats | ||
|
||
##### `switch.stats.protocols()` | ||
|
||
Returns an array containing the tags (string) for each observed protocol. | ||
|
||
##### `switch.stats.forProtocol(protocolTag).snapshot` | ||
|
||
Should return a stats snapshot, which is an object containing the following keys and respective values: | ||
|
||
* dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number | ||
* dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number | ||
|
||
|
||
##### `switch.stats.forProtocol(protocolTag).movingAverages` | ||
|
||
Returns an object containing the following keys: | ||
|
||
* dataSent | ||
* dataReceived | ||
|
||
Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds). | ||
|
||
Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme). | ||
|
||
#### Per-peer stats | ||
|
||
##### `switch.stats.peers()` | ||
|
||
Returns an array containing the peerIDs (B58-encoded string) for each observed peer. | ||
|
||
##### `switch.stats.forPeer(peerId:String).snapshot` | ||
|
||
Should return a stats snapshot, which is an object containing the following keys and respective values: | ||
|
||
* dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number | ||
* dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number | ||
|
||
|
||
##### `switch.stats.forPeer(peerId:String).movingAverages` | ||
|
||
Returns an object containing the following keys: | ||
|
||
* dataSent | ||
* dataReceived | ||
|
||
Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds). | ||
|
||
Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme). | ||
|
||
#### Stats update interval | ||
|
||
Stats are not updated in real-time. Instead, measurements are buffered and stats are updated at an interval. The maximum interval can be defined through the `Switch` constructor option `stats.computeThrottleTimeout`, defined in miliseconds. | ||
|
||
### This module uses `pull-streams` | ||
|
||
We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362). | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,12 +56,14 @@ | |
}, | ||
"dependencies": { | ||
"async": "^2.6.0", | ||
"big.js": "^5.0.3", | ||
"debug": "^3.1.0", | ||
"interface-connection": "~0.3.2", | ||
"ip-address": "^5.8.9", | ||
"libp2p-circuit": "~0.1.4", | ||
"libp2p-identify": "~0.6.3", | ||
"lodash.includes": "^4.3.0", | ||
"moving-average": "^1.0.0", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm glad you created this module :) |
||
"multiaddr": "^3.0.2", | ||
"multistream-select": "~0.14.1", | ||
"once": "^1.4.0", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,6 @@ const setImmediate = require('async/setImmediate') | |
|
||
const Circuit = require('libp2p-circuit') | ||
|
||
const protocolMuxer = require('./protocol-muxer') | ||
const plaintext = require('./plaintext') | ||
|
||
module.exports = function connection (swarm) { | ||
|
@@ -25,16 +24,15 @@ module.exports = function connection (swarm) { | |
swarm.handle(muxer.multicodec, (protocol, conn) => { | ||
const muxedConn = muxer.listener(conn) | ||
|
||
muxedConn.on('stream', (conn) => { | ||
protocolMuxer(swarm.protocols, conn) | ||
}) | ||
muxedConn.on('stream', swarm.protocolMuxer(protocol)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my bad for not renaming things here. Mind s/swarm/switch/g all the occurrences? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would, but |
||
|
||
// If identify is enabled | ||
// 1. overload getPeerInfo | ||
// 2. call getPeerInfo | ||
// 3. add this conn to the pool | ||
if (swarm.identify) { | ||
// overload peerInfo to use Identify instead | ||
const setPeerInfo = conn.setPeerInfo | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't there an overload of setPeerInfo? I'm afraid that this might cause issues in the future. Is it needed? |
||
conn.getPeerInfo = (cb) => { | ||
const conn = muxedConn.newStream() | ||
const ms = new multistream.Dialer() | ||
|
@@ -50,7 +48,12 @@ module.exports = function connection (swarm) { | |
}) | ||
cb(null, peerInfo) | ||
} | ||
], cb) | ||
], (err, pi) => { | ||
if (pi) { | ||
setPeerInfo.call(conn, pi) | ||
} | ||
cb(err, pi) | ||
}) | ||
} | ||
|
||
conn.getPeerInfo((err, peerInfo) => { | ||
|
@@ -120,7 +123,7 @@ module.exports = function connection (swarm) { | |
swarm.handle(tag, (protocol, conn) => { | ||
const myId = swarm._peerInfo.id | ||
const secure = encrypt(myId, conn, undefined, () => { | ||
protocolMuxer(swarm.protocols, secure) | ||
swarm.protocolMuxer(tag)(secure) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something is not correct here. Doing swarm.protocolMuxer(tag) is saying "After you get the conn encrypted, you can encrypt again". It should be for all the Protocols being handled and not just for one. Note: I'm still reading through the PR and you might have changed something else. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's the exact same behaviour as before, with the exception we're tracking the protocol tag for accounting purposes.. |
||
}) | ||
}) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
'use strict' | ||
|
||
const Connection = require('interface-connection').Connection | ||
const pull = require('pull-stream') | ||
|
||
module.exports = (transport, protocol, _conn, observer) => { | ||
const peerInfo = new Promise((resolve, reject) => { | ||
_conn.getPeerInfo((err, peerInfo) => { | ||
if (!err && peerInfo) { | ||
resolve(peerInfo) | ||
return | ||
} | ||
|
||
const setPeerInfo = _conn.setPeerInfo | ||
_conn.setPeerInfo = (pi) => { | ||
setPeerInfo.call(_conn, pi) | ||
resolve(pi) | ||
} | ||
}) | ||
}) | ||
|
||
const stream = { | ||
source: pull( | ||
_conn, | ||
observer.incoming(transport, protocol, peerInfo)), | ||
sink: pull( | ||
observer.outgoing(transport, protocol, peerInfo), | ||
_conn) | ||
} | ||
return new Connection(stream, _conn) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
'use strict' | ||
|
||
const pull = require('pull-stream') | ||
const EventEmitter = require('events') | ||
|
||
module.exports = (swarm) => { | ||
const observer = Object.assign(new EventEmitter(), { | ||
incoming: observe('in'), | ||
outgoing: observe('out') | ||
}) | ||
|
||
swarm.on('peer-mux-established', (peerInfo) => { | ||
observer.emit('peer:connected', peerInfo.id.toB58String()) | ||
}) | ||
|
||
swarm.on('peer-mux-closed', (peerInfo) => { | ||
observer.emit('peer:closed', peerInfo.id.toB58String()) | ||
}) | ||
|
||
return observer | ||
|
||
function observe (direction) { | ||
return (transport, protocol, peerInfo) => { | ||
return pull.map((buffer) => { | ||
willObserve(peerInfo, transport, protocol, direction, buffer.length) | ||
return buffer | ||
}) | ||
} | ||
} | ||
|
||
function willObserve (peerInfo, transport, protocol, direction, bufferLength) { | ||
peerInfo.then((pi) => { | ||
const peerId = pi.id.toB58String() | ||
setImmediate(() => observer.emit('message', peerId, transport, protocol, direction, bufferLength)) | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,29 @@ | ||
'use strict' | ||
|
||
const multistream = require('multistream-select') | ||
const observeConn = require('./observe-connection') | ||
|
||
module.exports = function protocolMuxer (protocols, conn) { | ||
const ms = new multistream.Listener() | ||
module.exports = function protocolMuxer (protocols, observer) { | ||
return (transport) => (parentConn) => { | ||
const ms = new multistream.Listener() | ||
|
||
Object.keys(protocols).forEach((protocol) => { | ||
if (!protocol) { | ||
return | ||
} | ||
Object.keys(protocols).forEach((protocol) => { | ||
if (!protocol) { | ||
return | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still digesting the code but it seems that there is an assumption that all the switch.handle happen in the beginning and so you are locking the list of protocols we support. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The protocols is an object, which can get mutated later as the user adds more protocols. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not get https://github.com/libp2p/js-libp2p-switch/issues/239 even harder to fix :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @diasdavid again, this was the same behaviour as before: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. Makes sense now thanks for explaining :) |
||
|
||
ms.addHandler(protocol, protocols[protocol].handlerFunc, protocols[protocol].matchFunc) | ||
}) | ||
const handler = (protocol, _conn) => { | ||
const conn = observeConn(transport, protocol, _conn, observer) | ||
protocols[protocol].handlerFunc.call(null, protocol, conn) | ||
} | ||
|
||
ms.handle(conn, (err) => { | ||
if (err) { | ||
// the multistream handshake failed | ||
} | ||
}) | ||
ms.addHandler(protocol, handler, protocols[protocol].matchFunc) | ||
}) | ||
|
||
ms.handle(parentConn, (err) => { | ||
if (err) { | ||
// the multistream handshake failed | ||
} | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this above the Internal Transports line.