diff --git a/.travis.yml b/.travis.yml index 5102ee5..99fe0ef 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,32 +1,33 @@ -# Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories. -sudo: false language: node_js +cache: npm -matrix: - include: - - node_js: 6 - env: CXX=g++-4.8 - - node_js: 8 - env: CXX=g++-4.8 - # - node_js: stable - # env: CXX=g++-4.8 +stages: + - check + - test + - cov + +node_js: + - '10' -script: - - npm run lint - - npm run test - - npm run coverage +os: + - linux + - osx -before_script: - - export DISPLAY=:99.0 - - sh -e /etc/init.d/xvfb start +script: npx nyc -s npm run test:node -- --bail +after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov + +jobs: + include: + - os: windows + filter_secrets: false + cache: false -after_success: - - npm run coverage-publish + - stage: check + script: + - npx aegir build --bundlesize + - npx aegir commitlint --travis + - npx aegir dep-check + - npm run lint -addons: - firefox: 'latest' - apt: - sources: - - ubuntu-toolchain-r-test - packages: - - g++-4.8 +notifications: + email: false diff --git a/package.json b/package.json index a07eaf3..146bea1 100644 --- a/package.json +++ b/package.json @@ -6,12 +6,15 @@ "main": "src/index.js", "scripts": { "lint": "aegir lint", - "test": "aegir test -t node -f test/**/*.js", - "release": "aegir release -t node --no-build", - "release-minor": "aegir release -t node --type minor --no-build", - "release-major": "aegir-release -t node --type major --no-build", + "test": "aegir test -t node", + "test:node": "aegir test -t node", + "build": "aegir build", + "docs": "aegir docs", + "release": "aegir release --docs -t node", + "release-minor": "aegir release --type minor --docs -t node", + "release-major": "aegir release --type major --docs -t node", "coverage": "aegir coverage", - "coverage-publish": "aegir coverage --provider coveralls" + "coverage-publish": "aegir-coverage publish" }, "pre-push": [ "lint", @@ -34,7 +37,7 @@ "npm": ">=3.0.0" }, "devDependencies": { - "aegir": "^15.1.0", + "aegir": "^18.1.1", "chai": "^4.1.2", "dirty-chai": "^2.0.1", "interface-transport": "~0.3.6", @@ -42,16 +45,16 @@ "pull-stream": "^3.6.9" }, "dependencies": { + "async-iterator-to-pull-stream": "^1.3.0", "class-is": "^1.1.0", "debug": "^3.1.0", + "err-code": "^1.1.2", "interface-connection": "~0.3.2", "ip-address": "^5.8.9", "lodash.includes": "^4.3.0", "lodash.isfunction": "^3.0.9", "mafmt": "^6.0.2", - "multiaddr": "^5.0.0", - "once": "^1.4.0", - "stream-to-pull-stream": "^1.7.2" + "multiaddr": "^5.0.0" }, "contributors": [ "Alan Shaw ", diff --git a/src/adapter.js b/src/adapter.js new file mode 100644 index 0000000..f1cfa6f --- /dev/null +++ b/src/adapter.js @@ -0,0 +1,80 @@ +'use strict' + +const { Connection } = require('interface-connection') +const withIs = require('class-is') +const toPull = require('async-iterator-to-pull-stream') +const error = require('pull-stream/sources/error') +const drain = require('pull-stream/sinks/drain') +const TCP = require('./') +const noop = () => {} + +function callbackify (fn) { + return async function (...args) { + let cb = args.pop() + if (typeof cb !== 'function') { + args.push(cb) + cb = noop + } + let res + try { + res = await fn(...args) + } catch (err) { + return cb(err) + } + cb(null, res) + } +} + +// Legacy adapter to old transport & connection interface +class TcpAdapter extends TCP { + dial (ma, options, callback) { + if (typeof options === 'function') { + callback = options + options = {} + } + + callback = callback || noop + + const conn = new Connection() + + super.dial(ma, options) + .then(socket => { + conn.setInnerConn(toPull.duplex(socket)) + conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) + conn.close = callbackify(socket.close.bind(socket)) + callback(null, conn) + }) + .catch(err => { + conn.setInnerConn({ sink: drain(), source: error(err) }) + callback(err) + }) + + return conn + } + + createListener (options, handler) { + if (typeof options === 'function') { + handler = options + options = {} + } + + const server = super.createListener(options, socket => { + const conn = new Connection(toPull.duplex(socket)) + conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) + handler(conn) + }) + + const proxy = { + listen: callbackify(server.listen.bind(server)), + close: callbackify(server.close.bind(server)), + getAddrs: callbackify(server.getAddrs.bind(server)), + getObservedAddrs: callbackify(() => server.getObservedAddrs()) + } + + return new Proxy(server, { get: (_, prop) => proxy[prop] || server[prop] }) + } +} +module.exports = withIs(TcpAdapter, { + className: 'TCP', + symbolName: '@libp2p/js-libp2p-tcp/tcp' +}) diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 0000000..3e5cb6e --- /dev/null +++ b/src/constants.js @@ -0,0 +1,8 @@ +'use strict' + +// IPFS multi-address code +module.exports.IPFS_MA_CODE = 421 + +// Time to wait for a connection to close gracefully before destroying it +// manually +module.exports.CLOSE_TIMEOUT = 2000 diff --git a/src/index.js b/src/index.js index b04c223..51a8706 100644 --- a/src/index.js +++ b/src/index.js @@ -1,55 +1,61 @@ 'use strict' const net = require('net') -const toPull = require('stream-to-pull-stream') const mafmt = require('mafmt') const withIs = require('class-is') const includes = require('lodash.includes') const isFunction = require('lodash.isfunction') -const Connection = require('interface-connection').Connection -const once = require('once') +const errcode = require('err-code') const debug = require('debug') const log = debug('libp2p:tcp:dial') +const Libp2pSocket = require('./socket') const createListener = require('./listener') function noop () {} class TCP { - dial (ma, options, callback) { - if (isFunction(options)) { - callback = options - options = {} - } - - callback = once(callback || noop) - + async dial (ma, options) { const cOpts = ma.toOptions() - log('Connecting to %s %s', cOpts.port, cOpts.host) + log('Connecting to %s:%s', cOpts.host, cOpts.port) - const rawSocket = net.connect(cOpts) + const rawSocket = await this._connect(cOpts) + return new Libp2pSocket(rawSocket, ma, options) + } - rawSocket.once('timeout', () => { - log('timeout') - rawSocket.emit('error', new Error('Timeout')) - }) + _connect (cOpts) { + return new Promise((resolve, reject) => { + const start = Date.now() + const rawSocket = net.connect(cOpts) - rawSocket.once('error', callback) + const onError = (err) => { + const msg = `Error connecting to ${cOpts.host}:${cOpts.port}: ${err.message}` + done(errcode(msg, err.code)) + } - rawSocket.once('connect', () => { - rawSocket.removeListener('error', callback) - callback() - }) + const onTimeout = () => { + log('Timeout connecting to %s:%s', cOpts.host, cOpts.port) + const err = errcode(`Timeout after ${Date.now() - start}ms`, 'ETIMEDOUT') + rawSocket.emit('error', err) + } - const socket = toPull.duplex(rawSocket) + const onConnect = () => { + log('Connected to %s:%s', cOpts.host, cOpts.port) + done(null, rawSocket) + } - const conn = new Connection(socket) + const done = (err, res) => { + rawSocket.removeListener('error', onError) + rawSocket.removeListener('timeout', onTimeout) + rawSocket.removeListener('connect', onConnect) - conn.getObservedAddrs = (callback) => { - return callback(null, [ma]) - } + err ? reject(err) : resolve(res) + } - return conn + rawSocket.once('error', onError) + rawSocket.once('timeout', onTimeout) + rawSocket.once('connect', onConnect) + }) } createListener (options, handler) { @@ -59,7 +65,6 @@ class TCP { } handler = handler || noop - return createListener(handler) } diff --git a/src/listener.js b/src/listener.js index 30b3076..5c2c144 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,19 +1,16 @@ 'use strict' const multiaddr = require('multiaddr') -const Connection = require('interface-connection').Connection const os = require('os') const includes = require('lodash.includes') const net = require('net') -const toPull = require('stream-to-pull-stream') const EventEmitter = require('events').EventEmitter const debug = require('debug') const log = debug('libp2p:tcp:listen') +const Libp2pSocket = require('./socket') const getMultiaddr = require('./get-multiaddr') - -const IPFS_CODE = 421 -const CLOSE_TIMEOUT = 2000 +const c = require('./constants') function noop () {} @@ -21,7 +18,7 @@ module.exports = (handler) => { const listener = new EventEmitter() const server = net.createServer((socket) => { - // Avoid uncaught errors cause by unstable connections + // Avoid uncaught errors caused by unstable connections socket.on('error', noop) const addr = getMultiaddr(socket) @@ -36,17 +33,11 @@ module.exports = (handler) => { log('new connection', addr.toString()) - const s = toPull.duplex(socket) - - s.getObservedAddrs = (cb) => { - cb(null, [addr]) - } - + const s = new Libp2pSocket(socket) trackSocket(server, socket) - const conn = new Connection(s) - handler(conn) - listener.emit('connection', conn) + handler && handler(s) + listener.emit('connection', s) }) server.on('listening', () => listener.emit('listening')) @@ -56,33 +47,31 @@ module.exports = (handler) => { // Keep track of open connections to destroy in case of timeout server.__connections = {} - listener.close = (options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - callback = callback || noop - options = options || {} - - const timeout = setTimeout(() => { - log('unable to close graciously, destroying conns') - Object.keys(server.__connections).forEach((key) => { - log('destroying %s', key) - server.__connections[key].destroy() - }) - }, options.timeout || CLOSE_TIMEOUT) + listener.close = (options = {}) => { + return new Promise((resolve, reject) => { + const start = Date.now() + + // Attempt to stop the server. If it takes longer than the timeout, + // destroy all the underlying sockets manually. + const timeout = setTimeout(() => { + log('Timeout closing server after %dms, destroying connections manually', Date.now() - start) + Object.keys(server.__connections).forEach((key) => { + log('destroying %s', key) + server.__connections[key].destroy() + }) + resolve() + }, options.timeout || c.CLOSE_TIMEOUT) - server.close(callback) + server.once('close', () => clearTimeout(timeout)) - server.once('close', () => { - clearTimeout(timeout) + server.close((err) => err ? reject(err) : resolve()) }) } let ipfsId let listeningAddr - listener.listen = (ma, callback) => { + listener.listen = (ma) => { listeningAddr = ma if (includes(ma.protoNames(), 'ipfs')) { ipfsId = getIpfsId(ma) @@ -90,16 +79,24 @@ module.exports = (handler) => { } const lOpts = listeningAddr.toOptions() - log('Listening on %s %s', lOpts.port, lOpts.host) - return server.listen(lOpts.port, lOpts.host, callback) + return new Promise((resolve, reject) => { + server.listen(lOpts.port, lOpts.host, (err) => { + if (err) { + return reject(err) + } + + log('Listening on %s %s', lOpts.port, lOpts.host) + resolve() + }) + }) } - listener.getAddrs = (callback) => { + listener.getAddrs = () => { const multiaddrs = [] const address = server.address() if (!address) { - return callback(new Error('Listener is not ready yet')) + throw new Error('Listener is not ready yet') } // Because TCP will only return the IPv6 version @@ -134,7 +131,7 @@ module.exports = (handler) => { multiaddrs.push(ma) } - callback(null, multiaddrs) + return multiaddrs } return listener @@ -142,7 +139,7 @@ module.exports = (handler) => { function getIpfsId (ma) { return ma.stringTuples().filter((tuple) => { - return tuple[0] === IPFS_CODE + return tuple[0] === c.IPFS_MA_CODE })[0][1] } @@ -150,7 +147,7 @@ function trackSocket (server, socket) { const key = `${socket.remoteAddress}:${socket.remotePort}` server.__connections[key] = socket - socket.on('close', () => { + socket.once('close', () => { delete server.__connections[key] }) } diff --git a/src/socket.js b/src/socket.js new file mode 100644 index 0000000..345484e --- /dev/null +++ b/src/socket.js @@ -0,0 +1,65 @@ +'use strict' + +const debug = require('debug') +const log = debug('libp2p:tcp:socket') + +const c = require('./constants') + +class Libp2pSocket { + constructor (rawSocket, ma, opts) { + this._rawSocket = rawSocket + this._ma = ma + + this.sink = this._sink(opts) + this.source = rawSocket + } + + _sink (opts = {}) { + // By default, close when the source is exhausted + const closeOnEnd = opts.closeOnEnd !== false + return (source) => this._write(source, closeOnEnd) + } + + async _write (source, closeOnEnd) { + for await (const data of source) { + const flushed = this._rawSocket.write(data) + if (!flushed) { + await new Promise((resolve) => this._rawSocket.once('drain', resolve)) + } + } + + if (closeOnEnd) { + await this.close() + } + } + + close (opts = {}) { + if (this._rawSocket.pending || this._rawSocket.destroyed) { + return + } + + return new Promise((resolve, reject) => { + const start = Date.now() + + // Attempt to end the socket. If it takes longer to close than the + // timeout, destroy it manually. + const timeout = setTimeout(() => { + const cOpts = this._ma.toOptions() + log('Timeout closing socket to %s:%s after %dms, destroying it manually', + cOpts.host, cOpts.port, Date.now() - start) + this._rawSocket.destroy() + resolve() + }, opts.timeout || c.CLOSE_TIMEOUT) + + this._rawSocket.once('close', () => clearTimeout(timeout)) + + this._rawSocket.end((err) => err ? reject(err) : resolve()) + }) + } + + getObservedAddrs () { + return [this._ma] + } +} + +module.exports = Libp2pSocket diff --git a/test/compliance.spec.js b/test/compliance.spec.js index 1339086..ba6a3d5 100644 --- a/test/compliance.spec.js +++ b/test/compliance.spec.js @@ -3,7 +3,7 @@ const tests = require('interface-transport') const multiaddr = require('multiaddr') -const TCP = require('../src') +const TCP = require('../src/adapter') describe('interface-transport compliance', () => { tests({ diff --git a/test/connection-wrap.spec.js b/test/connection-wrap.spec.js index ff141fb..5403bae 100644 --- a/test/connection-wrap.spec.js +++ b/test/connection-wrap.spec.js @@ -6,7 +6,7 @@ const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) -const TCP = require('../src') +const TCP = require('../src/adapter') const multiaddr = require('multiaddr') const Connection = require('interface-connection').Connection diff --git a/test/connection.spec.js b/test/connection.spec.js index 5100cb1..bef1964 100644 --- a/test/connection.spec.js +++ b/test/connection.spec.js @@ -6,7 +6,7 @@ const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) -const TCP = require('../src') +const TCP = require('../src/adapter') const multiaddr = require('multiaddr') describe('valid Connection', () => { diff --git a/test/listen-dial.js b/test/listen-dial.js index 7d8f119..7b7bf4c 100644 --- a/test/listen-dial.js +++ b/test/listen-dial.js @@ -6,7 +6,7 @@ const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) -const TCP = require('../src') +const TCP = require('../src/adapter') const net = require('net') const multiaddr = require('multiaddr') const isCI = process.env.CI