Skip to content

Commit

Permalink
fix: identify on dial (libp2p#313)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun authored Mar 25, 2019
1 parent b318e3f commit 6437139
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 27 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"libp2p-mplex": "~0.8.4",
"libp2p-pnet": "~0.1.0",
"libp2p-secio": "~0.11.1",
"libp2p-spdy": "~0.13.1",
"libp2p-spdy": "~0.13.3",
"libp2p-tcp": "~0.13.0",
"libp2p-webrtc-star": "~0.15.8",
"libp2p-websockets": "~0.12.2",
Expand Down
1 change: 1 addition & 0 deletions src/connection/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class IncomingConnectionFSM extends BaseConnection {
}
})

this._state.on('DISCONNECTED', () => this._onDisconnected())
this._state.on('PRIVATIZING', () => this._onPrivatizing())
this._state.on('PRIVATIZED', () => this._onPrivatized())
this._state.on('ENCRYPTING', () => this._onEncrypting())
Expand Down
39 changes: 39 additions & 0 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ const multistream = require('multistream-select')
const withIs = require('class-is')
const BaseConnection = require('./base')
const parallel = require('async/parallel')
const nextTick = require('async/nextTick')
const identify = require('libp2p-identify')
const errCode = require('err-code')
const { msHandle, msSelect, identifyDialer } = require('../utils')

const observeConnection = require('../observe-connection')
const {
Expand Down Expand Up @@ -390,13 +394,48 @@ class ConnectionFSM extends BaseConnection {

this.switch.emit('peer-mux-established', this.theirPeerInfo)
this._didUpgrade(null)

// Run identify on the connection
if (this.switch.identify) {
this._identify((err, results) => {
if (err) {
return this.close(err)
}
this.theirPeerInfo = this.switch._peerBook.put(results.peerInfo)
})
}
})
}

nextMuxer(muxers.shift())
})
}

/**
* Runs the identify protocol on the connection
* @private
* @param {function(error, { PeerInfo })} callback
* @returns {void}
*/
_identify (callback) {
if (!this.muxer) {
return nextTick(callback, errCode('The connection was already closed', 'ERR_CONNECTION_CLOSED'))
}
this.muxer.newStream(async (err, conn) => {
if (err) return callback(err)
const ms = new multistream.Dialer()
let results
try {
await msHandle(ms, conn)
const msConn = await msSelect(ms, identify.multicodec)
results = await identifyDialer(msConn, this.theirPeerInfo)
} catch (err) {
return callback(err)
}
callback(null, results)
})
}

/**
* Analyses the given error, if it exists, to determine where the state machine
* needs to go.
Expand Down
33 changes: 21 additions & 12 deletions test/dial-fsm.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,27 @@ describe('dialFSM', () => {

expect(switchA.connection.getAllById(peerBId)).to.have.length(0)

// 4 close checks (1 inc and 1 out for each node) and 1 hangup check
expect(5).checks(() => {
switchA.removeAllListeners('peer-mux-closed')
switchB.removeAllListeners('peer-mux-closed')
done()
// Expect 4 `peer-mux-established` events
expect(4).checks(() => {
// Expect 4 `peer-mux-closed`, plus 1 hangup
expect(5).checks(() => {
switchA.removeAllListeners('peer-mux-closed')
switchB.removeAllListeners('peer-mux-closed')
switchA.removeAllListeners('peer-mux-established')
switchB.removeAllListeners('peer-mux-established')
done()
})

switchA.hangUp(switchB._peerInfo, (err) => {
expect(err).to.not.exist().mark()
})
})

switchA.on('peer-mux-established', (peerInfo) => {
expect(peerInfo.id.toB58String()).to.eql(peerBId).mark()
})
switchB.on('peer-mux-established', (peerInfo) => {
expect(peerInfo.id.toB58String()).to.eql(peerAId).mark()
})

switchA.on('peer-mux-closed', (peerInfo) => {
Expand All @@ -243,13 +259,6 @@ describe('dialFSM', () => {
connB.on('muxed', cb)
})
})

connFSM.on('connection', () => {
// Hangup and verify the connections are closed
switchA.hangUp(switchB._peerInfo, (err) => {
expect(err).to.not.exist().mark()
})
})
})
})

Expand Down
41 changes: 27 additions & 14 deletions test/identify.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const TCP = require('libp2p-tcp')
const multiplex = require('libp2p-mplex')
const pull = require('pull-stream')
const secio = require('libp2p-secio')
const PeerInfo = require('peer-info')
const PeerBook = require('peer-book')
const identify = require('libp2p-identify')
const lp = require('pull-length-prefixed')
Expand Down Expand Up @@ -102,23 +103,35 @@ describe('Identify', () => {
})

it('should get protocols for one another', (done) => {
// We need to reset the PeerInfo objects we use,
// since we share memory we can receive a false positive if not
let peerA = new PeerInfo(switchA._peerInfo.id)
switchA._peerInfo.multiaddrs.toArray().forEach((m) => {
peerA.multiaddrs.add(m)
})
switchB._peerBook.remove(switchA._peerInfo.id.toB58String())
switchA._peerBook.remove(switchB._peerInfo.id.toB58String())

switchA.handle('/id-test/1.0.0', (protocol, conn) => pull(conn, conn))
switchB.dial(switchA._peerInfo, '/id-test/1.0.0', (err, conn) => {
switchB.dial(peerA, '/id-test/1.0.0', (err) => {
expect(err).to.not.exist()

const peerB = switchA._peerBook.get(switchB._peerInfo.id.toB58String())
const peerA = switchB._peerBook.get(switchA._peerInfo.id.toB58String())
expect(Array.from(peerB.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec
])
expect(Array.from(peerA.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec,
'/id-test/1.0.0'
])

done()
// Give identify a moment to run
setTimeout(() => {
const peerB = switchA._peerBook.get(switchB._peerInfo.id.toB58String())
const peerA = switchB._peerBook.get(switchA._peerInfo.id.toB58String())
expect(Array.from(peerB.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec
])
expect(Array.from(peerA.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec,
'/id-test/1.0.0'
])

done()
}, 500)
})
})

Expand Down

0 comments on commit 6437139

Please sign in to comment.