diff --git a/src/index.ts b/src/index.ts index ccaf2b1..50aed20 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,7 +2,7 @@ import net from 'net' import * as mafmt from '@multiformats/mafmt' import { logger } from '@libp2p/logger' import { toMultiaddrConnection } from './socket-to-conn.js' -import { TCPListener } from './listener.js' +import { CloseServerOnMaxConnectionsOpts, TCPListener } from './listener.js' import { multiaddrToNetConfig } from './utils.js' import { AbortError, CodeError } from '@libp2p/interfaces/errors' import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js' @@ -35,6 +35,12 @@ export interface TCPOptions { * https://nodejs.org/api/net.html#servermaxconnections */ maxConnections?: number + + /** + * Close server (stop listening for new connections) if connections exceed a limit. + * Open server (start listening for new connections) if connections fall below a limit. + */ + closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts } /** @@ -209,6 +215,7 @@ class TCP implements Transport { return new TCPListener({ ...options, maxConnections: this.opts.maxConnections, + closeServerOnMaxConnections: this.opts.closeServerOnMaxConnections, socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout, socketCloseTimeout: this.opts.socketCloseTimeout, metrics: this.components.metrics diff --git a/src/listener.ts b/src/listener.ts index 1b13385..7ca3ff3 100644 --- a/src/listener.ts +++ b/src/listener.ts @@ -4,7 +4,8 @@ import { toMultiaddrConnection } from './socket-to-conn.js' import { CODE_P2P } from './constants.js' import { getMultiaddrs, - multiaddrToNetConfig + multiaddrToNetConfig, + NetConfig } from './utils.js' import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events' import type { MultiaddrConnection, Connection } from '@libp2p/interface-connection' @@ -26,6 +27,14 @@ async function attemptClose (maConn: MultiaddrConnection) { } } +export interface CloseServerOnMaxConnectionsOpts { + /** Server listens once connection count is less than `listenBelow` */ + listenBelow: number + /** Close server once connection count is greater than or equal to `closeAbove` */ + closeAbove: number + onListenError?: (err: Error) => void +} + interface Context extends TCPCreateListenerOptions { handler?: (conn: Connection) => void upgrader: Upgrader @@ -33,6 +42,7 @@ interface Context extends TCPCreateListenerOptions { socketCloseTimeout?: number maxConnections?: number metrics?: Metrics + closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts } const SERVER_STATUS_UP = 1 @@ -44,7 +54,12 @@ export interface TCPListenerMetrics { events: CounterGroup } -type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null } +type Status = {started: false} | { + started: true + listeningAddr: Multiaddr + peerId: string | null + netConfig: NetConfig +} export class TCPListener extends EventEmitter implements Listener { private readonly server: net.Server @@ -69,6 +84,13 @@ export class TCPListener extends EventEmitter implements Listene this.server.maxConnections = context.maxConnections } + if (context.closeServerOnMaxConnections != null) { + // Sanity check options + if (context.closeServerOnMaxConnections.closeAbove < context.closeServerOnMaxConnections.listenBelow) { + throw Error('closeAbove must be >= listenBelow') + } + } + this.server .on('listening', () => { if (context.metrics != null) { @@ -159,12 +181,33 @@ export class TCPListener extends EventEmitter implements Listene socket.once('close', () => { this.connections.delete(maConn) + + if ( + this.context.closeServerOnMaxConnections != null && + this.connections.size < this.context.closeServerOnMaxConnections.listenBelow + ) { + // The most likely case of error is if the port taken by this application is binded by + // another process during the time the server if closed. In that case there's not much + // we can do. netListen() will be called again every time a connection is dropped, which + // acts as an eventual retry mechanism. onListenError allows the consumer act on this. + this.netListen().catch(e => { + log.error('error attempting to listen server once connection count under limit', e) + this.context.closeServerOnMaxConnections?.onListenError?.(e as Error) + }) + } }) if (this.context.handler != null) { this.context.handler(conn) } + if ( + this.context.closeServerOnMaxConnections != null && + this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove + ) { + this.netClose() + } + this.dispatchEvent(new CustomEvent('connection', { detail: conn })) }) .catch(async err => { @@ -220,34 +263,70 @@ export class TCPListener extends EventEmitter implements Listene } async listen (ma: Multiaddr) { + if (this.status.started) { + throw Error('server is already listening') + } + const peerId = ma.getPeerId() const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma - this.status = { started: true, listeningAddr, peerId } + this.status = { + started: true, + listeningAddr, + peerId, + netConfig: multiaddrToNetConfig(listeningAddr) + } - return await new Promise((resolve, reject) => { - const options = multiaddrToNetConfig(listeningAddr) - this.server.on('error', (err) => { - reject(err) - }) - this.server.listen(options, () => { - log('Listening on %s', this.server.address()) - resolve() - }) - }) + await this.netListen() } async close () { - if (!this.server.listening) { - return - } - await Promise.all( Array.from(this.connections.values()).map(async maConn => await attemptClose(maConn)) ) + // netClose already checks if server.listening + this.netClose() + } + + private async netListen (): Promise { + if (!this.status.started || this.server.listening) { + return + } + + const netConfig = this.status.netConfig + await new Promise((resolve, reject) => { - this.server.close(err => (err != null) ? reject(err) : resolve()) + // NOTE: 'listening' event is only fired on success. Any error such as port already binded, is emitted via 'error' + this.server.once('error', reject) + this.server.listen(netConfig, resolve) }) + + log('Listening on %s', this.server.address()) + } + + private netClose (): void { + if (!this.status.started || !this.server.listening) { + return + } + + log('Closing server on %s', this.server.address()) + + // NodeJS implementation tracks listening status with `this._handle` property. + // - Server.close() sets this._handle to null immediately. If this._handle is null, ERR_SERVER_NOT_RUNNING is thrown + // - Server.listening returns `this._handle !== null` https://github.com/nodejs/node/blob/386d761943bb1b217fba27d6b80b658c23009e60/lib/net.js#L1675 + // - Server.listen() if `this._handle !== null` throws ERR_SERVER_ALREADY_LISTEN + // + // NOTE: Both listen and close are technically not async actions, so it's not necessary to track + // states 'pending-close' or 'pending-listen' + + // From docs https://nodejs.org/api/net.html#serverclosecallback + // Stops the server from accepting new connections and keeps existing connections. + // 'close' event is emitted only emitted when all connections are ended. + // The optional callback will be called once the 'close' event occurs. + // + // NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary + // to pass a callback to close. + this.server.close() } } diff --git a/src/utils.ts b/src/utils.ts index dcdad4a..502c9b1 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -6,7 +6,9 @@ import path from 'path' const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' } -export function multiaddrToNetConfig (addr: Multiaddr): ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts) { +export type NetConfig = ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts) + +export function multiaddrToNetConfig (addr: Multiaddr): NetConfig { const listenPath = addr.getPath() // unix socket listening diff --git a/test/max-connections-close.spec.ts b/test/max-connections-close.spec.ts new file mode 100644 index 0000000..5ad0c26 --- /dev/null +++ b/test/max-connections-close.spec.ts @@ -0,0 +1,118 @@ +import net from 'node:net' +import { promisify } from 'util' +import { expect } from 'aegir/chai' +import { mockUpgrader } from '@libp2p/interface-mocks' +import { multiaddr } from '@multiformats/multiaddr' +import { tcp } from '../src/index.js' +import type { TCPListener } from '../src/listener.js' + +describe('close server on maxConnections', () => { + const afterEachCallbacks: Array<() => Promise | any> = [] + afterEach(async () => { + await Promise.all(afterEachCallbacks.map(fn => fn())) + afterEachCallbacks.length = 0 + }) + + it('reject dial of connection above closeAbove', async () => { + const listenBelow = 2 + const closeAbove = 3 + const port = 9900 + + const seenRemoteConnections = new Set() + const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })() + + const upgrader = mockUpgrader() + const listener = trasnport.createListener({ upgrader }) as TCPListener + // eslint-disable-next-line @typescript-eslint/promise-function-async + afterEachCallbacks.push(() => listener.close()) + await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) + + listener.addEventListener('connection', (conn) => { + seenRemoteConnections.add(conn.detail.remoteAddr.toString()) + }) + + function createSocket (): net.Socket { + const socket = net.connect({ port }) + + // eslint-disable-next-line @typescript-eslint/promise-function-async + afterEachCallbacks.unshift(async () => { + if (!socket.destroyed) { + socket.destroy() + await new Promise((resolve) => socket.on('close', resolve)) + } + }) + + return socket + } + + async function assertConnectedSocket (i: number) { + const socket = createSocket() + + await new Promise((resolve, reject) => { + socket.once('connect', () => { + resolve() + }) + socket.once('error', (err) => { + err.message = `Socket[${i}] ${err.message}` + reject(err) + }) + }) + + return socket + } + + async function assertRefusedSocket (i: number) { + const socket = createSocket() + + await new Promise((resolve, reject) => { + socket.once('connect', () => { + reject(Error(`Socket[${i}] connected but was expected to reject`)) + }) + socket.once('error', (err) => { + if (err.message.includes('ECONNREFUSED')) { + resolve() + } else { + err.message = `Socket[${i}] unexpected error ${err.message}` + reject(err) + } + }) + }) + } + + async function assertServerConnections (connections: number) { + // Expect server connections but allow time for sockets to connect or disconnect + for (let i = 0; i < 100; i++) { + // eslint-disable-next-line @typescript-eslint/dot-notation + if (listener['connections'].size === connections) { + return + } else { + await promisify(setTimeout)(10) + } + } + // eslint-disable-next-line @typescript-eslint/dot-notation + expect(listener['connections'].size).equals(connections, 'Wrong server connections') + } + + const socket1 = await assertConnectedSocket(1) + const socket2 = await assertConnectedSocket(2) + const socket3 = await assertConnectedSocket(3) + await assertServerConnections(3) + // Limit reached, server should be closed here + await assertRefusedSocket(4) + await assertRefusedSocket(5) + // Destroy sockets to be have connections < listenBelow + socket1.destroy() + socket2.destroy() + await assertServerConnections(1) + // Attempt to connect more sockets + const socket6 = await assertConnectedSocket(6) + const socket7 = await assertConnectedSocket(7) + await assertServerConnections(3) + // Limit reached, server should be closed here + await assertRefusedSocket(8) + + expect(socket3.destroyed).equals(false, 'socket3 must not destroyed') + expect(socket6.destroyed).equals(false, 'socket6 must not destroyed') + expect(socket7.destroyed).equals(false, 'socket7 must not destroyed') + }) +})