diff --git a/.gitignore b/.gitignore index f427b41..e1c82bd 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ logs *.log coverage +.nyc_output # Runtime data pids diff --git a/.travis.yml b/.travis.yml index e8d642b..c415b39 100644 --- a/.travis.yml +++ b/.travis.yml @@ -27,7 +27,6 @@ jobs: os: linux script: - npx aegir build --bundlesize - - npx aegir commitlint --travis - npx aegir dep-check - npm run lint diff --git a/README.md b/README.md index cff65f9..7c47fa6 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ [![](https://raw.githubusercontent.com/libp2p/interface-connection/master/img/badge.png)](https://github.com/libp2p/interface-connection) -> JavaScript implementation of the TCP module for libp2p. It exposes the [interface-transport](https://github.com/libp2p/interface-connection) for dial/listen. `libp2p-tcp` is a very thin shim that adds support for dialing to a `multiaddr`. This small shim will enable libp2p to use other different transports. +> JavaScript implementation of the TCP module for libp2p. It exposes the [interface-transport](https://github.com/libp2p/interface-connection) for dial/listen. `libp2p-tcp` is a very thin shim that adds support for dialing to a `multiaddr`. This small shim will enable libp2p to use other transports. ## Lead Maintainer @@ -41,37 +41,33 @@ ```js const TCP = require('libp2p-tcp') const multiaddr = require('multiaddr') -const pull = require('pull-stream') +const pipe = require('it-pipe') +const { collect } = require('streaming-iterables') -const mh = multiaddr('/ip4/127.0.0.1/tcp/9090') +const addr = multiaddr('/ip4/127.0.0.1/tcp/9090') const tcp = new TCP() const listener = tcp.createListener((socket) => { console.log('new connection opened') - pull( - pull.values(['hello']), + pipe( + ['hello'], socket ) }) -listener.listen(mh, () => { - console.log('listening') - - pull( - tcp.dial(mh), - pull.collect((err, values) => { - if (!err) { - console.log(`Value: ${values.toString()}`) - } else { - console.log(`Error: ${err}`) - } - - // Close connection after reading - listener.close() - }), - ) -}) +await listener.listen(addr) +console.log('listening') + +const socket = await tcp.dial(addr) +const values = await pipe( + socket, + collect +) +console.log(`Value: ${values.toString()}`) + +// Close connection after reading +await listener.close() ``` Outputs: @@ -88,12 +84,12 @@ Value: hello [![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png)](https://github.com/libp2p/interface-transport) -`libp2p-tcp` accepts TCP addresses both IPFS and non IPFS encapsulated addresses, i.e: +`libp2p-tcp` accepts TCP addresses as both IPFS and non IPFS encapsulated addresses, i.e: `/ip4/127.0.0.1/tcp/4001` `/ip4/127.0.0.1/tcp/4001/ipfs/QmHash` -Both for dialing and listening. +(both for dialing and listening) ### Connection diff --git a/package.json b/package.json index ddc480e..bcdbebe 100644 --- a/package.json +++ b/package.json @@ -8,15 +8,15 @@ "lint": "aegir lint", "test": "aegir test -t node", "test:node": "aegir test -t node", + "build": "aegir build", + "docs": "aegir docs", "release": "aegir release -t node --no-build", "release-minor": "aegir release -t node --type minor --no-build", "release-major": "aegir-release -t node --type major --no-build", - "coverage": "aegir coverage", - "coverage-publish": "aegir coverage --provider coveralls" + "coverage": "nyc --reporter=text --reporter=lcov npm run test:node" }, "pre-push": [ - "lint", - "test" + "lint" ], "repository": { "type": "git", @@ -38,20 +38,23 @@ "aegir": "^20.0.0", "chai": "^4.2.0", "dirty-chai": "^2.0.1", - "interface-transport": "~0.3.6", - "pull-stream": "^3.6.14" + "pull-stream": "^3.6.9", + "sinon": "^7.4.1" }, "dependencies": { + "abortable-iterator": "^2.1.0", "class-is": "^1.1.0", "debug": "^4.1.1", + "err-code": "^1.1.2", "interface-connection": "~0.3.3", + "interface-transport": "~0.5.2", "ip-address": "^6.1.0", + "it-pipe": "^1.0.1", "lodash.includes": "^4.3.0", "lodash.isfunction": "^3.0.9", - "mafmt": "^6.0.7", + "mafmt": "^6.0.8", "multiaddr": "^6.1.0", - "once": "^1.4.0", - "stream-to-pull-stream": "^1.7.3" + "streaming-iterables": "^4.1.0" }, "contributors": [ "Alan Shaw ", diff --git a/src/adapter.js b/src/adapter.js new file mode 100644 index 0000000..e206a8b --- /dev/null +++ b/src/adapter.js @@ -0,0 +1,17 @@ +'use strict' + +const { Adapter } = require('interface-transport') +const withIs = require('class-is') +const TCP = require('.') + +// Legacy adapter to old transport & connection interface +class TcpAdapter extends Adapter { + constructor () { + super(new TCP()) + } +} + +module.exports = withIs(TcpAdapter, { + className: 'TCP', + symbolName: '@libp2p/js-libp2p-tcp/tcp' +}) diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 0000000..3e5cb6e --- /dev/null +++ b/src/constants.js @@ -0,0 +1,8 @@ +'use strict' + +// IPFS multi-address code +module.exports.IPFS_MA_CODE = 421 + +// Time to wait for a connection to close gracefully before destroying it +// manually +module.exports.CLOSE_TIMEOUT = 2000 diff --git a/src/index.js b/src/index.js index b04c223..000ac4d 100644 --- a/src/index.js +++ b/src/index.js @@ -1,55 +1,76 @@ 'use strict' const net = require('net') -const toPull = require('stream-to-pull-stream') const mafmt = require('mafmt') const withIs = require('class-is') const includes = require('lodash.includes') const isFunction = require('lodash.isfunction') -const Connection = require('interface-connection').Connection -const once = require('once') +const errcode = require('err-code') const debug = require('debug') const log = debug('libp2p:tcp:dial') +const Libp2pSocket = require('./socket') const createListener = require('./listener') +const { AbortError } = require('interface-transport') function noop () {} class TCP { - dial (ma, options, callback) { - if (isFunction(options)) { - callback = options - options = {} - } + async dial (ma, options) { + const cOpts = ma.toOptions() + log('Dialing %s:%s', cOpts.host, cOpts.port) - callback = once(callback || noop) + const rawSocket = await this._connect(cOpts, options) + return new Libp2pSocket(rawSocket, ma, options) + } - const cOpts = ma.toOptions() - log('Connecting to %s %s', cOpts.port, cOpts.host) + _connect (cOpts, options = {}) { + return new Promise((resolve, reject) => { + if ((options.signal || {}).aborted) { + return reject(new AbortError()) + } - const rawSocket = net.connect(cOpts) + const start = Date.now() + const rawSocket = net.connect(cOpts) - rawSocket.once('timeout', () => { - log('timeout') - rawSocket.emit('error', new Error('Timeout')) - }) + const onError = (err) => { + const msg = `Error dialing ${cOpts.host}:${cOpts.port}: ${err.message}` + done(errcode(msg, err.code)) + } - rawSocket.once('error', callback) + const onTimeout = () => { + log('Timeout dialing %s:%s', cOpts.host, cOpts.port) + const err = errcode(`Timeout after ${Date.now() - start}ms`, 'ETIMEDOUT') + // Note: this will result in onError() being called + rawSocket.emit('error', err) + } - rawSocket.once('connect', () => { - rawSocket.removeListener('error', callback) - callback() - }) + const onConnect = () => { + log('Connected to %s:%s', cOpts.host, cOpts.port) + done(null, rawSocket) + } - const socket = toPull.duplex(rawSocket) + const onAbort = () => { + log('Dial to %s:%s aborted', cOpts.host, cOpts.port) + rawSocket.destroy() + done(new AbortError()) + } - const conn = new Connection(socket) + const done = (err, res) => { + rawSocket.removeListener('error', onError) + rawSocket.removeListener('timeout', onTimeout) + rawSocket.removeListener('connect', onConnect) - conn.getObservedAddrs = (callback) => { - return callback(null, [ma]) - } + options.signal && options.signal.removeEventListener('abort', onAbort) - return conn + err ? reject(err) : resolve(res) + } + + rawSocket.once('error', onError) + rawSocket.once('timeout', onTimeout) + rawSocket.once('connect', onConnect) + options.signal && options.signal.addEventListener('abort', onAbort) + }) } createListener (options, handler) { @@ -59,7 +80,6 @@ class TCP { } handler = handler || noop - return createListener(handler) } diff --git a/src/listener.js b/src/listener.js index 30b3076..604b55d 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,28 +1,26 @@ 'use strict' const multiaddr = require('multiaddr') -const Connection = require('interface-connection').Connection const os = require('os') const includes = require('lodash.includes') const net = require('net') -const toPull = require('stream-to-pull-stream') const EventEmitter = require('events').EventEmitter const debug = require('debug') const log = debug('libp2p:tcp:listen') +const logError = debug('libp2p:tcp:listen:error') +const Libp2pSocket = require('./socket') const getMultiaddr = require('./get-multiaddr') - -const IPFS_CODE = 421 -const CLOSE_TIMEOUT = 2000 - -function noop () {} +const c = require('./constants') module.exports = (handler) => { const listener = new EventEmitter() const server = net.createServer((socket) => { - // Avoid uncaught errors cause by unstable connections - socket.on('error', noop) + // Avoid uncaught errors caused by unstable connections + socket.on('error', (err) => { + logError('Error emitted by server handler socket: ' + err.message) + }) const addr = getMultiaddr(socket) if (!addr) { @@ -36,17 +34,11 @@ module.exports = (handler) => { log('new connection', addr.toString()) - const s = toPull.duplex(socket) - - s.getObservedAddrs = (cb) => { - cb(null, [addr]) - } - + const s = new Libp2pSocket(socket, addr) trackSocket(server, socket) - const conn = new Connection(s) - handler(conn) - listener.emit('connection', conn) + handler && handler(s) + listener.emit('connection', s) }) server.on('listening', () => listener.emit('listening')) @@ -56,33 +48,35 @@ module.exports = (handler) => { // Keep track of open connections to destroy in case of timeout server.__connections = {} - listener.close = (options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} + listener.close = (options = {}) => { + if (!server.listening) { + return } - callback = callback || noop - options = options || {} - - const timeout = setTimeout(() => { - log('unable to close graciously, destroying conns') - Object.keys(server.__connections).forEach((key) => { - log('destroying %s', key) - server.__connections[key].destroy() - }) - }, options.timeout || CLOSE_TIMEOUT) - server.close(callback) + return new Promise((resolve, reject) => { + const start = Date.now() + + // Attempt to stop the server. If it takes longer than the timeout, + // destroy all the underlying sockets manually. + const timeout = setTimeout(() => { + log('Timeout closing server after %dms, destroying connections manually', Date.now() - start) + Object.keys(server.__connections).forEach((key) => { + log('destroying %s', key) + server.__connections[key].destroy() + }) + resolve() + }, options.timeout || c.CLOSE_TIMEOUT) + + server.once('close', () => clearTimeout(timeout)) - server.once('close', () => { - clearTimeout(timeout) + server.close((err) => err ? reject(err) : resolve()) }) } let ipfsId let listeningAddr - listener.listen = (ma, callback) => { + listener.listen = (ma) => { listeningAddr = ma if (includes(ma.protoNames(), 'ipfs')) { ipfsId = getIpfsId(ma) @@ -90,16 +84,24 @@ module.exports = (handler) => { } const lOpts = listeningAddr.toOptions() - log('Listening on %s %s', lOpts.port, lOpts.host) - return server.listen(lOpts.port, lOpts.host, callback) + return new Promise((resolve, reject) => { + server.listen(lOpts.port, lOpts.host, (err) => { + if (err) { + return reject(err) + } + + log('Listening on %s %s', lOpts.port, lOpts.host) + resolve() + }) + }) } - listener.getAddrs = (callback) => { + listener.getAddrs = () => { const multiaddrs = [] const address = server.address() if (!address) { - return callback(new Error('Listener is not ready yet')) + throw new Error('Listener is not ready yet') } // Because TCP will only return the IPv6 version @@ -134,7 +136,7 @@ module.exports = (handler) => { multiaddrs.push(ma) } - callback(null, multiaddrs) + return multiaddrs } return listener @@ -142,7 +144,7 @@ module.exports = (handler) => { function getIpfsId (ma) { return ma.stringTuples().filter((tuple) => { - return tuple[0] === IPFS_CODE + return tuple[0] === c.IPFS_MA_CODE })[0][1] } @@ -150,7 +152,7 @@ function trackSocket (server, socket) { const key = `${socket.remoteAddress}:${socket.remotePort}` server.__connections[key] = socket - socket.on('close', () => { + socket.once('close', () => { delete server.__connections[key] }) } diff --git a/src/socket.js b/src/socket.js new file mode 100644 index 0000000..19a4128 --- /dev/null +++ b/src/socket.js @@ -0,0 +1,85 @@ +'use strict' + +const abortable = require('abortable-iterator') +const debug = require('debug') +const log = debug('libp2p:tcp:socket') + +const c = require('./constants') + +class Libp2pSocket { + constructor (rawSocket, ma, opts = {}) { + this._rawSocket = rawSocket + this._ma = ma + + this.sink = this._sink(opts) + this.source = opts.signal ? abortable(rawSocket, opts.signal) : rawSocket + } + + _sink (opts) { + // By default, close when the source is exhausted + const closeOnEnd = opts.closeOnEnd !== false + + return async (source) => { + try { + const src = opts.signal ? abortable(source, opts.signal) : source + await this._write(src, closeOnEnd) + } catch (err) { + // If the connection is aborted just close the socket + if (err.type === 'aborted') { + return this.close() + } + + throw err + } + } + } + + async _write (source, closeOnEnd) { + for await (const data of source) { + if (this._rawSocket.destroyed) { + const cOpts = this._ma.toOptions() + throw new Error('Cannot write %d bytes to destroyed socket %s:%s', + data.length, cOpts.host, cOpts.port) + } + + const flushed = this._rawSocket.write(data) + if (!flushed) { + await new Promise((resolve) => this._rawSocket.once('drain', resolve)) + } + } + + if (closeOnEnd) { + await this.close() + } + } + + close (opts = {}) { + if (this._rawSocket.pending || this._rawSocket.destroyed) { + return + } + + return new Promise((resolve, reject) => { + const start = Date.now() + + // Attempt to end the socket. If it takes longer to close than the + // timeout, destroy it manually. + const timeout = setTimeout(() => { + const cOpts = this._ma.toOptions() + log('Timeout closing socket to %s:%s after %dms, destroying it manually', + cOpts.host, cOpts.port, Date.now() - start) + this._rawSocket.destroy() + resolve() + }, opts.timeout || c.CLOSE_TIMEOUT) + + this._rawSocket.once('close', () => clearTimeout(timeout)) + + this._rawSocket.end((err) => err ? reject(err) : resolve()) + }) + } + + getObservedAddrs () { + return [this._ma] + } +} + +module.exports = Libp2pSocket diff --git a/test/connection-wrap.spec.js b/test/adapter/connection-wrap.spec.js similarity index 98% rename from test/connection-wrap.spec.js rename to test/adapter/connection-wrap.spec.js index ff141fb..0077417 100644 --- a/test/connection-wrap.spec.js +++ b/test/adapter/connection-wrap.spec.js @@ -6,7 +6,7 @@ const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) -const TCP = require('../src') +const TCP = require('../../src/adapter') const multiaddr = require('multiaddr') const Connection = require('interface-connection').Connection diff --git a/test/adapter/connection.spec.js b/test/adapter/connection.spec.js new file mode 100644 index 0000000..559143d --- /dev/null +++ b/test/adapter/connection.spec.js @@ -0,0 +1,111 @@ +/* eslint-env mocha */ +'use strict' + +const pull = require('pull-stream') +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const TCP = require('../../src/adapter') +const multiaddr = require('multiaddr') + +describe('valid Connection', () => { + let tcp + + beforeEach(() => { + tcp = new TCP() + }) + + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090') + + it('get observed addrs', (done) => { + let dialerObsAddrs + + const listener = tcp.createListener((conn) => { + expect(conn).to.exist() + conn.getObservedAddrs((err, addrs) => { + expect(err).to.not.exist() + dialerObsAddrs = addrs + pull(pull.empty(), conn) + }) + }) + + listener.listen(ma, () => { + const conn = tcp.dial(ma) + pull( + conn, + pull.onEnd(endHandler) + ) + + function endHandler () { + conn.getObservedAddrs((err, addrs) => { + expect(err).to.not.exist() + pull(pull.empty(), conn) + closeAndAssert(listener, addrs) + }) + } + + function closeAndAssert (listener, addrs) { + listener.close(() => { + expect(addrs[0]).to.deep.equal(ma) + expect(dialerObsAddrs.length).to.equal(1) + done() + }) + } + }) + }) + + it('get Peer Info', (done) => { + const listener = tcp.createListener((conn) => { + expect(conn).to.exist() + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.exist() + expect(peerInfo).to.not.exist() + pull(pull.empty(), conn) + }) + }) + + listener.listen(ma, () => { + const conn = tcp.dial(ma) + + pull(conn, pull.onEnd(endHandler)) + + function endHandler () { + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.exist() + expect(peerInfo).to.not.exist() + + listener.close(done) + }) + } + }) + }) + + it('set Peer Info', (done) => { + const listener = tcp.createListener((conn) => { + expect(conn).to.exist() + conn.setPeerInfo('batatas') + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.not.exist() + expect(peerInfo).to.equal('batatas') + pull(pull.empty(), conn) + }) + }) + + listener.listen(ma, () => { + const conn = tcp.dial(ma) + + pull(conn, pull.onEnd(endHandler)) + + function endHandler () { + conn.setPeerInfo('arroz') + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.not.exist() + expect(peerInfo).to.equal('arroz') + + listener.close(done) + }) + } + }) + }) +}) diff --git a/test/adapter/listen-dial.spec.js b/test/adapter/listen-dial.spec.js new file mode 100644 index 0000000..97e6980 --- /dev/null +++ b/test/adapter/listen-dial.spec.js @@ -0,0 +1,259 @@ +/* eslint-env mocha */ +'use strict' + +const pull = require('pull-stream') +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const TCP = require('../../src/adapter') +const net = require('net') +const multiaddr = require('multiaddr') +const isCI = process.env.CI + +describe('listen', () => { + let tcp + + beforeEach(() => { + tcp = new TCP() + }) + + it('close listener with connections, through timeout', (done) => { + const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const listener = tcp.createListener((conn) => { + pull(conn, conn) + }) + + listener.listen(mh, () => { + const socket1 = net.connect(9090) + const socket2 = net.connect(9090) + + socket1.write('Some data that is never handled') + socket1.end() + socket1.on('error', () => {}) + socket2.on('error', () => {}) + socket1.on('connect', () => { + listener.close(done) + }) + }) + }) + + it('listen on port 0', (done) => { + const mh = multiaddr('/ip4/127.0.0.1/tcp/0') + const listener = tcp.createListener((conn) => {}) + listener.listen(mh, () => { + listener.close(done) + }) + }) + + it('listen on IPv6 addr', (done) => { + if (isCI) { return done() } + const mh = multiaddr('/ip6/::/tcp/9090') + const listener = tcp.createListener((conn) => {}) + listener.listen(mh, () => { + listener.close(done) + }) + }) + + it('listen on any Interface', (done) => { + const mh = multiaddr('/ip4/0.0.0.0/tcp/9090') + const listener = tcp.createListener((conn) => {}) + listener.listen(mh, () => { + listener.close(done) + }) + }) + + it('getAddrs', (done) => { + const mh = multiaddr('/ip4/127.0.0.1/tcp/9090') + const listener = tcp.createListener((conn) => {}) + listener.listen(mh, () => { + listener.getAddrs((err, multiaddrs) => { + expect(err).to.not.exist() + expect(multiaddrs.length).to.equal(1) + expect(multiaddrs[0]).to.deep.equal(mh) + listener.close(done) + }) + }) + }) + + it('getAddrs on port 0 listen', (done) => { + const mh = multiaddr('/ip4/127.0.0.1/tcp/0') + const listener = tcp.createListener((conn) => {}) + listener.listen(mh, () => { + listener.getAddrs((err, multiaddrs) => { + expect(err).to.not.exist() + expect(multiaddrs.length).to.equal(1) + listener.close(done) + }) + }) + }) + + it('getAddrs from listening on 0.0.0.0', (done) => { + const mh = multiaddr('/ip4/0.0.0.0/tcp/9090') + const listener = tcp.createListener((conn) => {}) + listener.listen(mh, () => { + listener.getAddrs((err, multiaddrs) => { + expect(err).to.not.exist() + expect(multiaddrs.length > 0).to.equal(true) + expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) + listener.close(done) + }) + }) + }) + + it('getAddrs from listening on 0.0.0.0 and port 0', (done) => { + const mh = multiaddr('/ip4/0.0.0.0/tcp/0') + const listener = tcp.createListener((conn) => {}) + listener.listen(mh, () => { + listener.getAddrs((err, multiaddrs) => { + expect(err).to.not.exist() + expect(multiaddrs.length > 0).to.equal(true) + expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) + listener.close(done) + }) + }) + }) + + it('getAddrs preserves IPFS Id', (done) => { + const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const listener = tcp.createListener((conn) => {}) + listener.listen(mh, () => { + listener.getAddrs((err, multiaddrs) => { + expect(err).to.not.exist() + expect(multiaddrs.length).to.equal(1) + expect(multiaddrs[0]).to.deep.equal(mh) + listener.close(done) + }) + }) + }) +}) + +describe('dial', () => { + let tcp + let listener + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090') + + beforeEach((done) => { + tcp = new TCP() + listener = tcp.createListener((conn) => { + pull( + conn, + pull.map((x) => Buffer.from(x.toString() + '!')), + conn + ) + }) + listener.listen(ma, done) + }) + + afterEach((done) => { + listener.close(done) + }) + + it('dial on IPv4', (done) => { + pull( + pull.values(['hey']), + tcp.dial(ma), + pull.collect((err, values) => { + expect(err).to.not.exist() + expect(values).to.eql([Buffer.from('hey!')]) + done() + }) + ) + }) + + it('dial to non existent listener', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/8989') + pull( + tcp.dial(ma), + pull.onEnd((err) => { + expect(err).to.exist() + done() + }) + ) + }) + + it('dial on IPv6', (done) => { + if (isCI) { return done() } + + const ma = multiaddr('/ip6/::/tcp/9066') + const listener = tcp.createListener((conn) => { + pull(conn, conn) + }) + listener.listen(ma, () => { + pull( + pull.values(['hey']), + tcp.dial(ma), + pull.collect((err, values) => { + expect(err).to.not.exist() + + expect(values).to.be.eql([Buffer.from('hey')]) + + listener.close(done) + }) + ) + }) + }) + + it('dial and destroy on listener', (done) => { + let count = 0 + const closed = () => ++count === 2 ? finish() : null + + const ma = multiaddr('/ip6/::/tcp/9067') + + const listener = tcp.createListener((conn) => { + pull( + pull.empty(), + conn, + pull.onEnd(closed) + ) + }) + + listener.listen(ma, () => { + pull(tcp.dial(ma), pull.onEnd(closed)) + }) + + function finish () { + listener.close(done) + } + }) + + it('dial and destroy on dialer', (done) => { + if (isCI) { return done() } + + let count = 0 + const destroyed = () => ++count === 2 ? finish() : null + + const ma = multiaddr('/ip6/::/tcp/9068') + + const listener = tcp.createListener((conn) => { + pull(conn, pull.onEnd(destroyed)) + }) + + listener.listen(ma, () => { + pull( + pull.empty(), + tcp.dial(ma), + pull.onEnd(destroyed) + ) + }) + + function finish () { + listener.close(done) + } + }) + + it('dial on IPv4 with IPFS Id', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const conn = tcp.dial(ma) + + pull( + pull.values(['hey']), + conn, + pull.collect((err, res) => { + expect(err).to.not.exist() + expect(res).to.be.eql([Buffer.from('hey!')]) + done() + }) + ) + }) +}) diff --git a/test/compliance.spec.js b/test/compliance.spec.js index 1339086..11fd244 100644 --- a/test/compliance.spec.js +++ b/test/compliance.spec.js @@ -1,13 +1,15 @@ /* eslint-env mocha */ 'use strict' +const sinon = require('sinon') const tests = require('interface-transport') const multiaddr = require('multiaddr') +const net = require('net') const TCP = require('../src') describe('interface-transport compliance', () => { tests({ - setup (cb) { + setup () { const tcp = new TCP() const addrs = [ multiaddr('/ip4/127.0.0.1/tcp/9091'), @@ -15,10 +17,27 @@ describe('interface-transport compliance', () => { multiaddr('/ip4/127.0.0.1/tcp/9093'), multiaddr('/dns4/ipfs.io') ] - cb(null, tcp, addrs) - }, - teardown (cb) { - cb() + + // Used by the dial tests to simulate a delayed connect + const connector = { + delay (delayMs) { + const netConnect = net.connect + sinon.replace(net, 'connect', (opts) => { + const socket = netConnect(opts) + const socketEmit = socket.emit.bind(socket) + sinon.replace(socket, 'emit', (...args) => { + const time = args[0] === 'connect' ? delayMs : 0 + setTimeout(() => socketEmit(...args), time) + }) + return socket + }) + }, + restore () { + sinon.restore() + } + } + + return { transport: tcp, addrs, connector } } }) }) diff --git a/test/connection.spec.js b/test/connection.spec.js index 5100cb1..4d8f385 100644 --- a/test/connection.spec.js +++ b/test/connection.spec.js @@ -1,7 +1,6 @@ /* eslint-env mocha */ 'use strict' -const pull = require('pull-stream') const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect @@ -18,94 +17,39 @@ describe('valid Connection', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090') - it('get observed addrs', (done) => { - let dialerObsAddrs - - const listener = tcp.createListener((conn) => { - expect(conn).to.exist() - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - dialerObsAddrs = addrs - pull(pull.empty(), conn) - }) - }) - - listener.listen(ma, () => { - const conn = tcp.dial(ma) - pull( - conn, - pull.onEnd(endHandler) - ) - - function endHandler () { - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - pull(pull.empty(), conn) - closeAndAssert(listener, addrs) - }) - } - - function closeAndAssert (listener, addrs) { - listener.close(() => { - expect(addrs[0]).to.deep.equal(ma) - expect(dialerObsAddrs.length).to.equal(1) - done() - }) - } + it('get observed addrs', async () => { + // Create a Promise that resolves when a connection is handled + let handled + const handlerPromise = new Promise((resolve) => { + handled = resolve }) - }) - it('get Peer Info', (done) => { - const listener = tcp.createListener((conn) => { + const handler = async (conn) => { expect(conn).to.exist() - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - expect(peerInfo).to.not.exist() - pull(pull.empty(), conn) - }) - }) + const dialerObsAddrs = await conn.getObservedAddrs() + handled(dialerObsAddrs) + } - listener.listen(ma, () => { - const conn = tcp.dial(ma) + // Create a listener with the handler + const listener = tcp.createListener(handler) - pull(conn, pull.onEnd(endHandler)) + // Listen on the multi-address + await listener.listen(ma) - function endHandler () { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - expect(peerInfo).to.not.exist() + // Dial to that same address + const conn = await tcp.dial(ma) + const addrs = await conn.getObservedAddrs() - listener.close(done) - }) - } - }) - }) - - it('set Peer Info', (done) => { - const listener = tcp.createListener((conn) => { - expect(conn).to.exist() - conn.setPeerInfo('batatas') - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('batatas') - pull(pull.empty(), conn) - }) - }) - - listener.listen(ma, () => { - const conn = tcp.dial(ma) + // Wait for the incoming dial to be handled + const dialerObsAddrs = await handlerPromise - pull(conn, pull.onEnd(endHandler)) + // Close the listener + await listener.close() - function endHandler () { - conn.setPeerInfo('arroz') - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('arroz') - - listener.close(done) - }) - } - }) + // The addresses should match + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + expect(dialerObsAddrs.length).to.equal(1) + expect(dialerObsAddrs[0]).to.exist() }) }) diff --git a/test/listen-dial.spec.js b/test/listen-dial.spec.js index 7d8f119..ad414fa 100644 --- a/test/listen-dial.spec.js +++ b/test/listen-dial.spec.js @@ -1,14 +1,16 @@ /* eslint-env mocha */ 'use strict' -const pull = require('pull-stream') const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) + const TCP = require('../src') const net = require('net') const multiaddr = require('multiaddr') +const pipe = require('it-pipe') +const { collect, map } = require('streaming-iterables') const isCI = process.env.CI describe('listen', () => { @@ -18,113 +20,111 @@ describe('listen', () => { tcp = new TCP() }) - it('close listener with connections, through timeout', (done) => { + it('close listener with connections, through timeout', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = tcp.createListener((conn) => { - pull(conn, conn) + pipe(conn, conn) }) - listener.listen(mh, () => { - const socket1 = net.connect(9090) - const socket2 = net.connect(9090) + await listener.listen(mh) + + const socket1 = net.connect(9090) + const socket2 = net.connect(9090) - socket1.write('Some data that is never handled') - socket1.end() - socket1.on('error', () => {}) - socket2.on('error', () => {}) - socket1.on('connect', () => { - listener.close(done) + socket1.write('Some data that is never handled') + socket1.end() + socket1.on('error', () => {}) + socket2.on('error', () => {}) + + await new Promise((resolve) => { + socket1.on('connect', async () => { + await listener.close({ timeout: 100 }) + resolve() }) }) }) - it('listen on port 0', (done) => { + it('listen on port 0', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/0') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.close(done) - }) + await listener.listen(mh) + await listener.close() }) - it('listen on IPv6 addr', (done) => { - if (isCI) { return done() } + it('listen on IPv6 addr', async () => { + if (isCI) { + return + } const mh = multiaddr('/ip6/::/tcp/9090') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.close(done) - }) + await listener.listen(mh) + await listener.close() }) - it('listen on any Interface', (done) => { + it('listen on any Interface', async () => { const mh = multiaddr('/ip4/0.0.0.0/tcp/9090') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.close(done) - }) + await listener.listen(mh) + await listener.close() }) - it('getAddrs', (done) => { + it('getAddrs', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.getAddrs((err, multiaddrs) => { - expect(err).to.not.exist() - expect(multiaddrs.length).to.equal(1) - expect(multiaddrs[0]).to.deep.equal(mh) - listener.close(done) - }) - }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length).to.equal(1) + expect(multiaddrs[0]).to.deep.equal(mh) + + await listener.close() }) - it('getAddrs on port 0 listen', (done) => { + it('getAddrs on port 0 listen', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/0') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.getAddrs((err, multiaddrs) => { - expect(err).to.not.exist() - expect(multiaddrs.length).to.equal(1) - listener.close(done) - }) - }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length).to.equal(1) + + await listener.close() }) - it('getAddrs from listening on 0.0.0.0', (done) => { + it('getAddrs from listening on 0.0.0.0', async () => { const mh = multiaddr('/ip4/0.0.0.0/tcp/9090') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.getAddrs((err, multiaddrs) => { - expect(err).to.not.exist() - expect(multiaddrs.length > 0).to.equal(true) - expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) - listener.close(done) - }) - }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length > 0).to.equal(true) + expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) + + await listener.close() }) - it('getAddrs from listening on 0.0.0.0 and port 0', (done) => { + it('getAddrs from listening on 0.0.0.0 and port 0', async () => { const mh = multiaddr('/ip4/0.0.0.0/tcp/0') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.getAddrs((err, multiaddrs) => { - expect(err).to.not.exist() - expect(multiaddrs.length > 0).to.equal(true) - expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) - listener.close(done) - }) - }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length > 0).to.equal(true) + expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) + + await listener.close() }) - it('getAddrs preserves IPFS Id', (done) => { + it('getAddrs preserves IPFS Id', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.getAddrs((err, multiaddrs) => { - expect(err).to.not.exist() - expect(multiaddrs.length).to.equal(1) - expect(multiaddrs[0]).to.deep.equal(mh) - listener.close(done) - }) - }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length).to.equal(1) + expect(multiaddrs[0]).to.deep.equal(mh) + + await listener.close() }) }) @@ -133,127 +133,109 @@ describe('dial', () => { let listener const ma = multiaddr('/ip4/127.0.0.1/tcp/9090') - beforeEach((done) => { + beforeEach(async () => { tcp = new TCP() listener = tcp.createListener((conn) => { - pull( + pipe( conn, - pull.map((x) => Buffer.from(x.toString() + '!')), + map((x) => Buffer.from(x.toString() + '!')), conn ) }) - listener.listen(ma, done) + await listener.listen(ma) }) - afterEach((done) => { - listener.close(done) - }) + afterEach(() => listener.close()) - it('dial on IPv4', (done) => { - pull( - pull.values(['hey']), - tcp.dial(ma), - pull.collect((err, values) => { - expect(err).to.not.exist() - expect(values).to.eql([Buffer.from('hey!')]) - done() - }) + it('dial on IPv4', async () => { + const values = await pipe( + ['hey'], + await tcp.dial(ma), + collect ) + expect(values).to.eql([Buffer.from('hey!')]) }) - it('dial to non existent listener', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/8989') - pull( - tcp.dial(ma), - pull.onEnd((err) => { - expect(err).to.exist() - done() - }) - ) - }) - - it('dial on IPv6', (done) => { - if (isCI) { return done() } + it('dial on IPv6', async () => { + if (isCI) { + return + } const ma = multiaddr('/ip6/::/tcp/9066') const listener = tcp.createListener((conn) => { - pull(conn, conn) + pipe(conn, conn) }) - listener.listen(ma, () => { - pull( - pull.values(['hey']), - tcp.dial(ma), - pull.collect((err, values) => { - expect(err).to.not.exist() + await listener.listen(ma) - expect(values).to.be.eql([Buffer.from('hey')]) + const values = await pipe( + ['hey'], + await tcp.dial(ma), + collect + ) + expect(values).to.be.eql([Buffer.from('hey')]) - listener.close(done) - }) - ) - }) + await listener.close() }) - it('dial and destroy on listener', (done) => { - let count = 0 - const closed = () => ++count === 2 ? finish() : null + it('dial and destroy on listener', async () => { + let handled + const handledPromise = new Promise((resolve) => { + handled = resolve + }) - const ma = multiaddr('/ip6/::/tcp/9067') + const ma = multiaddr('/ip6/::/tcp/0') - const listener = tcp.createListener((conn) => { - pull( - pull.empty(), - conn, - pull.onEnd(closed) + const listener = tcp.createListener(async (conn) => { + await pipe( + [], + conn ) + handled() }) - listener.listen(ma, () => { - pull(tcp.dial(ma), pull.onEnd(closed)) - }) + await listener.listen(ma) + const addrs = listener.getAddrs() + await pipe(await tcp.dial(addrs[0])) - function finish () { - listener.close(done) - } + await handledPromise + await listener.close() }) - it('dial and destroy on dialer', (done) => { - if (isCI) { return done() } + it('dial and destroy on dialer', async () => { + if (isCI) { + return + } - let count = 0 - const destroyed = () => ++count === 2 ? finish() : null + let handled + const handledPromise = new Promise((resolve) => { + handled = resolve + }) - const ma = multiaddr('/ip6/::/tcp/9068') + const ma = multiaddr('/ip6/::/tcp/0') - const listener = tcp.createListener((conn) => { - pull(conn, pull.onEnd(destroyed)) + const listener = tcp.createListener(async (conn) => { + // pull(conn, pull.onEnd(destroyed)) + await pipe(conn) + handled() }) - listener.listen(ma, () => { - pull( - pull.empty(), - tcp.dial(ma), - pull.onEnd(destroyed) - ) - }) + await listener.listen(ma) + const addrs = listener.getAddrs() + await pipe(await tcp.dial(addrs[0])) - function finish () { - listener.close(done) - } + await handledPromise + await listener.close() }) - it('dial on IPv4 with IPFS Id', (done) => { + it('dial on IPv4 with IPFS Id', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = tcp.dial(ma) + const conn = await tcp.dial(ma) - pull( - pull.values(['hey']), + const res = await pipe( + ['hey'], conn, - pull.collect((err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql([Buffer.from('hey!')]) - done() - }) + collect ) + expect(res).to.be.eql([Buffer.from('hey!')]) }) })