Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

observe traffic and expose statistics #243

Merged
merged 26 commits into from
Feb 27, 2018
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
494d7fd
starting to observe
pgte Feb 16, 2018
758cf2a
WIP: stats: observer to stats
pgte Feb 19, 2018
0afd916
stats have transport now
pgte Feb 19, 2018
0f5bfaa
stats: peer list and transport list
pgte Feb 19, 2018
20d6934
stats: stop
pgte Feb 19, 2018
7210efc
trying to set peer info
pgte Feb 19, 2018
0eb8c26
propagating peer info
pgte Feb 19, 2018
84e766b
stats: first tests
pgte Feb 20, 2018
fa6e377
stats: transports, protocols and peers
pgte Feb 20, 2018
a1d9014
stats: transport-specific stats
pgte Feb 20, 2018
3d0ad42
stats: not relying on the order / existence of tests
pgte Feb 20, 2018
7ee4fc5
stats: peer-specific stats
pgte Feb 20, 2018
6ea2061
setPeerInfo only if there's a peerInfo
pgte Feb 20, 2018
e692fa2
stats: tests now are isolated
pgte Feb 20, 2018
905680a
stats: tests: removed console.log
pgte Feb 20, 2018
f6db42d
stats: tests not hardcoding the port
pgte Feb 20, 2018
ae88175
stats: documented API
pgte Feb 20, 2018
39f01af
removed console log
pgte Feb 20, 2018
84b00f9
stats: emit update event every time stats get updated
pgte Feb 21, 2018
eb2cf59
docs: moved stats api place in readme
pgte Feb 21, 2018
fb0d209
docs: fixed typo
pgte Feb 21, 2018
a2cd482
setting the peer info directly without having to cache the method
pgte Feb 21, 2018
2ed0cf8
docs: consistency for list markdown
pgte Feb 21, 2018
5b3cb3b
docs: dcumented options
pgte Feb 21, 2018
6c0ff32
stats: retaning old peers in case they reconnect
pgte Feb 21, 2018
efc7c92
stats: fix: now accounts for transport, and separates protocol-specif…
pgte Feb 22, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 158 additions & 100 deletions test/stats.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)
const parallel = require('async/parallel')
const map = require('async/map')
const TCP = require('libp2p-tcp')
const multiplex = require('libp2p-multiplex')
const pull = require('pull-stream')
Expand All @@ -18,131 +19,188 @@ const tryEcho = utils.tryEcho
const Switch = require('../src')

