diff --git a/src/index.ts b/src/index.ts index f790055..066ee13 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,11 +3,11 @@ import * as mafmt from '@multiformats/mafmt' import errCode from 'err-code' import { logger } from '@libp2p/logger' import { toMultiaddrConnection } from './socket-to-conn.js' -import { createListener } from './listener.js' +import { TCPListener } from './listener.js' import { multiaddrToNetConfig } from './utils.js' import { AbortError } from '@libp2p/interfaces/errors' import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js' -import { CreateListenerOptions, DialOptions, symbol, Transport } from '@libp2p/interface-transport' +import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from '@libp2p/interface-transport' import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr' import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net' import type { Connection } from '@libp2p/interface-connection' @@ -155,8 +155,8 @@ export class TCP implements Transport { * anytime a new incoming Connection has been successfully upgraded via * `upgrader.upgradeInbound`. */ - createListener (options: TCPCreateListenerOptions) { - return createListener({ + createListener (options: TCPCreateListenerOptions): Listener { + return new TCPListener({ ...options, socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout, socketCloseTimeout: this.opts.socketCloseTimeout @@ -166,7 +166,7 @@ export class TCP implements Transport { /** * Takes a list of `Multiaddr`s and returns only valid TCP addresses */ - filter (multiaddrs: Multiaddr[]) { + filter (multiaddrs: Multiaddr[]): Multiaddr[] { multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] return multiaddrs.filter(ma => { diff --git a/src/listener.ts b/src/listener.ts index d850016..0bd2d74 100644 --- a/src/listener.ts +++ b/src/listener.ts @@ -8,17 +8,12 @@ import { } from './utils.js' import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events' import type { MultiaddrConnection, Connection } from '@libp2p/interface-connection' -import type { Upgrader, Listener } from '@libp2p/interface-transport' -import type { Server } from 'net' +import type { Upgrader, Listener, ListenerEvents } from '@libp2p/interface-transport' import type { Multiaddr } from '@multiformats/multiaddr' import type { TCPCreateListenerOptions } from './index.js' const log = logger('libp2p:tcp:listener') -interface ServerWithMultiaddrConnections extends Server { - __connections: MultiaddrConnection[] -} - /** * Attempts to close the given maConn. If a failure occurs, it will be logged */ @@ -37,20 +32,29 @@ interface Context extends TCPCreateListenerOptions { socketCloseTimeout?: number } -/** - * Create listener - */ -export function createListener (context: Context) { - const { - handler, upgrader, socketInactivityTimeout, socketCloseTimeout - } = context +type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null } + +export class TCPListener extends EventEmitter implements Listener { + private readonly server: net.Server + /** Keep track of open connections to destroy in case of timeout */ + private readonly connections = new Set() - context.keepAlive = context.keepAlive ?? true + private status: Status = { started: false } - let peerId: string | null - let listeningAddr: Multiaddr + constructor (private readonly context: Context) { + super() + + context.keepAlive = context.keepAlive ?? true + + this.server = net.createServer(context, this.onSocket.bind(this)) + + this.server + .on('listening', () => this.dispatchEvent(new CustomEvent('listening'))) + .on('error', err => this.dispatchEvent(new CustomEvent('error', { detail: err }))) + .on('close', () => this.dispatchEvent(new CustomEvent('close'))) + } - const server: ServerWithMultiaddrConnections = Object.assign(net.createServer(context, socket => { + private onSocket (socket: net.Socket) { // Avoid uncaught errors caused by unstable connections socket.on('error', err => { log('socket error', err) @@ -59,9 +63,9 @@ export function createListener (context: Context) { let maConn: MultiaddrConnection try { maConn = toMultiaddrConnection(socket, { - listeningAddr, - socketInactivityTimeout, - socketCloseTimeout + listeningAddr: this.status.started ? this.status.listeningAddr : undefined, + socketInactivityTimeout: this.context.socketInactivityTimeout, + socketCloseTimeout: this.context.socketCloseTimeout }) } catch (err) { log.error('inbound connection failed', err) @@ -70,16 +74,20 @@ export function createListener (context: Context) { log('new inbound connection %s', maConn.remoteAddr) try { - upgrader.upgradeInbound(maConn) + this.context.upgrader.upgradeInbound(maConn) .then((conn) => { log('inbound connection %s upgraded', maConn.remoteAddr) - trackConn(server, maConn, socket) + this.connections.add(maConn) - if (handler != null) { - handler(conn) + socket.once('close', () => { + this.connections.delete(maConn) + }) + + if (this.context.handler != null) { + this.context.handler(conn) } - listener.dispatchEvent(new CustomEvent('connection', { detail: conn })) + this.dispatchEvent(new CustomEvent('connection', { detail: conn })) }) .catch(async err => { log.error('inbound connection failed', err) @@ -97,85 +105,69 @@ export function createListener (context: Context) { log.error('closing inbound connection failed', err) }) } - }), - // Keep track of open connections to destroy in case of timeout - { __connections: [] }) + } - const listener: Listener = Object.assign(new EventEmitter(), { - getAddrs: () => { - let addrs: Multiaddr[] = [] - const address = server.address() + getAddrs () { + if (!this.status.started) { + return [] + } - if (address == null) { - return [] - } + let addrs: Multiaddr[] = [] + const address = this.server.address() + const { listeningAddr, peerId } = this.status - if (typeof address === 'string') { - addrs = [listeningAddr] - } else { - try { - // Because TCP will only return the IPv6 version - // we need to capture from the passed multiaddr - if (listeningAddr.toString().startsWith('/ip4')) { - addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port)) - } else if (address.family === 'IPv6') { - addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port)) - } - } catch (err) { - log.error('could not turn %s:%s into multiaddr', address.address, address.port, err) + if (address == null) { + return [] + } + + if (typeof address === 'string') { + addrs = [listeningAddr] + } else { + try { + // Because TCP will only return the IPv6 version + // we need to capture from the passed multiaddr + if (listeningAddr.toString().startsWith('/ip4')) { + addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port)) + } else if (address.family === 'IPv6') { + addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port)) } + } catch (err) { + log.error('could not turn %s:%s into multiaddr', address.address, address.port, err) } + } - return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma) - }, - listen: async (ma: Multiaddr) => { - listeningAddr = ma - peerId = ma.getPeerId() - - if (peerId == null) { - listeningAddr = ma.decapsulateCode(CODE_P2P) - } + return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma) + } - return await new Promise((resolve, reject) => { - const options = multiaddrToNetConfig(listeningAddr) - server.listen(options, (err?: any) => { - if (err != null) { - return reject(err) - } - log('Listening on %s', server.address()) - resolve() - }) - }) - }, - close: async () => { - if (!server.listening) { - return - } + async listen (ma: Multiaddr) { + const peerId = ma.getPeerId() + const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma - await Promise.all( - server.__connections.map(async maConn => await attemptClose(maConn)) - ) + this.status = { started: true, listeningAddr, peerId } - await new Promise((resolve, reject) => { - server.close(err => (err != null) ? reject(err) : resolve()) + return await new Promise((resolve, reject) => { + const options = multiaddrToNetConfig(listeningAddr) + this.server.listen(options, (err?: any) => { + if (err != null) { + return reject(err) + } + log('Listening on %s', this.server.address()) + resolve() }) - } - }) - - server - .on('listening', () => listener.dispatchEvent(new CustomEvent('listening'))) - .on('error', err => listener.dispatchEvent(new CustomEvent('error', { detail: err }))) - .on('close', () => listener.dispatchEvent(new CustomEvent('close'))) + }) + } - return listener -} + async close () { + if (!this.server.listening) { + return + } -function trackConn (server: ServerWithMultiaddrConnections, maConn: MultiaddrConnection, socket: net.Socket) { - server.__connections.push(maConn) + await Promise.all( + Array.from(this.connections.values()).map(async maConn => await attemptClose(maConn)) + ) - const untrackConn = () => { - server.__connections = server.__connections.filter(c => c !== maConn) + await new Promise((resolve, reject) => { + this.server.close(err => (err != null) ? reject(err) : resolve()) + }) } - - socket.once('close', untrackConn) }