diff --git a/.gitignore b/.gitignore index f427b41..9faa8e0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,46 +1,4 @@ -docs -**/node_modules/ -**/*.log -test/repo-tests* -**/bundle.js - -# yarn -yarn.lock - -# Logs -logs -*.log - -coverage - -# Runtime data -pids -*.pid -*.seed - -# Directory for instrumented libs generated by jscoverage/JSCover -lib-cov - -# Coverage directory used by tools like istanbul -coverage - -# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) -.grunt - -# node-waf configuration -.lock-wscript - -build - -# Dependency directory -# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git node_modules - -lib -dist -test/test-data/go-ipfs-repo/LOCK -test/test-data/go-ipfs-repo/LOG -test/test-data/go-ipfs-repo/LOG.old - -# while testing npm5 package-lock.json +coverage +.nyc_output diff --git a/.travis.yml b/.travis.yml index e8d642b..d78ede7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,6 +18,7 @@ os: before_script: - if [ "$TRAVIS_OS_NAME" = "linux" ]; then sudo sh -c 'echo 0 > /proc/sys/net/ipv6/conf/all/disable_ipv6'; fi + script: npx nyc -s npm run test:node -- --bail after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov @@ -27,7 +28,6 @@ jobs: os: linux script: - npx aegir build --bundlesize - - npx aegir commitlint --travis - npx aegir dep-check - npm run lint diff --git a/README.md b/README.md index cff65f9..7c47fa6 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ [![](https://raw.githubusercontent.com/libp2p/interface-connection/master/img/badge.png)](https://github.com/libp2p/interface-connection) -> JavaScript implementation of the TCP module for libp2p. It exposes the [interface-transport](https://github.com/libp2p/interface-connection) for dial/listen. `libp2p-tcp` is a very thin shim that adds support for dialing to a `multiaddr`. This small shim will enable libp2p to use other different transports. +> JavaScript implementation of the TCP module for libp2p. It exposes the [interface-transport](https://github.com/libp2p/interface-connection) for dial/listen. `libp2p-tcp` is a very thin shim that adds support for dialing to a `multiaddr`. This small shim will enable libp2p to use other transports. ## Lead Maintainer @@ -41,37 +41,33 @@ ```js const TCP = require('libp2p-tcp') const multiaddr = require('multiaddr') -const pull = require('pull-stream') +const pipe = require('it-pipe') +const { collect } = require('streaming-iterables') -const mh = multiaddr('/ip4/127.0.0.1/tcp/9090') +const addr = multiaddr('/ip4/127.0.0.1/tcp/9090') const tcp = new TCP() const listener = tcp.createListener((socket) => { console.log('new connection opened') - pull( - pull.values(['hello']), + pipe( + ['hello'], socket ) }) -listener.listen(mh, () => { - console.log('listening') - - pull( - tcp.dial(mh), - pull.collect((err, values) => { - if (!err) { - console.log(`Value: ${values.toString()}`) - } else { - console.log(`Error: ${err}`) - } - - // Close connection after reading - listener.close() - }), - ) -}) +await listener.listen(addr) +console.log('listening') + +const socket = await tcp.dial(addr) +const values = await pipe( + socket, + collect +) +console.log(`Value: ${values.toString()}`) + +// Close connection after reading +await listener.close() ``` Outputs: @@ -88,12 +84,12 @@ Value: hello [![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png)](https://github.com/libp2p/interface-transport) -`libp2p-tcp` accepts TCP addresses both IPFS and non IPFS encapsulated addresses, i.e: +`libp2p-tcp` accepts TCP addresses as both IPFS and non IPFS encapsulated addresses, i.e: `/ip4/127.0.0.1/tcp/4001` `/ip4/127.0.0.1/tcp/4001/ipfs/QmHash` -Both for dialing and listening. +(both for dialing and listening) ### Connection diff --git a/package.json b/package.json index ddc480e..13738cb 100644 --- a/package.json +++ b/package.json @@ -11,12 +11,10 @@ "release": "aegir release -t node --no-build", "release-minor": "aegir release -t node --type minor --no-build", "release-major": "aegir-release -t node --type major --no-build", - "coverage": "aegir coverage", - "coverage-publish": "aegir coverage --provider coveralls" + "coverage": "nyc --reporter=text --reporter=lcov npm run test:node" }, "pre-push": [ - "lint", - "test" + "lint" ], "repository": { "type": "git", @@ -38,20 +36,18 @@ "aegir": "^20.0.0", "chai": "^4.2.0", "dirty-chai": "^2.0.1", - "interface-transport": "~0.3.6", - "pull-stream": "^3.6.14" + "interface-transport": "^0.6.1", + "sinon": "^7.3.1" }, "dependencies": { + "abortable-iterator": "^2.1.0", "class-is": "^1.1.0", "debug": "^4.1.1", - "interface-connection": "~0.3.3", + "err-code": "^2.0.0", "ip-address": "^6.1.0", - "lodash.includes": "^4.3.0", - "lodash.isfunction": "^3.0.9", - "mafmt": "^6.0.7", - "multiaddr": "^6.1.0", - "once": "^1.4.0", - "stream-to-pull-stream": "^1.7.3" + "mafmt": "^6.0.9", + "multiaddr": "^7.1.0", + "stream-to-it": "^0.1.1" }, "contributors": [ "Alan Shaw ", diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 0000000..b7ab8fe --- /dev/null +++ b/src/constants.js @@ -0,0 +1,8 @@ +'use strict' + +// p2p multi-address code +exports.CODE_P2P = 421 +exports.CODE_CIRCUIT = 290 + +// Time to wait for a connection to close gracefully before destroying it manually +exports.CLOSE_TIMEOUT = 2000 diff --git a/src/get-multiaddr.js b/src/get-multiaddr.js deleted file mode 100644 index 4be7243..0000000 --- a/src/get-multiaddr.js +++ /dev/null @@ -1,33 +0,0 @@ -'use strict' - -const multiaddr = require('multiaddr') -const Address6 = require('ip-address').Address6 -const debug = require('debug') -const log = debug('libp2p:tcp:get-multiaddr') - -module.exports = (socket) => { - let ma - - try { - if (socket.remoteFamily === 'IPv6') { - const addr = new Address6(socket.remoteAddress) - - if (addr.v4) { - const ip4 = addr.to4().correctForm() - ma = multiaddr('/ip4/' + ip4 + - '/tcp/' + socket.remotePort - ) - } else { - ma = multiaddr('/ip6/' + socket.remoteAddress + - '/tcp/' + socket.remotePort - ) - } - } else { - ma = multiaddr('/ip4/' + socket.remoteAddress + - '/tcp/' + socket.remotePort) - } - } catch (err) { - log(err) - } - return ma -} diff --git a/src/index.js b/src/index.js index b04c223..56eabac 100644 --- a/src/index.js +++ b/src/index.js @@ -1,83 +1,137 @@ 'use strict' const net = require('net') -const toPull = require('stream-to-pull-stream') const mafmt = require('mafmt') const withIs = require('class-is') -const includes = require('lodash.includes') -const isFunction = require('lodash.isfunction') -const Connection = require('interface-connection').Connection -const once = require('once') -const debug = require('debug') -const log = debug('libp2p:tcp:dial') - +const errCode = require('err-code') +const log = require('debug')('libp2p:tcp') +const toConnection = require('./socket-to-conn') const createListener = require('./listener') +const { AbortError } = require('abortable-iterator') +const { CODE_CIRCUIT, CODE_P2P } = require('./constants') +const assert = require('assert') -function noop () {} - +/** + * @class TCP + */ class TCP { - dial (ma, options, callback) { - if (isFunction(options)) { - callback = options - options = {} - } + /** + * @constructor + * @param {object} options + * @param {Upgrader} options.upgrader + */ + constructor ({ upgrader }) { + assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.') + this._upgrader = upgrader + } - callback = once(callback || noop) + /** + * @async + * @param {Multiaddr} ma + * @param {object} options + * @param {AbortSignal} options.signal Used to abort dial requests + * @returns {Connection} An upgraded Connection + */ + async dial (ma, options) { + options = options || {} + const socket = await this._connect(ma, options) + const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal }) + log('new outbound connection %s', maConn.remoteAddr) + const conn = await this._upgrader.upgradeOutbound(maConn) + log('outbound connection %s upgraded', maConn.remoteAddr) + return conn + } - const cOpts = ma.toOptions() - log('Connecting to %s %s', cOpts.port, cOpts.host) + /** + * @private + * @param {Multiaddr} ma + * @param {object} options + * @param {AbortSignal} options.signal Used to abort dial requests + * @returns {Promise} Resolves a TCP Socket + */ + _connect (ma, options = {}) { + if (options.signal && options.signal.aborted) { + throw new AbortError() + } - const rawSocket = net.connect(cOpts) + return new Promise((resolve, reject) => { + const start = Date.now() + const cOpts = ma.toOptions() - rawSocket.once('timeout', () => { - log('timeout') - rawSocket.emit('error', new Error('Timeout')) - }) + log('dialing %s:%s', cOpts.host, cOpts.port) + const rawSocket = net.connect(cOpts) - rawSocket.once('error', callback) + const onError = err => { + err.message = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}` + done(err) + } - rawSocket.once('connect', () => { - rawSocket.removeListener('error', callback) - callback() - }) + const onTimeout = () => { + log('connnection timeout %s:%s', cOpts.host, cOpts.port) + const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT') + // Note: this will result in onError() being called + rawSocket.emit('error', err) + } - const socket = toPull.duplex(rawSocket) + const onConnect = () => { + log('connection opened %s:%s', cOpts.host, cOpts.port) + done() + } - const conn = new Connection(socket) + const onAbort = () => { + log('connection aborted %s:%s', cOpts.host, cOpts.port) + rawSocket.destroy() + done(new AbortError()) + } - conn.getObservedAddrs = (callback) => { - return callback(null, [ma]) - } + const done = err => { + rawSocket.removeListener('error', onError) + rawSocket.removeListener('timeout', onTimeout) + rawSocket.removeListener('connect', onConnect) + options.signal && options.signal.removeEventListener('abort', onAbort) - return conn + if (err) return reject(err) + resolve(rawSocket) + } + + rawSocket.on('error', onError) + rawSocket.on('timeout', onTimeout) + rawSocket.on('connect', onConnect) + options.signal && options.signal.addEventListener('abort', onAbort) + }) } + /** + * Creates a TCP listener. The provided `handler` function will be called + * anytime a new incoming Connection has been successfully upgraded via + * `upgrader.upgradeInbound`. + * @param {*} [options] + * @param {function(Connection)} handler + * @returns {Listener} A TCP listener + */ createListener (options, handler) { - if (isFunction(options)) { + if (typeof options === 'function') { handler = options options = {} } - - handler = handler || noop - - return createListener(handler) + options = options || {} + return createListener({ handler, upgrader: this._upgrader }, options) } + /** + * Takes a list of `Multiaddr`s and returns only valid TCP addresses + * @param {Multiaddr[]} multiaddrs + * @returns {Multiaddr[]} Valid TCP multiaddrs + */ filter (multiaddrs) { - if (!Array.isArray(multiaddrs)) { - multiaddrs = [multiaddrs] - } + multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] - return multiaddrs.filter((ma) => { - if (includes(ma.protoNames(), 'p2p-circuit')) { + return multiaddrs.filter(ma => { + if (ma.protoCodes().includes(CODE_CIRCUIT)) { return false } - if (includes(ma.protoNames(), 'ipfs')) { - ma = ma.decapsulate('ipfs') - } - - return mafmt.TCP.matches(ma) + return mafmt.TCP.matches(ma.decapsulateCode(CODE_P2P)) }) } } diff --git a/src/ip-port-to-multiaddr.js b/src/ip-port-to-multiaddr.js new file mode 100644 index 0000000..6d980c4 --- /dev/null +++ b/src/ip-port-to-multiaddr.js @@ -0,0 +1,30 @@ +'use strict' + +const multiaddr = require('multiaddr') +const { Address4, Address6 } = require('ip-address') + +module.exports = (ip, port) => { + if (typeof ip !== 'string') { + throw new Error('invalid ip') + } + + port = parseInt(port) + + if (isNaN(port)) { + throw new Error('invalid port') + } + + if (new Address4(ip).isValid()) { + return multiaddr(`/ip4/${ip}/tcp/${port}`) + } + + const ip6 = new Address6(ip) + + if (ip6.isValid()) { + return ip6.is4() + ? multiaddr(`/ip4/${ip6.to4().correctForm()}/tcp/${port}`) + : multiaddr(`/ip6/${ip}/tcp/${port}`) + } + + throw new Error('invalid ip') +} diff --git a/src/listener.js b/src/listener.js index 30b3076..ac675f8 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,156 +1,122 @@ 'use strict' const multiaddr = require('multiaddr') -const Connection = require('interface-connection').Connection const os = require('os') -const includes = require('lodash.includes') const net = require('net') -const toPull = require('stream-to-pull-stream') -const EventEmitter = require('events').EventEmitter -const debug = require('debug') -const log = debug('libp2p:tcp:listen') +const EventEmitter = require('events') +const log = require('debug')('libp2p:tcp:listener') +const toConnection = require('./socket-to-conn') +const { CODE_P2P } = require('./constants') +const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' } -const getMultiaddr = require('./get-multiaddr') - -const IPFS_CODE = 421 -const CLOSE_TIMEOUT = 2000 - -function noop () {} - -module.exports = (handler) => { +module.exports = ({ handler, upgrader }, options) => { const listener = new EventEmitter() - const server = net.createServer((socket) => { - // Avoid uncaught errors cause by unstable connections - socket.on('error', noop) - - const addr = getMultiaddr(socket) - if (!addr) { - if (socket.remoteAddress === undefined) { - log('connection closed before p2p connection made') - } else { - log('error interpreting incoming p2p connection') - } - return - } - - log('new connection', addr.toString()) + const server = net.createServer(async socket => { + // Avoid uncaught errors caused by unstable connections + socket.on('error', err => log('socket error', err)) - const s = toPull.duplex(socket) + const maConn = toConnection(socket) + log('new inbound connection %s', maConn.remoteAddr) - s.getObservedAddrs = (cb) => { - cb(null, [addr]) - } + const conn = await upgrader.upgradeInbound(maConn) + log('inbound connection %s upgraded', maConn.remoteAddr) - trackSocket(server, socket) + trackConn(server, maConn) - const conn = new Connection(s) - handler(conn) + if (handler) handler(conn) listener.emit('connection', conn) }) - server.on('listening', () => listener.emit('listening')) - server.on('error', (err) => listener.emit('error', err)) - server.on('close', () => listener.emit('close')) + server + .on('listening', () => listener.emit('listening')) + .on('error', err => listener.emit('error', err)) + .on('close', () => listener.emit('close')) // Keep track of open connections to destroy in case of timeout - server.__connections = {} + server.__connections = [] - listener.close = (options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - callback = callback || noop - options = options || {} - - const timeout = setTimeout(() => { - log('unable to close graciously, destroying conns') - Object.keys(server.__connections).forEach((key) => { - log('destroying %s', key) - server.__connections[key].destroy() - }) - }, options.timeout || CLOSE_TIMEOUT) - - server.close(callback) + listener.close = () => { + if (!server.listening) return - server.once('close', () => { - clearTimeout(timeout) + return new Promise((resolve, reject) => { + server.__connections.forEach(maConn => maConn.close()) + server.close(err => err ? reject(err) : resolve()) }) } - let ipfsId - let listeningAddr + let peerId, listeningAddr - listener.listen = (ma, callback) => { + listener.listen = ma => { listeningAddr = ma - if (includes(ma.protoNames(), 'ipfs')) { - ipfsId = getIpfsId(ma) - listeningAddr = ma.decapsulate('ipfs') + peerId = ma.getPeerId() + + if (peerId) { + listeningAddr = ma.decapsulateCode(CODE_P2P) } - const lOpts = listeningAddr.toOptions() - log('Listening on %s %s', lOpts.port, lOpts.host) - return server.listen(lOpts.port, lOpts.host, callback) + return new Promise((resolve, reject) => { + const { host, port } = listeningAddr.toOptions() + server.listen(port, host, err => { + if (err) return reject(err) + log('Listening on %s %s', port, host) + resolve() + }) + }) } - listener.getAddrs = (callback) => { - const multiaddrs = [] + listener.getAddrs = () => { + let addrs = [] const address = server.address() if (!address) { - return callback(new Error('Listener is not ready yet')) + throw new Error('Listener is not ready yet') } // Because TCP will only return the IPv6 version // we need to capture from the passed multiaddr - if (listeningAddr.toString().indexOf('ip4') !== -1) { - let m = listeningAddr.decapsulate('tcp') - m = m.encapsulate('/tcp/' + address.port) - if (ipfsId) { - m = m.encapsulate('/ipfs/' + ipfsId) - } - - if (m.toString().indexOf('0.0.0.0') !== -1) { - const netInterfaces = os.networkInterfaces() - Object.keys(netInterfaces).forEach((niKey) => { - netInterfaces[niKey].forEach((ni) => { - if (ni.family === 'IPv4') { - multiaddrs.push(multiaddr(m.toString().replace('0.0.0.0', ni.address))) - } - }) - }) - } else { - multiaddrs.push(m) - } + if (listeningAddr.toString().startsWith('/ip4')) { + addrs = addrs.concat(getMulitaddrs('ip4', address.address, address.port)) + } else if (address.family === 'IPv6') { + addrs = addrs.concat(getMulitaddrs('ip6', address.address, address.port)) } - if (address.family === 'IPv6') { - let ma = multiaddr('/ip6/' + address.address + '/tcp/' + address.port) - if (ipfsId) { - ma = ma.encapsulate('/ipfs/' + ipfsId) - } - - multiaddrs.push(ma) - } - - callback(null, multiaddrs) + return addrs.map(ma => peerId ? ma.encapsulate(`/p2p/${peerId}`) : ma) } return listener } -function getIpfsId (ma) { - return ma.stringTuples().filter((tuple) => { - return tuple[0] === IPFS_CODE - })[0][1] +function getMulitaddrs (proto, ip, port) { + const toMa = ip => multiaddr(`/${proto}/${ip}/tcp/${port}`) + return (isAnyAddr(ip) ? getNetworkAddrs(ProtoFamily[proto]) : [ip]).map(toMa) } -function trackSocket (server, socket) { - const key = `${socket.remoteAddress}:${socket.remotePort}` - server.__connections[key] = socket +function isAnyAddr (ip) { + return ['0.0.0.0', '::'].includes(ip) +} - socket.on('close', () => { - delete server.__connections[key] - }) +/** + * @private + * @param {string} family One of ['IPv6', 'IPv4'] + * @returns {string[]} an array of ip address strings + */ +function getNetworkAddrs (family) { + return Object.values(os.networkInterfaces()).reduce((addresses, netAddrs) => { + netAddrs.forEach(netAddr => { + // Add the ip of each matching network interface + if (netAddr.family === family) addresses.push(netAddr.address) + }) + return addresses + }, []) +} + +function trackConn (server, maConn) { + server.__connections.push(maConn) + + const untrackConn = () => { + server.__connections = server.__connections.filter(c => c !== maConn) + } + + maConn.conn.once('close', untrackConn) } diff --git a/src/socket-to-conn.js b/src/socket-to-conn.js new file mode 100644 index 0000000..83e9a4d --- /dev/null +++ b/src/socket-to-conn.js @@ -0,0 +1,83 @@ +'use strict' + +const abortable = require('abortable-iterator') +const log = require('debug')('libp2p:tcp:socket') +const toIterable = require('stream-to-it') +const toMultiaddr = require('./ip-port-to-multiaddr') +const { CLOSE_TIMEOUT } = require('./constants') + +// Convert a socket into a MultiaddrConnection +// https://github.com/libp2p/interface-transport#multiaddrconnection +module.exports = (socket, options) => { + options = options || {} + + const { sink, source } = toIterable.duplex(socket) + const maConn = { + async sink (source) { + if (options.signal) { + source = abortable(source, options.signal) + } + + try { + await sink((async function * () { + for await (const chunk of source) { + // Convert BufferList to Buffer + yield Buffer.isBuffer(chunk) ? chunk : chunk.slice() + } + })()) + } catch (err) { + // If aborted we can safely ignore + if (err.type !== 'aborted') { + // If the source errored the socket will already have been destroyed by + // toIterable.duplex(). If the socket errored it will already be + // destroyed. There's nothing to do here except log the error & return. + log(err) + } + } + }, + + source: options.signal ? abortable(source, options.signal) : source, + + conn: socket, + + localAddr: toMultiaddr(socket.localAddress, socket.localPort), + + // If the remote address was passed, use it - it may have the peer ID encapsulated + remoteAddr: options.remoteAddr || toMultiaddr(socket.remoteAddress, socket.remotePort), + + timeline: { open: Date.now() }, + + close () { + if (socket.destroyed) return + + return new Promise((resolve, reject) => { + const start = Date.now() + + // Attempt to end the socket. If it takes longer to close than the + // timeout, destroy it manually. + const timeout = setTimeout(() => { + const { host, port } = maConn.remoteAddr.toOptions() + log('timeout closing socket to %s:%s after %dms, destroying it manually', + host, port, Date.now() - start) + + if (socket.destroyed) { + log('%s:%s is already destroyed', host, port) + } else { + socket.destroy() + } + + resolve() + }, CLOSE_TIMEOUT) + + socket.once('close', () => clearTimeout(timeout)) + socket.end(err => { + maConn.timeline.close = Date.now() + if (err) return reject(err) + resolve() + }) + }) + } + } + + return maConn +} diff --git a/test/compliance.spec.js b/test/compliance.spec.js index 1339086..b337d3f 100644 --- a/test/compliance.spec.js +++ b/test/compliance.spec.js @@ -1,24 +1,43 @@ /* eslint-env mocha */ 'use strict' +const sinon = require('sinon') const tests = require('interface-transport') const multiaddr = require('multiaddr') +const net = require('net') const TCP = require('../src') describe('interface-transport compliance', () => { tests({ - setup (cb) { - const tcp = new TCP() + setup ({ upgrader }) { + const tcp = new TCP({ upgrader }) const addrs = [ multiaddr('/ip4/127.0.0.1/tcp/9091'), multiaddr('/ip4/127.0.0.1/tcp/9092'), multiaddr('/ip4/127.0.0.1/tcp/9093'), multiaddr('/dns4/ipfs.io') ] - cb(null, tcp, addrs) - }, - teardown (cb) { - cb() + + // Used by the dial tests to simulate a delayed connect + const connector = { + delay (delayMs) { + const netConnect = net.connect + sinon.replace(net, 'connect', (opts) => { + const socket = netConnect(opts) + const socketEmit = socket.emit.bind(socket) + sinon.replace(socket, 'emit', (...args) => { + const time = args[0] === 'connect' ? delayMs : 0 + setTimeout(() => socketEmit(...args), time) + }) + return socket + }) + }, + restore () { + sinon.restore() + } + } + + return { transport: tcp, addrs, connector } } }) }) diff --git a/test/connection-wrap.spec.js b/test/connection-wrap.spec.js deleted file mode 100644 index ff141fb..0000000 --- a/test/connection-wrap.spec.js +++ /dev/null @@ -1,121 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const pull = require('pull-stream') -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const TCP = require('../src') -const multiaddr = require('multiaddr') -const Connection = require('interface-connection').Connection - -describe('Connection Wrap', () => { - let tcp - let listener - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090') - - beforeEach((done) => { - tcp = new TCP() - listener = tcp.createListener((conn) => { - pull(conn, conn) - }) - listener.on('listening', done) - listener.listen(ma) - }) - - afterEach((done) => { - listener.close(done) - }) - - it('simple wrap', (done) => { - const conn = tcp.dial(ma) - conn.setPeerInfo('peerInfo') - const connWrap = new Connection(conn) - pull( - pull.values(['hey']), - connWrap, - pull.collect((err, chunks) => { - expect(err).to.not.exist() - expect(chunks).to.be.eql([Buffer.from('hey')]) - - connWrap.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('peerInfo') - done() - }) - }) - ) - }) - - it('buffer wrap', (done) => { - const conn = tcp.dial(ma) - const connWrap = new Connection() - pull( - pull.values(['hey']), - connWrap, - pull.collect((err, chunks) => { - expect(err).to.not.exist() - expect(chunks).to.be.eql([Buffer.from('hey')]) - done() - }) - ) - - connWrap.setInnerConn(conn) - }) - - it('overload wrap', (done) => { - const conn = tcp.dial(ma) - const connWrap = new Connection(conn) - connWrap.getPeerInfo = (callback) => { - callback(null, 'none') - } - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - }) - connWrap.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('none') - }) - pull( - pull.values(['hey']), - connWrap, - pull.collect((err, chunks) => { - expect(err).to.not.exist() - expect(chunks).to.be.eql([Buffer.from('hey')]) - done() - }) - ) - }) - - it('dial error', (done) => { - tcp.dial(multiaddr('/ip4/127.0.0.1/tcp/22234'), (err) => { - expect(err).to.exist() - done() - }) - }) - - it('matryoshka wrap', (done) => { - const conn = tcp.dial(ma) - const connWrap1 = new Connection(conn) - const connWrap2 = new Connection(connWrap1) - const connWrap3 = new Connection(connWrap2) - - conn.getPeerInfo = (callback) => { - callback(null, 'inner doll') - } - pull( - pull.values(['hey']), - connWrap3, - pull.collect((err, chunks) => { - expect(err).to.not.exist() - expect(chunks).to.eql([Buffer.from('hey')]) - connWrap3.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('inner doll') - done() - }) - }) - ) - }) -}) diff --git a/test/connection.spec.js b/test/connection.spec.js index 5100cb1..870e0ac 100644 --- a/test/connection.spec.js +++ b/test/connection.spec.js @@ -1,7 +1,6 @@ /* eslint-env mocha */ 'use strict' -const pull = require('pull-stream') const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect @@ -9,103 +8,49 @@ chai.use(dirtyChai) const TCP = require('../src') const multiaddr = require('multiaddr') -describe('valid Connection', () => { +describe('valid localAddr and remoteAddr', () => { let tcp + const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn + } + beforeEach(() => { - tcp = new TCP() + tcp = new TCP({ upgrader: mockUpgrader }) }) - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090') - - it('get observed addrs', (done) => { - let dialerObsAddrs - - const listener = tcp.createListener((conn) => { - expect(conn).to.exist() - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - dialerObsAddrs = addrs - pull(pull.empty(), conn) - }) - }) - - listener.listen(ma, () => { - const conn = tcp.dial(ma) - pull( - conn, - pull.onEnd(endHandler) - ) + const ma = multiaddr('/ip4/127.0.0.1/tcp/0') - function endHandler () { - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - pull(pull.empty(), conn) - closeAndAssert(listener, addrs) - }) - } + it('should resolve port 0', async () => { + // Create a Promise that resolves when a connection is handled + let handled + const handlerPromise = new Promise(resolve => { handled = resolve }) - function closeAndAssert (listener, addrs) { - listener.close(() => { - expect(addrs[0]).to.deep.equal(ma) - expect(dialerObsAddrs.length).to.equal(1) - done() - }) - } - }) - }) - - it('get Peer Info', (done) => { - const listener = tcp.createListener((conn) => { - expect(conn).to.exist() - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - expect(peerInfo).to.not.exist() - pull(pull.empty(), conn) - }) - }) + const handler = conn => handled(conn) - listener.listen(ma, () => { - const conn = tcp.dial(ma) + // Create a listener with the handler + const listener = tcp.createListener(handler) - pull(conn, pull.onEnd(endHandler)) + // Listen on the multi-address + await listener.listen(ma) - function endHandler () { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - expect(peerInfo).to.not.exist() - - listener.close(done) - }) - } - }) - }) + const localAddrs = listener.getAddrs() + expect(localAddrs.length).to.equal(1) - it('set Peer Info', (done) => { - const listener = tcp.createListener((conn) => { - expect(conn).to.exist() - conn.setPeerInfo('batatas') - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('batatas') - pull(pull.empty(), conn) - }) - }) + // Dial to that address + const dialerConn = await tcp.dial(localAddrs[0]) - listener.listen(ma, () => { - const conn = tcp.dial(ma) + // Wait for the incoming dial to be handled + const listenerConn = await handlerPromise - pull(conn, pull.onEnd(endHandler)) + // Close the listener + await listener.close() - function endHandler () { - conn.setPeerInfo('arroz') - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('arroz') + expect(dialerConn.localAddr.toString()) + .to.equal(listenerConn.remoteAddr.toString()) - listener.close(done) - }) - } - }) + expect(dialerConn.remoteAddr.toString()) + .to.equal(listenerConn.localAddr.toString()) }) }) diff --git a/test/constructor.spec.js b/test/constructor.spec.js deleted file mode 100644 index 6183ad1..0000000 --- a/test/constructor.spec.js +++ /dev/null @@ -1,15 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const TCP = require('../src') - -describe('Constructor', () => { - it('create an instance', () => { - const tcp = new TCP() - expect(tcp).to.exist() - }) -}) diff --git a/test/filter.spec.js b/test/filter.spec.js index b6ce629..9539d1c 100644 --- a/test/filter.spec.js +++ b/test/filter.spec.js @@ -15,7 +15,7 @@ describe('filter addrs', () => { let tcp before(() => { - tcp = new TCP() + tcp = new TCP({ upgrader: {} }) }) it('filter valid addrs for this transport', () => { diff --git a/test/get-multiaddr.spec.js b/test/get-multiaddr.spec.js deleted file mode 100644 index d7199ac..0000000 --- a/test/get-multiaddr.spec.js +++ /dev/null @@ -1,54 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const getMultiaddr = require('../src/get-multiaddr') - -const goodSocket4 = { - remoteAddress: '127.0.0.1', - remotePort: '9090', - remoteFamily: 'IPv4' -} - -const goodSocket6 = { - remoteAddress: '::1', - remotePort: '9090', - remoteFamily: 'IPv6' -} - -const badSocket = {} - -const badSocketData = { - remoteAddress: 'aewmrn4awoew', - remotePort: '234', - remoteFamily: 'Hufflepuff' -} - -describe('getMultiaddr multiaddr creation', () => { - it('creates multiaddr from valid socket data', (done) => { - expect(getMultiaddr(goodSocket4)) - .to.exist() - done() - }) - - it('creates multiaddr from valid IPv6 socket data', (done) => { - expect(getMultiaddr(goodSocket6)) - .to.exist() - done() - }) - - it('returns undefined multiaddr from missing socket data', (done) => { - expect(getMultiaddr(badSocket)) - .to.equal(undefined) - done() - }) - - it('returns undefined multiaddr from unparseable socket data', (done) => { - expect(getMultiaddr(badSocketData)) - .to.equal(undefined) - done() - }) -}) diff --git a/test/ip-port-to-multiaddr.spec.js b/test/ip-port-to-multiaddr.spec.js new file mode 100644 index 0000000..657e569 --- /dev/null +++ b/test/ip-port-to-multiaddr.spec.js @@ -0,0 +1,50 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const toMultiaddr = require('../src/ip-port-to-multiaddr') + +describe('IP and port to Multiaddr', () => { + it('creates multiaddr from valid IPv4 IP and port', () => { + const ip = '127.0.0.1' + const port = '9090' + expect(toMultiaddr(ip, port).toString()).to.equal(`/ip4/${ip}/tcp/${port}`) + }) + + it('creates multiaddr from valid IPv4 IP and numeric port', () => { + const ip = '127.0.0.1' + const port = 9090 + expect(toMultiaddr(ip, port).toString()).to.equal(`/ip4/${ip}/tcp/${port}`) + }) + + it('creates multiaddr from valid IPv4 in IPv6 IP and port', () => { + const ip = '0:0:0:0:0:0:101.45.75.219' + const port = '9090' + expect(toMultiaddr(ip, port).toString()).to.equal(`/ip4/101.45.75.219/tcp/${port}`) + }) + + it('creates multiaddr from valid IPv6 IP and port', () => { + const ip = '::1' + const port = '9090' + expect(toMultiaddr(ip, port).toString()).to.equal(`/ip6/${ip}/tcp/${port}`) + }) + + it('throws for missing IP address', () => { + expect(() => toMultiaddr()).to.throw('invalid ip') + }) + + it('throws for invalid IP address', () => { + const ip = 'aewmrn4awoew' + const port = '234' + expect(() => toMultiaddr(ip, port)).to.throw('invalid ip') + }) + + it('throws for invalid port', () => { + const ip = '127.0.0.1' + const port = 'garbage' + expect(() => toMultiaddr(ip, port)).to.throw('invalid port') + }) +}) diff --git a/test/listen-dial.spec.js b/test/listen-dial.spec.js index 7d8f119..4f90413 100644 --- a/test/listen-dial.spec.js +++ b/test/listen-dial.spec.js @@ -1,7 +1,6 @@ /* eslint-env mocha */ 'use strict' -const pull = require('pull-stream') const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect @@ -9,122 +8,133 @@ chai.use(dirtyChai) const TCP = require('../src') const net = require('net') const multiaddr = require('multiaddr') +const pipe = require('it-pipe') +const { collect, map } = require('streaming-iterables') const isCI = process.env.CI +describe('construction', () => { + it('requires an upgrader', () => { + expect(() => new TCP()).to.throw() + }) +}) + describe('listen', () => { let tcp beforeEach(() => { - tcp = new TCP() + tcp = new TCP({ + upgrader: { + upgradeOutbound: maConn => maConn, + upgradeInbound: maConn => maConn + } + }) }) - it('close listener with connections, through timeout', (done) => { + it('close listener with connections, through timeout', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = tcp.createListener((conn) => { - pull(conn, conn) + pipe(conn, conn) }) - listener.listen(mh, () => { - const socket1 = net.connect(9090) - const socket2 = net.connect(9090) + await listener.listen(mh) - socket1.write('Some data that is never handled') - socket1.end() - socket1.on('error', () => {}) - socket2.on('error', () => {}) - socket1.on('connect', () => { - listener.close(done) + const socket1 = net.connect(9090) + const socket2 = net.connect(9090) + + socket1.write('Some data that is never handled') + socket1.end() + socket1.on('error', () => {}) + socket2.on('error', () => {}) + + await new Promise((resolve) => { + socket1.on('connect', async () => { + await listener.close({ timeout: 100 }) + resolve() }) }) }) - it('listen on port 0', (done) => { + it('listen on port 0', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/0') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.close(done) - }) + await listener.listen(mh) + await listener.close() }) - it('listen on IPv6 addr', (done) => { - if (isCI) { return done() } + it('listen on IPv6 addr', async () => { + if (isCI) { + return + } const mh = multiaddr('/ip6/::/tcp/9090') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.close(done) - }) + await listener.listen(mh) + await listener.close() }) - it('listen on any Interface', (done) => { + it('listen on any Interface', async () => { const mh = multiaddr('/ip4/0.0.0.0/tcp/9090') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.close(done) - }) + await listener.listen(mh) + await listener.close() }) - it('getAddrs', (done) => { + it('getAddrs', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.getAddrs((err, multiaddrs) => { - expect(err).to.not.exist() - expect(multiaddrs.length).to.equal(1) - expect(multiaddrs[0]).to.deep.equal(mh) - listener.close(done) - }) - }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length).to.equal(1) + expect(multiaddrs[0]).to.deep.equal(mh) + + await listener.close() }) - it('getAddrs on port 0 listen', (done) => { + it('getAddrs on port 0 listen', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/0') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.getAddrs((err, multiaddrs) => { - expect(err).to.not.exist() - expect(multiaddrs.length).to.equal(1) - listener.close(done) - }) - }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length).to.equal(1) + + await listener.close() }) - it('getAddrs from listening on 0.0.0.0', (done) => { + it('getAddrs from listening on 0.0.0.0', async () => { const mh = multiaddr('/ip4/0.0.0.0/tcp/9090') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.getAddrs((err, multiaddrs) => { - expect(err).to.not.exist() - expect(multiaddrs.length > 0).to.equal(true) - expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) - listener.close(done) - }) - }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length > 0).to.equal(true) + expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) + + await listener.close() }) - it('getAddrs from listening on 0.0.0.0 and port 0', (done) => { + it('getAddrs from listening on 0.0.0.0 and port 0', async () => { const mh = multiaddr('/ip4/0.0.0.0/tcp/0') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.getAddrs((err, multiaddrs) => { - expect(err).to.not.exist() - expect(multiaddrs.length > 0).to.equal(true) - expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) - listener.close(done) - }) - }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length > 0).to.equal(true) + expect(multiaddrs[0].toString().indexOf('0.0.0.0')).to.equal(-1) + + await listener.close() }) - it('getAddrs preserves IPFS Id', (done) => { + it('getAddrs preserves IPFS Id', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = tcp.createListener((conn) => {}) - listener.listen(mh, () => { - listener.getAddrs((err, multiaddrs) => { - expect(err).to.not.exist() - expect(multiaddrs.length).to.equal(1) - expect(multiaddrs[0]).to.deep.equal(mh) - listener.close(done) - }) - }) + await listener.listen(mh) + + const multiaddrs = listener.getAddrs() + expect(multiaddrs.length).to.equal(1) + expect(multiaddrs[0]).to.deep.equal(mh) + + await listener.close() }) }) @@ -133,127 +143,114 @@ describe('dial', () => { let listener const ma = multiaddr('/ip4/127.0.0.1/tcp/9090') - beforeEach((done) => { - tcp = new TCP() + beforeEach(async () => { + tcp = new TCP({ + upgrader: { + upgradeOutbound: maConn => maConn, + upgradeInbound: maConn => maConn + } + }) listener = tcp.createListener((conn) => { - pull( + pipe( conn, - pull.map((x) => Buffer.from(x.toString() + '!')), + map((x) => Buffer.from(x.toString() + '!')), conn ) }) - listener.listen(ma, done) + await listener.listen(ma) }) - afterEach((done) => { - listener.close(done) - }) - - it('dial on IPv4', (done) => { - pull( - pull.values(['hey']), - tcp.dial(ma), - pull.collect((err, values) => { - expect(err).to.not.exist() - expect(values).to.eql([Buffer.from('hey!')]) - done() - }) - ) - }) + afterEach(() => listener.close()) - it('dial to non existent listener', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/8989') - pull( - tcp.dial(ma), - pull.onEnd((err) => { - expect(err).to.exist() - done() - }) + it('dial on IPv4', async () => { + const values = await pipe( + ['hey'], + await tcp.dial(ma), + collect ) + expect(values).to.eql([Buffer.from('hey!')]) }) - it('dial on IPv6', (done) => { - if (isCI) { return done() } + it('dial on IPv6', async () => { + if (isCI) { + return + } const ma = multiaddr('/ip6/::/tcp/9066') const listener = tcp.createListener((conn) => { - pull(conn, conn) + pipe(conn, conn) }) - listener.listen(ma, () => { - pull( - pull.values(['hey']), - tcp.dial(ma), - pull.collect((err, values) => { - expect(err).to.not.exist() + await listener.listen(ma) - expect(values).to.be.eql([Buffer.from('hey')]) + const values = await pipe( + ['hey'], + await tcp.dial(ma), + collect + ) + expect(values).to.be.eql([Buffer.from('hey')]) - listener.close(done) - }) - ) - }) + await listener.close() }) - it('dial and destroy on listener', (done) => { - let count = 0 - const closed = () => ++count === 2 ? finish() : null + it('dial and destroy on listener', async () => { + let handled + const handledPromise = new Promise((resolve) => { + handled = resolve + }) - const ma = multiaddr('/ip6/::/tcp/9067') + const ma = multiaddr('/ip6/::/tcp/0') - const listener = tcp.createListener((conn) => { - pull( - pull.empty(), - conn, - pull.onEnd(closed) + const listener = tcp.createListener(async (conn) => { + await pipe( + [], + conn ) + handled() }) - listener.listen(ma, () => { - pull(tcp.dial(ma), pull.onEnd(closed)) - }) + await listener.listen(ma) + const addrs = listener.getAddrs() + await pipe(await tcp.dial(addrs[0])) - function finish () { - listener.close(done) - } + await handledPromise + await listener.close() }) - it('dial and destroy on dialer', (done) => { - if (isCI) { return done() } + it('dial and destroy on dialer', async () => { + if (isCI) { + return + } - let count = 0 - const destroyed = () => ++count === 2 ? finish() : null + let handled + const handledPromise = new Promise((resolve) => { + handled = resolve + }) - const ma = multiaddr('/ip6/::/tcp/9068') + const ma = multiaddr('/ip6/::/tcp/0') - const listener = tcp.createListener((conn) => { - pull(conn, pull.onEnd(destroyed)) + const listener = tcp.createListener(async (conn) => { + // pull(conn, pull.onEnd(destroyed)) + await pipe(conn) + handled() }) - listener.listen(ma, () => { - pull( - pull.empty(), - tcp.dial(ma), - pull.onEnd(destroyed) - ) - }) + await listener.listen(ma) + const addrs = listener.getAddrs() + await pipe(await tcp.dial(addrs[0])) - function finish () { - listener.close(done) - } + await handledPromise + await listener.close() }) - it('dial on IPv4 with IPFS Id', (done) => { + it('dial on IPv4 with IPFS Id', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = tcp.dial(ma) + const conn = await tcp.dial(ma) - pull( - pull.values(['hey']), + const res = await pipe( + ['hey'], conn, - pull.collect((err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql([Buffer.from('hey!')]) - done() - }) + collect ) + expect(res).to.be.eql([Buffer.from('hey!')]) }) }) diff --git a/test/turbolence.spec.js b/test/turbolence.spec.js deleted file mode 100644 index e54bae0..0000000 --- a/test/turbolence.spec.js +++ /dev/null @@ -1,16 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -// const TCP = require('../src') - -describe.skip('turbolence', () => { - it('dialer - emits error on the other end is terminated abruptly', (done) => { - expect('ok').to.equal('ok') - }) - - it('listener - emits error on the other end is terminated abruptly', (done) => {}) -})