Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Add Typescript support #145

Merged
merged 8 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module.exports = {
build: {
config: {
platform: 'node'
}
}
}
3 changes: 2 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ jobs:
steps:
- uses: actions/checkout@v2
- run: npm install
- run: npx aegir lint
- run: npm run lint
- run: npx aegir dep-check -- -i wrtc -i electron-webrtc
- run: npm run build
test-node:
needs: check
runs-on: ${{ matrix.os }}
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ package-lock.json
coverage
.nyc_output
docs

dist
13 changes: 8 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
"main": "src/index.js",
"scripts": {
"lint": "aegir lint",
"build": "aegir build",
"test": "aegir test -t node",
"test:node": "aegir test -t node",
"release": "aegir release -t node --no-build",
"release-minor": "aegir release -t node --type minor --no-build",
"release-major": "aegir-release -t node --type major --no-build",
"release": "aegir release -t node",
"release-minor": "aegir release -t node --type minor",
"release-major": "aegir-release -t node --type major",
"coverage": "nyc --reporter=text --reporter=lcov npm run test:node"
},
"pre-push": [
Expand All @@ -37,10 +38,12 @@
"engines": {
"node": ">=14.0.0"
},
"types": "dist/src/index.d.ts",
"devDependencies": {
"aegir": "^33.0.0",
"@types/debug": "^4.1.5",
"aegir": "^33.2.0",
"it-pipe": "^1.1.0",
"libp2p-interfaces": "^0.9.0",
"libp2p-interfaces": "^0.11.0",
"sinon": "^10.0.1",
"streaming-iterables": "^5.0.2"
},
Expand Down
29 changes: 19 additions & 10 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const net = require('net')
const mafmt = require('mafmt')
// Missing Type
// @ts-ignore
const withIs = require('class-is')
const errCode = require('err-code')
const log = require('debug')('libp2p:tcp')
Expand All @@ -14,6 +16,9 @@ const { CODE_CIRCUIT, CODE_P2P } = require('./constants')
/**
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
yusefnapora marked this conversation as resolved.
Show resolved Hide resolved
* @typedef {import('libp2p-interfaces/src/transport/types').Upgrader} Upgrader
* @typedef {import('libp2p-interfaces/src/transport/types').Listener} Listener
* @typedef {import('net').Socket} Socket
*/

class TCP {
Expand All @@ -33,8 +38,8 @@ class TCP {
* @async
* @param {Multiaddr} ma
* @param {object} options
* @param {AbortSignal} options.signal - Used to abort dial requests
* @returns {Connection} An upgraded Connection
* @param {AbortSignal} [options.signal] - Used to abort dial requests
* @returns {Promise<Connection>} An upgraded Connection
*/
async dial (ma, options) {
options = options || {}
Expand All @@ -50,7 +55,7 @@ class TCP {
* @private
* @param {Multiaddr} ma
* @param {object} options
* @param {AbortSignal} options.signal - Used to abort dial requests
* @param {AbortSignal} [options.signal] - Used to abort dial requests
* @returns {Promise<Socket>} Resolves a TCP Socket
*/
_connect (ma, options = {}) {
Expand All @@ -65,13 +70,13 @@ class TCP {
log('dialing %j', cOpts)
const rawSocket = net.connect(cOpts)

const onError = err => {
const onError = /** @param {Error} err */ err => {
err.message = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}`
done(err)
}

const onTimeout = () => {
log('connnection timeout %s:%s', cOpts.host, cOpts.port)
log('connection timeout %s:%s', cOpts.host, cOpts.port)
const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT')
// Note: this will result in onError() being called
rawSocket.emit('error', err)
Expand All @@ -88,7 +93,7 @@ class TCP {
done(new AbortError())
}

const done = err => {
const done = /** @param {Error} [err] */ err => {
rawSocket.removeListener('error', onError)
rawSocket.removeListener('timeout', onTimeout)
rawSocket.removeListener('connect', onConnect)
Expand All @@ -110,17 +115,21 @@ class TCP {
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
*
* @param {*} [options]
* @param {function(Connection)} handler
* @param {* | function(Connection):void} options
* @param {function(Connection):void} [handler]
* @returns {Listener} A TCP listener
*/
createListener (options, handler) {
let listenerHandler

if (typeof options === 'function') {
handler = options
listenerHandler = options
options = {}
} else {
listenerHandler = handler
}
options = options || {}
return createListener({ handler, upgrader: this._upgrader }, options)
return createListener({ handler: listenerHandler, upgrader: this._upgrader }, options)
}

/**
Expand Down
149 changes: 91 additions & 58 deletions src/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,30 @@
const net = require('net')
const EventEmitter = require('events')
const debug = require('debug')
const log = debug('libp2p:tcp:listener')
log.error = debug('libp2p:tcp:listener:error')

const log = Object.assign(
debug('libp2p:tcp:listener'),
{ error: debug('libp2p:tcp:listener:error') })
const toConnection = require('./socket-to-conn')
const { CODE_P2P } = require('./constants')
const {
getMultiaddrs,
multiaddrToNetConfig
} = require('./utils')

/**
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
* @typedef {import('libp2p-interfaces/src/transport/types').Upgrader} Upgrader
* @typedef {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} MultiaddrConnection
* @typedef {import('libp2p-interfaces/src/transport/types').Listener} Listener
* @typedef {import('net').Server & {__connections: MultiaddrConnection[]}} Server
*/

/**
* Attempts to close the given maConn. If a failure occurs, it will be logged.
*
* @private
* @param {import('libp2p-interfaces/src/transport/types').MultiaddrConnection} maConn
* @param {MultiaddrConnection} maConn
yusefnapora marked this conversation as resolved.
Show resolved Hide resolved
*/
async function attemptClose (maConn) {
try {
Expand All @@ -27,13 +36,80 @@ async function attemptClose (maConn) {
}
}

/**
* Create listener
*
* @param {object} context
* @param {function(Connection):void} context.handler
* @param {Upgrader} context.upgrader
* @param {*} options
* @returns {Listener}
*/
module.exports = ({ handler, upgrader }, options) => {
const listener = new EventEmitter()
/** @type {Server} */
// eslint-disable-next-line prefer-const
let server

/** @type {string | null} */
let peerId

/** @type {Multiaddr} */
let listeningAddr

const listener = Object.assign(new EventEmitter(), {
getAddrs: () => {
/** @type {Multiaddr[]} */
let addrs = []
/** @type {import('net').AddressInfo} */
// @ts-ignore
const address = server.address()

if (!address) {
throw new Error('Listener is not ready yet')
}

// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}

return addrs.map(ma => peerId ? ma.encapsulate(`/p2p/${peerId}`) : ma)
},
listen: async (/** @type {Multiaddr} */ ma) => {
listeningAddr = ma
peerId = ma.getPeerId()

if (peerId) {
listeningAddr = ma.decapsulateCode(CODE_P2P)
}

return new Promise((resolve, reject) => {
const options = multiaddrToNetConfig(listeningAddr)
server.listen(options, (/** @type {any} */ err) => {
if (err) return reject(err)
log('Listening on %s', server.address())
resolve(undefined)
})
})
},
close: async () => {
if (!server.listening) return

return new Promise((resolve, reject) => {
server.__connections.forEach(maConn => attemptClose(maConn))
server.close(err => err ? reject(err) : resolve(undefined))
})
}
})

const server = net.createServer(async socket => {
server = Object.assign(net.createServer(async socket => {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => log('socket error', err))

/** @type {MultiaddrConnection} */
let maConn
let conn
try {
Expand All @@ -42,6 +118,7 @@ module.exports = ({ handler, upgrader }, options) => {
conn = await upgrader.upgradeInbound(maConn)
} catch (err) {
log.error('inbound connection failed', err)
// @ts-ignore
return attemptClose(maConn)
}

Expand All @@ -51,73 +128,29 @@ module.exports = ({ handler, upgrader }, options) => {

if (handler) handler(conn)
listener.emit('connection', conn)
})
}),
// Keep track of open connections to destroy in case of timeout
{ __connections: [] })

server
.on('listening', () => listener.emit('listening'))
.on('error', err => listener.emit('error', err))
.on('close', () => listener.emit('close'))

// Keep track of open connections to destroy in case of timeout
server.__connections = []

listener.close = () => {
if (!server.listening) return

return new Promise((resolve, reject) => {
server.__connections.forEach(maConn => attemptClose(maConn))
server.close(err => err ? reject(err) : resolve())
})
}

let peerId, listeningAddr

listener.listen = ma => {
listeningAddr = ma
peerId = ma.getPeerId()

if (peerId) {
listeningAddr = ma.decapsulateCode(CODE_P2P)
}

return new Promise((resolve, reject) => {
const options = multiaddrToNetConfig(listeningAddr)
server.listen(options, err => {
if (err) return reject(err)
log('Listening on %s', server.address())
resolve()
})
})
}

listener.getAddrs = () => {
let addrs = []
const address = server.address()

if (!address) {
throw new Error('Listener is not ready yet')
}

// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}

return addrs.map(ma => peerId ? ma.encapsulate(`/p2p/${peerId}`) : ma)
}

return listener
}

/**
* @param {Server} server
* @param {MultiaddrConnection} maConn
*/
function trackConn (server, maConn) {
server.__connections.push(maConn)

const untrackConn = () => {
server.__connections = server.__connections.filter(c => c !== maConn)
}

// @ts-ignore
maConn.conn.once('close', untrackConn)
}
Loading