Skip to content

Commit

Permalink
feat: registrar (#471)
Browse files Browse the repository at this point in the history
* feat: peer-store v0

* feat: registrar

* chore: apply suggestions from code review

Co-Authored-By: Jacob Heun <jacobheun@gmail.com>

* chore: address review

* chore: support multiple conns

* chore: address review

* fix: no remote peer from topology on disconnect
  • Loading branch information
vasco-santos authored and jacobheun committed Jan 24, 2020
1 parent 582094a commit 9d52b80
Show file tree
Hide file tree
Showing 8 changed files with 594 additions and 8 deletions.
108 changes: 108 additions & 0 deletions src/connection-manager/topology.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
'use strict'

const assert = require('assert')

class Topology {
/**
* @param {Object} props
* @param {number} props.min minimum needed connections (default: 0)
* @param {number} props.max maximum needed connections (default: Infinity)
* @param {Array<string>} props.multicodecs protocol multicodecs
* @param {Object} props.handlers
* @param {function} props.handlers.onConnect protocol "onConnect" handler
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler
* @constructor
*/
constructor ({
min = 0,
max = Infinity,
multicodecs,
handlers
}) {
assert(multicodecs, 'one or more multicodec should be provided')
assert(handlers, 'the handlers should be provided')
assert(handlers.onConnect && typeof handlers.onConnect === 'function',
'the \'onConnect\' handler must be provided')
assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function',
'the \'onDisconnect\' handler must be provided')

this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs]
this.min = min
this.max = max

// Handlers
this._onConnect = handlers.onConnect
this._onDisconnect = handlers.onDisconnect

this.peers = new Map()
this._registrar = undefined

this._onProtocolChange = this._onProtocolChange.bind(this)
}

set registrar (registrar) {
this._registrar = registrar
this._registrar.peerStore.on('change:protocols', this._onProtocolChange)

// Update topology peers
this._updatePeers(this._registrar.peerStore.peers.values())
}

/**
* Update topology.
* @param {Array<PeerInfo>} peerInfoIterable
* @returns {void}
*/
_updatePeers (peerInfoIterable) {
for (const peerInfo of peerInfoIterable) {
if (this.multicodecs.filter(multicodec => peerInfo.protocols.has(multicodec))) {
// Add the peer regardless of whether or not there is currently a connection
this.peers.set(peerInfo.id.toB58String(), peerInfo)
// If there is a connection, call _onConnect
const connection = this._registrar.getConnection(peerInfo)
connection && this._onConnect(peerInfo, connection)
} else {
// Remove any peers we might be tracking that are no longer of value to us
this.peers.delete(peerInfo.id.toB58String())
}
}
}

/**
* Notify protocol of peer disconnected.
* @param {PeerInfo} peerInfo
* @param {Error} [error]
* @returns {void}
*/
disconnect (peerInfo, error) {
this._onDisconnect(peerInfo, error)
}

/**
* Check if a new peer support the multicodecs for this topology.
* @param {Object} props
* @param {PeerInfo} props.peerInfo
* @param {Array<string>} props.protocols
*/
_onProtocolChange ({ peerInfo, protocols }) {
const existingPeer = this.peers.get(peerInfo.id.toB58String())
const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol))

// Not supporting the protocol anymore?
if (existingPeer && hasProtocol.length === 0) {
this._onDisconnect({
peerInfo
})
}

// New to protocol support
for (const protocol of protocols) {
if (this.multicodecs.includes(protocol)) {
this._updatePeers([peerInfo])
return
}
}
}
}

module.exports = Topology
12 changes: 6 additions & 6 deletions src/get-peer-info.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ const errCode = require('err-code')

