Skip to content
This repository has been archived by the owner on Feb 26, 2021. It is now read-only.

Commit

Permalink
fix: better support for interface closing
Browse files Browse the repository at this point in the history
chore: update deps
chore: remove test on pre-push
chore: remove non jenkins ci files
chore: fix aegir test commands
chore: fix linting
fix: make callback optional
fix: improve stability
  • Loading branch information
jacobheun committed Nov 13, 2018
1 parent f1c9fd5 commit 7bae7ee
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 134 deletions.
32 changes: 0 additions & 32 deletions .travis.yml

This file was deleted.

29 changes: 0 additions & 29 deletions appveyor.yml

This file was deleted.

15 changes: 0 additions & 15 deletions circle.yml

This file was deleted.

30 changes: 15 additions & 15 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@
"lint": "aegir lint",
"build": "aegir build",
"test": "aegir test",
"test:node": "aegir -t node",
"test:browser": "aegir -t browser",
"test:node": "aegir test -t node",
"test:browser": "aegir test -t browser",
"release": "aegir release",
"release-minor": "aegir release --type minor",
"release-major": "aegir release --type major",
"coverage": "aegir coverage",
"coverage-publish": "aegir-coverage publish"
},
"pre-push": [
"lint",
"test"
"lint"
],
"repository": {
"type": "git",
Expand All @@ -37,26 +36,27 @@
"npm": ">=3.0.0"
},
"devDependencies": {
"aegir": "^13.0.6",
"chai": "^4.1.2",
"aegir": "^17.0.1",
"chai": "^4.2.0",
"chai-checkmark": "^1.0.1",
"dirty-chai": "^2.0.1",
"gulp": "^3.9.1",
"interface-stream-muxer": "~0.5.9",
"libp2p-tcp": "~0.12.0",
"libp2p-websockets": "~0.11.0",
"multiaddr": "^4.0.0",
"interface-stream-muxer": "github:libp2p/interface-stream-muxer#test/increase-timeout",
"libp2p-tcp": "~0.13.0",
"libp2p-websockets": "~0.12.0",
"multiaddr": "^5.0.0",
"pull-file": "^1.1.0",
"pull-pair": "^1.1.0",
"pull-stream": "^3.6.7",
"run-parallel": "^1.1.8",
"run-parallel": "^1.1.9",
"sinon": "^7.1.0",
"tap-spec": "^4.1.1",
"tape": "^4.9.0"
},
"dependencies": {
"interface-connection": "~0.3.2",
"lodash.noop": "^3.0.1",
"pull-catch": "^1.0.0",
"pull-stream": "^3.6.9",
"pull-stream-to-stream": "^1.3.4",
"spdy-transport": "^2.1.0",
"spdy-transport": "^3.0.0",
"stream-to-pull-stream": "^1.7.2"
},
"contributors": [
Expand Down
10 changes: 1 addition & 9 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,7 @@ const SPDY_CODEC = require('./spdy-codec')

function create (rawConn, isListener) {
const conn = toStream(rawConn)
// Let it flow, let it flooow
conn.resume()

conn.on('end', () => {
// Cleanup and destroy the connection when it ends
// as the converted stream doesn't emit 'close'
// but .destroy will trigger a 'close' event.
conn.destroy()
})
conn.on('end', () => conn.destroy())

const spdyMuxer = spdy.connection.create(conn, {
protocol: 'spdy',
Expand Down
65 changes: 53 additions & 12 deletions src/muxer.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
'use strict'

const EventEmitter = require('events').EventEmitter
const noop = require('lodash.noop')
const Connection = require('interface-connection').Connection
const toPull = require('stream-to-pull-stream')
const pullCatch = require('pull-catch')
const pull = require('pull-stream')
const noop = () => {}

function catchError (stream) {
return {
source: pull(
stream.source,
pullCatch((err) => {
if (err.code === 'ECONNRESET' || err.code === 'EPIPE') {
return
}
return false
})
),
sink: stream.sink
}
}

const SPDY_CODEC = require('./spdy-codec')

Expand All @@ -18,19 +35,35 @@ module.exports = class Muxer extends EventEmitter {
spdy.start(3.1)

// The rest of the API comes by default with SPDY
spdy.on('close', () => {
this.emit('close')
spdy.on('close', (didError) => {
// If we get a fatal ok error, just close
if (didError && /ok/i.test(didError.message)) {
didError = false
}
spdy.destroyStreams(new Error('underlying socket has been closed'))
this.emit('close', didError)
})

spdy.on('error', (err) => {
this.emit('error', err)
if (!err) {
return noop()
}
// If the connection was severed, ensure the streams
// are destroyed
if (err.code === 'ECONNRESET' || err.code === 'EPIPE') {
spdy.destroyStreams()
} else {
this.emit('error', err)
}
})

// needed by other spdy impl that need the response headers
// in order to confirm the stream can be open
spdy.on('stream', (stream) => {
stream.respond(200, {})
const muxedConn = new Connection(toPull.duplex(stream), this.conn)
const muxedConn = new Connection(
catchError(toPull.duplex(stream)),
this.conn
)
this.emit('stream', muxedConn)
})
}
Expand All @@ -40,25 +73,33 @@ module.exports = class Muxer extends EventEmitter {
if (!callback) {
callback = noop
}
const conn = new Connection(null, this.conn)

this.spdy.request({
const stream = this.spdy.request({
method: 'POST',
path: '/',
headers: {}
}, (err, stream) => {
}, (err) => {
if (err) {
return callback(err)
}
conn.setInnerConn(toPull.duplex(stream), this.conn)
callback(null, conn)
})

const conn = new Connection(
catchError(toPull.duplex(stream)),
this.conn
)

return conn
}

end (cb) {
this.spdy.destroyStreams()
this.spdy.end(cb)
cb = cb || noop
this.spdy.end((err) => {
if (err && /ok/i.test(err.message)) {
return cb()
}
cb(err)
})
}
}
4 changes: 1 addition & 3 deletions test/conn-properties.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ describe('conn properties are propagated to each stream', () => {
})

after((done) => {
// TODO: fix listener close hanging
// listener.close(done)
done()
listener.close(done)
})

it('getObservedAddrs', (done) => {
Expand Down
59 changes: 59 additions & 0 deletions test/muxer.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/* eslint-env mocha */
'use strict'

const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-checkmark'))
const expect = chai.expect
const sinon = require('sinon')

const spdy = require('spdy-transport')
const pair = require('pull-pair/duplex')
const toStream = require('pull-stream-to-stream')

const Muxer = require('../src/muxer')

describe('multiplex-muxer', () => {
let muxer
let spdyMuxer

afterEach(() => {
sinon.restore()
})

it('can be created', () => {
const p = pair()
spdyMuxer = spdy.connection.create(toStream(p), {
protocol: 'spdy',
isServer: false
})
muxer = new Muxer(p, spdyMuxer)
})

it('catches newStream errors', (done) => {
sinon.stub(spdyMuxer, 'request').callsFake((_, cb) => {
cb(new Error('something bad happened'))
})

muxer.newStream((err) => {
expect(err).to.exist()
expect(err.message).to.equal('something bad happened')
done()
})
})

it('can get destroyed', (done) => {
const spy = sinon.spy(spdyMuxer, 'destroyStreams')
expect(2).checks(done)

muxer.end((err) => {
expect(err).to.not.exist().mark()

// End it again to test accidental duplicate close
muxer.end((err) => {
expect(spy.callCount).to.eql(2)
expect(err).to.not.exist().mark()
})
})
})
})
Loading

0 comments on commit 7bae7ee

Please sign in to comment.