From 980f750d14fa8663d8df90687609b272602e50af Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 29 Mar 2019 07:53:08 +0000 Subject: [PATCH 01/16] refactor: wip switch to it-ws and async iterators License: MIT Signed-off-by: Alan Shaw --- package.json | 4 +- src/index.js | 39 +++------------ src/listener.js | 32 +++++------- test/node.js | 129 +++++++++++++++++++----------------------------- 4 files changed, 73 insertions(+), 131 deletions(-) diff --git a/package.json b/package.json index b074561..66f043a 100644 --- a/package.json +++ b/package.json @@ -43,9 +43,9 @@ "class-is": "^1.1.0", "debug": "^4.1.1", "interface-connection": "~0.3.2", + "it-ws": "^1.0.0", "mafmt": "^6.0.4", - "multiaddr-to-uri": "^4.0.1", - "pull-ws": "hugomrdias/pull-ws#fix/bundle-size" + "multiaddr-to-uri": "^4.0.1" }, "devDependencies": { "aegir": "^18.0.3", diff --git a/src/index.js b/src/index.js index b3053b0..e6b2947 100644 --- a/src/index.js +++ b/src/index.js @@ -1,47 +1,24 @@ 'use strict' -const connect = require('pull-ws/client') +const connect = require('it-ws/client') const mafmt = require('mafmt') const withIs = require('class-is') -const Connection = require('interface-connection').Connection - const toUri = require('multiaddr-to-uri') -const debug = require('debug') -const log = debug('libp2p:websockets:dialer') +const log = require('debug')('libp2p:websockets:transport') const createListener = require('./listener') class WebSockets { - dial (ma, options, callback) { - if (typeof options === 'function') { - callback = options - options = {} - } - - callback = callback || function () { } - + async dial (ma, options) { + log('dialing %s', ma) const url = toUri(ma) - log('dialing %s', url) - const socket = connect(url, { - binary: true, - onConnect: (err) => { - callback(err) - } - }) - - const conn = new Connection(socket) - conn.getObservedAddrs = (cb) => cb(null, [ma]) - conn.close = (cb) => socket.close(cb) - - return conn + const socket = connect(url, { binary: true }) + socket.getObservedAddrs = () => [ma] + log('connected %s', ma) + return socket } createListener (options, handler) { - if (typeof options === 'function') { - handler = options - options = {} - } - return createListener(options, handler) } diff --git a/src/listener.js b/src/listener.js index 2149cb3..3c30365 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,43 +1,35 @@ 'use strict' -const Connection = require('interface-connection').Connection const multiaddr = require('multiaddr') const os = require('os') -function noop () {} - -const createServer = require('pull-ws/server') || noop +const createServer = require('it-ws/server') module.exports = (options, handler) => { - const listener = createServer(options, (socket) => { - socket.getObservedAddrs = (callback) => { - // TODO research if we can reuse the address in anyway - return callback(null, []) - } - - handler(new Connection(socket)) + const server = createServer(options, socket => { + socket.getObservedAddrs = () => [] + handler(socket) }) let listeningMultiaddr - listener._listen = listener.listen - listener.listen = (ma, callback) => { - callback = callback || noop + const listen = server.listen + server.listen = ma => { listeningMultiaddr = ma if (ma.protoNames().includes('ipfs')) { ma = ma.decapsulate('ipfs') } - listener._listen(ma.toOptions(), callback) + return listen(ma.toOptions()) } - listener.getAddrs = (callback) => { + server.getAddrs = async () => { const multiaddrs = [] - const address = listener.address() + const address = server.address() if (!address) { - return callback(new Error('Listener is not ready yet')) + throw new Error('Listener is not ready yet') } let ipfsId = listeningMultiaddr.getPeerId() @@ -65,8 +57,8 @@ module.exports = (options, handler) => { } } - callback(null, multiaddrs) + return multiaddrs } - return listener + return server } diff --git a/test/node.js b/test/node.js index f6ba702..d67852e 100644 --- a/test/node.js +++ b/test/node.js @@ -30,19 +30,18 @@ describe('listen', () => { ws = new WS() }) - it('listen, check for callback', (done) => { + it('listen, check for promise', async () => { const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) it('listen, check for listening event', (done) => { const listener = ws.createListener((conn) => { }) - listener.on('listening', () => { - listener.close(done) + listener.on('listening', async () => { + await listener.close() + done() }) listener.listen(ma) @@ -59,14 +58,12 @@ describe('listen', () => { listener.listen(ma) }) - it('listen on addr with /ipfs/QmHASH', (done) => { + it('listen on addr with /ipfs/QmHASH', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = ws.createListener((conn) => { }) - listener.listen(ma, () => { - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) it.skip('close listener with connections, through timeout', (done) => { @@ -82,73 +79,53 @@ describe('listen', () => { // TODO 0.0.0.0 not supported yet }) - it('getAddrs', (done) => { - const listener = ws.createListener((conn) => { - }) - listener.listen(ma, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs[0]).to.deep.equal(ma) - listener.close(done) - }) - }) + it('getAddrs', async () => { + const listener = ws.createListener((conn) => { }) + await listener.listen(ma) + const addrs = await listener.getAddrs() + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + await listener.close() }) - it('getAddrs on port 0 listen', (done) => { + it('getAddrs on port 0 listen', async () => { const addr = multiaddr(`/ip4/127.0.0.1/tcp/0/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') - listener.close(done) - }) - }) + const listener = ws.createListener((conn) => { }) + await listener.listen(addr) + const addrs = await listener.getAddrs() + expect(addrs.length).to.equal(1) + expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') + await listener.close() }) - it('getAddrs from listening on 0.0.0.0', (done) => { + it('getAddrs from listening on 0.0.0.0', async () => { const addr = multiaddr(`/ip4/0.0.0.0/tcp/9003/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') - listener.close(done) - }) - }) + const listener = ws.createListener((conn) => { }) + await listener.listen(addr) + const addrs = await listener.getAddrs() + expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') + await listener.close() }) - it('getAddrs from listening on 0.0.0.0 and port 0', (done) => { + it('getAddrs from listening on 0.0.0.0 and port 0', async () => { const addr = multiaddr(`/ip4/0.0.0.0/tcp/0/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') - expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') - listener.close(done) - }) - }) + const listener = ws.createListener((conn) => { }) + await listener.listen(addr) + const addrs = await listener.getAddrs() + expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') + expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') + await listener.close() }) - it('getAddrs preserves IPFS Id', (done) => { + it('getAddrs preserves IPFS Id', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = ws.createListener((conn) => { }) - listener.listen(ma, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs[0]).to.deep.equal(ma) - listener.close(done) - }) - }) + await listener.listen(ma) + const addrs = await listener.getAddrs() + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + await listener.close() }) }) @@ -160,19 +137,18 @@ describe('listen', () => { ws = new WS() }) - it('listen, check for callback', (done) => { + it('listen, check for promise', async () => { const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) it('listen, check for listening event', (done) => { const listener = ws.createListener((conn) => { }) - listener.on('listening', () => { - listener.close(done) + listener.on('listening', async () => { + await listener.close() + done() }) listener.listen(ma) @@ -189,14 +165,11 @@ describe('listen', () => { listener.listen(ma) }) - it('listen on addr with /ipfs/QmHASH', (done) => { + it('listen on addr with /ipfs/QmHASH', async () => { const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) }) }) From 4321b7e89a2c7da55af1a1ec3bac46643a68613d Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 1 Apr 2019 14:27:25 +0100 Subject: [PATCH 02/16] feat: adaper with passing tests --- package.json | 3 +- src/adapter.js | 73 +++++ src/index.js | 17 +- test/adapter/compliance.node.js | 24 ++ test/adapter/node.js | 563 ++++++++++++++++++++++++++++++++ 5 files changed, 670 insertions(+), 10 deletions(-) create mode 100644 src/adapter.js create mode 100644 test/adapter/compliance.node.js create mode 100644 test/adapter/node.js diff --git a/package.json b/package.json index 66f043a..8badf4b 100644 --- a/package.json +++ b/package.json @@ -40,10 +40,11 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { + "async-iterator-to-pull-stream": "^1.3.0", "class-is": "^1.1.0", "debug": "^4.1.1", "interface-connection": "~0.3.2", - "it-ws": "^1.0.0", + "it-ws": "^2.1.0", "mafmt": "^6.0.4", "multiaddr-to-uri": "^4.0.1" }, diff --git a/src/adapter.js b/src/adapter.js new file mode 100644 index 0000000..15015b9 --- /dev/null +++ b/src/adapter.js @@ -0,0 +1,73 @@ +'use strict' + +const { Connection } = require('interface-connection') +const withIs = require('class-is') +const toPull = require('async-iterator-to-pull-stream') +const WebSockets = require('./') +const noop = () => {} + +function callbackify (fn) { + return async function (...args) { + let cb = args.pop() + if (typeof cb !== 'function') { + args.push(cb) + cb = noop + } + let res + try { + res = await fn(...args) + } catch (err) { + return cb(err) + } + cb(null, res) + } +} + +// Legacy adapter to old transport & connection interface +class WebSocketsAdapter extends WebSockets { + dial (ma, options, callback) { + if (typeof options === 'function') { + callback = options + options = {} + } + + callback = callback || noop + + const socket = super.dial(ma, options) + const conn = new Connection(toPull.duplex(socket)) + + conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) + conn.close = callbackify(socket.close.bind(socket)) + + socket.connected().then(callback).catch(callback) + + return conn + } + + createListener (options, handler) { + if (typeof options === 'function') { + handler = options + options = {} + } + + const server = super.createListener(options, socket => { + const conn = new Connection(toPull.duplex(socket)) + conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) + handler(conn) + }) + + const proxy = { + listen: callbackify(server.listen.bind(server)), + close: callbackify(server.close.bind(server)), + getAddrs: callbackify(server.getAddrs.bind(server)), + getObservedAddrs: callbackify(() => server.getObservedAddrs()) + } + + return new Proxy(server, { get: (_, prop) => proxy[prop] || server[prop] }) + } +} + +module.exports = withIs(WebSocketsAdapter, { + className: 'WebSockets', + symbolName: '@libp2p/js-libp2p-websockets/websockets' +}) diff --git a/src/index.js b/src/index.js index e6b2947..0237dc6 100644 --- a/src/index.js +++ b/src/index.js @@ -9,10 +9,9 @@ const log = require('debug')('libp2p:websockets:transport') const createListener = require('./listener') class WebSockets { - async dial (ma, options) { + dial (ma, options) { log('dialing %s', ma) - const url = toUri(ma) - const socket = connect(url, { binary: true }) + const socket = connect(toUri(ma), { binary: true }) socket.getObservedAddrs = () => [ma] log('connected %s', ma) return socket @@ -23,9 +22,7 @@ class WebSockets { } filter (multiaddrs) { - if (!Array.isArray(multiaddrs)) { - multiaddrs = [multiaddrs] - } + multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] return multiaddrs.filter((ma) => { if (ma.protoNames().includes('p2p-circuit')) { @@ -36,10 +33,12 @@ class WebSockets { ma = ma.decapsulate('ipfs') } - return mafmt.WebSockets.matches(ma) || - mafmt.WebSocketsSecure.matches(ma) + return mafmt.WebSockets.matches(ma) || mafmt.WebSocketsSecure.matches(ma) }) } } -module.exports = withIs(WebSockets, { className: 'WebSockets', symbolName: '@libp2p/js-libp2p-websockets/websockets' }) +module.exports = withIs(WebSockets, { + className: 'WebSockets', + symbolName: '@libp2p/js-libp2p-websockets/websockets' +}) diff --git a/test/adapter/compliance.node.js b/test/adapter/compliance.node.js new file mode 100644 index 0000000..e940090 --- /dev/null +++ b/test/adapter/compliance.node.js @@ -0,0 +1,24 @@ +/* eslint-env mocha */ +'use strict' + +const tests = require('interface-transport') +const multiaddr = require('multiaddr') +const WS = require('../../src/adapter') + +describe('compliance', () => { + tests({ + setup (callback) { + let ws = new WS() + const addrs = [ + multiaddr('/ip4/127.0.0.1/tcp/9091/ws'), + multiaddr('/ip4/127.0.0.1/tcp/9092/wss'), + multiaddr('/dns4/ipfs.io/tcp/9092/ws'), + multiaddr('/dns4/ipfs.io/tcp/9092/wss') + ] + callback(null, ws, addrs) + }, + teardown (callback) { + callback() + } + }) +}) diff --git a/test/adapter/node.js b/test/adapter/node.js new file mode 100644 index 0000000..7537ba9 --- /dev/null +++ b/test/adapter/node.js @@ -0,0 +1,563 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 6] */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const multiaddr = require('multiaddr') +const pull = require('pull-stream') +const goodbye = require('pull-goodbye') + +const WS = require('../../src/adapter') + +require('./compliance.node') + +describe('instantiate the transport', () => { + it('create', () => { + const ws = new WS() + expect(ws).to.exist() + }) +}) + +describe('listen', () => { + describe('ip4', () => { + let ws + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') + + beforeEach(() => { + ws = new WS() + }) + + it('listen, check for callback', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.close(done) + }) + }) + + it('listen, check for listening event', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.on('listening', () => { + listener.close(done) + }) + + listener.listen(ma) + }) + + it('listen, check for the close event', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.on('listening', () => { + listener.on('close', done) + listener.close() + }) + + listener.listen(ma) + }) + + it('listen on addr with /ipfs/QmHASH', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.close(done) + }) + }) + + it.skip('close listener with connections, through timeout', (done) => { + // TODO `ws` closes all anyway, we need to make it not close + // first - https://github.com/diasdavid/simple-websocket-server + }) + + it.skip('listen on port 0', (done) => { + // TODO port 0 not supported yet + }) + + it.skip('listen on any Interface', (done) => { + // TODO 0.0.0.0 not supported yet + }) + + it('getAddrs', (done) => { + const listener = ws.createListener((conn) => { + }) + listener.listen(ma, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + listener.close(done) + }) + }) + }) + + it('getAddrs on port 0 listen', (done) => { + const addr = multiaddr(`/ip4/127.0.0.1/tcp/0/ws`) + const listener = ws.createListener((conn) => { + }) + listener.listen(addr, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.length).to.equal(1) + expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') + listener.close(done) + }) + }) + }) + + it('getAddrs from listening on 0.0.0.0', (done) => { + const addr = multiaddr(`/ip4/0.0.0.0/tcp/9003/ws`) + const listener = ws.createListener((conn) => { + }) + listener.listen(addr, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') + listener.close(done) + }) + }) + }) + + it('getAddrs from listening on 0.0.0.0 and port 0', (done) => { + const addr = multiaddr(`/ip4/0.0.0.0/tcp/0/ws`) + const listener = ws.createListener((conn) => { + }) + listener.listen(addr, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') + expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') + listener.close(done) + }) + }) + }) + + it('getAddrs preserves IPFS Id', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + listener.close(done) + }) + }) + }) + }) + + describe('ip6', () => { + let ws + const ma = multiaddr('/ip6/::1/tcp/9091/ws') + + beforeEach(() => { + ws = new WS() + }) + + it('listen, check for callback', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.close(done) + }) + }) + + it('listen, check for listening event', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.on('listening', () => { + listener.close(done) + }) + + listener.listen(ma) + }) + + it('listen, check for the close event', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.on('listening', () => { + listener.on('close', done) + listener.close() + }) + + listener.listen(ma) + }) + + it('listen on addr with /ipfs/QmHASH', (done) => { + const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.close(done) + }) + }) + }) +}) + +describe('dial', () => { + describe('ip4', () => { + let ws + let listener + const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws') + + beforeEach((done) => { + ws = new WS() + listener = ws.createListener((conn) => { + pull(conn, conn) + }) + listener.listen(ma, done) + }) + + afterEach((done) => { + listener.close(done) + }) + + it('dial', (done) => { + const conn = ws.dial(ma) + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + + expect(result).to.be.eql(['hey']) + done() + }) + }) + + pull(s, conn, s) + }) + + it('dial with IPFS Id', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const conn = ws.dial(ma) + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + + expect(result).to.be.eql(['hey']) + done() + }) + }) + + pull(s, conn, s) + }) + }) + + describe('ip6', () => { + let ws + let listener + const ma = multiaddr('/ip6/::1/tcp/9091') + + beforeEach((done) => { + ws = new WS() + listener = ws.createListener((conn) => { + pull(conn, conn) + }) + listener.listen(ma, done) + }) + + afterEach((done) => { + listener.close(done) + }) + + it('dial', (done) => { + const conn = ws.dial(ma) + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + + expect(result).to.be.eql(['hey']) + done() + }) + }) + + pull(s, conn, s) + }) + + it('dial with IPFS Id', (done) => { + const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const conn = ws.dial(ma) + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + + expect(result).to.be.eql(['hey']) + done() + }) + }) + + pull(s, conn, s) + }) + }) +}) + +describe('filter addrs', () => { + let ws + + before(() => { + ws = new WS() + }) + + describe('filter valid addrs for this transport', function () { + it('should fail invalid WS addresses', function () { + const ma1 = multiaddr('/ip4/127.0.0.1/tcp/9090') + const ma2 = multiaddr('/ip4/127.0.0.1/udp/9090') + const ma3 = multiaddr('/ip6/::1/tcp/80') + const ma4 = multiaddr('/dnsaddr/ipfs.io/tcp/80') + + const valid = ws.filter([ma1, ma2, ma3, ma4]) + expect(valid.length).to.equal(0) + }) + + it('should filter correct ipv4 addresses', function () { + const ma1 = multiaddr('/ip4/127.0.0.1/tcp/80/ws') + const ma2 = multiaddr('/ip4/127.0.0.1/tcp/443/wss') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct ipv4 addresses with ipfs id', function () { + const ma1 = multiaddr('/ip4/127.0.0.1/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/ip4/127.0.0.1/tcp/80/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct ipv6 address', function () { + const ma1 = multiaddr('/ip6/::1/tcp/80/ws') + const ma2 = multiaddr('/ip6/::1/tcp/443/wss') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct ipv6 addresses with ipfs id', function () { + const ma1 = multiaddr('/ip6/::1/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/ip6/::1/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct dns address', function () { + const ma1 = multiaddr('/dnsaddr/ipfs.io/ws') + const ma2 = multiaddr('/dnsaddr/ipfs.io/tcp/80/ws') + const ma3 = multiaddr('/dnsaddr/ipfs.io/tcp/80/wss') + + const valid = ws.filter([ma1, ma2, ma3]) + expect(valid.length).to.equal(3) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + expect(valid[2]).to.deep.equal(ma3) + }) + + it('should filter correct dns address with ipfs id', function () { + const ma1 = multiaddr('/dnsaddr/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/dnsaddr/ipfs.io/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct dns4 address', function () { + const ma1 = multiaddr('/dns4/ipfs.io/tcp/80/ws') + const ma2 = multiaddr('/dns4/ipfs.io/tcp/443/wss') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct dns6 address', function () { + const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws') + const ma2 = multiaddr('/dns6/ipfs.io/tcp/443/wss') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct dns6 address with ipfs id', function () { + const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/dns6/ipfs.io/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter mixed addresses', function () { + const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/ip4/127.0.0.1/tcp/9090') + const ma3 = multiaddr('/ip4/127.0.0.1/udp/9090') + const ma4 = multiaddr('/dns6/ipfs.io/ws') + const mh5 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw' + + '/p2p-circuit/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2, ma3, ma4, mh5]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma4) + }) + }) + + it('filter a single addr for this transport', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter(ma) + expect(valid.length).to.equal(1) + expect(valid[0]).to.deep.equal(ma) + done() + }) +}) + +describe('valid Connection', () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9092/ws') + + it('get observed addrs', (done) => { + let dialerObsAddrs + let listenerObsAddrs + + const ws = new WS() + + const listener = ws.createListener((conn) => { + expect(conn).to.exist() + + conn.getObservedAddrs((err, addrs) => { + expect(err).to.not.exist() + dialerObsAddrs = addrs + }) + + pull(conn, conn) + }) + + listener.listen(ma, () => { + const conn = ws.dial(ma) + + pull( + pull.empty(), + conn, + pull.onEnd(onEnd) + ) + + function onEnd () { + conn.getObservedAddrs((err, addrs) => { + expect(err).to.not.exist() + listenerObsAddrs = addrs + + listener.close(onClose) + + function onClose () { + expect(listenerObsAddrs[0]).to.deep.equal(ma) + expect(dialerObsAddrs.length).to.equal(0) + done() + } + }) + } + }) + }) + + it('get Peer Info', (done) => { + const ws = new WS() + + const listener = ws.createListener((conn) => { + expect(conn).to.exist() + + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.exist() + }) + + pull(conn, conn) + }) + + listener.listen(ma, () => { + const conn = ws.dial(ma) + + pull( + pull.empty(), + conn, + pull.onEnd(onEnd) + ) + + function onEnd () { + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.exist() + listener.close(done) + }) + } + }) + }) + + it('set Peer Info', (done) => { + const ws = new WS() + + const listener = ws.createListener((conn) => { + expect(conn).to.exist() + conn.setPeerInfo('a') + + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.not.exist() + expect(peerInfo).to.equal('a') + }) + + pull(conn, conn) + }) + + listener.listen(ma, onListen) + + function onListen () { + const conn = ws.dial(ma) + conn.setPeerInfo('b') + + pull( + pull.empty(), + conn, + pull.onEnd(onEnd) + ) + + function onEnd () { + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.not.exist() + expect(peerInfo).to.equal('b') + listener.close(done) + }) + } + } + }) +}) + +describe.skip('turbolence', () => { + it('dialer - emits error on the other end is terminated abruptly', (done) => { + }) + it('listener - emits error on the other end is terminated abruptly', (done) => { + }) +}) From 616fac9d213ad74ff3bf1648c98e7143056edc6d Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 2 Apr 2019 10:05:45 +0100 Subject: [PATCH 03/16] refactor: async dial that resolves on connection open License: MIT Signed-off-by: Alan Shaw --- package.json | 2 +- src/adapter.js | 20 ++++++++++++++------ src/index.js | 3 ++- src/listener.js | 1 - 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/package.json b/package.json index 8badf4b..64e8ea7 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,7 @@ "interface-transport": "~0.3.6", "multiaddr": "^6.0.3", "pull-goodbye": "0.0.2", - "pull-stream": "^3.6.7" + "pull-stream": "^3.6.9" }, "contributors": [ "Chris Campbell ", diff --git a/src/adapter.js b/src/adapter.js index 15015b9..76453bf 100644 --- a/src/adapter.js +++ b/src/adapter.js @@ -3,6 +3,8 @@ const { Connection } = require('interface-connection') const withIs = require('class-is') const toPull = require('async-iterator-to-pull-stream') +const error = require('pull-stream/sources/error') +const drain = require('pull-stream/sinks/drain') const WebSockets = require('./') const noop = () => {} @@ -33,13 +35,19 @@ class WebSocketsAdapter extends WebSockets { callback = callback || noop - const socket = super.dial(ma, options) - const conn = new Connection(toPull.duplex(socket)) + const conn = new Connection() - conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) - conn.close = callbackify(socket.close.bind(socket)) - - socket.connected().then(callback).catch(callback) + super.dial(ma, options) + .then(socket => { + conn.setInnerConn(toPull.duplex(socket)) + conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) + conn.close = callbackify(socket.close.bind(socket)) + callback(null, conn) + }) + .catch(err => { + conn.setInnerConn({ sink: drain(), source: error(err) }) + callback(err) + }) return conn } diff --git a/src/index.js b/src/index.js index 0237dc6..c1f620d 100644 --- a/src/index.js +++ b/src/index.js @@ -9,9 +9,10 @@ const log = require('debug')('libp2p:websockets:transport') const createListener = require('./listener') class WebSockets { - dial (ma, options) { + async dial (ma, options) { log('dialing %s', ma) const socket = connect(toUri(ma), { binary: true }) + await socket.connected() socket.getObservedAddrs = () => [ma] log('connected %s', ma) return socket diff --git a/src/listener.js b/src/listener.js index 3c30365..8e23760 100644 --- a/src/listener.js +++ b/src/listener.js @@ -2,7 +2,6 @@ const multiaddr = require('multiaddr') const os = require('os') - const createServer = require('it-ws/server') module.exports = (options, handler) => { From 1b9c591da6c468ac65deae02ba9355e4639f325f Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 2 Apr 2019 15:18:30 +0100 Subject: [PATCH 04/16] fix: listener params and adapter tests License: MIT Signed-off-by: Alan Shaw --- package.json | 6 +- src/listener.js | 7 + test/adapter/compliance.node.js | 4 +- test/adapter/compliance/dial-test.js | 73 +++++++++ test/adapter/compliance/index.js | 12 ++ test/adapter/compliance/listen-test.js | 124 ++++++++++++++ test/adapter/index.js | 2 + test/adapter/node.js | 17 +- test/node.js | 213 +++++-------------------- 9 files changed, 274 insertions(+), 184 deletions(-) create mode 100644 test/adapter/compliance/dial-test.js create mode 100644 test/adapter/compliance/index.js create mode 100644 test/adapter/compliance/listen-test.js create mode 100644 test/adapter/index.js diff --git a/package.json b/package.json index 64e8ea7..c09b80d 100644 --- a/package.json +++ b/package.json @@ -53,9 +53,13 @@ "chai": "^4.1.2", "dirty-chai": "^2.0.1", "interface-transport": "~0.3.6", + "it-goodbye": "^1.0.0", + "it-pipe": "^1.0.0", "multiaddr": "^6.0.3", "pull-goodbye": "0.0.2", - "pull-stream": "^3.6.9" + "pull-serializer": "~0.3.2", + "pull-stream": "^3.6.9", + "streaming-iterables": "^4.0.2" }, "contributors": [ "Chris Campbell ", diff --git a/src/listener.js b/src/listener.js index 8e23760..7b3ed25 100644 --- a/src/listener.js +++ b/src/listener.js @@ -5,6 +5,13 @@ const os = require('os') const createServer = require('it-ws/server') module.exports = (options, handler) => { + if (typeof options === 'function') { + handler = options + options = {} + } + + options = options || {} + const server = createServer(options, socket => { socket.getObservedAddrs = () => [] handler(socket) diff --git a/test/adapter/compliance.node.js b/test/adapter/compliance.node.js index e940090..b309240 100644 --- a/test/adapter/compliance.node.js +++ b/test/adapter/compliance.node.js @@ -1,11 +1,11 @@ /* eslint-env mocha */ 'use strict' -const tests = require('interface-transport') +const tests = require('./compliance') const multiaddr = require('multiaddr') const WS = require('../../src/adapter') -describe('compliance', () => { +describe('adapter compliance', () => { tests({ setup (callback) { let ws = new WS() diff --git a/test/adapter/compliance/dial-test.js b/test/adapter/compliance/dial-test.js new file mode 100644 index 0000000..85c1a6c --- /dev/null +++ b/test/adapter/compliance/dial-test.js @@ -0,0 +1,73 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const pull = require('pull-stream') +const goodbye = require('pull-goodbye') +const serializer = require('pull-serializer') + +module.exports = (common) => { + describe('dial', () => { + let addrs + let transport + let listener + + before((done) => { + common.setup((err, _transport, _addrs) => { + if (err) return done(err) + transport = _transport + addrs = _addrs + done() + }) + }) + + after((done) => { + common.teardown(done) + }) + + beforeEach((done) => { + listener = transport.createListener((conn) => { + pull(conn, conn) + }) + listener.listen(addrs[0], done) + }) + + afterEach((done) => { + listener.close(done) + }) + + it('simple', (done) => { + const s = serializer(goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, values) => { + expect(err).to.not.exist() + expect( + values + ).to.be.eql( + ['hey'] + ) + done() + }) + })) + + pull( + s, + transport.dial(addrs[0]), + s + ) + }) + + it('to non existent listener', (done) => { + pull( + transport.dial(addrs[1]), + pull.onEnd((err) => { + expect(err).to.exist() + done() + }) + ) + }) + }) +} diff --git a/test/adapter/compliance/index.js b/test/adapter/compliance/index.js new file mode 100644 index 0000000..e8173e2 --- /dev/null +++ b/test/adapter/compliance/index.js @@ -0,0 +1,12 @@ +/* eslint-env mocha */ +'use strict' + +const dial = require('./dial-test') +const listen = require('./listen-test') + +module.exports = (common) => { + describe('interface-transport', () => { + dial(common) + listen(common) + }) +} diff --git a/test/adapter/compliance/listen-test.js b/test/adapter/compliance/listen-test.js new file mode 100644 index 0000000..082361a --- /dev/null +++ b/test/adapter/compliance/listen-test.js @@ -0,0 +1,124 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const pull = require('pull-stream') + +module.exports = (common) => { + describe('listen', () => { + let addrs + let transport + + before((done) => { + common.setup((err, _transport, _addrs) => { + if (err) return done(err) + transport = _transport + addrs = _addrs + done() + }) + }) + + after((done) => { + common.teardown(done) + }) + + it('simple', (done) => { + const listener = transport.createListener((conn) => {}) + listener.listen(addrs[0], () => { + listener.close(done) + }) + }) + + it('close listener with connections, through timeout', (done) => { + const finish = plan(3, done) + const listener = transport.createListener((conn) => { + pull(conn, conn) + }) + + listener.listen(addrs[0], () => { + const socket1 = transport.dial(addrs[0], () => { + listener.close(finish) + }) + + pull( + transport.dial(addrs[0]), + pull.onEnd(() => { + finish() + }) + ) + + pull( + pull.values([Buffer.from('Some data that is never handled')]), + socket1, + pull.onEnd(() => { + finish() + }) + ) + }) + }) + + describe('events', () => { + // eslint-disable-next-line + // TODO: figure out why it fails in the full test suite + it.skip('connection', (done) => { + const finish = plan(2, done) + + const listener = transport.createListener() + + listener.on('connection', (conn) => { + expect(conn).to.exist() + finish() + }) + + listener.listen(addrs[0], () => { + transport.dial(addrs[0], () => { + listener.close(finish) + }) + }) + }) + + it('listening', (done) => { + const listener = transport.createListener() + listener.on('listening', () => { + listener.close(done) + }) + listener.listen(addrs[0]) + }) + + // eslint-disable-next-line + // TODO: how to get the listener to emit an error? + it.skip('error', (done) => { + const listener = transport.createListener() + listener.on('error', (err) => { + expect(err).to.exist() + listener.close(done) + }) + }) + + it('close', (done) => { + const finish = plan(2, done) + const listener = transport.createListener() + listener.on('close', finish) + + listener.listen(addrs[0], () => { + listener.close(finish) + }) + }) + }) + }) +} + +function plan (n, done) { + let i = 0 + return (err) => { + if (err) return done(err) + i++ + + if (i === n) done() + } +} diff --git a/test/adapter/index.js b/test/adapter/index.js new file mode 100644 index 0000000..24be09b --- /dev/null +++ b/test/adapter/index.js @@ -0,0 +1,2 @@ +require('./compliance.node') +require('./node') diff --git a/test/adapter/node.js b/test/adapter/node.js index 7537ba9..9441cbd 100644 --- a/test/adapter/node.js +++ b/test/adapter/node.js @@ -14,14 +14,14 @@ const WS = require('../../src/adapter') require('./compliance.node') -describe('instantiate the transport', () => { +describe('adapter instantiate the transport', () => { it('create', () => { const ws = new WS() expect(ws).to.exist() }) }) -describe('listen', () => { +describe('adapter listen', () => { describe('ip4', () => { let ws const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') @@ -201,7 +201,7 @@ describe('listen', () => { }) }) -describe('dial', () => { +describe('adapter dial', () => { describe('ip4', () => { let ws let listener @@ -305,7 +305,7 @@ describe('dial', () => { }) }) -describe('filter addrs', () => { +describe('adapter filter addrs', () => { let ws before(() => { @@ -440,7 +440,7 @@ describe('filter addrs', () => { }) }) -describe('valid Connection', () => { +describe('adapter valid Connection', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9092/ws') it('get observed addrs', (done) => { @@ -554,10 +554,3 @@ describe('valid Connection', () => { } }) }) - -describe.skip('turbolence', () => { - it('dialer - emits error on the other end is terminated abruptly', (done) => { - }) - it('listener - emits error on the other end is terminated abruptly', (done) => { - }) -}) diff --git a/test/node.js b/test/node.js index d67852e..6ff4d48 100644 --- a/test/node.js +++ b/test/node.js @@ -7,12 +7,13 @@ const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) const multiaddr = require('multiaddr') -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') +const goodbye = require('it-goodbye') +const { collect, consume } = require('streaming-iterables') +const pipe = require('it-pipe') const WS = require('../src') -require('./compliance.node') +// require('./compliance.node') describe('instantiate the transport', () => { it('create', () => { @@ -180,49 +181,31 @@ describe('dial', () => { let listener const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws') - beforeEach((done) => { + beforeEach(() => { ws = new WS() - listener = ws.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(ma, done) + listener = ws.createListener(conn => pipe(conn, conn)) + return listener.listen(ma) }) - afterEach((done) => { - listener.close(done) - }) + afterEach(() => listener.close()) - it('dial', (done) => { - const conn = ws.dial(ma) + it('dial', async () => { + const conn = await ws.dial(ma) + const s = goodbye({ source: ['hey'], sink: collect }) - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() + const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) + expect(result).to.be.eql(['hey']) }) - it('dial with IPFS Id', (done) => { + it('dial with IPFS Id', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = ws.dial(ma) + const conn = await ws.dial(ma) + const s = goodbye({ source: ['hey'], sink: collect }) - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() + const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) + expect(result).to.be.eql(['hey']) }) }) @@ -231,49 +214,34 @@ describe('dial', () => { let listener const ma = multiaddr('/ip6/::1/tcp/9091') - beforeEach((done) => { + beforeEach(() => { ws = new WS() - listener = ws.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(ma, done) + listener = ws.createListener(conn => pipe(conn, conn)) + return listener.listen(ma) }) - afterEach((done) => { - listener.close(done) - }) + afterEach(() => listener.close()) - it('dial', (done) => { - const conn = ws.dial(ma) + it('dial', async () => { + const conn = await ws.dial(ma) + const s = goodbye({ source: ['hey'], sink: collect }) - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() + const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) + expect(result).to.be.eql(['hey']) }) - it('dial with IPFS Id', (done) => { + it('dial with IPFS Id', async () => { const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = ws.dial(ma) + const conn = await ws.dial(ma) const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - - expect(result).to.be.eql(['hey']) - done() - }) + source: ['hey'], + sink: collect }) - pull(s, conn, s) + const result = await pipe(s, conn, s) + expect(result).to.be.eql(['hey']) }) }) }) @@ -416,121 +384,28 @@ describe('filter addrs', () => { describe('valid Connection', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9092/ws') - it('get observed addrs', (done) => { + it('get observed addrs', async () => { let dialerObsAddrs let listenerObsAddrs const ws = new WS() - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - dialerObsAddrs = addrs - }) - - pull(conn, conn) - }) - - listener.listen(ma, () => { - const conn = ws.dial(ma) - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - listenerObsAddrs = addrs - - listener.close(onClose) - - function onClose () { - expect(listenerObsAddrs[0]).to.deep.equal(ma) - expect(dialerObsAddrs.length).to.equal(0) - done() - } - }) - } - }) - }) - - it('get Peer Info', (done) => { - const ws = new WS() - - const listener = ws.createListener((conn) => { + const listener = ws.createListener(async conn => { expect(conn).to.exist() - - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - }) - - pull(conn, conn) + dialerObsAddrs = await conn.getObservedAddrs() + pipe(conn, conn) }) - listener.listen(ma, () => { - const conn = ws.dial(ma) + await listener.listen(ma) + const conn = await ws.dial(ma) - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) + await pipe([], conn, consume) - function onEnd () { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - listener.close(done) - }) - } - }) - }) + listenerObsAddrs = await conn.getObservedAddrs() - it('set Peer Info', (done) => { - const ws = new WS() + await listener.close() - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - conn.setPeerInfo('a') - - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('a') - }) - - pull(conn, conn) - }) - - listener.listen(ma, onListen) - - function onListen () { - const conn = ws.dial(ma) - conn.setPeerInfo('b') - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('b') - listener.close(done) - }) - } - } - }) -}) - -describe.skip('turbolence', () => { - it('dialer - emits error on the other end is terminated abruptly', (done) => { - }) - it('listener - emits error on the other end is terminated abruptly', (done) => { + expect(listenerObsAddrs[0]).to.deep.equal(ma) + expect(dialerObsAddrs.length).to.equal(0) }) }) From f674122b6f413545a41e50c455a2e2cc00b331b2 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 3 Apr 2019 10:38:35 +0100 Subject: [PATCH 05/16] test: add interface tests License: MIT Signed-off-by: Alan Shaw --- .aegir.js | 11 ++++-- package.json | 2 +- src/listener.js | 4 +- test/adapter/browser.js | 81 +++++++++++++++++++++++++++++++++++++++++ test/adapter/index.js | 2 + test/browser.js | 68 ++++++++++++++-------------------- test/compliance.node.js | 12 +++--- test/node.js | 2 +- 8 files changed, 126 insertions(+), 56 deletions(-) create mode 100644 test/adapter/browser.js diff --git a/.aegir.js b/.aegir.js index f43cfb2..3f672dc 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,21 +1,24 @@ 'use strict' const multiaddr = require('multiaddr') -const pull = require('pull-stream') +const pipe = require('it-pipe') const WS = require('./src') let listener function boot (done) { + console.log('boot!') const ws = new WS() const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') - listener = ws.createListener((conn) => pull(conn, conn)) - listener.listen(ma, done) + listener = ws.createListener(conn => pipe(conn, conn)) + listener.listen(ma).then(() => done()).catch(done) + listener.on('error', console.error) } function shutdown (done) { - listener.close(done) + console.log('shutdown') + listener.close().then(done).catch(done) } module.exports = { diff --git a/package.json b/package.json index c09b80d..0ba2833 100644 --- a/package.json +++ b/package.json @@ -52,7 +52,7 @@ "aegir": "^18.0.3", "chai": "^4.1.2", "dirty-chai": "^2.0.1", - "interface-transport": "~0.3.6", + "interface-transport": "github:libp2p/interface-transport#feat/async-await", "it-goodbye": "^1.0.0", "it-pipe": "^1.0.0", "multiaddr": "^6.0.3", diff --git a/src/listener.js b/src/listener.js index 7b3ed25..5c5422e 100644 --- a/src/listener.js +++ b/src/listener.js @@ -12,10 +12,10 @@ module.exports = (options, handler) => { options = options || {} - const server = createServer(options, socket => { + const server = createServer(options, handler ? socket => { socket.getObservedAddrs = () => [] handler(socket) - }) + } : null) let listeningMultiaddr diff --git a/test/adapter/browser.js b/test/adapter/browser.js new file mode 100644 index 0000000..39cc612 --- /dev/null +++ b/test/adapter/browser.js @@ -0,0 +1,81 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const multiaddr = require('multiaddr') +const pull = require('pull-stream') +const goodbye = require('pull-goodbye') + +const WS = require('../../src/adapter') + +describe('adapter libp2p-websockets', () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') + let ws + let conn + + beforeEach((done) => { + ws = new WS() + expect(ws).to.exist() + conn = ws.dial(ma, (err, res) => { + expect(err).to.not.exist() + done() + }) + }) + + it('echo', (done) => { + const message = 'Hello World!' + + const s = goodbye({ + source: pull.values([message]), + sink: pull.collect((err, results) => { + expect(err).to.not.exist() + expect(results).to.eql([message]) + done() + }) + }) + + pull(s, conn, s) + }) + + describe('stress', () => { + it('one big write', (done) => { + const rawMessage = Buffer.allocUnsafe(1000000).fill('a') + + const s = goodbye({ + source: pull.values([rawMessage]), + sink: pull.collect((err, results) => { + expect(err).to.not.exist() + expect(results).to.eql([rawMessage]) + done() + }) + }) + pull(s, conn, s) + }) + + it('many writes', function (done) { + this.timeout(100000) + const s = goodbye({ + source: pull( + pull.infinite(), + pull.take(20000), + pull.map((val) => Buffer.from(val.toString())) + ), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + expect(result).to.have.length(20000) + done() + }) + }) + + pull(s, conn, s) + }) + }) +}) + +it('.createServer throws in browser', () => { + expect(new WS().createListener).to.throw() +}) diff --git a/test/adapter/index.js b/test/adapter/index.js index 24be09b..a09dc02 100644 --- a/test/adapter/index.js +++ b/test/adapter/index.js @@ -1,2 +1,4 @@ +'use strict' + require('./compliance.node') require('./node') diff --git a/test/browser.js b/test/browser.js index bc4e9db..f627fb3 100644 --- a/test/browser.js +++ b/test/browser.js @@ -7,71 +7,57 @@ const expect = chai.expect chai.use(dirtyChai) const multiaddr = require('multiaddr') -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') +const pipe = require('it-pipe') +const goodbye = require('it-goodbye') +const { collect, take } = require('streaming-iterables') const WS = require('../src') +// require('./adapter/browser') + describe('libp2p-websockets', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') let ws let conn - beforeEach((done) => { + beforeEach(async () => { ws = new WS() - expect(ws).to.exist() - conn = ws.dial(ma, (err, res) => { - expect(err).to.not.exist() - done() - }) + conn = await ws.dial(ma) }) - it('echo', (done) => { + it('echo', async () => { const message = 'Hello World!' + const s = goodbye({ source: [message], sink: collect }) - const s = goodbye({ - source: pull.values([message]), - sink: pull.collect((err, results) => { - expect(err).to.not.exist() - expect(results).to.eql([message]) - done() - }) - }) - - pull(s, conn, s) + const results = await pipe(s, conn, s) + expect(results).to.eql([message]) }) describe('stress', () => { - it('one big write', (done) => { + it('one big write', async () => { const rawMessage = Buffer.allocUnsafe(1000000).fill('a') - const s = goodbye({ - source: pull.values([rawMessage]), - sink: pull.collect((err, results) => { - expect(err).to.not.exist() - expect(results).to.eql([rawMessage]) - done() - }) - }) - pull(s, conn, s) + const s = goodbye({ source: [rawMessage], sink: collect }) + + const results = await pipe(s, conn, s) + expect(results).to.eql([rawMessage]) }) - it('many writes', function (done) { - this.timeout(10000) + it('many writes', async function () { + this.timeout(100000) const s = goodbye({ - source: pull( - pull.infinite(), - pull.take(1000), - pull.map((val) => Buffer.from(val.toString())) + source: pipe( + { + [Symbol.iterator] () { return this }, + next: () => ({ done: false, value: Buffer.from(Math.random().toString()) }) + }, + take(20000) ), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - expect(result).to.have.length(1000) - done() - }) + sink: collect }) - pull(s, conn, s) + const result = await pipe(s, conn, s) + expect(result).to.have.length(20000) }) }) }) diff --git a/test/compliance.node.js b/test/compliance.node.js index 3f31f97..e625142 100644 --- a/test/compliance.node.js +++ b/test/compliance.node.js @@ -5,20 +5,18 @@ const tests = require('interface-transport') const multiaddr = require('multiaddr') const WS = require('../src') -describe('compliance', () => { +describe('adapter compliance', () => { tests({ - setup (callback) { - let ws = new WS() + async setup () { + const ws = new WS() const addrs = [ multiaddr('/ip4/127.0.0.1/tcp/9091/ws'), multiaddr('/ip4/127.0.0.1/tcp/9092/wss'), multiaddr('/dns4/ipfs.io/tcp/9092/ws'), multiaddr('/dns4/ipfs.io/tcp/9092/wss') ] - callback(null, ws, addrs) + return { transport: ws, addrs } }, - teardown (callback) { - callback() - } + async teardown () {} }) }) diff --git a/test/node.js b/test/node.js index 6ff4d48..95add5c 100644 --- a/test/node.js +++ b/test/node.js @@ -13,7 +13,7 @@ const pipe = require('it-pipe') const WS = require('../src') -// require('./compliance.node') +require('./compliance.node') describe('instantiate the transport', () => { it('create', () => { From 24d0a6132a994ab5631f8c39c6912b395172e785 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 17 Apr 2019 12:34:56 +0100 Subject: [PATCH 06/16] fix: tests License: MIT Signed-off-by: Alan Shaw --- .aegir.js | 3 --- package.json | 2 +- src/index.js | 2 +- test/adapter/browser.js | 6 +++--- test/browser.js | 10 +++++----- test/node.js | 8 ++++---- 6 files changed, 14 insertions(+), 17 deletions(-) diff --git a/.aegir.js b/.aegir.js index 3f672dc..ddf424a 100644 --- a/.aegir.js +++ b/.aegir.js @@ -2,13 +2,11 @@ const multiaddr = require('multiaddr') const pipe = require('it-pipe') - const WS = require('./src') let listener function boot (done) { - console.log('boot!') const ws = new WS() const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') listener = ws.createListener(conn => pipe(conn, conn)) @@ -17,7 +15,6 @@ function boot (done) { } function shutdown (done) { - console.log('shutdown') listener.close().then(done).catch(done) } diff --git a/package.json b/package.json index 0ba2833..4c25573 100644 --- a/package.json +++ b/package.json @@ -53,7 +53,7 @@ "chai": "^4.1.2", "dirty-chai": "^2.0.1", "interface-transport": "github:libp2p/interface-transport#feat/async-await", - "it-goodbye": "^1.0.0", + "it-goodbye": "^2.0.0", "it-pipe": "^1.0.0", "multiaddr": "^6.0.3", "pull-goodbye": "0.0.2", diff --git a/src/index.js b/src/index.js index c1f620d..ab4bbf9 100644 --- a/src/index.js +++ b/src/index.js @@ -11,7 +11,7 @@ const createListener = require('./listener') class WebSockets { async dial (ma, options) { log('dialing %s', ma) - const socket = connect(toUri(ma), { binary: true }) + const socket = connect(toUri(ma), Object.assign({ binary: true }, options)) await socket.connected() socket.getObservedAddrs = () => [ma] log('connected %s', ma) diff --git a/test/adapter/browser.js b/test/adapter/browser.js index 39cc612..52a1513 100644 --- a/test/adapter/browser.js +++ b/test/adapter/browser.js @@ -74,8 +74,8 @@ describe('adapter libp2p-websockets', () => { pull(s, conn, s) }) }) -}) -it('.createServer throws in browser', () => { - expect(new WS().createListener).to.throw() + it('.createServer throws in browser', () => { + expect(new WS().createListener).to.throw() + }) }) diff --git a/test/browser.js b/test/browser.js index f627fb3..78134c7 100644 --- a/test/browser.js +++ b/test/browser.js @@ -13,7 +13,7 @@ const { collect, take } = require('streaming-iterables') const WS = require('../src') -// require('./adapter/browser') +require('./adapter/browser') describe('libp2p-websockets', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') @@ -26,7 +26,7 @@ describe('libp2p-websockets', () => { }) it('echo', async () => { - const message = 'Hello World!' + const message = Buffer.from('Hello World!') const s = goodbye({ source: [message], sink: collect }) const results = await pipe(s, conn, s) @@ -60,8 +60,8 @@ describe('libp2p-websockets', () => { expect(result).to.have.length(20000) }) }) -}) -it('.createServer throws in browser', () => { - expect(new WS().createListener).to.throw() + it('.createServer throws in browser', () => { + expect(new WS().createListener).to.throw() + }) }) diff --git a/test/node.js b/test/node.js index 95add5c..b7fe695 100644 --- a/test/node.js +++ b/test/node.js @@ -195,7 +195,7 @@ describe('dial', () => { const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) + expect(result).to.be.eql([Buffer.from('hey')]) }) it('dial with IPFS Id', async () => { @@ -205,7 +205,7 @@ describe('dial', () => { const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) + expect(result).to.be.eql([Buffer.from('hey')]) }) }) @@ -228,7 +228,7 @@ describe('dial', () => { const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) + expect(result).to.be.eql([Buffer.from('hey')]) }) it('dial with IPFS Id', async () => { @@ -241,7 +241,7 @@ describe('dial', () => { }) const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) + expect(result).to.be.eql([Buffer.from('hey')]) }) }) }) From 16a6b55960846a3e78eb4666f1aee41e8647909c Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 09:52:49 +0100 Subject: [PATCH 07/16] feat: abortable dials License: MIT Signed-off-by: Alan Shaw --- package.json | 2 ++ src/index.js | 50 ++++++++++++++++++++++++++++++++++++++++++++++---- test/node.js | 44 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index 4c25573..17b5fb7 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { + "abortable-iterator": "^1.0.4", "async-iterator-to-pull-stream": "^1.3.0", "class-is": "^1.1.0", "debug": "^4.1.1", @@ -49,6 +50,7 @@ "multiaddr-to-uri": "^4.0.1" }, "devDependencies": { + "abort-controller": "^3.0.0", "aegir": "^18.0.3", "chai": "^4.1.2", "dirty-chai": "^2.0.1", diff --git a/src/index.js b/src/index.js index ab4bbf9..16110e3 100644 --- a/src/index.js +++ b/src/index.js @@ -5,17 +5,59 @@ const mafmt = require('mafmt') const withIs = require('class-is') const toUri = require('multiaddr-to-uri') const log = require('debug')('libp2p:websockets:transport') - +const abortable = require('abortable-iterator') const createListener = require('./listener') +const { AbortError } = abortable class WebSockets { async dial (ma, options) { + options = options || {} log('dialing %s', ma) + const socket = connect(toUri(ma), Object.assign({ binary: true }, options)) - await socket.connected() - socket.getObservedAddrs = () => [ma] + const getObservedAddrs = () => [ma] + + if (!options.signal) { + socket.getObservedAddrs = getObservedAddrs + await socket.connected() + log('connected %s', ma) + return socket + } + + // Allow abort via signal during connect + let onAbort + const abort = new Promise((resolve, reject) => { + onAbort = () => { + socket.close() + reject(new AbortError('connection aborted')) + } + + // Already aborted? + if (options.signal.aborted) return onAbort() + options.signal.addEventListener('abort', onAbort) + }) + + try { + await Promise.race([abort, socket.connected()]) + } finally { + options.signal.removeEventListener('abort', onAbort) + } + log('connected %s', ma) - return socket + return { + sink: async source => { + try { + await socket.sink(abortable(source, options.signal)) + } catch (err) { + // Re-throw non-aborted errors + if (err.type !== 'aborted') throw err + // Otherwise, this is fine... + await socket.close() + } + }, + source: abortable(socket.source, options.signal), + getObservedAddrs + } } createListener (options, handler) { diff --git a/test/node.js b/test/node.js index b7fe695..14574a0 100644 --- a/test/node.js +++ b/test/node.js @@ -10,6 +10,7 @@ const multiaddr = require('multiaddr') const goodbye = require('it-goodbye') const { collect, consume } = require('streaming-iterables') const pipe = require('it-pipe') +const AbortController = require('abort-controller') const WS = require('../src') @@ -207,6 +208,49 @@ describe('dial', () => { expect(result).to.be.eql([Buffer.from('hey')]) }) + + it('should be abortable after connect', async () => { + const controller = new AbortController() + const conn = await ws.dial(ma, { signal: controller.signal }) + const s = goodbye({ + source: { + [Symbol.asyncIterator] () { + return this + }, + next () { + return new Promise(resolve => { + setTimeout(() => resolve(Math.random()), 1000) + }) + } + }, + sink: consume + }) + + setTimeout(() => controller.abort(), 500) + + try { + await pipe(s, conn, s) + } catch (err) { + expect(err.type).to.equal('aborted') + return + } + + throw new Error('connection was not aborted') + }) + + it('should be abortable before connect', async () => { + const controller = new AbortController() + controller.abort() // Abort before connect + + try { + await ws.dial(ma, { signal: controller.signal }) + } catch (err) { + expect(err.type).to.equal('aborted') + return + } + + throw new Error('connection was not aborted') + }) }) describe('ip6', () => { From bdfcaafb1367ecde9526f84ee61cfc86b0496a0f Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 10:22:24 +0100 Subject: [PATCH 08/16] chore: update interface-transport dependency Async await changes merged but not released :( License: MIT Signed-off-by: Alan Shaw --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index fe9dcd1..ba97487 100644 --- a/package.json +++ b/package.json @@ -54,7 +54,7 @@ "aegir": "^18.2.1", "chai": "^4.2.0", "dirty-chai": "^2.0.1", - "interface-transport": "github:libp2p/interface-transport#feat/async-await", + "interface-transport": "github:libp2p/interface-transport#master", "it-goodbye": "^2.0.0", "it-pipe": "^1.0.0", "multiaddr": "^6.0.6", From 6451c390e75be01d3783c44a786c993e11a72d6e Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 10:53:59 +0100 Subject: [PATCH 09/16] fix: describe name License: MIT Signed-off-by: Alan Shaw --- test/compliance.node.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/compliance.node.js b/test/compliance.node.js index e625142..18ef713 100644 --- a/test/compliance.node.js +++ b/test/compliance.node.js @@ -5,7 +5,7 @@ const tests = require('interface-transport') const multiaddr = require('multiaddr') const WS = require('../src') -describe('adapter compliance', () => { +describe('compliance', () => { tests({ async setup () { const ws = new WS() From 89ae00a0b8b04c1233e26edcfd34aa40a5b169ba Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 13:31:26 +0100 Subject: [PATCH 10/16] fix: tests License: MIT Signed-off-by: Alan Shaw --- package.json | 2 +- src/index.js | 2 +- src/listener.js | 2 +- test/compliance.node.js | 43 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 45 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index ba97487..cabb62f 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,7 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { - "abortable-iterator": "^1.0.4", + "abortable-iterator": "^2.0.0", "async-iterator-to-pull-stream": "^1.3.0", "class-is": "^1.1.0", "debug": "^4.1.1", diff --git a/src/index.js b/src/index.js index 16110e3..bf93f81 100644 --- a/src/index.js +++ b/src/index.js @@ -28,8 +28,8 @@ class WebSockets { let onAbort const abort = new Promise((resolve, reject) => { onAbort = () => { + reject(new AbortError()) socket.close() - reject(new AbortError('connection aborted')) } // Already aborted? diff --git a/src/listener.js b/src/listener.js index 5c5422e..6ffdda6 100644 --- a/src/listener.js +++ b/src/listener.js @@ -2,7 +2,7 @@ const multiaddr = require('multiaddr') const os = require('os') -const createServer = require('it-ws/server') +const { createServer } = require('it-ws') module.exports = (options, handler) => { if (typeof options === 'function') { diff --git a/test/compliance.node.js b/test/compliance.node.js index 18ef713..13e4605 100644 --- a/test/compliance.node.js +++ b/test/compliance.node.js @@ -3,6 +3,7 @@ const tests = require('interface-transport') const multiaddr = require('multiaddr') +const http = require('http') const WS = require('../src') describe('compliance', () => { @@ -15,7 +16,47 @@ describe('compliance', () => { multiaddr('/dns4/ipfs.io/tcp/9092/ws'), multiaddr('/dns4/ipfs.io/tcp/9092/wss') ] - return { transport: ws, addrs } + + let delayMs = 0 + const delayedCreateListener = (options, handler) => { + if (typeof options === 'function') { + handler = options + options = {} + } + + options = options || {} + + // A server that will delay the upgrade event by delayMs + options.server = new Proxy(http.createServer(), { + get (server, prop) { + if (prop === 'on') { + return (event, handler) => { + server.on(event, (...args) => { + if (event !== 'upgrade' || !delayMs) { + return handler(...args) + } + setTimeout(() => handler(...args), delayMs) + }) + } + } + return server[prop] + } + }) + + return ws.createListener(options, handler) + } + + const wsProxy = new Proxy(ws, { + get: (_, prop) => prop === 'createListener' ? delayedCreateListener : ws[prop] + }) + + // Used by the dial tests to simulate a delayed connect + const connector = { + delay (ms) { delayMs = ms }, + restore () { delayMs = 0 } + } + + return { transport: wsProxy, addrs, connector } }, async teardown () {} }) From e48fee16981315f57be1b5cfdd6fb880cbb2d0ef Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 15:03:10 +0100 Subject: [PATCH 11/16] refactor: use adapter class in interface-transport License: MIT Signed-off-by: Alan Shaw --- package.json | 1 - src/adapter.js | 72 +++----------------------------------------------- 2 files changed, 4 insertions(+), 69 deletions(-) diff --git a/package.json b/package.json index cabb62f..614c52e 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,6 @@ "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { "abortable-iterator": "^2.0.0", - "async-iterator-to-pull-stream": "^1.3.0", "class-is": "^1.1.0", "debug": "^4.1.1", "interface-connection": "~0.3.3", diff --git a/src/adapter.js b/src/adapter.js index 76453bf..c27d009 100644 --- a/src/adapter.js +++ b/src/adapter.js @@ -1,77 +1,13 @@ 'use strict' -const { Connection } = require('interface-connection') +const { Adapter } = require('interface-transport') const withIs = require('class-is') -const toPull = require('async-iterator-to-pull-stream') -const error = require('pull-stream/sources/error') -const drain = require('pull-stream/sinks/drain') const WebSockets = require('./') -const noop = () => {} - -function callbackify (fn) { - return async function (...args) { - let cb = args.pop() - if (typeof cb !== 'function') { - args.push(cb) - cb = noop - } - let res - try { - res = await fn(...args) - } catch (err) { - return cb(err) - } - cb(null, res) - } -} // Legacy adapter to old transport & connection interface -class WebSocketsAdapter extends WebSockets { - dial (ma, options, callback) { - if (typeof options === 'function') { - callback = options - options = {} - } - - callback = callback || noop - - const conn = new Connection() - - super.dial(ma, options) - .then(socket => { - conn.setInnerConn(toPull.duplex(socket)) - conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) - conn.close = callbackify(socket.close.bind(socket)) - callback(null, conn) - }) - .catch(err => { - conn.setInnerConn({ sink: drain(), source: error(err) }) - callback(err) - }) - - return conn - } - - createListener (options, handler) { - if (typeof options === 'function') { - handler = options - options = {} - } - - const server = super.createListener(options, socket => { - const conn = new Connection(toPull.duplex(socket)) - conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) - handler(conn) - }) - - const proxy = { - listen: callbackify(server.listen.bind(server)), - close: callbackify(server.close.bind(server)), - getAddrs: callbackify(server.getAddrs.bind(server)), - getObservedAddrs: callbackify(() => server.getObservedAddrs()) - } - - return new Proxy(server, { get: (_, prop) => proxy[prop] || server[prop] }) +class WebSocketsAdapter extends Adapter { + constructor () { + super(new WebSockets()) } } From dbcd1ca723a2f883a6e0fe5e561d5c6282921236 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 15:08:17 +0100 Subject: [PATCH 12/16] fix: remove async from non-async function License: MIT Signed-off-by: Alan Shaw --- src/listener.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/listener.js b/src/listener.js index 6ffdda6..5e01ac8 100644 --- a/src/listener.js +++ b/src/listener.js @@ -30,7 +30,7 @@ module.exports = (options, handler) => { return listen(ma.toOptions()) } - server.getAddrs = async () => { + server.getAddrs = () => { const multiaddrs = [] const address = server.address() From 4d499935aa87c87e02a304227ddcb7353ef46c7f Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 15:27:15 +0100 Subject: [PATCH 13/16] chore: cleanup License: MIT Signed-off-by: Alan Shaw --- .gitignore | 2 +- ci/Jenkinsfile | 2 -- package.json | 3 +-- 3 files changed, 2 insertions(+), 5 deletions(-) delete mode 100644 ci/Jenkinsfile diff --git a/.gitignore b/.gitignore index c2b6311..f338286 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ lib-cov # Coverage directory used by tools like istanbul coverage +.nyc_output # Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) .grunt @@ -40,4 +41,3 @@ node_modules *.swp dist - diff --git a/ci/Jenkinsfile b/ci/Jenkinsfile deleted file mode 100644 index a7da2e5..0000000 --- a/ci/Jenkinsfile +++ /dev/null @@ -1,2 +0,0 @@ -// Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories. -javascript() diff --git a/package.json b/package.json index 614c52e..e142827 100644 --- a/package.json +++ b/package.json @@ -13,8 +13,7 @@ "release": "aegir release -t node -t browser ", "release-minor": "aegir release --type minor -t node -t browser", "release-major": "aegir release --type major -t node -t browser", - "coverage": "aegir coverage", - "coverage-publish": "aegir coverage --provider coveralls" + "coverage": "nyc --reporter=lcov --reporter=text npm run test:node" }, "browser": { "src/listener": "./src/listener.browser.js" From 01cd1fd9ea4e33fe0c83db5b9babc0160e148037 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 15:28:46 +0100 Subject: [PATCH 14/16] chore: remove old interface-transport tests License: MIT Signed-off-by: Alan Shaw --- package.json | 3 - test/adapter/browser.js | 81 ---- test/adapter/compliance.node.js | 24 -- test/adapter/compliance/dial-test.js | 73 ---- test/adapter/compliance/index.js | 12 - test/adapter/compliance/listen-test.js | 124 ------ test/adapter/index.js | 4 - test/adapter/node.js | 556 ------------------------- test/browser.js | 2 - 9 files changed, 879 deletions(-) delete mode 100644 test/adapter/browser.js delete mode 100644 test/adapter/compliance.node.js delete mode 100644 test/adapter/compliance/dial-test.js delete mode 100644 test/adapter/compliance/index.js delete mode 100644 test/adapter/compliance/listen-test.js delete mode 100644 test/adapter/index.js delete mode 100644 test/adapter/node.js diff --git a/package.json b/package.json index e142827..912c8d9 100644 --- a/package.json +++ b/package.json @@ -56,9 +56,6 @@ "it-goodbye": "^2.0.0", "it-pipe": "^1.0.0", "multiaddr": "^6.0.6", - "pull-goodbye": "0.0.2", - "pull-serializer": "~0.3.2", - "pull-stream": "^3.6.9", "streaming-iterables": "^4.0.2" }, "contributors": [ diff --git a/test/adapter/browser.js b/test/adapter/browser.js deleted file mode 100644 index 52a1513..0000000 --- a/test/adapter/browser.js +++ /dev/null @@ -1,81 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const multiaddr = require('multiaddr') -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') - -const WS = require('../../src/adapter') - -describe('adapter libp2p-websockets', () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') - let ws - let conn - - beforeEach((done) => { - ws = new WS() - expect(ws).to.exist() - conn = ws.dial(ma, (err, res) => { - expect(err).to.not.exist() - done() - }) - }) - - it('echo', (done) => { - const message = 'Hello World!' - - const s = goodbye({ - source: pull.values([message]), - sink: pull.collect((err, results) => { - expect(err).to.not.exist() - expect(results).to.eql([message]) - done() - }) - }) - - pull(s, conn, s) - }) - - describe('stress', () => { - it('one big write', (done) => { - const rawMessage = Buffer.allocUnsafe(1000000).fill('a') - - const s = goodbye({ - source: pull.values([rawMessage]), - sink: pull.collect((err, results) => { - expect(err).to.not.exist() - expect(results).to.eql([rawMessage]) - done() - }) - }) - pull(s, conn, s) - }) - - it('many writes', function (done) { - this.timeout(100000) - const s = goodbye({ - source: pull( - pull.infinite(), - pull.take(20000), - pull.map((val) => Buffer.from(val.toString())) - ), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - expect(result).to.have.length(20000) - done() - }) - }) - - pull(s, conn, s) - }) - }) - - it('.createServer throws in browser', () => { - expect(new WS().createListener).to.throw() - }) -}) diff --git a/test/adapter/compliance.node.js b/test/adapter/compliance.node.js deleted file mode 100644 index b309240..0000000 --- a/test/adapter/compliance.node.js +++ /dev/null @@ -1,24 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const tests = require('./compliance') -const multiaddr = require('multiaddr') -const WS = require('../../src/adapter') - -describe('adapter compliance', () => { - tests({ - setup (callback) { - let ws = new WS() - const addrs = [ - multiaddr('/ip4/127.0.0.1/tcp/9091/ws'), - multiaddr('/ip4/127.0.0.1/tcp/9092/wss'), - multiaddr('/dns4/ipfs.io/tcp/9092/ws'), - multiaddr('/dns4/ipfs.io/tcp/9092/wss') - ] - callback(null, ws, addrs) - }, - teardown (callback) { - callback() - } - }) -}) diff --git a/test/adapter/compliance/dial-test.js b/test/adapter/compliance/dial-test.js deleted file mode 100644 index 85c1a6c..0000000 --- a/test/adapter/compliance/dial-test.js +++ /dev/null @@ -1,73 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') -const serializer = require('pull-serializer') - -module.exports = (common) => { - describe('dial', () => { - let addrs - let transport - let listener - - before((done) => { - common.setup((err, _transport, _addrs) => { - if (err) return done(err) - transport = _transport - addrs = _addrs - done() - }) - }) - - after((done) => { - common.teardown(done) - }) - - beforeEach((done) => { - listener = transport.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(addrs[0], done) - }) - - afterEach((done) => { - listener.close(done) - }) - - it('simple', (done) => { - const s = serializer(goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, values) => { - expect(err).to.not.exist() - expect( - values - ).to.be.eql( - ['hey'] - ) - done() - }) - })) - - pull( - s, - transport.dial(addrs[0]), - s - ) - }) - - it('to non existent listener', (done) => { - pull( - transport.dial(addrs[1]), - pull.onEnd((err) => { - expect(err).to.exist() - done() - }) - ) - }) - }) -} diff --git a/test/adapter/compliance/index.js b/test/adapter/compliance/index.js deleted file mode 100644 index e8173e2..0000000 --- a/test/adapter/compliance/index.js +++ /dev/null @@ -1,12 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const dial = require('./dial-test') -const listen = require('./listen-test') - -module.exports = (common) => { - describe('interface-transport', () => { - dial(common) - listen(common) - }) -} diff --git a/test/adapter/compliance/listen-test.js b/test/adapter/compliance/listen-test.js deleted file mode 100644 index 082361a..0000000 --- a/test/adapter/compliance/listen-test.js +++ /dev/null @@ -1,124 +0,0 @@ -/* eslint max-nested-callbacks: ["error", 8] */ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const pull = require('pull-stream') - -module.exports = (common) => { - describe('listen', () => { - let addrs - let transport - - before((done) => { - common.setup((err, _transport, _addrs) => { - if (err) return done(err) - transport = _transport - addrs = _addrs - done() - }) - }) - - after((done) => { - common.teardown(done) - }) - - it('simple', (done) => { - const listener = transport.createListener((conn) => {}) - listener.listen(addrs[0], () => { - listener.close(done) - }) - }) - - it('close listener with connections, through timeout', (done) => { - const finish = plan(3, done) - const listener = transport.createListener((conn) => { - pull(conn, conn) - }) - - listener.listen(addrs[0], () => { - const socket1 = transport.dial(addrs[0], () => { - listener.close(finish) - }) - - pull( - transport.dial(addrs[0]), - pull.onEnd(() => { - finish() - }) - ) - - pull( - pull.values([Buffer.from('Some data that is never handled')]), - socket1, - pull.onEnd(() => { - finish() - }) - ) - }) - }) - - describe('events', () => { - // eslint-disable-next-line - // TODO: figure out why it fails in the full test suite - it.skip('connection', (done) => { - const finish = plan(2, done) - - const listener = transport.createListener() - - listener.on('connection', (conn) => { - expect(conn).to.exist() - finish() - }) - - listener.listen(addrs[0], () => { - transport.dial(addrs[0], () => { - listener.close(finish) - }) - }) - }) - - it('listening', (done) => { - const listener = transport.createListener() - listener.on('listening', () => { - listener.close(done) - }) - listener.listen(addrs[0]) - }) - - // eslint-disable-next-line - // TODO: how to get the listener to emit an error? - it.skip('error', (done) => { - const listener = transport.createListener() - listener.on('error', (err) => { - expect(err).to.exist() - listener.close(done) - }) - }) - - it('close', (done) => { - const finish = plan(2, done) - const listener = transport.createListener() - listener.on('close', finish) - - listener.listen(addrs[0], () => { - listener.close(finish) - }) - }) - }) - }) -} - -function plan (n, done) { - let i = 0 - return (err) => { - if (err) return done(err) - i++ - - if (i === n) done() - } -} diff --git a/test/adapter/index.js b/test/adapter/index.js deleted file mode 100644 index a09dc02..0000000 --- a/test/adapter/index.js +++ /dev/null @@ -1,4 +0,0 @@ -'use strict' - -require('./compliance.node') -require('./node') diff --git a/test/adapter/node.js b/test/adapter/node.js deleted file mode 100644 index 9441cbd..0000000 --- a/test/adapter/node.js +++ /dev/null @@ -1,556 +0,0 @@ -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 6] */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const multiaddr = require('multiaddr') -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') - -const WS = require('../../src/adapter') - -require('./compliance.node') - -describe('adapter instantiate the transport', () => { - it('create', () => { - const ws = new WS() - expect(ws).to.exist() - }) -}) - -describe('adapter listen', () => { - describe('ip4', () => { - let ws - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - - beforeEach(() => { - ws = new WS() - }) - - it('listen, check for callback', (done) => { - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) - }) - - it('listen, check for listening event', (done) => { - const listener = ws.createListener((conn) => { }) - - listener.on('listening', () => { - listener.close(done) - }) - - listener.listen(ma) - }) - - it('listen, check for the close event', (done) => { - const listener = ws.createListener((conn) => { }) - - listener.on('listening', () => { - listener.on('close', done) - listener.close() - }) - - listener.listen(ma) - }) - - it('listen on addr with /ipfs/QmHASH', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) - }) - - it.skip('close listener with connections, through timeout', (done) => { - // TODO `ws` closes all anyway, we need to make it not close - // first - https://github.com/diasdavid/simple-websocket-server - }) - - it.skip('listen on port 0', (done) => { - // TODO port 0 not supported yet - }) - - it.skip('listen on any Interface', (done) => { - // TODO 0.0.0.0 not supported yet - }) - - it('getAddrs', (done) => { - const listener = ws.createListener((conn) => { - }) - listener.listen(ma, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs[0]).to.deep.equal(ma) - listener.close(done) - }) - }) - }) - - it('getAddrs on port 0 listen', (done) => { - const addr = multiaddr(`/ip4/127.0.0.1/tcp/0/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') - listener.close(done) - }) - }) - }) - - it('getAddrs from listening on 0.0.0.0', (done) => { - const addr = multiaddr(`/ip4/0.0.0.0/tcp/9003/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') - listener.close(done) - }) - }) - }) - - it('getAddrs from listening on 0.0.0.0 and port 0', (done) => { - const addr = multiaddr(`/ip4/0.0.0.0/tcp/0/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') - expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') - listener.close(done) - }) - }) - }) - - it('getAddrs preserves IPFS Id', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs[0]).to.deep.equal(ma) - listener.close(done) - }) - }) - }) - }) - - describe('ip6', () => { - let ws - const ma = multiaddr('/ip6/::1/tcp/9091/ws') - - beforeEach(() => { - ws = new WS() - }) - - it('listen, check for callback', (done) => { - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) - }) - - it('listen, check for listening event', (done) => { - const listener = ws.createListener((conn) => { }) - - listener.on('listening', () => { - listener.close(done) - }) - - listener.listen(ma) - }) - - it('listen, check for the close event', (done) => { - const listener = ws.createListener((conn) => { }) - - listener.on('listening', () => { - listener.on('close', done) - listener.close() - }) - - listener.listen(ma) - }) - - it('listen on addr with /ipfs/QmHASH', (done) => { - const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) - }) - }) -}) - -describe('adapter dial', () => { - describe('ip4', () => { - let ws - let listener - const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws') - - beforeEach((done) => { - ws = new WS() - listener = ws.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(ma, done) - }) - - afterEach((done) => { - listener.close(done) - }) - - it('dial', (done) => { - const conn = ws.dial(ma) - - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) - }) - - it('dial with IPFS Id', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = ws.dial(ma) - - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) - }) - }) - - describe('ip6', () => { - let ws - let listener - const ma = multiaddr('/ip6/::1/tcp/9091') - - beforeEach((done) => { - ws = new WS() - listener = ws.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(ma, done) - }) - - afterEach((done) => { - listener.close(done) - }) - - it('dial', (done) => { - const conn = ws.dial(ma) - - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) - }) - - it('dial with IPFS Id', (done) => { - const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = ws.dial(ma) - - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) - }) - }) -}) - -describe('adapter filter addrs', () => { - let ws - - before(() => { - ws = new WS() - }) - - describe('filter valid addrs for this transport', function () { - it('should fail invalid WS addresses', function () { - const ma1 = multiaddr('/ip4/127.0.0.1/tcp/9090') - const ma2 = multiaddr('/ip4/127.0.0.1/udp/9090') - const ma3 = multiaddr('/ip6/::1/tcp/80') - const ma4 = multiaddr('/dnsaddr/ipfs.io/tcp/80') - - const valid = ws.filter([ma1, ma2, ma3, ma4]) - expect(valid.length).to.equal(0) - }) - - it('should filter correct ipv4 addresses', function () { - const ma1 = multiaddr('/ip4/127.0.0.1/tcp/80/ws') - const ma2 = multiaddr('/ip4/127.0.0.1/tcp/443/wss') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct ipv4 addresses with ipfs id', function () { - const ma1 = multiaddr('/ip4/127.0.0.1/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const ma2 = multiaddr('/ip4/127.0.0.1/tcp/80/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct ipv6 address', function () { - const ma1 = multiaddr('/ip6/::1/tcp/80/ws') - const ma2 = multiaddr('/ip6/::1/tcp/443/wss') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct ipv6 addresses with ipfs id', function () { - const ma1 = multiaddr('/ip6/::1/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const ma2 = multiaddr('/ip6/::1/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct dns address', function () { - const ma1 = multiaddr('/dnsaddr/ipfs.io/ws') - const ma2 = multiaddr('/dnsaddr/ipfs.io/tcp/80/ws') - const ma3 = multiaddr('/dnsaddr/ipfs.io/tcp/80/wss') - - const valid = ws.filter([ma1, ma2, ma3]) - expect(valid.length).to.equal(3) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - expect(valid[2]).to.deep.equal(ma3) - }) - - it('should filter correct dns address with ipfs id', function () { - const ma1 = multiaddr('/dnsaddr/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const ma2 = multiaddr('/dnsaddr/ipfs.io/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct dns4 address', function () { - const ma1 = multiaddr('/dns4/ipfs.io/tcp/80/ws') - const ma2 = multiaddr('/dns4/ipfs.io/tcp/443/wss') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct dns6 address', function () { - const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws') - const ma2 = multiaddr('/dns6/ipfs.io/tcp/443/wss') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter correct dns6 address with ipfs id', function () { - const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const ma2 = multiaddr('/dns6/ipfs.io/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const valid = ws.filter([ma1, ma2]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma2) - }) - - it('should filter mixed addresses', function () { - const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const ma2 = multiaddr('/ip4/127.0.0.1/tcp/9090') - const ma3 = multiaddr('/ip4/127.0.0.1/udp/9090') - const ma4 = multiaddr('/dns6/ipfs.io/ws') - const mh5 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw' + - '/p2p-circuit/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const valid = ws.filter([ma1, ma2, ma3, ma4, mh5]) - expect(valid.length).to.equal(2) - expect(valid[0]).to.deep.equal(ma1) - expect(valid[1]).to.deep.equal(ma4) - }) - }) - - it('filter a single addr for this transport', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - - const valid = ws.filter(ma) - expect(valid.length).to.equal(1) - expect(valid[0]).to.deep.equal(ma) - done() - }) -}) - -describe('adapter valid Connection', () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9092/ws') - - it('get observed addrs', (done) => { - let dialerObsAddrs - let listenerObsAddrs - - const ws = new WS() - - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - dialerObsAddrs = addrs - }) - - pull(conn, conn) - }) - - listener.listen(ma, () => { - const conn = ws.dial(ma) - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - listenerObsAddrs = addrs - - listener.close(onClose) - - function onClose () { - expect(listenerObsAddrs[0]).to.deep.equal(ma) - expect(dialerObsAddrs.length).to.equal(0) - done() - } - }) - } - }) - }) - - it('get Peer Info', (done) => { - const ws = new WS() - - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - }) - - pull(conn, conn) - }) - - listener.listen(ma, () => { - const conn = ws.dial(ma) - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - listener.close(done) - }) - } - }) - }) - - it('set Peer Info', (done) => { - const ws = new WS() - - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - conn.setPeerInfo('a') - - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('a') - }) - - pull(conn, conn) - }) - - listener.listen(ma, onListen) - - function onListen () { - const conn = ws.dial(ma) - conn.setPeerInfo('b') - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('b') - listener.close(done) - }) - } - } - }) -}) diff --git a/test/browser.js b/test/browser.js index 78134c7..c3f1ff5 100644 --- a/test/browser.js +++ b/test/browser.js @@ -13,8 +13,6 @@ const { collect, take } = require('streaming-iterables') const WS = require('../src') -require('./adapter/browser') - describe('libp2p-websockets', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') let ws From 22a75f299bd62beb0367a54733b0d36038c7b086 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 15:30:53 +0100 Subject: [PATCH 15/16] refactor: remove test covered by interface-transport tests License: MIT Signed-off-by: Alan Shaw --- test/node.js | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/test/node.js b/test/node.js index 48b892a..abd1f15 100644 --- a/test/node.js +++ b/test/node.js @@ -237,20 +237,6 @@ describe('dial', () => { throw new Error('connection was not aborted') }) - - it('should be abortable before connect', async () => { - const controller = new AbortController() - controller.abort() // Abort before connect - - try { - await ws.dial(ma, { signal: controller.signal }) - } catch (err) { - expect(err.type).to.equal('aborted') - return - } - - throw new Error('connection was not aborted') - }) }) // TODO: https://github.com/libp2p/js-libp2p-websockets/issues/84 From a0e886c8f4cf48c3def4725ab4ec6bbb4722de05 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 15:42:17 +0100 Subject: [PATCH 16/16] refactor: use abort error from interface-transport License: MIT Signed-off-by: Alan Shaw --- src/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index.js b/src/index.js index bf93f81..26de44a 100644 --- a/src/index.js +++ b/src/index.js @@ -6,8 +6,8 @@ const withIs = require('class-is') const toUri = require('multiaddr-to-uri') const log = require('debug')('libp2p:websockets:transport') const abortable = require('abortable-iterator') +const { AbortError } = require('interface-transport') const createListener = require('./listener') -const { AbortError } = abortable class WebSockets { async dial (ma, options) {