diff --git a/.aegir.js b/.aegir.js index 18d6236a58..df4a41f5f8 100644 --- a/.aegir.js +++ b/.aegir.js @@ -45,7 +45,7 @@ const after = async () => { } module.exports = { - bundlesize: { maxSize: '200kB' }, + bundlesize: { maxSize: '202kB' }, hooks: { pre: before, post: after diff --git a/doc/CONFIGURATION.md b/doc/CONFIGURATION.md index 6a0bb44e02..17174bb1ef 100644 --- a/doc/CONFIGURATION.md +++ b/doc/CONFIGURATION.md @@ -270,7 +270,7 @@ const node = await Libp2p.create({ }, config: { peerDiscovery: { - autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minPeers) + autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minConnections) // The `tag` property will be searched when creating the instance of your Peer Discovery service. // The associated object, will be passed to the service when it is instantiated. [MulticastDNS.tag]: { diff --git a/doc/GETTING_STARTED.md b/doc/GETTING_STARTED.md index 216110c4a6..8b50efd7e0 100644 --- a/doc/GETTING_STARTED.md +++ b/doc/GETTING_STARTED.md @@ -217,7 +217,7 @@ const node = await Libp2p.create({ }, config: { peerDiscovery: { - autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minPeers) + autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minConnections) // The `tag` property will be searched when creating the instance of your Peer Discovery service. // The associated object, will be passed to the service when it is instantiated. [Bootstrap.tag]: { diff --git a/src/config.js b/src/config.js index f586c6dc87..7e3d630efe 100644 --- a/src/config.js +++ b/src/config.js @@ -12,7 +12,7 @@ const DefaultConfig = { noAnnounce: [] }, connectionManager: { - minPeers: 25 + minConnections: 25 }, transportManager: { faultTolerance: FaultTolerance.FATAL_ALL diff --git a/src/connection-manager/index.js b/src/connection-manager/index.js index e0e64e7cdd..969877d8fd 100644 --- a/src/connection-manager/index.js +++ b/src/connection-manager/index.js @@ -1,9 +1,12 @@ 'use strict' +const debug = require('debug') +const log = debug('libp2p:connection-manager') +log.error = debug('libp2p:connection-manager:error') + const errcode = require('err-code') const mergeOptions = require('merge-options') const LatencyMonitor = require('./latency-monitor') -const debug = require('debug')('libp2p:connection-manager') const retimer = require('retimer') const { EventEmitter } = require('events') @@ -22,6 +25,7 @@ const defaultOptions = { maxReceivedData: Infinity, maxEventLoopDelay: Infinity, pollInterval: 2000, + autoDialInterval: 10000, movingAverageInterval: 60000, defaultPeerValue: 1 } @@ -45,6 +49,8 @@ class ConnectionManager extends EventEmitter { * @param {Number} options.pollInterval How often, in milliseconds, metrics and latency should be checked. Default=2000 * @param {Number} options.movingAverageInterval How often, in milliseconds, to compute averages. Default=60000 * @param {Number} options.defaultPeerValue The value of the peer. Default=1 + * @param {boolean} options.autoDial Should preemptively guarantee connections are above the low watermark. Default=true + * @param {Number} options.autoDialInterval How often, in milliseconds, it should preemptively guarantee connections are above the low watermark. Default=10000 */ constructor (libp2p, options) { super() @@ -57,7 +63,7 @@ class ConnectionManager extends EventEmitter { throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS) } - debug('options: %j', this._options) + log('options: %j', this._options) this._libp2p = libp2p @@ -73,8 +79,11 @@ class ConnectionManager extends EventEmitter { */ this.connections = new Map() + this._started = false this._timer = null + this._autoDialTimeout = null this._checkMetrics = this._checkMetrics.bind(this) + this._autoDial = this._autoDial.bind(this) } /** @@ -101,7 +110,11 @@ class ConnectionManager extends EventEmitter { }) this._onLatencyMeasure = this._onLatencyMeasure.bind(this) this._latencyMonitor.on('data', this._onLatencyMeasure) - debug('started') + + this._started = true + log('started') + + this._options.autoDial && this._autoDial() } /** @@ -109,11 +122,13 @@ class ConnectionManager extends EventEmitter { * @async */ async stop () { + this._autoDialTimeout && this._autoDialTimeout.clear() this._timer && this._timer.clear() this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure) + this._started = false await this._close() - debug('stopped') + log('stopped') } /** @@ -157,12 +172,12 @@ class ConnectionManager extends EventEmitter { _checkMetrics () { const movingAverages = this._libp2p.metrics.global.movingAverages const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage() - this._checkLimit('maxReceivedData', received) + this._checkMaxLimit('maxReceivedData', received) const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage() - this._checkLimit('maxSentData', sent) + this._checkMaxLimit('maxSentData', sent) const total = received + sent - this._checkLimit('maxData', total) - debug('metrics update', total) + this._checkMaxLimit('maxData', total) + log('metrics update', total) this._timer.reschedule(this._options.pollInterval) } @@ -188,7 +203,7 @@ class ConnectionManager extends EventEmitter { this._peerValues.set(peerIdStr, this._options.defaultPeerValue) } - this._checkLimit('maxConnections', this.size) + this._checkMaxLimit('maxConnections', this.size) } /** @@ -248,7 +263,7 @@ class ConnectionManager extends EventEmitter { * @param {*} summary The LatencyMonitor summary */ _onLatencyMeasure (summary) { - this._checkLimit('maxEventLoopDelay', summary.avgMs) + this._checkMaxLimit('maxEventLoopDelay', summary.avgMs) } /** @@ -257,15 +272,69 @@ class ConnectionManager extends EventEmitter { * @param {string} name The name of the field to check limits for * @param {number} value The current value of the field */ - _checkLimit (name, value) { + _checkMaxLimit (name, value) { const limit = this._options[name] - debug('checking limit of %s. current value: %d of %d', name, value, limit) + log('checking limit of %s. current value: %d of %d', name, value, limit) if (value > limit) { - debug('%s: limit exceeded: %s, %d', this._peerId, name, value) + log('%s: limit exceeded: %s, %d', this._peerId, name, value) this._maybeDisconnectOne() } } + /** + * Proactively tries to connect to known peers stored in the PeerStore. + * It will keep the number of connections below the upper limit and sort + * the peers to connect based on wether we know their keys and protocols. + * @async + * @private + */ + async _autoDial () { + const minConnections = this._options.minConnections + + const recursiveTimeoutTrigger = () => { + if (this._autoDialTimeout) { + this._autoDialTimeout.reschedule(this._options.autoDialInterval) + } else { + this._autoDialTimeout = retimer(this._autoDial, this._options.autoDialInterval) + } + } + + // Already has enough connections + if (this.size >= minConnections) { + recursiveTimeoutTrigger() + return + } + + // Sort peers on wether we know protocols of public keys for them + const peers = Array.from(this._libp2p.peerStore.peers.values()) + .sort((a, b) => { + if (b.protocols && b.protocols.length && (!a.protocols || !a.protocols.length)) { + return 1 + } else if (b.id.pubKey && !a.id.pubKey) { + return 1 + } + return -1 + }) + + for (let i = 0; i < peers.length && this.size < minConnections; i++) { + if (!this.get(peers[i].id)) { + log('connecting to a peerStore stored peer %s', peers[i].id.toB58String()) + try { + await this._libp2p.dialer.connectToPeer(peers[i].id) + + // Connection Manager was stopped + if (!this._started) { + return + } + } catch (err) { + log.error('could not connect to peerStore stored peer', err) + } + } + } + + recursiveTimeoutTrigger() + } + /** * If we have more connections than our maximum, close a connection * to the lowest valued peer. @@ -274,12 +343,12 @@ class ConnectionManager extends EventEmitter { _maybeDisconnectOne () { if (this._options.minConnections < this.connections.size) { const peerValues = Array.from(this._peerValues).sort(byPeerValue) - debug('%s: sorted peer values: %j', this._peerId, peerValues) + log('%s: sorted peer values: %j', this._peerId, peerValues) const disconnectPeer = peerValues[0] if (disconnectPeer) { const peerId = disconnectPeer[0] - debug('%s: lowest value peer is %s', this._peerId, peerId) - debug('%s: closing a connection to %j', this._peerId, peerId) + log('%s: lowest value peer is %s', this._peerId, peerId) + log('%s: closing a connection to %j', this._peerId, peerId) for (const connections of this.connections.values()) { if (connections[0].remotePeer.toB58String() === peerId) { connections[0].close() diff --git a/src/index.js b/src/index.js index ad7a78281c..0fc2ce55d5 100644 --- a/src/index.js +++ b/src/index.js @@ -65,7 +65,13 @@ class Libp2p extends EventEmitter { this._discovery = new Map() // Discovery service instances/references // Create the Connection Manager - this.connectionManager = new ConnectionManager(this, this._options.connectionManager) + if (this._options.connectionManager.minPeers) { // Remove in 0.29 + this._options.connectionManager.minConnections = this._options.connectionManager.minPeers + } + this.connectionManager = new ConnectionManager(this, { + autoDial: this._config.peerDiscovery.autoDial, + ...this._options.connectionManager + }) // Create Metrics if (this._options.metrics.enabled) { @@ -459,19 +465,19 @@ class Libp2p extends EventEmitter { async _onDidStart () { this._isStarted = true - this.connectionManager.start() - this.peerStore.on('peer', peerId => { this.emit('peer:discovery', peerId) this._maybeConnect(peerId) }) - // Once we start, emit and dial any peers we may have already discovered + // Once we start, emit any peers we may have already discovered + // TODO: this should be removed, as we already discovered these peers in the past for (const peer of this.peerStore.peers.values()) { this.emit('peer:discovery', peer.id) - this._maybeConnect(peer.id) } + this.connectionManager.start() + // Peer discovery await this._setupPeerDiscovery() } @@ -495,15 +501,15 @@ class Libp2p extends EventEmitter { /** * Will dial to the given `peerId` if the current number of * connected peers is less than the configured `ConnectionManager` - * minPeers. + * minConnections. * @private * @param {PeerId} peerId */ async _maybeConnect (peerId) { // If auto dialing is on and we have no connection to the peer, check if we should dial if (this._config.peerDiscovery.autoDial === true && !this.connectionManager.get(peerId)) { - const minPeers = this._options.connectionManager.minPeers || 0 - if (minPeers > this.connectionManager.size) { + const minConnections = this._options.connectionManager.minConnections || 0 + if (minConnections > this.connectionManager.size) { log('connecting to discovered peer %s', peerId.toB58String()) try { await this.dialer.connectToPeer(peerId) diff --git a/test/connection-manager/index.node.js b/test/connection-manager/index.node.js index 3a3d000297..efa09d7cc7 100644 --- a/test/connection-manager/index.node.js +++ b/test/connection-manager/index.node.js @@ -7,6 +7,9 @@ chai.use(require('chai-as-promised')) const { expect } = chai const sinon = require('sinon') +const delay = require('delay') +const pWaitFor = require('p-wait-for') + const peerUtils = require('../utils/creators/peer') const mockConnection = require('../utils/mockConnection') const baseOptions = require('../utils/base-options.browser') @@ -112,4 +115,148 @@ describe('libp2p.connections', () => { await libp2p.stop() await remoteLibp2p.stop() }) + + describe('proactive connections', () => { + let nodes = [] + + beforeEach(async () => { + nodes = await peerUtils.createPeer({ + number: 2, + config: { + addresses: { + listen: ['/ip4/127.0.0.1/tcp/0/ws'] + } + } + }) + }) + + afterEach(async () => { + await Promise.all(nodes.map((node) => node.stop())) + sinon.reset() + }) + + it('should connect to all the peers stored in the PeerStore, if their number is below minConnections', async () => { + const [libp2p] = await peerUtils.createPeer({ + fixture: false, + started: false, + config: { + addresses: { + listen: ['/ip4/127.0.0.1/tcp/0/ws'] + }, + connectionManager: { + minConnections: 3 + } + } + }) + + // Populate PeerStore before starting + libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs) + libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs) + + await libp2p.start() + + // Wait for peers to connect + await pWaitFor(() => libp2p.connectionManager.size === 2) + + await libp2p.stop() + }) + + it('should connect to all the peers stored in the PeerStore until reaching the minConnections', async () => { + const minConnections = 1 + const [libp2p] = await peerUtils.createPeer({ + fixture: false, + started: false, + config: { + addresses: { + listen: ['/ip4/127.0.0.1/tcp/0/ws'] + }, + connectionManager: { + minConnections + } + } + }) + + // Populate PeerStore before starting + libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs) + libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs) + + await libp2p.start() + + // Wait for peer to connect + await pWaitFor(() => libp2p.connectionManager.size === minConnections) + + // Wait more time to guarantee no other connection happened + await delay(200) + expect(libp2p.connectionManager.size).to.eql(minConnections) + + await libp2p.stop() + }) + + it('should connect to all the peers stored in the PeerStore until reaching the minConnections sorted', async () => { + const minConnections = 1 + const [libp2p] = await peerUtils.createPeer({ + fixture: false, + started: false, + config: { + addresses: { + listen: ['/ip4/127.0.0.1/tcp/0/ws'] + }, + connectionManager: { + minConnections + } + } + }) + + // Populate PeerStore before starting + libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs) + libp2p.peerStore.addressBook.set(nodes[1].peerId, nodes[1].multiaddrs) + libp2p.peerStore.protoBook.set(nodes[1].peerId, ['/protocol-min-conns']) + + await libp2p.start() + + // Wait for peer to connect + await pWaitFor(() => libp2p.connectionManager.size === minConnections) + + // Should have connected to the peer with protocols + expect(libp2p.connectionManager.get(nodes[0].peerId)).to.not.exist() + expect(libp2p.connectionManager.get(nodes[1].peerId)).to.exist() + + await libp2p.stop() + }) + + it('should connect to peers in the PeerStore when a peer disconnected', async () => { + const minConnections = 1 + const autoDialInterval = 1000 + + const [libp2p] = await peerUtils.createPeer({ + fixture: false, + config: { + addresses: { + listen: ['/ip4/127.0.0.1/tcp/0/ws'] + }, + connectionManager: { + minConnections, + autoDialInterval + } + } + }) + + // Populate PeerStore after starting (discovery) + libp2p.peerStore.addressBook.set(nodes[0].peerId, nodes[0].multiaddrs) + + // Wait for peer to connect + const conn = await libp2p.dial(nodes[0].peerId) + expect(libp2p.connectionManager.get(nodes[0].peerId)).to.exist() + + await conn.close() + // Closed + await pWaitFor(() => libp2p.connectionManager.size === 0) + // Connected + await pWaitFor(() => libp2p.connectionManager.size === 1) + + expect(libp2p.connectionManager.get(nodes[0].peerId)).to.exist() + + await libp2p.stop() + }) + }) }) diff --git a/test/connection-manager/index.spec.js b/test/connection-manager/index.spec.js index 8ffb0ee19c..caf6becb8a 100644 --- a/test/connection-manager/index.spec.js +++ b/test/connection-manager/index.spec.js @@ -58,7 +58,8 @@ describe('Connection Manager', () => { config: { modules: baseOptions.modules, connectionManager: { - maxConnections: max + maxConnections: max, + minConnections: 2 } }, started: false @@ -96,7 +97,8 @@ describe('Connection Manager', () => { config: { modules: baseOptions.modules, connectionManager: { - maxConnections: max + maxConnections: max, + minConnections: 0 } }, started: false diff --git a/test/peer-discovery/index.spec.js b/test/peer-discovery/index.spec.js index b6e355eed1..fcada30ca9 100644 --- a/test/peer-discovery/index.spec.js +++ b/test/peer-discovery/index.spec.js @@ -31,10 +31,13 @@ describe('peer discovery', () => { sinon.reset() }) - it('should dial know peers on startup', async () => { + it('should dial know peers on startup below the minConnections watermark', async () => { libp2p = new Libp2p({ ...baseOptions, - peerId + peerId, + connectionManager: { + minConnections: 2 + } }) libp2p.peerStore.addressBook.set(remotePeerId, [multiaddr('/ip4/165.1.1.1/tcp/80')])