Skip to content

Commit

Permalink
fix: dial in series until we have proper abort support (libp2p#306)
Browse files Browse the repository at this point in the history
refactor: simplify the circuit dial logic

chore: remove travis windows cache

refactor: clean up dial many error logic

test: explicitly set correct address

test(refactor): update order of echo logic and add after

refactor: cleanup per feedback
  • Loading branch information
jacobheun authored Mar 5, 2019
1 parent d504040 commit ddf622b
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 60 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ node_js:
os:
- linux
- osx
- windows

script: npx nyc -s npm run test:node -- --bail
after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov

jobs:
include:
- os: windows
cache: false

- stage: check
script:
- npx aegir commitlint --travis
Expand Down
52 changes: 29 additions & 23 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ const withIs = require('class-is')
const BaseConnection = require('./base')

const observeConnection = require('../observe-connection')
const Errors = require('../errors')
const {
CONNECTION_FAILED,
DIAL_SELF,
INVALID_STATE_TRANSITION,
NO_TRANSPORTS_REGISTERED,
maybeUnexpectedEnd
} = require('../errors')

/**
* @typedef {Object} ConnectionOptions
Expand Down Expand Up @@ -136,7 +142,7 @@ class ConnectionFSM extends BaseConnection {
*/
dial () {
if (this.theirB58Id === this.ourPeerInfo.id.toB58String()) {
return this.emit('error', Errors.DIAL_SELF())
return this.emit('error', DIAL_SELF())
} else if (this.getState() === 'DIALING') {
return this.log('attempted to dial while already dialing, ignoring')
}
Expand Down Expand Up @@ -191,40 +197,40 @@ class ConnectionFSM extends BaseConnection {
this.log(`dialing ${this.theirB58Id}`)

if (!this.switch.hasTransports()) {
return this.close(Errors.NO_TRANSPORTS_REGISTERED())
return this.close(NO_TRANSPORTS_REGISTERED())
}

const tKeys = this.switch.availableTransports(this.theirPeerInfo)

const circuitEnabled = Boolean(this.switch.transports[Circuit.tag])
let circuitTried = false

if (circuitEnabled && !tKeys.includes(Circuit.tag)) {
tKeys.push(Circuit.tag)
}

const nextTransport = (key) => {
let transport = key
if (!transport) {
if (!circuitEnabled) {
return this.close(Errors.CONNECTION_FAILED(
new Error(`Circuit not enabled and all transports failed to dial peer ${this.theirB58Id}!`)
))
return this.close(
CONNECTION_FAILED(`Circuit not enabled and all transports failed to dial peer ${this.theirB58Id}!`)
)
}

if (circuitTried) {
return this.close(Errors.CONNECTION_FAILED(
new Error(`No available transports to dial peer ${this.theirB58Id}!`)
))
}
return this.close(
CONNECTION_FAILED(`No available transports to dial peer ${this.theirB58Id}!`)
)
}

this.log(`Falling back to dialing over circuit`)
this.theirPeerInfo.multiaddrs.add(`/p2p-circuit/ipfs/${this.theirB58Id}`)
circuitTried = true
transport = Circuit.tag
if (transport === Circuit.tag) {
this.theirPeerInfo.multiaddrs.add(`/p2p-circuit/p2p/${this.theirB58Id}`)
}

this.log(`dialing transport ${transport}`)
this.switch.transport.dial(transport, this.theirPeerInfo, (err, _conn) => {
if (err) {
this.emit('error:connection_attempt_failed', err.errors || [err])
this.log(err)
this.switch.transport.dial(transport, this.theirPeerInfo, (errors, _conn) => {
if (errors) {
this.emit('error:connection_attempt_failed', errors)
this.log(errors)
return nextTransport(tKeys.shift())
}

Expand Down Expand Up @@ -296,14 +302,14 @@ class ConnectionFSM extends BaseConnection {
const msDialer = new multistream.Dialer()
msDialer.handle(this.conn, (err) => {
if (err) {
return this.close(Errors.maybeUnexpectedEnd(err))
return this.close(maybeUnexpectedEnd(err))
}

this.log('selecting crypto %s to %s', this.switch.crypto.tag, this.theirB58Id)

msDialer.select(this.switch.crypto.tag, (err, _conn) => {
if (err) {
return this.close(Errors.maybeUnexpectedEnd(err))
return this.close(maybeUnexpectedEnd(err))
}

const conn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer)
Expand Down Expand Up @@ -444,7 +450,7 @@ class ConnectionFSM extends BaseConnection {
* @returns {void}
*/
_onStateError (err) {
this.emit('error', Errors.INVALID_STATE_TRANSITION(err))
this.emit('error', INVALID_STATE_TRANSITION(err))
this.log(err)
}
}
Expand Down
30 changes: 15 additions & 15 deletions src/limit-dialer/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const map = require('async/map')
const tryEach = require('async/tryEach')
const debug = require('debug')
const once = require('once')

Expand Down Expand Up @@ -40,25 +40,25 @@ class LimitDialer {
const token = { cancel: false }
callback = once(callback) // only call callback once

map(addrs, (m, cb) => {
this.dialSingle(peer, transport, m, token, cb)
}, (err, results) => {
if (err) {
return callback(err)
}
let errors = []
const tasks = addrs.map((m) => {
return (cb) => this.dialSingle(peer, transport, m, token, (err, result) => {
if (err) {
errors.push(err)
return cb(err)
}
return cb(null, result)
})
})

const success = results.filter((res) => res.conn)
if (success.length > 0) {
tryEach(tasks, (_, result) => {
if (result && result.conn) {
log('dialMany:success')
return callback(null, success[0])
return callback(null, result)
}

log('dialMany:error')
const error = new Error('Failed to dial any provided address')
error.errors = results
.filter((res) => res.error)
.map((res) => res.error)
return callback(error)
return callback(errors)
})
}

Expand Down
2 changes: 1 addition & 1 deletion src/limit-dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DialQueue {
this._dialWithTimeout(transport, addr, (err, conn) => {
if (err) {
log.error(`${transport.constructor.name}:work`, err)
return callback(null, { error: err })
return callback(err)
}

if (token.cancel) {
Expand Down
6 changes: 3 additions & 3 deletions src/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ class TransportManager {
log('dialing %s', key, multiaddrs.map((m) => m.toString()))

// dial each of the multiaddrs with the given transport
this.dialer.dialMany(peerInfo.id, transport, multiaddrs, (err, success) => {
if (err) {
return callback(err)
this.dialer.dialMany(peerInfo.id, transport, multiaddrs, (errors, success) => {
if (errors) {
return callback(errors)
}

peerInfo.connect(success.multiaddr)
Expand Down
8 changes: 3 additions & 5 deletions test/connection.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,9 @@ describe('ConnectionFSM', () => {
peerInfo: listenerSwitch._peerInfo
})

const stub = sinon.stub(dialerSwitch.transport, 'dial').callsArgWith(2, {
errors: [
new Error('address in use')
]
})
const stub = sinon.stub(dialerSwitch.transport, 'dial').callsArgWith(2, [
new Error('address in use')
])

connection.once('error:connection_attempt_failed', (errors) => {
expect(errors).to.have.length(1).mark()
Expand Down
7 changes: 3 additions & 4 deletions test/limit-dialer.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,18 @@ describe('LimitDialer', () => {

it('all failing', (done) => {
const dialer = new LimitDialer(2, 10)

const error = new Error('fail')
// mock transport
const t1 = {
dial (addr, cb) {
setTimeout(() => cb(new Error('fail')), 1)
setTimeout(() => cb(error), 1)
return {}
}
}

dialer.dialMany(peers[0].id, t1, peers[0].multiaddrs.toArray(), (err, conn) => {
expect(err).to.exist()
expect(err.errors).to.have.length(3)
expect(err.errors[0].message).to.eql('fail')
expect(err).to.eql([error, error, error])
expect(conn).to.not.exist()
done()
})
Expand Down
20 changes: 12 additions & 8 deletions test/transports.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ describe('transports', () => {
})
})

after(function (done) {
parallel([
(next) => switchA.stop(next),
(next) => switchB.stop(next)
], done)
})

it('.transport.remove', () => {
switchA.transport.add('test', new t.C())
expect(switchA.transports).to.have.any.keys(['test'])
Expand Down Expand Up @@ -91,28 +98,26 @@ describe('transports', () => {
const peer = morePeerInfo[0]
peer.multiaddrs.add(t.maGen(9999))

const conn = switchA.transport.dial(t.n, peer, (err, conn) => {
switchA.transport.dial(t.n, peer, (err, conn) => {
expect(err).to.not.exist()
tryEcho(conn, done)
})

tryEcho(conn, done)
})

it('.transport.dial to set of multiaddr, only one is available', (done) => {
const peer = morePeerInfo[1]
peer.multiaddrs.add(t.maGen(9359))
peer.multiaddrs.add(t.maGen(9329))
peer.multiaddrs.add(t.maGen(9910))
peer.multiaddrs.add(t.maGen(9999))
peer.multiaddrs.add(switchB._peerInfo.multiaddrs.toArray()[0]) // the valid address
peer.multiaddrs.add(t.maGen(9309))
// addr not supported added on purpose
peer.multiaddrs.add('/ip4/1.2.3.4/tcp/3456/ws/p2p-webrtc-star')

const conn = switchA.transport.dial(t.n, peer, (err, conn) => {
switchA.transport.dial(t.n, peer, (err, conn) => {
expect(err).to.not.exist()
tryEcho(conn, done)
})

tryEcho(conn, done)
})

it('.transport.dial to set of multiaddr, none is available', (done) => {
Expand All @@ -124,7 +129,6 @@ describe('transports', () => {

switchA.transport.dial(t.n, peer, (err, conn) => {
expect(err).to.exist()
expect(err.errors).to.have.length(2)
expect(conn).to.not.exist()
done()
})
Expand Down

0 comments on commit ddf622b

Please sign in to comment.