From 98e1ef67b3bdd9c03f45c02e7b350bc92fe3e3ba Mon Sep 17 00:00:00 2001 From: David Dias Date: Sun, 19 Jun 2016 06:35:31 +0100 Subject: [PATCH] follow new transport and connection spec --- README.md | 10 +- gulpfile.js | 17 +-- package.json | 14 ++- src/index.js | 140 ++++++++++++++++------ test/browser.js | 13 +- test/node.js | 308 +++++++++++++++++++++++++++++++++++++++--------- 6 files changed, 378 insertions(+), 124 deletions(-) diff --git a/README.md b/README.md index 90efb87..bf4ac86 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,10 @@ js-libp2p-websockets [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) ![](https://img.shields.io/badge/coverage-%3F-yellow.svg?style=flat-square) -[![Dependency Status](https://david-dm.org/diasdavid/js-libp2p-websockets.svg?style=flat-square)](https://david-dm.org/diasdavid/js-libp2p-websockets) +[![Dependency Status](https://david-dm.org/libp2p/js-libp2p-websockets.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-websockets) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard) -![](https://raw.githubusercontent.com/diasdavid/interace-connection/master/img/badge.png) -![](https://raw.githubusercontent.com/diasdavid/interface-transport/master/img/badge.png) - -> JavaScript implementation of the WebSockets module that libp2p uses and that implements the interface-transport interface - +![](https://raw.githubusercontent.com/libp2p/interace-connection/master/img/badge.png) +![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png) +> JavaScript implementation of the WebSockets module that libp2p uses and that implements the interface-transport interface diff --git a/gulpfile.js b/gulpfile.js index 89103b8..28de380 100644 --- a/gulpfile.js +++ b/gulpfile.js @@ -2,20 +2,21 @@ const gulp = require('gulp') const multiaddr = require('multiaddr') -const WSlibp2p = require('./src') +const WS = require('./src') -let ws +let listener gulp.task('test:browser:before', (done) => { - ws = new WSlibp2p() - const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - ws.createListener(mh, (socket) => { - socket.pipe(socket) - }, done) + const ws = new WS() + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') + listener = ws.createListener((conn) => { + conn.pipe(conn) + }) + listener.listen(ma, done) }) gulp.task('test:browser:after', (done) => { - ws.close(done) + listener.close(done) }) require('aegir/gulp')(gulp) diff --git a/package.json b/package.json index a76a8e7..0d5821d 100644 --- a/package.json +++ b/package.json @@ -34,18 +34,20 @@ }, "homepage": "https://github.com/diasdavid/js-libp2p-websockets#readme", "dependencies": { + "detect-node": "^2.0.3", + "interface-connection": "^0.1.3", "lodash.contains": "^2.4.3", "mafmt": "^2.1.0", - "multiaddr": "^2.0.2", "run-parallel": "^1.1.6", - "simple-websocket": "github:diasdavid/simple-websocket#ec31437" + "simple-websocket": "^4.1.0", + "simple-websocket-server": "^0.1.4" }, "devDependencies": { + "aegir": "^3.2.0", + "multiaddr": "^2.0.2", "chai": "^3.5.0", - "aegir": "^3.0.1", "gulp": "^3.9.1", - "interface-connection": "0.0.3", - "interface-transport": "^0.1.1", + "interface-transport": "^0.2.0", "pre-commit": "^1.1.2" }, "contributors": [ @@ -53,4 +55,4 @@ "Francisco Baio Dias ", "Friedel Ziegelmayer " ] -} \ No newline at end of file +} diff --git a/src/index.js b/src/index.js index c808cf5..a663e6e 100644 --- a/src/index.js +++ b/src/index.js @@ -2,10 +2,20 @@ const debug = require('debug') const log = debug('libp2p:websockets') -const SWS = require('simple-websocket') +const SW = require('simple-websocket') +const isNode = require('detect-node') +let SWS +if (isNode) { + SWS = require('simple-websocket-server') +} else { + SWS = {} +} const mafmt = require('mafmt') -const parallel = require('run-parallel') const contains = require('lodash.contains') +const Connection = require('interface-connection').Connection + +const CLOSE_TIMEOUT = 2000 +// const IPFS_CODE = 421 exports = module.exports = WebSockets @@ -14,66 +24,118 @@ function WebSockets () { return new WebSockets() } - const listeners = [] - - this.dial = function (multiaddr, options) { - if (!options) { + this.dial = function (ma, options, callback) { + if (typeof options === 'function') { + callback = options options = {} } - options.ready = options.ready || function noop () {} - const maOpts = multiaddr.toOptions() - const conn = new SWS('ws://' + maOpts.host + ':' + maOpts.port) - conn.on('connect', options.ready) - conn.getObservedAddrs = () => { - return [multiaddr] + if (!callback) { + callback = function noop () {} } + + const maOpts = ma.toOptions() + + const socket = new SW('ws://' + maOpts.host + ':' + maOpts.port) + + const conn = new Connection(socket) + + socket.on('timeout', () => { + conn.emit('timeout') + }) + + socket.on('error', (err) => { + callback(err) + conn.emit('error', err) + }) + + socket.on('connect', () => { + callback(null, conn) + conn.emit('connect') + }) + + conn.getObservedAddrs = (cb) => { + return cb(null, [ma]) + } + return conn } - this.createListener = (multiaddrs, options, handler, callback) => { + this.createListener = (options, handler) => { if (typeof options === 'function') { - callback = handler handler = options options = {} } - if (!Array.isArray(multiaddrs)) { - multiaddrs = [multiaddrs] - } + const listener = SWS.createServer((socket) => { + const conn = new Connection(socket) - var count = 0 + conn.getObservedAddrs = (cb) => { + // TODO research if we can reuse the address in anyway + return cb(null, []) + } + handler(conn) + }) + + let listeningMultiaddr - multiaddrs.forEach((m) => { - if (contains(m.protoNames(), 'ipfs')) { - m = m.decapsulate('ipfs') + listener._listen = listener.listen + listener.listen = (ma, callback) => { + if (!callback) { + callback = function noop () {} } - const listener = SWS.createServer((conn) => { - conn.getObservedAddrs = () => { - return [] // TODO think if it makes sense for WebSockets - } - handler(conn) - }) + listeningMultiaddr = ma + + if (contains(ma.protoNames(), 'ipfs')) { + ma = ma.decapsulate('ipfs') + } - listener.listen(m.toOptions().port, () => { - if (++count === multiaddrs.length) { - callback() + listener._listen(ma.toOptions(), callback) + } + + listener._close = listener.close + listener.close = (options, callback) => { + if (typeof options === 'function') { + callback = options + options = { timeout: CLOSE_TIMEOUT } + } + if (!callback) { callback = function noop () {} } + if (!options) { options = { timeout: CLOSE_TIMEOUT } } + + let closed = false + listener.once('close', () => { + closed = true + }) + listener._close(callback) + setTimeout(() => { + if (closed) { + return } + log('unable to close graciously, destroying conns') + Object.keys(listener.__connections).forEach((key) => { + log('destroying %s', key) + listener.__connections[key].destroy() + }) + }, options.timeout || CLOSE_TIMEOUT) + } + + // Keep track of open connections to destroy in case of timeout + listener.__connections = {} + listener.on('connection', (socket) => { + const key = (~~(Math.random() * 1e9)).toString(36) + Date.now() + listener.__connections[key] = socket + + socket.on('close', () => { + delete listener.__connections[key] }) - listeners.push(listener) }) - } - this.close = (callback) => { - if (listeners.length === 0) { - log('Called close with no active listeners') - return callback() + listener.getAddrs = (callback) => { + callback(null, [listeningMultiaddr]) } - parallel(listeners.map((listener) => { - return (cb) => listener.close(cb) - }), callback) + return listener } this.filter = (multiaddrs) => { diff --git a/test/browser.js b/test/browser.js index 58567cc..dd23f14 100644 --- a/test/browser.js +++ b/test/browser.js @@ -3,21 +3,20 @@ const expect = require('chai').expect const multiaddr = require('multiaddr') -const WSlibp2p = require('../src') +const WS = require('../src') -describe('libp2p-websockets', function () { - this.timeout(10000) - var ws +describe('libp2p-websockets', () => { + let ws it('create', (done) => { - ws = new WSlibp2p() + ws = new WS() expect(ws).to.exist done() }) it('echo', (done) => { - const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - const conn = ws.dial(mh) + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') + const conn = ws.dial(ma) const message = 'Hello World!' conn.write(message) conn.on('data', (data) => { diff --git a/test/node.js b/test/node.js index a06dc77..6b7fd88 100644 --- a/test/node.js +++ b/test/node.js @@ -3,98 +3,290 @@ const expect = require('chai').expect const multiaddr = require('multiaddr') -const WSlibp2p = require('../src') - -describe('libp2p-websockets', function () { - this.timeout(10000) - var ws +const WS = require('../src') +describe('instantiate the transport', () => { it('create', (done) => { - ws = new WSlibp2p() + const ws = new WS() expect(ws).to.exist done() }) - it('listen and dial', (done) => { - const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - ws.createListener(mh, (socket) => { - expect(socket).to.exist - socket.end() - ws.close(done) - }, () => { - const conn = ws.dial(mh) - conn.end() + it('create without new', (done) => { + const ws = WS() + expect(ws).to.exist + done() + }) +}) + +describe('listen', () => { + let ws + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') + + beforeEach(() => { + ws = new WS() + }) + + it('listen, check for callback', (done) => { + const listener = ws.createListener((conn) => {}) + listener.listen(ma, () => { + listener.close(done) + }) + }) + + it('listen, check for listening event', (done) => { + const listener = ws.createListener((conn) => {}) + listener.on('listening', () => { + listener.close(done) }) + listener.listen(ma) }) - it('listen on several', (done) => { - const mh1 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - const mh2 = multiaddr('/ip4/127.0.0.1/tcp/9091/ws') - const ws = new WSlibp2p() + it('listen, check for the close event', (done) => { + const listener = ws.createListener((conn) => {}) + listener.on('listening', () => { + listener.on('close', done) + listener.close() + }) + listener.listen(ma) + }) - ws.createListener([mh1, mh2], (socket) => {}, () => { - ws.close(done) + it('listen on addr with /ipfs/QmHASH', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const listener = ws.createListener((conn) => {}) + listener.listen(ma, () => { + listener.close(done) }) }) - it('get observed addrs', (done) => { - const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - ws.createListener(mh, (socket) => { - expect(socket).to.exist - socket.end() - expect(socket.getObservedAddrs()).to.deep.equal([]) - ws.close(done) - }, () => { - const conn = ws.dial(mh) - conn.end() + it.skip('close listener with connections, through timeout', (done) => { + // TODO `ws` closes all anyway, we need to make it not close + // first - https://github.com/diasdavid/simple-websocket-server + }) + + it.skip('listen on port 0', (done) => { + // TODO port 0 not supported yet + }) + it.skip('listen on IPv6 addr', (done) => { + // TODO IPv6 not supported yet + }) + + it.skip('listen on any Interface', (done) => { + // TODO 0.0.0.0 not supported yet + }) + + it('getAddrs', (done) => { + const listener = ws.createListener((conn) => {}) + listener.listen(ma, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + listener.close(done) + }) + }) + }) + + it.skip('getAddrs on port 0 listen', (done) => { + // TODO port 0 not supported yet + }) + + it.skip('getAddrs from listening on 0.0.0.0', (done) => { + // TODO 0.0.0.0 not supported yet + }) + + it.skip('getAddrs from listening on 0.0.0.0 and port 0', (done) => { + // TODO 0.0.0.0 or port 0 not supported yet + }) + + it('getAddrs preserves IPFS Id', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const listener = ws.createListener((conn) => {}) + listener.listen(ma, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + listener.close(done) + }) }) }) +}) + +describe('dial', () => { + let ws + let listener + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') + + beforeEach((done) => { + ws = new WS() + listener = ws.createListener((conn) => { + conn.pipe(conn) + }) + listener.listen(ma, done) + }) - it('filter', (done) => { + afterEach((done) => { + listener.close(done) + }) + + it('dial on IPv4', (done) => { + const conn = ws.dial(ma) + conn.write('hey') + conn.end() + conn.on('data', (chunk) => { + expect(chunk.toString()).to.equal('hey') + }) + conn.on('end', done) + }) + + it.skip('dial on IPv6', (done) => { + // TODO IPv6 not supported yet + }) + + it('dial on IPv4 with IPFS Id', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const conn = ws.dial(ma) + conn.write('hey') + conn.end() + conn.on('data', (chunk) => { + expect(chunk.toString()).to.equal('hey') + }) + conn.on('end', done) + }) +}) + +describe('filter addrs', () => { + let ws + + before(() => { + ws = new WS() + }) + + it('filter valid addrs for this transport', (done) => { const mh1 = multiaddr('/ip4/127.0.0.1/tcp/9090') const mh2 = multiaddr('/ip4/127.0.0.1/udp/9090') const mh3 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') + const mh4 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const valid = ws.filter([mh1, mh2, mh3]) - expect(valid.length).to.equal(1) + const valid = ws.filter([mh1, mh2, mh3, mh4]) + expect(valid.length).to.equal(2) expect(valid[0]).to.deep.equal(mh3) + expect(valid[1]).to.deep.equal(mh4) + done() + }) + + it('filter a single addr for this transport', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter(ma) + expect(valid.length).to.equal(1) + expect(valid[0]).to.deep.equal(ma) done() }) +}) + +describe('valid Connection', () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') + + it('get observed addrs', (done) => { + let dialerObsAddrs + let listenerObsAddrs + + const ws = new WS() + + const listener = ws.createListener((conn) => { + expect(conn).to.exist + + conn.getObservedAddrs((err, addrs) => { + expect(err).to.not.exist + dialerObsAddrs = addrs + }) - it('echo', (done) => { - const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - ws.createListener(mh, (conn) => { conn.pipe(conn) - }, () => { - const conn = ws.dial(mh) - const message = 'Hello World!' - conn.write(message) - conn.on('data', (data) => { - expect(data.toString()).to.equal(message) - conn.end() - ws.close(done) + }) + + listener.listen(ma, () => { + const conn = ws.dial(ma) + + conn.on('end', () => { + conn.getObservedAddrs((err, addrs) => { + expect(err).to.not.exist + listenerObsAddrs = addrs + + listener.close(() => { + expect(listenerObsAddrs[0]).to.deep.equal(ma) + expect(dialerObsAddrs.length).to.equal(0) + done() + }) + }) }) + conn.resume() + conn.end() }) }) - it('echo with connect event and send', (done) => { - const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') - ws.createListener(mh, (conn) => { + it('get Peer Info', (done) => { + const ws = new WS() + + const listener = ws.createListener((conn) => { + expect(conn).to.exist + + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.exist + }) + conn.pipe(conn) - }, () => { - const message = 'Hello World!' + }) + + listener.listen(ma, () => { + const conn = ws.dial(ma) - const conn = ws.dial(mh, { - ready: () => { - conn.send(message) - } + conn.on('end', () => { + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.exit + listener.close(done) + }) }) + conn.resume() + conn.end() + }) + }) - conn.on('data', (data) => { - expect(data.toString()).to.equal(message) - conn.end() - ws.close(done) + it('set Peer Info', (done) => { + const ws = new WS() + + const listener = ws.createListener((conn) => { + expect(conn).to.exist + conn.setPeerInfo('a') + + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.not.exist + expect(peerInfo).to.equal('a') }) + + conn.pipe(conn) + }) + + listener.listen(ma, () => { + const conn = ws.dial(ma) + conn.setPeerInfo('b') + + conn.on('end', () => { + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.not.exist + expect(peerInfo).to.equal('b') + listener.close(done) + }) + }) + conn.resume() + conn.end() }) }) }) + +describe.skip('turbolence', () => { + it('dialer - emits error on the other end is terminated abruptly', (done) => {}) + it('listener - emits error on the other end is terminated abruptly', (done) => {}) +})