Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

observe traffic and expose statistics #243

Merged
merged 26 commits into from
Feb 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
494d7fd
starting to observe
pgte Feb 16, 2018
758cf2a
WIP: stats: observer to stats
pgte Feb 19, 2018
0afd916
stats have transport now
pgte Feb 19, 2018
0f5bfaa
stats: peer list and transport list
pgte Feb 19, 2018
20d6934
stats: stop
pgte Feb 19, 2018
7210efc
trying to set peer info
pgte Feb 19, 2018
0eb8c26
propagating peer info
pgte Feb 19, 2018
84e766b
stats: first tests
pgte Feb 20, 2018
fa6e377
stats: transports, protocols and peers
pgte Feb 20, 2018
a1d9014
stats: transport-specific stats
pgte Feb 20, 2018
3d0ad42
stats: not relying on the order / existence of tests
pgte Feb 20, 2018
7ee4fc5
stats: peer-specific stats
pgte Feb 20, 2018
6ea2061
setPeerInfo only if there's a peerInfo
pgte Feb 20, 2018
e692fa2
stats: tests now are isolated
pgte Feb 20, 2018
905680a
stats: tests: removed console.log
pgte Feb 20, 2018
f6db42d
stats: tests not hardcoding the port
pgte Feb 20, 2018
ae88175
stats: documented API
pgte Feb 20, 2018
39f01af
removed console log
pgte Feb 20, 2018
84b00f9
stats: emit update event every time stats get updated
pgte Feb 21, 2018
eb2cf59
docs: moved stats api place in readme
pgte Feb 21, 2018
fb0d209
docs: fixed typo
pgte Feb 21, 2018
a2cd482
setting the peer info directly without having to cache the method
pgte Feb 21, 2018
2ed0cf8
docs: consistency for list markdown
pgte Feb 21, 2018
5b3cb3b
docs: dcumented options
pgte Feb 21, 2018
6c0ff32
stats: retaning old peers in case they reconnect
pgte Feb 21, 2018
efc7c92
stats: fix: now accounts for transport, and separates protocol-specif…
pgte Feb 22, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 127 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca
- [`switch.start(callback)`](#swarmlistencallback)
- [`switch.stop(callback)`](#swarmclosecallback)
- [`switch.connection`](#connection)
- [`switch.stats`](#stats-api)
- [Internal Transports API](#transports)
- [Design Notes](#designnotes)
- [Multitransport](#multitransport)
Expand All @@ -51,9 +52,26 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca
```JavaScript
const switch = require('libp2p-switch')

const sw = new switch(peerInfo [, peerBook])
const sw = new switch(peerInfo , peerBook [, options])
```

If defined, `options` should be an object with the following keys and respective values:

- `stats`: an object with the following keys and respective values:
- `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`.
- `computeThrottleMaxQueueSize`: maximum queue size to perform stats computation throttling. Defaults to `1000`.
- `computeThrottleTimeout`: Throttle timeout, in miliseconds. Defaults to `2000`,
- `movingAverageIntervals`: Array containin the intervals, in miliseconds, for which moving averages are calculated. Defaults to:

```js
[
60 * 1000, // 1 minute
5 * 60 * 1000, // 5 minutes
15 * 60 * 1000 // 15 minutes
]
```


## API

- peerInfo is a [PeerInfo](https://github.com/libp2p/js-peer-info) object that has the peer information.
Expand Down Expand Up @@ -147,6 +165,111 @@ Enable circuit relaying.
- active - is it an active or passive relay (default false)
- `callback`

### Stats API

##### `switch.stats.emit('update')`

Every time any stat value changes, this object emits an `update` event.

#### Global stats

##### `switch.stats.global.snapshot`

Should return a stats snapshot, which is an object containing the following keys and respective values:

- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number
- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number

##### `switch.stats.global.movingAverages`

Returns an object containing the following keys:

- dataSent
- dataReceived

Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds).

Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme).

#### Per-transport stats

##### `switch.stats.transports()`

Returns an array containing the tags (string) for each observed transport.

##### `switch.stats.forTransport(transportTag).snapshot`

Should return a stats snapshot, which is an object containing the following keys and respective values:

- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number
- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number

##### `switch.stats.forTransport(transportTag).movingAverages`

Returns an object containing the following keys:

dataSent
dataReceived

Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds).

Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme).

#### Per-protocol stats

##### `switch.stats.protocols()`

Returns an array containing the tags (string) for each observed protocol.

##### `switch.stats.forProtocol(protocolTag).snapshot`

Should return a stats snapshot, which is an object containing the following keys and respective values:

- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number
- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number


##### `switch.stats.forProtocol(protocolTag).movingAverages`

Returns an object containing the following keys:

- dataSent
- dataReceived

Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds).

Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme).

#### Per-peer stats

##### `switch.stats.peers()`

Returns an array containing the peerIDs (B58-encoded string) for each observed peer.

##### `switch.stats.forPeer(peerId:String).snapshot`

Should return a stats snapshot, which is an object containing the following keys and respective values:

- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number
- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number


##### `switch.stats.forPeer(peerId:String).movingAverages`

Returns an object containing the following keys:

- dataSent
- dataReceived

Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds).

Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme).

