Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

observe traffic and expose statistics #243

Merged
merged 26 commits into from
Feb 27, 2018
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
494d7fd
starting to observe
pgte Feb 16, 2018
758cf2a
WIP: stats: observer to stats
pgte Feb 19, 2018
0afd916
stats have transport now
pgte Feb 19, 2018
0f5bfaa
stats: peer list and transport list
pgte Feb 19, 2018
20d6934
stats: stop
pgte Feb 19, 2018
7210efc
trying to set peer info
pgte Feb 19, 2018
0eb8c26
propagating peer info
pgte Feb 19, 2018
84e766b
stats: first tests
pgte Feb 20, 2018
fa6e377
stats: transports, protocols and peers
pgte Feb 20, 2018
a1d9014
stats: transport-specific stats
pgte Feb 20, 2018
3d0ad42
stats: not relying on the order / existence of tests
pgte Feb 20, 2018
7ee4fc5
stats: peer-specific stats
pgte Feb 20, 2018
6ea2061
setPeerInfo only if there's a peerInfo
pgte Feb 20, 2018
e692fa2
stats: tests now are isolated
pgte Feb 20, 2018
905680a
stats: tests: removed console.log
pgte Feb 20, 2018
f6db42d
stats: tests not hardcoding the port
pgte Feb 20, 2018
ae88175
stats: documented API
pgte Feb 20, 2018
39f01af
removed console log
pgte Feb 20, 2018
84b00f9
stats: emit update event every time stats get updated
pgte Feb 21, 2018
eb2cf59
docs: moved stats api place in readme
pgte Feb 21, 2018
fb0d209
docs: fixed typo
pgte Feb 21, 2018
a2cd482
setting the peer info directly without having to cache the method
pgte Feb 21, 2018
2ed0cf8
docs: consistency for list markdown
pgte Feb 21, 2018
5b3cb3b
docs: dcumented options
pgte Feb 21, 2018
6c0ff32
stats: retaning old peers in case they reconnect
pgte Feb 21, 2018
efc7c92
stats: fix: now accounts for transport, and separates protocol-specif…
pgte Feb 22, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@
},
"dependencies": {
"async": "^2.6.0",
"big.js": "^5.0.3",
"debug": "^3.1.0",
"interface-connection": "~0.3.2",
"ip-address": "^5.8.9",
"libp2p-circuit": "~0.1.4",
"libp2p-identify": "~0.6.3",
"lodash.includes": "^4.3.0",
"moving-average": "^1.0.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad you created this module :)

"multiaddr": "^3.0.2",
"multistream-select": "~0.14.1",
"once": "^1.4.0",
Expand Down
13 changes: 7 additions & 6 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const setImmediate = require('async/setImmediate')

const Circuit = require('libp2p-circuit')

const protocolMuxer = require('./protocol-muxer')
const plaintext = require('./plaintext')

module.exports = function connection (swarm) {
Expand All @@ -25,16 +24,15 @@ module.exports = function connection (swarm) {
swarm.handle(muxer.multicodec, (protocol, conn) => {
const muxedConn = muxer.listener(conn)

muxedConn.on('stream', (conn) => {
protocolMuxer(swarm.protocols, conn)
})
muxedConn.on('stream', swarm.protocolMuxer(protocol))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad for not renaming things here. Mind s/swarm/switch/g all the occurrences?

Copy link
Contributor Author

@pgte pgte Feb 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would, but switch is a reserved JS keyword.. ;)
Switching to swtch, in lack of a better replacement…


// If identify is enabled
// 1. overload getPeerInfo
// 2. call getPeerInfo
// 3. add this conn to the pool
if (swarm.identify) {
// overload peerInfo to use Identify instead
const setPeerInfo = conn.setPeerInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there an overload of setPeerInfo? I'm afraid that this might cause issues in the future. Is it needed?

conn.getPeerInfo = (cb) => {
const conn = muxedConn.newStream()
const ms = new multistream.Dialer()
Expand All @@ -50,7 +48,10 @@ module.exports = function connection (swarm) {
})
cb(null, peerInfo)
}
], cb)
], (err, pi) => {
setPeerInfo.call(conn, pi)
cb(err, pi)
})
}

