From 0357bf2f32a75fe77a8ada3cf966221052198aef Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 10 Apr 2019 19:22:44 +0200 Subject: [PATCH] feat: support a priority queue for dials (#325) --- README.md | 22 +++++++++++++++++++++- src/connection/base.js | 1 + src/connection/index.js | 2 -- src/connection/manager.js | 16 +++++++++++++--- src/constants.js | 4 +++- src/dialer/index.js | 32 +++++++++++++++++++++++++++----- src/dialer/queue.js | 4 +++- src/dialer/queueManager.js | 27 ++++++++++++++++----------- test/dialer.spec.js | 33 ++++++++++++++++++++++++++++----- 9 files changed, 112 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index f54af01002..728adde233 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,16 @@ works like dial, but calls back with a [Connection State Machine](#connection-st Connection state machines emit a number of events that can be used to determine the current state of the connection and to received the underlying connection that can be used to transfer data. +### `switch.dialer.connect(peer, options, callback)` + +a low priority dial to the provided peer. Calls to `dial` and `dialFSM` will take priority. This should be used when an application only wishes to establish connections to new peers, such as during peer discovery when there is a low peer count. Currently, anything greater than the HIGH_PRIORITY (10) will be placed into the cold call queue, and anything less than or equal to the HIGH_PRIORITY will be added to the normal queue. + +- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][] +- `options`: Optional +- `options.priority`: Number of the priority of the dial, defaults to 20. +- `options.useFSM`: Boolean of whether or not to callback with a [Connection State Machine](#connection-state-machine) +- `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine) + ##### Events - `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted. - `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal. @@ -187,7 +197,17 @@ Emitted when the switch encounters an error. ### `switch.on('peer-mux-closed', (peer) => {})` -- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection. +- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection with. + +### `switch.on('connection:start', (peer) => {})` +This will be triggered anytime a new connection is created. + +- `peer`: is instance of [PeerInfo][] that has info of the peer we have just started a connection with. + +### `switch.on('connection:end', (peer) => {})` +This will be triggered anytime an existing connection, regardless of state, is removed from the switch's internal connection tracking. + +- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a connection with. ### `switch.on('start', () => {})` diff --git a/src/connection/base.js b/src/connection/base.js index cb3d5a59bb..36f7842815 100644 --- a/src/connection/base.js +++ b/src/connection/base.js @@ -80,6 +80,7 @@ class BaseConnection extends EventEmitter { * @returns {void} */ _onDisconnected () { + this.switch.connection.remove(this) this.log('disconnected from %s', this.theirB58Id) this.emit('close') this.removeAllListeners() diff --git a/src/connection/index.js b/src/connection/index.js index 1bfe13990d..a2df251a2d 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -269,8 +269,6 @@ class ConnectionFSM extends BaseConnection { _onDisconnecting () { this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer)) - this.switch.connection.remove(this) - delete this.switch.conns[this.theirB58Id] let tasks = [] diff --git a/src/connection/manager.js b/src/connection/manager.js index 1f17873914..b6abd362e3 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -33,7 +33,12 @@ class ConnectionManager { // Only add it if it's not there if (!this.get(connection)) { this.connections[connection.theirB58Id].push(connection) - this.switch.emit('peer-mux-established', connection.theirPeerInfo) + this.switch.emit('connection:start', connection.theirPeerInfo) + if (connection.getState() === 'MUXED') { + this.switch.emit('peer-mux-established', connection.theirPeerInfo) + } else { + connection.once('muxed', () => this.switch.emit('peer-mux-established', connection.theirPeerInfo)) + } } } @@ -81,8 +86,10 @@ class ConnectionManager { remove (connection) { // No record of the peer, disconnect it if (!this.connections[connection.theirB58Id]) { - connection.theirPeerInfo.disconnect() - this.switch.emit('peer-mux-closed', connection.theirPeerInfo) + if (connection.theirPeerInfo) { + connection.theirPeerInfo.disconnect() + this.switch.emit('peer-mux-closed', connection.theirPeerInfo) + } return } @@ -99,6 +106,9 @@ class ConnectionManager { connection.theirPeerInfo.disconnect() this.switch.emit('peer-mux-closed', connection.theirPeerInfo) } + + // A tracked connection was closed, let the world know + this.switch.emit('connection:end', connection.theirPeerInfo) } /** diff --git a/src/constants.js b/src/constants.js index fded895e50..f0b6496ded 100644 --- a/src/constants.js +++ b/src/constants.js @@ -6,5 +6,7 @@ module.exports = { DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials - QUARTER_HOUR: 15 * 60e3 + QUARTER_HOUR: 15 * 60e3, + PRIORITY_HIGH: 10, + PRIORITY_LOW: 20 } diff --git a/src/dialer/index.js b/src/dialer/index.js index 00675a5fbb..8ee1ace71c 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -6,7 +6,9 @@ const { BLACK_LIST_ATTEMPTS, BLACK_LIST_TTL, MAX_COLD_CALLS, - MAX_PARALLEL_DIALS + MAX_PARALLEL_DIALS, + PRIORITY_HIGH, + PRIORITY_LOW } = require('../constants') module.exports = function (_switch) { @@ -19,7 +21,7 @@ module.exports = function (_switch) { * @param {DialRequest} dialRequest * @returns {void} */ - function _dial ({ peerInfo, protocol, useFSM, callback }) { + function _dial ({ peerInfo, protocol, options, callback }) { if (typeof protocol === 'function') { callback = protocol protocol = null @@ -32,7 +34,7 @@ module.exports = function (_switch) { } // Add it to the queue, it will automatically get executed - dialQueueManager.add({ peerInfo, protocol, useFSM, callback }) + dialQueueManager.add({ peerInfo, protocol, options, callback }) } /** @@ -64,14 +66,33 @@ module.exports = function (_switch) { dialQueueManager.clearBlacklist(peerInfo) } + /** + * Attempts to establish a connection to the given `peerInfo` at + * a lower priority than a standard dial. + * @param {PeerInfo} peerInfo + * @param {object} options + * @param {boolean} options.useFSM Whether or not to return a `ConnectionFSM`. Defaults to false. + * @param {number} options.priority Lowest priority goes first. Defaults to 20. + * @param {function(Error, Connection)} callback + */ + function connect (peerInfo, options, callback) { + if (typeof options === 'function') { + callback = options + options = null + } + options = { useFSM: false, priority: PRIORITY_LOW, ...options } + _dial({ peerInfo, protocol: null, options, callback }) + } + /** * Adds the dial request to the queue for the given `peerInfo` + * The request will be added with a high priority (10). * @param {PeerInfo} peerInfo * @param {string} protocol * @param {function(Error, Connection)} callback */ function dial (peerInfo, protocol, callback) { - _dial({ peerInfo, protocol, useFSM: false, callback }) + _dial({ peerInfo, protocol, options: { useFSM: false, priority: PRIORITY_HIGH }, callback }) } /** @@ -82,10 +103,11 @@ module.exports = function (_switch) { * @param {function(Error, ConnectionFSM)} callback */ function dialFSM (peerInfo, protocol, callback) { - _dial({ peerInfo, protocol, useFSM: true, callback }) + _dial({ peerInfo, protocol, options: { useFSM: true, priority: PRIORITY_HIGH }, callback }) } return { + connect, dial, dialFSM, clearBlacklist, diff --git a/src/dialer/queue.js b/src/dialer/queue.js index 6df9557f77..c4e0f871d5 100644 --- a/src/dialer/queue.js +++ b/src/dialer/queue.js @@ -13,7 +13,9 @@ log.error = debug('libp2p:switch:dial:error') * @typedef {Object} DialRequest * @property {PeerInfo} peerInfo - The peer to dial to * @property {string} [protocol] - The protocol to create a stream for - * @property {boolean} useFSM - If `callback` should return a ConnectionFSM + * @property {object} options + * @property {boolean} options.useFSM - If `callback` should return a ConnectionFSM + * @property {number} options.priority - The priority of the dial * @property {function(Error, Connection|ConnectionFSM)} callback */ diff --git a/src/dialer/queueManager.js b/src/dialer/queueManager.js index 6199e0c972..52355f6b82 100644 --- a/src/dialer/queueManager.js +++ b/src/dialer/queueManager.js @@ -5,7 +5,7 @@ const Queue = require('./queue') const { DIAL_ABORTED } = require('../errors') const nextTick = require('async/nextTick') const retimer = require('retimer') -const { QUARTER_HOUR } = require('../constants') +const { QUARTER_HOUR, PRIORITY_HIGH } = require('../constants') const debug = require('debug') const log = debug('libp2p:switch:dial:manager') const noop = () => {} @@ -103,17 +103,25 @@ class DialQueueManager { * @param {DialRequest} dialRequest * @returns {void} */ - add ({ peerInfo, protocol, useFSM, callback }) { + add ({ peerInfo, protocol, options, callback }) { callback = callback ? once(callback) : noop // Add the dial to its respective queue const targetQueue = this.getQueue(peerInfo) - // If we have too many cold calls, abort the dial immediately - if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS && !protocol) { - return nextTick(callback, DIAL_ABORTED()) + + // Cold Call + if (options.priority > PRIORITY_HIGH) { + // If we have too many cold calls, abort the dial immediately + if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS) { + return nextTick(callback, DIAL_ABORTED()) + } + + if (this._queue.has(targetQueue.id)) { + return nextTick(callback, DIAL_ABORTED()) + } } - targetQueue.add(protocol, useFSM, callback) + targetQueue.add(protocol, options.useFSM, callback) // If we're already connected to the peer, start the queue now // While it might cause queues to go over the max parallel amount, @@ -130,15 +138,12 @@ class DialQueueManager { // Add the id to its respective queue set if the queue isn't running if (!targetQueue.isRunning) { - if (protocol) { + if (options.priority <= PRIORITY_HIGH) { this._queue.add(targetQueue.id) this._coldCallQueue.delete(targetQueue.id) // Only add it to the cold queue if it's not in the normal queue - } else if (!this._queue.has(targetQueue.id)) { - this._coldCallQueue.add(targetQueue.id) - // The peer is already in the normal queue, abort the cold call } else { - return nextTick(callback, DIAL_ABORTED()) + this._coldCallQueue.add(targetQueue.id) } } diff --git a/test/dialer.spec.js b/test/dialer.spec.js index 277af906e2..83ca43b02f 100644 --- a/test/dialer.spec.js +++ b/test/dialer.spec.js @@ -12,17 +12,20 @@ const PeerBook = require('peer-book') const Queue = require('../src/dialer/queue') const QueueManager = require('../src/dialer/queueManager') const Switch = require('../src') +const { PRIORITY_HIGH, PRIORITY_LOW } = require('../src/constants') const utils = require('./utils') const createInfos = utils.createInfos describe('dialer', () => { let switchA + let switchB - before((done) => createInfos(1, (err, infos) => { + before((done) => createInfos(2, (err, infos) => { expect(err).to.not.exist() switchA = new Switch(infos[0], new PeerBook()) + switchB = new Switch(infos[1], new PeerBook()) done() })) @@ -31,6 +34,26 @@ describe('dialer', () => { sinon.restore() }) + describe('connect', () => { + afterEach(() => { + switchA.dialer.clearBlacklist(switchB._peerInfo) + }) + + it('should use default options', (done) => { + switchA.dialer.connect(switchB._peerInfo, (err) => { + expect(err).to.exist() + done() + }) + }) + + it('should be able to use custom options', (done) => { + switchA.dialer.connect(switchB._peerInfo, { useFSM: true, priority: PRIORITY_HIGH }, (err) => { + expect(err).to.exist() + done() + }) + }) + }) + describe('queue', () => { it('should blacklist forever after 5 blacklists', () => { const queue = new Queue('QM', switchA) @@ -58,7 +81,7 @@ describe('dialer', () => { id: { toB58String: () => 'QmA' } }, protocol: null, - useFSM: true, + options: { useFSM: true, priority: PRIORITY_LOW }, callback: (err) => { expect(err.code).to.eql('DIAL_ABORTED') done() @@ -75,7 +98,7 @@ describe('dialer', () => { isConnected: () => null }, protocol: '/echo/1.0.0', - useFSM: true, + options: { useFSM: true, priority: PRIORITY_HIGH }, callback: () => {} } @@ -99,7 +122,7 @@ describe('dialer', () => { isConnected: () => null }, protocol: null, - useFSM: true, + options: { useFSM: true, priority: PRIORITY_LOW }, callback: () => {} } @@ -120,7 +143,7 @@ describe('dialer', () => { isConnected: () => null }, protocol: null, - useFSM: true, + options: { useFSM: true, priority: PRIORITY_LOW }, callback: (err) => { expect(runSpy.called).to.eql(false) expect(hasSpy.called).to.eql(true)