describe('Stats', () => {
let switchA
let switchB

before((done) => createInfos(2, (err, infos) => {
expect(err).to.not.exist()
const setup = (cb) => {
createInfos(2, (err, infos) => {
expect(err).to.not.exist()

const options = {
stats: {
computeThrottleTimeout: 100
const options = {
stats: {
computeThrottleTimeout: 100
}
}
}

const peerA = infos[0]
const peerB = infos[1]

peerA.multiaddrs.add('/ip4/127.0.0.1/tcp/9001')
peerB.multiaddrs.add('/ip4/127.0.0.1/tcp/9002')

switchA = new Switch(peerA, new PeerBook(), options)
switchB = new Switch(peerB, new PeerBook(), options)

switchA.transport.add('tcp', new TCP())
switchB.transport.add('tcp', new TCP())

switchA.connection.crypto(secio.tag, secio.encrypt)
switchB.connection.crypto(secio.tag, secio.encrypt)

switchA.connection.addStreamMuxer(multiplex)
switchB.connection.addStreamMuxer(multiplex)
const peerA = infos[0]
const peerB = infos[1]

peerA.multiaddrs.add('/ip4/127.0.0.1/tcp/9001')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is gonna conflict if we run two test cases at the same time, is it possible to set the port to 0 and then later get it (if you need it) instead? Think you're not actually using the port in the test, so should work fine to be set to 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@victorbjelkholm good call, fixed!

peerB.multiaddrs.add('/ip4/127.0.0.1/tcp/9002')

const switchA = new Switch(peerA, new PeerBook(), options)
const switchB = new Switch(peerB, new PeerBook(), options)

switchA.transport.add('tcp', new TCP())
switchB.transport.add('tcp', new TCP())

switchA.connection.crypto(secio.tag, secio.encrypt)
switchB.connection.crypto(secio.tag, secio.encrypt)

switchA.connection.addStreamMuxer(multiplex)
switchB.connection.addStreamMuxer(multiplex)

parallel([
(cb) => switchA.transport.listen('tcp', {}, null, cb),
(cb) => switchB.transport.listen('tcp', {}, null, cb)
], (err) => {
if (err) {
cb(err)
return
}
const echo = (protocol, conn) => pull(conn, conn)
switchB.handle('/echo/1.0.0', echo)
switchA.handle('/echo/1.0.0', echo)

parallel([
(cb) => {
switchA.dial(switchB._peerInfo, '/echo/1.0.0', (err, conn) => {
expect(err).to.not.exist()
tryEcho(conn, cb)
})
},
(cb) => {
switchB.dial(switchA._peerInfo, '/echo/1.0.0', (err, conn) => {
expect(err).to.not.exist()
tryEcho(conn, cb)
})
}
], (err) => {
if (err) {
cb(err)
return
}

// wait until stats are processed
setTimeout(() => cb(null, [switchA, switchB]), 500)
})
})
})
}

parallel([
(cb) => switchA.transport.listen('tcp', {}, null, cb),
(cb) => switchB.transport.listen('tcp', {}, null, cb)
], done)
}))
const teardown = (switches, cb) => {
map(switches, (swtch, cb) => swtch.stop(cb), cb)
}

after(function (done) {
this.timeout(3 * 1000)
parallel([
(cb) => switchA.stop(cb),
(cb) => switchB.stop(cb)
], done)
})
it('both nodes have some global stats', (done) => {
setup((err, switches) => {
expect(err).to.not.exist()

before(() => {
const echo = (protocol, conn) => pull(conn, conn)
switchB.handle('/echo/1.0.0', echo)
switchA.handle('/echo/1.0.0', echo)
})
switches.forEach((swtch) => {
let snapshot = swtch.stats.global.snapshot
expect(snapshot.dataReceived.toFixed()).to.equal('51')
expect(snapshot.dataSent.toFixed()).to.equal('49')
})

before((done) => {
// dial A -> B
switchA.dial(switchB._peerInfo, '/echo/1.0.0', (err, conn) => {
expect(err).to.not.exist()
tryEcho(conn, done)
teardown(switches, done)
})
})

before((done) => {
// dial B -> A
switchB.dial(switchA._peerInfo, '/echo/1.0.0', (err, conn) => {
it('both nodes know the transports', (done) => {
setup((err, switches) => {
expect(err).to.not.exist()
tryEcho(conn, done)
const expectedTransports = [
'/mplex/6.7.0',
'/secio/1.0.0'
]

switches.forEach(
(swtch) => expect(swtch.stats.transports().sort()).to.deep.equal(expectedTransports))
teardown(switches, done)
})
})

before((done) => setTimeout(done, 1000))
it('both nodes know the protocols', (done) => {
setup((err, switches) => {
expect(err).to.not.exist()
const expectedProtocols = [
'/echo/1.0.0',
'/mplex/6.7.0'
]

it('both nodes have some global stats', () => {
let snapshot = switchA.stats.global.snapshot
expect(snapshot.dataReceived.toFixed()).to.equal('51')
expect(snapshot.dataSent.toFixed()).to.equal('49')
switches.forEach((swtch) => {
expect(swtch.stats.protocols().sort()).to.deep.equal(expectedProtocols)
})

snapshot = switchB.stats.global.snapshot
expect(snapshot.dataReceived.toFixed()).to.equal('51')
expect(snapshot.dataSent.toFixed()).to.equal('49')
})

it('both nodes know some transports', () => {
const expectedTransports = [
'/mplex/6.7.0',
'/secio/1.0.0'
]
expect(switchA.stats.transports().sort()).to.deep.equal(expectedTransports)
expect(switchB.stats.transports().sort()).to.deep.equal(expectedTransports)
teardown(switches, done)
})
})

it('both nodes know some protocols', () => {
const expectedProtocols = [
'/echo/1.0.0',
'/mplex/6.7.0'
]
expect(switchA.stats.protocols().sort()).to.deep.equal(expectedProtocols)
expect(switchB.stats.protocols().sort()).to.deep.equal(expectedProtocols)
it('both nodes know about each other', (done) => {
setup((err, switches) => {
expect(err).to.not.exist()
switches.forEach(
(swtch, index) => {
const otherSwitch = selectOther(switches, index)
expect(swtch.stats.peers().sort()).to.deep.equal([otherSwitch._peerInfo.id.toB58String()])
})
teardown(switches, done)
})
})

it('both nodes know about each other', () => {
expect(switchA.stats.peers().sort()).to.deep.equal([switchB._peerInfo.id.toB58String()])
expect(switchB.stats.peers().sort()).to.deep.equal([switchA._peerInfo.id.toB58String()])
it('both have transport-specific stats', (done) => {
setup((err, switches) => {
expect(err).to.not.exist()
switches.forEach((swtch) => {
let snapshot = swtch.stats.forTransport('/secio/1.0.0').snapshot
expect(snapshot.dataReceived.toFixed()).to.equal('92')
expect(snapshot.dataSent.toFixed()).to.equal('90')
})
teardown(switches, done)
})
})

it('both have transport-specific stats', () => {
let snapshot = switchA.stats.forTransport('/secio/1.0.0').snapshot
expect(snapshot.dataReceived.toFixed()).to.equal('92')
expect(snapshot.dataSent.toFixed()).to.equal('90')
snapshot = switchB.stats.forTransport('/secio/1.0.0').snapshot
expect(snapshot.dataReceived.toFixed()).to.equal('92')
expect(snapshot.dataSent.toFixed()).to.equal('90')
it('both have protocol-specific stats', (done) => {
setup((err, switches) => {
expect(err).to.not.exist()
switches.forEach((swtch) => {
let snapshot = swtch.stats.forProtocol('/echo/1.0.0').snapshot
expect(snapshot.dataReceived.toFixed()).to.equal('4')
expect(snapshot.dataSent.toFixed()).to.equal('8')
})
teardown(switches, done)
})
})

it('both have protocol-specific stats', () => {
let snapshot = switchA.stats.forProtocol('/echo/1.0.0').snapshot
expect(snapshot.dataReceived.toFixed()).to.equal('4')
expect(snapshot.dataSent.toFixed()).to.equal('8')
snapshot = switchB.stats.forProtocol('/echo/1.0.0').snapshot
expect(snapshot.dataReceived.toFixed()).to.equal('4')
expect(snapshot.dataSent.toFixed()).to.equal('8')
it('both have peer-specific stats', (done) => {
setup((err, switches) => {
expect(err).to.not.exist()
switches.forEach((swtch, index) => {
const other = selectOther(switches, index)
let snapshot = swtch.stats.forPeer(other._peerInfo.id.toB58String()).snapshot
expect(snapshot.dataReceived.toFixed()).to.equal('51')
expect(snapshot.dataSent.toFixed()).to.equal('49')
})
teardown(switches, done)
})
})

it('both have peer-specific stats', () => {
let snapshot = switchA.stats.forPeer(switchB._peerInfo.id.toB58String()).snapshot
expect(snapshot.dataReceived.toFixed()).to.equal('51')
expect(snapshot.dataSent.toFixed()).to.equal('49')
snapshot = switchB.stats.forPeer(switchA._peerInfo.id.toB58String()).snapshot
expect(snapshot.dataReceived.toFixed()).to.equal('51')
expect(snapshot.dataSent.toFixed()).to.equal('49')
it('both have moving average stats for peer', (done) => {
setup((err, switches) => {
expect(err).to.not.exist()
switches.forEach((swtch, index) => {
const other = selectOther(switches, index)
let ma = swtch.stats.forPeer(other._peerInfo.id.toB58String()).movingAverages
const intervals = [60000, 300000, 900000]
intervals.forEach((interval) => {
const average = ma.dataReceived[interval].movingAverage()
console.log(average)
expect(average).to.be.above(0).below(1)
})
})
teardown(switches, done)
})
})
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need tests for:

  • Clearing out previous stats from a peer (Rep mechanisms will need a reset)
  • Check what happens when a peer disconnects and connects again
  • Check when a peer disconnects and connects again from a different transport

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@diasdavid Currently, peer disconnections imply losing the stats for that peer.
If you think that it's important to keep them around, we should impose a time limit so that I eventually becomes garbage collected...
@diasdavid thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps tracking the sessions and having them in a LRU cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@diasdavid good call, but I'm not going to put all of the sessions there, only once a peer disconnects..


function selectOther (array, index) {
const useIndex = (index + 1) % array.length
return array[useIndex]
}