diff --git a/README.md b/README.md index af40da3..f51b097 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,8 @@ const sw = new switch(peerInfo , peerBook [, options]) If defined, `options` should be an object with the following keys and respective values: - `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Defaults to `120000`(120 seconds) -- `maxParallelDials` - number of concurrent dials the switch should allow. Defaults to `50` +- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `50` +- `dialTimeout`: - number of ms a dial to a peer should be allowed to run. Defaults to `30000` (30 seconds) - `stats`: an object with the following keys and respective values: - `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`. - `computeThrottleMaxQueueSize`: maximum queue size to perform stats computation throttling. Defaults to `1000`. diff --git a/src/constants.js b/src/constants.js index fac81c8..ddf66a9 100644 --- a/src/constants.js +++ b/src/constants.js @@ -2,5 +2,6 @@ module.exports = { BLACK_LIST_TTL: 120e3, // How long before an errored peer can be dialed again + DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take MAX_PARALLEL_DIALS: 50 // Maximum allowed concurrent dials } diff --git a/src/dialer/queue.js b/src/dialer/queue.js index 8520039..08871d4 100644 --- a/src/dialer/queue.js +++ b/src/dialer/queue.js @@ -63,7 +63,7 @@ class Queue { * @constructor * @param {string} peerId * @param {Switch} _switch - * @param {function} onStopped Called when the queue stops + * @param {function(string)} onStopped Called when the queue stops */ constructor (peerId, _switch, onStopped) { this.id = peerId @@ -78,20 +78,16 @@ class Queue { } /** - * Adds the dial request to the queue and starts the - * queue if it is stopped + * Adds the dial request to the queue. The queue is not automatically started * @param {string} protocol * @param {boolean} useFSM If callback should use a ConnectionFSM instead * @param {function(Error, Connection)} callback - * @returns {boolean} whether or not the queue has been started */ add (protocol, useFSM, callback) { if (!this.isDialAllowed()) { nextTick(callback, ERR_BLACKLISTED()) - return false } this._queue.push({ protocol, useFSM, callback }) - return this.start() } /** @@ -133,7 +129,7 @@ class Queue { if (this.isRunning) { log('stopping dial queue to %s', this.id) this.isRunning = false - this.onStopped() + this.onStopped(this.id) } } diff --git a/src/dialer/queueManager.js b/src/dialer/queueManager.js index 4d418da..96ab36f 100644 --- a/src/dialer/queueManager.js +++ b/src/dialer/queueManager.js @@ -2,7 +2,6 @@ const once = require('once') const Queue = require('./queue') -const { DIAL_ABORTED } = require('../errors') const noop = () => {} class DialQueueManager { @@ -11,10 +10,10 @@ class DialQueueManager { * @param {Switch} _switch */ constructor (_switch) { - this._queue = [] + this._queue = new Set() + this._dialingQueues = new Set() this._queues = {} this.switch = _switch - this.dials = 0 } /** @@ -24,11 +23,8 @@ class DialQueueManager { * This causes the entire DialerQueue to be drained */ abort () { - // Abort items in the general queue - while (this._queue.length > 0) { - let dial = this._queue.shift() - dial.callback(DIAL_ABORTED()) - } + // Clear the general queue + this._queue.clear() // Abort the individual peer queues const queues = Object.values(this._queues) @@ -46,16 +42,24 @@ class DialQueueManager { add ({ peerInfo, protocol, useFSM, callback }) { callback = callback ? once(callback) : noop - // If the target queue is currently running, just add the dial - // directly to it. This acts as a crude priority lane for multiple - // calls to a peer. + // Add the dial to its respective queue const targetQueue = this.getQueue(peerInfo) - if (targetQueue.isRunning) { - targetQueue.add(protocol, useFSM, callback) + targetQueue.add(protocol, useFSM, callback) + + // If we're already connected to the peer, start the queue now + // While it might cause queues to go over the max parallel amount, + // it avoids blocking peers we're already connected to + if (peerInfo.isConnected()) { + targetQueue.start() return } - this._queue.push({ peerInfo, protocol, useFSM, callback }) + // Add the id to the general queue set if the queue isn't running + // and if the queue is allowed to dial + if (!targetQueue.isRunning && targetQueue.isDialAllowed()) { + this._queue.add(targetQueue.id) + } + this.run() } @@ -63,12 +67,14 @@ class DialQueueManager { * Will execute up to `MAX_PARALLEL_DIALS` dials */ run () { - if (this.dials < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.length > 0) { - let { peerInfo, protocol, useFSM, callback } = this._queue.shift() - let dialQueue = this.getQueue(peerInfo) - if (dialQueue.add(protocol, useFSM, callback)) { - this.dials++ - } + if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.size > 0) { + let nextQueue = this._queue.values().next() + if (nextQueue.done) return + + this._queue.delete(nextQueue.value) + let targetQueue = this._queues[nextQueue.value] + this._dialingQueues.add(targetQueue.id) + targetQueue.start() } } @@ -84,9 +90,10 @@ class DialQueueManager { * A handler for when dialing queues stop. This will trigger * `run()` in order to keep the queue processing. * @private + * @param {string} id peer id of the queue that stopped */ - _onQueueStopped () { - this.dials-- + _onQueueStopped (id) { + this._dialingQueues.delete(id) this.run() } diff --git a/src/limit-dialer/index.js b/src/limit-dialer/index.js index b7dbdc2..9b47389 100644 --- a/src/limit-dialer/index.js +++ b/src/limit-dialer/index.js @@ -1,12 +1,13 @@ 'use strict' -const tryEach = require('async/tryEach') +const race = require('async/race') const debug = require('debug') const once = require('once') const log = debug('libp2p:switch:dialer') const DialQueue = require('./queue') +const { CONNECTION_FAILED } = require('../errors') /** * Track dials per peer and limited them. @@ -42,19 +43,21 @@ class LimitDialer { let errors = [] const tasks = addrs.map((m) => { - return (cb) => this.dialSingle(peer, transport, m, token, (err, result) => { - if (err) { - errors.push(err) - return cb(err) + return (cb) => this.dialSingle(peer, transport, m, token, (err, res) => { + if (res) return cb(null, res) + + errors.push(err || CONNECTION_FAILED()) + + if (errors.length === tasks.length) { + cb(errors) } - return cb(null, result) }) }) - tryEach(tasks, (_, result) => { - if (result && result.conn) { + race(tasks, (_, successfulDial) => { + if (successfulDial) { log('dialMany:success') - return callback(null, result) + return callback(null, successfulDial) } log('dialMany:error') diff --git a/src/limit-dialer/queue.js b/src/limit-dialer/queue.js index 8a3cc9d..344997a 100644 --- a/src/limit-dialer/queue.js +++ b/src/limit-dialer/queue.js @@ -54,9 +54,9 @@ class DialQueue { pull(empty(), conn) // If we can close the connection, do it if (typeof conn.close === 'function') { - return conn.close((_) => callback(null, { cancel: true })) + return conn.close((_) => callback(null)) } - return callback(null, { cancel: true }) + return callback(null) } // one is enough diff --git a/src/transport.js b/src/transport.js index b0aac97..14bb282 100644 --- a/src/transport.js +++ b/src/transport.js @@ -8,14 +8,11 @@ const debug = require('debug') const log = debug('libp2p:switch:transport') const LimitDialer = require('./limit-dialer') +const { DIAL_TIMEOUT } = require('./constants') // number of concurrent outbound dials to make per peer, same as go-libp2p-swtch const defaultPerPeerRateLimit = 8 -// the amount of time a single dial has to succeed -// TODO this should be exposed as a option -const dialTimeout = 30 * 1000 - /** * Manages the transports for the switch. This simplifies dialing and listening across * multiple transports. @@ -23,7 +20,7 @@ const dialTimeout = 30 * 1000 class TransportManager { constructor (_switch) { this.switch = _switch - this.dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout) + this.dialer = new LimitDialer(defaultPerPeerRateLimit, this.switch._options.dialTimeout || DIAL_TIMEOUT) } /** diff --git a/test/limit-dialer.node.js b/test/limit-dialer.node.js index 985c59f..24cf8b4 100644 --- a/test/limit-dialer.node.js +++ b/test/limit-dialer.node.js @@ -3,6 +3,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) +chai.use(require('chai-checkmark')) const expect = chai.expect const multiaddr = require('multiaddr') const pull = require('pull-stream') @@ -52,13 +53,15 @@ describe('LimitDialer', () => { it('two success', (done) => { const dialer = new LimitDialer(2, 10) + expect(2).checks(done) + // mock transport const t1 = { dial (addr, cb) { const as = addr.toString() if (as.match(/191/)) { setImmediate(() => cb(new Error('fail'))) - return {} + return null } else if (as.match(/192/)) { setTimeout(cb, 2) return { @@ -69,7 +72,10 @@ describe('LimitDialer', () => { setTimeout(cb, 8) return { source: pull.values([2]), - sink: pull.drain() + sink: pull.onEnd((err) => { + // Verify the unused connection gets closed + expect(err).to.not.exist().mark() + }) } } } @@ -83,8 +89,7 @@ describe('LimitDialer', () => { conn, pull.collect((err, res) => { expect(err).to.not.exist() - expect(res).to.be.eql([1]) - done() + expect(res).to.be.eql([1]).mark() }) ) })