#### Stats update interval

Stats are not updated in real-time. Instead, measurements are buffered and stats are updated at an interval. The maximum interval can be defined through the `Switch` constructor option `stats.computeThrottleTimeout`, defined in miliseconds.


### Internal Transports API

##### `switch.transport.add(key, transport, options)`
Expand Down Expand Up @@ -212,9 +335,10 @@ Identify is a protocol that switchs mounts on top of itself, to identify the con
- a) peer A dials a conn to peer B
- b) that conn gets upgraded to a stream multiplexer that both peers agree
- c) peer B executes de identify protocol
- d) peer B now can open streams to peer A, knowing which is the identity of peer A
- d) peer B now can open streams to peer A, knowing which is the
identity of peer A

In addition to this, we also share the 'observed addresses' by the other peer, which is extremely useful information for different kinds of network topologies.
In addition to this, we also share the "observed addresses" by the other peer, which is extremely useful information for different kinds of network topologies.

### Notes

Expand Down
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,21 @@
},
"dependencies": {
"async": "^2.6.0",
"big.js": "^5.0.3",
"debug": "^3.1.0",
"interface-connection": "~0.3.2",
"ip-address": "^5.8.9",
"libp2p-circuit": "~0.1.4",
"libp2p-identify": "~0.6.3",
"lodash.includes": "^4.3.0",
"moving-average": "^1.0.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad you created this module :)

"multiaddr": "^3.0.2",
"multistream-select": "~0.14.1",
"once": "^1.4.0",
"peer-id": "~0.10.6",
"peer-info": "~0.11.6",
"pull-stream": "^3.6.1"
"pull-stream": "^3.6.1",
"quick-lru": "^1.1.0"
},
"contributors": [
"Arnaud <arnaud.valensi@gmail.com>",
Expand Down
54 changes: 28 additions & 26 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,33 @@ const identify = require('libp2p-identify')
const multistream = require('multistream-select')
const waterfall = require('async/waterfall')
const debug = require('debug')
const log = debug('libp2p:swarm:connection')
const log = debug('libp2p:switch:connection')
const once = require('once')
const setImmediate = require('async/setImmediate')

const Circuit = require('libp2p-circuit')

const protocolMuxer = require('./protocol-muxer')
const plaintext = require('./plaintext')

module.exports = function connection (swarm) {
module.exports = function connection (swtch) {
return {
addUpgrade () {},

addStreamMuxer (muxer) {
// for dialing
swarm.muxers[muxer.multicodec] = muxer
swtch.muxers[muxer.multicodec] = muxer

// for listening
swarm.handle(muxer.multicodec, (protocol, conn) => {
swtch.handle(muxer.multicodec, (protocol, conn) => {
const muxedConn = muxer.listener(conn)

muxedConn.on('stream', (conn) => {
protocolMuxer(swarm.protocols, conn)
})
muxedConn.on('stream', swtch.protocolMuxer(null))

// If identify is enabled
// 1. overload getPeerInfo
// 2. call getPeerInfo
// 3. add this conn to the pool
if (swarm.identify) {
if (swtch.identify) {
// overload peerInfo to use Identify instead
conn.getPeerInfo = (cb) => {
const conn = muxedConn.newStream()
Expand All @@ -46,11 +43,16 @@ module.exports = function connection (swarm) {
(conn, cb) => identify.dialer(conn, cb),
(peerInfo, observedAddrs, cb) => {
observedAddrs.forEach((oa) => {
swarm._peerInfo.multiaddrs.addSafe(oa)
swtch._peerInfo.multiaddrs.addSafe(oa)
})
cb(null, peerInfo)
}
], cb)
], (err, pi) => {
if (pi) {
conn.setPeerInfo(pi)
}
cb(err, pi)
})
}

conn.getPeerInfo((err, peerInfo) => {
Expand All @@ -59,7 +61,7 @@ module.exports = function connection (swarm) {
}
const b58Str = peerInfo.id.toB58String()

swarm.muxedConns[b58Str] = { muxer: muxedConn }
swtch.muxedConns[b58Str] = { muxer: muxedConn }

if (peerInfo.multiaddrs.size > 0) {
// with incomming conn and through identify, going to pick one
Expand All @@ -72,16 +74,16 @@ module.exports = function connection (swarm) {
// no addr, use just their IPFS id
peerInfo.connect(`/ipfs/${b58Str}`)
}
peerInfo = swarm._peerBook.put(peerInfo)
peerInfo = swtch._peerBook.put(peerInfo)

muxedConn.on('close', () => {
delete swarm.muxedConns[b58Str]
delete swtch.muxedConns[b58Str]
peerInfo.disconnect()
peerInfo = swarm._peerBook.put(peerInfo)
setImmediate(() => swarm.emit('peer-mux-closed', peerInfo))
peerInfo = swtch._peerBook.put(peerInfo)
setImmediate(() => swtch.emit('peer-mux-closed', peerInfo))
})

setImmediate(() => swarm.emit('peer-mux-established', peerInfo))
setImmediate(() => swtch.emit('peer-mux-established', peerInfo))
})
}

Expand All @@ -90,9 +92,9 @@ module.exports = function connection (swarm) {
},

reuse () {
swarm.identify = true
swarm.handle(identify.multicodec, (protocol, conn) => {
identify.listener(conn, swarm._peerInfo)
swtch.identify = true
swtch.handle(identify.multicodec, (protocol, conn) => {
identify.listener(conn, swtch._peerInfo)
})
},

Expand All @@ -106,7 +108,7 @@ module.exports = function connection (swarm) {

// TODO: (dryajov) should we enable circuit listener and
// dialer by default?
swarm.transport.add(Circuit.tag, new Circuit(swarm, config))
swtch.transport.add(Circuit.tag, new Circuit(swtch, config))
}
},

Expand All @@ -116,15 +118,15 @@ module.exports = function connection (swarm) {
encrypt = plaintext.encrypt
}

swarm.unhandle(swarm.crypto.tag)
swarm.handle(tag, (protocol, conn) => {
const myId = swarm._peerInfo.id
swtch.unhandle(swtch.crypto.tag)
swtch.handle(tag, (protocol, conn) => {
const myId = swtch._peerInfo.id
const secure = encrypt(myId, conn, undefined, () => {
protocolMuxer(swarm.protocols, secure)
swtch.protocolMuxer(null)(secure)
})
})

swarm.crypto = {tag, encrypt}
swtch.crypto = {tag, encrypt}
}
}
}
Loading