conn.getPeerInfo((err, peerInfo) => {
Expand Down Expand Up @@ -120,7 +121,7 @@ module.exports = function connection (swarm) {
swarm.handle(tag, (protocol, conn) => {
const myId = swarm._peerInfo.id
const secure = encrypt(myId, conn, undefined, () => {
protocolMuxer(swarm.protocols, secure)
swarm.protocolMuxer(tag)(secure)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something is not correct here. Doing swarm.protocolMuxer(tag) is saying "After you get the conn encrypted, you can encrypt again". It should be for all the Protocols being handled and not just for one.

Note: I'm still reading through the PR and you might have changed something else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the exact same behaviour as before, with the exception we're tracking the protocol tag for accounting purposes..

})
})

Expand Down
8 changes: 5 additions & 3 deletions src/dial.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ const Circuit = require('libp2p-circuit')
const debug = require('debug')
const log = debug('libp2p:swarm:dial')

const protocolMuxer = require('./protocol-muxer')

function dial (swarm) {
return (peer, protocol, callback) => {
if (typeof protocol === 'function') {
Expand All @@ -22,6 +20,7 @@ function dial (swarm) {
const pi = getPeerInfo(peer, swarm._peerBook)

const proxyConn = new Connection()
proxyConn.setPeerInfo(pi)

const b58Id = pi.id.toB58String()
log('dialing %s', b58Id)
Expand Down Expand Up @@ -133,6 +132,7 @@ function dial (swarm) {
if (err) {
return cb(err)
}
wrapped.setPeerInfo(pi)
cb(null, wrapped)
})
})
Expand Down Expand Up @@ -189,7 +189,8 @@ function dial (swarm) {

// For incoming streams, in case identify is on
muxedConn.on('stream', (conn) => {
protocolMuxer(swarm.protocols, conn)
conn.setPeerInfo(pi)
swarm.protocolMuxer(key)(conn)
})

setImmediate(() => swarm.emit('peer-mux-established', pi))
Expand All @@ -213,6 +214,7 @@ function dial (swarm) {
if (err) {
return cb(err)
}
proxyConn.setPeerInfo(pi)
proxyConn.setInnerConn(conn)
cb(null, proxyConn)
})
Expand Down
14 changes: 11 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@ const transport = require('./transport')
const connection = require('./connection')
const getPeerInfo = require('./get-peer-info')
const dial = require('./dial')
const protocolMuxer = require('./protocol-muxer')
const ProtocolMuxer = require('./protocol-muxer')
const plaintext = require('./plaintext')
const Observer = require('./observer')
const Stats = require('./stats')
const assert = require('assert')

class Switch extends EE {
constructor (peerInfo, peerBook) {
constructor (peerInfo, peerBook, options) {
super()
assert(peerInfo, 'You must provide a `peerInfo`')
assert(peerBook, 'You must provide a `peerBook`')

this._peerInfo = peerInfo
this._peerBook = peerBook
this._options = options || {}

this.setMaxListeners(Infinity)
// transports --
Expand Down Expand Up @@ -69,10 +72,14 @@ class Switch extends EE {
})
}

const observer = Observer(this)
this.stats = Stats(observer, this._options.stats)
this.protocolMuxer = ProtocolMuxer(this.protocols, observer)

this.handle(this.crypto.tag, (protocol, conn) => {
const peerId = this._peerInfo.id
const wrapped = this.crypto.encrypt(peerId, conn, undefined, () => {})
return protocolMuxer(this.protocols, wrapped)
return this.protocolMuxer(this.crypto.tag)(wrapped)
})

// higher level (public) API
Expand All @@ -88,6 +95,7 @@ class Switch extends EE {
}

stop (callback) {
this.stats.stop()
series([
(cb) => each(this.muxedConns, (conn, cb) => {
conn.muxer.end((err) => {
Expand Down
31 changes: 31 additions & 0 deletions src/observe-connection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use strict'

const Connection = require('interface-connection').Connection
const pull = require('pull-stream')

module.exports = (transport, protocol, _conn, observer) => {
const peerInfo = new Promise((resolve, reject) => {
_conn.getPeerInfo((err, peerInfo) => {
if (!err && peerInfo) {
resolve(peerInfo)
return
}

const setPeerInfo = _conn.setPeerInfo
_conn.setPeerInfo = (pi) => {
setPeerInfo.call(_conn, pi)
resolve(pi)
}
})
})

const stream = {
source: pull(
_conn,
observer.incoming(transport, protocol, peerInfo)),
sink: pull(
observer.outgoing(transport, protocol, peerInfo),
_conn)
}
return new Connection(stream, _conn)
}
37 changes: 37 additions & 0 deletions src/observer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
'use strict'

const pull = require('pull-stream')
const EventEmitter = require('events')

module.exports = (swarm) => {
const observer = Object.assign(new EventEmitter(), {
incoming: observe('in'),
outgoing: observe('out')
})

swarm.on('peer-mux-established', (peerInfo) => {
observer.emit('peer:connected', peerInfo.id.toB58String())
})

swarm.on('peer-mux-closed', (peerInfo) => {
observer.emit('peer:closed', peerInfo.id.toB58String())
})

return observer

function observe (direction) {
return (transport, protocol, peerInfo) => {
return pull.map((buffer) => {
willObserve(peerInfo, transport, protocol, direction, buffer.length)
return buffer
})
}
}

function willObserve (peerInfo, transport, protocol, direction, bufferLength) {
peerInfo.then((pi) => {
const peerId = pi.id.toB58String()
setImmediate(() => observer.emit('message', peerId, transport, protocol, direction, bufferLength))
})
}
}
34 changes: 21 additions & 13 deletions src/protocol-muxer.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
'use strict'

const multistream = require('multistream-select')
const observeConn = require('./observe-connection')

module.exports = function protocolMuxer (protocols, conn) {
const ms = new multistream.Listener()
module.exports = function protocolMuxer (protocols, observer) {
return (transport) => (parentConn) => {
const ms = new multistream.Listener()

Object.keys(protocols).forEach((protocol) => {
if (!protocol) {
return
}
Object.keys(protocols).forEach((protocol) => {
if (!protocol) {
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still digesting the code but it seems that there is an assumption that all the switch.handle happen in the beginning and so you are locking the list of protocols we support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The protocols is an object, which can get mutated later as the user adds more protocols.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not get https://github.com/libp2p/js-libp2p-switch/issues/239 even harder to fix :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@diasdavid again, this was the same behaviour as before: switch.protocols also contained a mutable object with the protocol handlers... I'm just storing the protocols on that closure, but it's still a JS object that will get mutated, same as before..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Makes sense now thanks for explaining :)


ms.addHandler(protocol, protocols[protocol].handlerFunc, protocols[protocol].matchFunc)
})
const handler = (protocol, _conn) => {
const conn = observeConn(transport, protocol, _conn, observer)
protocols[protocol].handlerFunc.call(null, protocol, conn)
}

ms.handle(conn, (err) => {
if (err) {
// the multistream handshake failed
}
})
ms.addHandler(protocol, handler, protocols[protocol].matchFunc)
})

ms.handle(parentConn, (err) => {
if (err) {
// the multistream handshake failed
}
})
}
}
92 changes: 92 additions & 0 deletions src/stats/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
'use strict'

const Stat = require('./stat')

const defaultOptions = {
enabled: true,
computeThrottleMaxQueueSize: 1000,
computeThrottleTimeout: 2000,
movingAverageIntervals: [
60 * 1000, // 1 minute
5 * 60 * 1000, // 5 minutes
15 * 60 * 1000 // 15 minutes
]
}

const initialCounters = [
'dataReceived',
'dataSent'
]

const directionToEvent = {
in: 'dataReceived',
out: 'dataSent'
}

module.exports = (observer, _options) => {
const options = Object.assign({}, defaultOptions, _options)
const globalStats = new Stat(initialCounters, options)
const peerStats = new Map()
const transportStats = new Map()
const protocolStats = new Map()

observer.on('message', (peerId, transportTag, protocolTag, direction, bufferLength) => {
// if (!peerId) {
// throw new Error('message should have peer id')
// }
console.log('m', peerId, transportTag, protocolTag, direction, bufferLength)
const event = directionToEvent[direction]

// global stats
globalStats.push(event, bufferLength)

// peer stats
let peer = peerStats.get(peerId)
if (!peer) {
peer = new Stat(initialCounters, options)
peerStats.set(peerId, peer)
}
peer.push(event, bufferLength)

// transport stats
let transport = transportStats.get(transportTag)
if (!transport) {
transport = new Stat(initialCounters, options)
transportStats.set(transportTag, transport)
}
transport.push(event, bufferLength)

// protocol stats
let protocol = protocolStats.get(protocolTag)
if (!protocol) {
protocol = new Stat(initialCounters, options)
protocolStats.set(protocolTag, transport)
}
protocol.push(event, bufferLength)
})

observer.on('peer:closed', (peerId) => {
peerStats.delete(peerId)
})

return {
stop: stop,
global: globalStats,
peers: () => Array.from(peerStats.keys()),
forPeer: (peerId) => peerStats.get(peerId),
transports: () => Array.from(transportStats.keys()),
forTransport: (transport) => transportStats.get(transport),
protocols: () => Array.from(protocolStats.keys()),
forProtocol: (protocol) => protocolStats.get(protocol)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does one "clear" previous stats from a peer?


function stop () {
globalStats.stop()
for (let peerStat of peerStats.values()) {
peerStat.stop()
}
for (let transportStat of transportStats.values()) {
transportStat.stop()
}
}
}
Loading