/**
* Converts the given `peer` to a `PeerInfo` instance.
* The `PeerBook` will be checked for the resulting peer, and
* the peer will be updated in the `PeerBook`.
* The `PeerStore` will be checked for the resulting peer, and
* the peer will be updated in the `PeerStore`.
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer
* @param {PeerBook} peerBook
* @param {PeerStore} peerStore
* @returns {PeerInfo}
*/
function getPeerInfo (peer, peerBook) {
function getPeerInfo (peer, peerStore) {
if (typeof peer === 'string') {
peer = multiaddr(peer)
}
Expand All @@ -38,7 +38,7 @@ function getPeerInfo (peer, peerBook) {

addr && peer.multiaddrs.add(addr)

return peerBook ? peerBook.put(peer) : peer
return peerStore ? peerStore.put(peer) : peer
}

/**
Expand All @@ -54,7 +54,7 @@ function getPeerInfoRemote (peer, libp2p) {
let peerInfo

try {
peerInfo = getPeerInfo(peer, libp2p.peerBook)
peerInfo = getPeerInfo(peer, libp2p.peerStore)
} catch (err) {
return Promise.reject(errCode(
new Error(`${peer} is not a valid peer type`),
Expand Down
8 changes: 8 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const Dialer = require('./dialer')
const TransportManager = require('./transport-manager')
const Upgrader = require('./upgrader')
const PeerStore = require('./peer-store')
const Registrar = require('./registrar')

const notStarted = (action, state) => {
return errCode(
Expand Down Expand Up @@ -71,10 +72,13 @@ class Libp2p extends EventEmitter {
const peerInfo = getPeerInfo(connection.remotePeer)

this.peerStore.put(peerInfo)
this.registrar.onConnect(peerInfo, connection)
this.emit('peer:connect', peerInfo)
},
onConnectionEnd: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)

this.registrar.onDisconnect(peerInfo, connection)
this.emit('peer:disconnect', peerInfo)
}
})
Expand Down Expand Up @@ -108,6 +112,10 @@ class Libp2p extends EventEmitter {
transportManager: this.transportManager
})

this.registrar = new Registrar({ peerStore: this.peerStore })
this.handle = this.handle.bind(this)
this.registrar.handle = this.handle

// Attach private network protector
if (this._modules.connProtector) {
this.upgrader.protector = this._modules.connProtector
Expand Down
139 changes: 139 additions & 0 deletions src/registrar.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
'use strict'

const assert = require('assert')
const debug = require('debug')
const log = debug('libp2p:peer-store')
log.error = debug('libp2p:peer-store:error')

const { Connection } = require('libp2p-interfaces/src/connection')
const PeerInfo = require('peer-info')
const Toplogy = require('./connection-manager/topology')

/**
* Responsible for notifying registered protocols of events in the network.
*/
class Registrar {
/**
* @param {Object} props
* @param {PeerStore} props.peerStore
* @constructor
*/
constructor ({ peerStore }) {
this.peerStore = peerStore

/**
* Map of connections per peer
* TODO: this should be handled by connectionManager
* @type {Map<string, Array<conn>>}
*/
this.connections = new Map()

/**
* Map of topologies
*
* @type {Map<string, object>}
*/
this.topologies = new Map()

this._handle = undefined
}

get handle () {
return this._handle
}

set handle (handle) {
this._handle = handle
}

/**
* Add a new connected peer to the record
* TODO: this should live in the ConnectionManager
* @param {PeerInfo} peerInfo
* @param {Connection} conn
* @returns {void}
*/
onConnect (peerInfo, conn) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
assert(Connection.isConnection(conn), 'conn must be an instance of interface-connection')

const id = peerInfo.id.toB58String()
const storedConn = this.connections.get(id)

if (storedConn) {
storedConn.push(conn)
} else {
this.connections.set(id, [conn])
}
}

/**
* Remove a disconnected peer from the record
* TODO: this should live in the ConnectionManager
* @param {PeerInfo} peerInfo
* @param {Connection} connection
* @param {Error} [error]
* @returns {void}
*/
onDisconnect (peerInfo, connection, error) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

const id = peerInfo.id.toB58String()
let storedConn = this.connections.get(id)

if (storedConn && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id === connection.id)
} else if (storedConn) {
for (const [, topology] of this.topologies) {
topology.disconnect(peerInfo, error)
}

this.connections.delete(peerInfo.id.toB58String())
}
}

/**
* Get a connection with a peer.
* @param {PeerInfo} peerInfo
* @returns {Connection}
*/
getConnection (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

// TODO: what should we return
return this.connections.get(peerInfo.id.toB58String())[0]
}

/**
* Register handlers for a set of multicodecs given
* @param {Object} topologyProps properties for topology
* @param {Array<string>|string} topologyProps.multicodecs
* @param {Object} topologyProps.handlers
* @param {function} topologyProps.handlers.onConnect
* @param {function} topologyProps.handlers.onDisconnect
* @return {string} registrar identifier
*/
register (topologyProps) {
// Create multicodec topology
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
const topology = new Toplogy(topologyProps)

this.topologies.set(id, topology)

// Set registrar
topology.registrar = this

return id
}

/**
* Unregister topology.
* @param {string} id registrar identifier
* @return {boolean} unregistered successfully
*/
unregister (id) {
return this.topologies.delete(id)
}
}

module.exports = Registrar
57 changes: 57 additions & 0 deletions test/registrar/registrar.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict'
/* eslint-env mocha */

const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai
const sinon = require('sinon')

const mergeOptions = require('merge-options')

const multiaddr = require('multiaddr')
const Libp2p = require('../../src')

const baseOptions = require('../utils/base-options')
const peerUtils = require('../utils/creators/peer')
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')

describe('registrar on dial', () => {
let peerInfo
let remotePeerInfo
let libp2p
let remoteLibp2p
let remoteAddr

before(async () => {
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2)
remoteLibp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo: remotePeerInfo
}))

await remoteLibp2p.transportManager.listen([listenAddr])
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0]
})

after(async () => {
sinon.restore()
await remoteLibp2p.stop()
libp2p && await libp2p.stop()
})

it('should inform registrar of a new connection', async () => {
libp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo
}))

sinon.spy(remoteLibp2p.registrar, 'onConnect')

await libp2p.dial(remoteAddr)
expect(remoteLibp2p.registrar.onConnect.callCount).to.equal(1)

const libp2pConn = libp2p.registrar.getConnection(remotePeerInfo)
expect(libp2pConn).to.exist()

const remoteConn = remoteLibp2p.registrar.getConnection(peerInfo)
expect(remoteConn).to.exist()
})
})
Loading

0 comments on commit 9d52b80

Please sign in to comment.