Skip to content

Commit

Permalink
fix: not started yet (#297)
Browse files Browse the repository at this point in the history
* fix: callback when not started rather than throwing asserts

* fix: dont remove transports until the switch has stopped

* test: update connection check logic

* test: fix variable reference

* chore: update switch dep

* chore: update switch dep
  • Loading branch information
jacobheun authored Dec 14, 2018
1 parent 15bdb79 commit fdfb7b4
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 67 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"libp2p-connection-manager": "~0.0.2",
"libp2p-floodsub": "~0.15.1",
"libp2p-ping": "~0.8.3",
"libp2p-switch": "~0.41.2",
"libp2p-switch": "~0.41.3",
"libp2p-websockets": "~0.12.0",
"mafmt": "^6.0.2",
"multiaddr": "^5.0.2",
Expand Down
31 changes: 18 additions & 13 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

const FSM = require('fsm-event')
const EventEmitter = require('events').EventEmitter
const assert = require('assert')
const debug = require('debug')
const log = debug('libp2p')
log.error = debug('libp2p:error')
const errCode = require('err-code')

const each = require('async/each')
const series = require('async/series')
Expand All @@ -24,7 +24,12 @@ const pubsub = require('./pubsub')
const getPeerInfo = require('./get-peer-info')
const validateConfig = require('./config').validate

const NOT_STARTED_ERROR_MESSAGE = 'The libp2p node is not started yet'
const notStarted = (action, state) => {
return errCode(
new Error(`libp2p cannot ${action} when not started; state is ${state}`),
'ERR_NODE_NOT_STARTED'
)
}

/**
* @fires Node#error Emitted when an error occurs
Expand Down Expand Up @@ -217,8 +222,6 @@ class Node extends EventEmitter {
* @returns {void}
*/
dial (peer, callback) {
assert(this.isStarted(), NOT_STARTED_ERROR_MESSAGE)

this.dialProtocol(peer, null, callback)
}

Expand All @@ -233,7 +236,9 @@ class Node extends EventEmitter {
* @returns {void}
*/
dialProtocol (peer, protocol, callback) {
assert(this.isStarted(), NOT_STARTED_ERROR_MESSAGE)
if (!this.isStarted()) {
return callback(notStarted('dial', this.state._state))
}

if (typeof protocol === 'function') {
callback = protocol
Expand Down Expand Up @@ -261,7 +266,9 @@ class Node extends EventEmitter {
* @returns {void}
*/
dialFSM (peer, protocol, callback) {
assert(this.isStarted(), NOT_STARTED_ERROR_MESSAGE)
if (!this.isStarted()) {
return callback(notStarted('dial', this.state._state))
}

if (typeof protocol === 'function') {
callback = protocol
Expand All @@ -282,8 +289,6 @@ class Node extends EventEmitter {
}

hangUp (peer, callback) {
assert(this.isStarted(), NOT_STARTED_ERROR_MESSAGE)

this._getPeerInfo(peer, (err, peerInfo) => {
if (err) { return callback(err) }

Expand All @@ -293,7 +298,7 @@ class Node extends EventEmitter {

ping (peer, callback) {
if (!this.isStarted()) {
return callback(new Error(NOT_STARTED_ERROR_MESSAGE))
return callback(notStarted('ping', this.state._state))
}

this._getPeerInfo(peer, (err, peerInfo) => {
Expand Down Expand Up @@ -463,13 +468,13 @@ class Node extends EventEmitter {
}
cb()
},
(cb) => {
// Ensures idempotency for restarts
this._switch.transport.removeAll(cb)
},
(cb) => {
this.connectionManager.stop()
this._switch.stop(cb)
},
(cb) => {
// Ensures idempotent restarts
this._switch.transport.removeAll(cb)
}
], (err) => {
if (err) {
Expand Down
24 changes: 24 additions & 0 deletions test/fsm.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,29 @@ describe('libp2p state machine (fsm)', () => {

node.start()
})

it('should not dial when the node is stopped', (done) => {
node.on('stop', () => {
node.dial(null, (err) => {
expect(err).to.exist()
expect(err.code).to.eql('ERR_NODE_NOT_STARTED')
done()
})
})

node.stop()
})

it('should not dial (fsm) when the node is stopped', (done) => {
node.on('stop', () => {
node.dialFSM(null, null, (err) => {
expect(err).to.exist()
expect(err.code).to.eql('ERR_NODE_NOT_STARTED')
done()
})
})

node.stop()
})
})
})
2 changes: 1 addition & 1 deletion test/stream-muxing.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ describe('stream muxing', () => {

nodeA.dial(nodeB.peerInfo, (err) => {
expect(err).to.not.exist()
expect(Object.keys(nodeA._switch.muxedConns)).to.have.length(0)
expect(nodeA._switch.connection.getAll()).to.have.length(0)
cb()
})
},
Expand Down
27 changes: 14 additions & 13 deletions test/transports.browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ describe('transports', () => {
function check () {
const peers = nodeA.peerBook.getAll()
expect(Object.keys(peers)).to.have.length(1)
expect(Object.keys(nodeA._switch.muxedConns)).to.have.length(0)
expect(nodeA._switch.connection.getAll()).to.have.length(0)
done()
}
})
Expand Down Expand Up @@ -142,7 +142,7 @@ describe('transports', () => {
const peers = nodeA.peerBook.getAll()
expect(err).to.not.exist()
expect(Object.keys(peers)).to.have.length(1)
expect(Object.keys(nodeA._switch.muxedConns)).to.have.length(0)
expect(nodeA._switch.connection.getAll()).to.have.length(0)
done()
}
})
Expand All @@ -153,16 +153,17 @@ describe('transports', () => {
expect(err).to.not.exist()

connFSM.once('muxed', () => {
expect(nodeA._switch.muxedConns).to.have.any.keys(
peerB.id.toB58String()
)
expect(
nodeA._switch.connection.getAllById(peerB.id.toB58String())
).to.have.length(1)

connFSM.once('error', done)
connFSM.once('close', () => {
// ensure the connection is closed
expect(nodeA._switch.muxedConns).to.not.have.any.keys([
peerB.id.toB58String()
])
expect(
nodeA._switch.connection.getAllById(peerB.id.toB58String())
).to.have.length(0)

done()
})

Expand Down Expand Up @@ -312,7 +313,7 @@ describe('transports', () => {
function check () {
const peers = node1.peerBook.getAll()
expect(Object.keys(peers)).to.have.length(1)
expect(Object.keys(node1._switch.muxedConns)).to.have.length(0)
expect(node1._switch.connection.getAll()).to.have.length(0)
done()
}
})
Expand All @@ -326,7 +327,7 @@ describe('transports', () => {

function check () {
// Verify both nodes are connected to node 3
if (node1._switch.muxedConns[b58Id] && node2._switch.muxedConns[b58Id]) {
if (node1._switch.connection.getAllById(b58Id) && node2._switch.connection.getAllById(b58Id)) {
done()
}
}
Expand Down Expand Up @@ -417,7 +418,7 @@ describe('transports', () => {
function check () {
const peers = node1.peerBook.getAll()
expect(Object.keys(peers)).to.have.length(1)
expect(Object.keys(node1._switch.muxedConns)).to.have.length(0)
expect(node1._switch.connection.getAll()).to.have.length(0)
done()
}
})
Expand All @@ -430,8 +431,8 @@ describe('transports', () => {

function check () {
if (++counter === 3) {
expect(Object.keys(node1._switch.muxedConns).length).to.equal(1)
expect(Object.keys(node2._switch.muxedConns).length).to.equal(1)
expect(node1._switch.connection.getAll()).to.have.length(1)
expect(node2._switch.connection.getAll()).to.have.length(1)
done()
}
}
Expand Down
Loading

0 comments on commit fdfb7b4

Please sign in to comment.