From 5c361befebeb3fffd7959dab980f8b068d653bec Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Mon, 18 Sep 2023 13:29:03 +0200 Subject: [PATCH] Fix the bug of listner intermediary state --- packages/transport-tcp/src/listener.ts | 78 +++++++++++++++++++------- 1 file changed, 57 insertions(+), 21 deletions(-) diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index c7de579471..a92e944d77 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -55,8 +55,21 @@ export interface TCPListenerMetrics { events: CounterGroup } -type Status = { started: false } | { - started: true +enum TCPListenerStatusCode { + /** + * When server object is initialized but we don't know the listening address yet or + * the server object is stopped manually, can be resumed only by calling listen() + **/ + INERT = 'inert', + /* When listener is aware of the address but the server is not started listening */ + INITIALIZED = 'initializing', + LISTENING = 'listening', + /* During the connection limits */ + PAUSED = 'paused', +} + +type Status = { code: TCPListenerStatusCode.INERT } | { + code: Exclude listeningAddr: Multiaddr peerId: string | null netConfig: NetConfig @@ -66,7 +79,7 @@ export class TCPListener extends EventEmitter implements Listene private readonly server: net.Server /** Keep track of open connections to destroy in case of timeout */ private readonly connections = new Set() - private status: Status = { started: false } + private status: Status = { code: TCPListenerStatusCode.INERT } private metrics?: TCPListenerMetrics private addr: string @@ -144,6 +157,8 @@ export class TCPListener extends EventEmitter implements Listene this.dispatchEvent(new CustomEvent('error', { detail: err })) }) .on('close', () => { + if(this.status.code === TCPListenerStatusCode.PAUSED) return + this.metrics?.status.update({ [this.addr]: SERVER_STATUS_DOWN }) @@ -152,6 +167,9 @@ export class TCPListener extends EventEmitter implements Listene } private onSocket (socket: net.Socket): void { + if(this.status.code === TCPListenerStatusCode.INERT) { + throw new Error('Server is is not listening yet') + } // Avoid uncaught errors caused by unstable connections socket.on('error', err => { log('socket error', err) @@ -161,7 +179,7 @@ export class TCPListener extends EventEmitter implements Listene let maConn: MultiaddrConnection try { maConn = toMultiaddrConnection(socket, { - listeningAddr: this.status.started ? this.status.listeningAddr : undefined, + listeningAddr: this.status.code ? this.status.listeningAddr : undefined, socketInactivityTimeout: this.context.socketInactivityTimeout, socketCloseTimeout: this.context.socketCloseTimeout, metrics: this.metrics?.events, @@ -191,7 +209,7 @@ export class TCPListener extends EventEmitter implements Listene // another process during the time the server if closed. In that case there's not much // we can do. netListen() will be called again every time a connection is dropped, which // acts as an eventual retry mechanism. onListenError allows the consumer act on this. - this.netListen().catch(e => { + this.resume().catch(e => { log.error('error attempting to listen server once connection count under limit', e) this.context.closeServerOnMaxConnections?.onListenError?.(e as Error) }) @@ -206,7 +224,9 @@ export class TCPListener extends EventEmitter implements Listene this.context.closeServerOnMaxConnections != null && this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove ) { - this.netClose() + this.pause(false).catch(e => { + log.error('error attempting to close server once connection count over limit', e) + }) } this.dispatchEvent(new CustomEvent('connection', { detail: conn })) @@ -232,7 +252,7 @@ export class TCPListener extends EventEmitter implements Listene } getAddrs (): Multiaddr[] { - if (!this.status.started) { + if (this.status.code === TCPListenerStatusCode.INERT) { return [] } @@ -264,7 +284,7 @@ export class TCPListener extends EventEmitter implements Listene } async listen (ma: Multiaddr): Promise { - if (this.status.started) { + if (this.status.code === TCPListenerStatusCode.LISTENING || this.status.code === TCPListenerStatusCode.PAUSED ) { throw Error('server is already listening') } @@ -273,13 +293,13 @@ export class TCPListener extends EventEmitter implements Listene const { backlog } = this.context this.status = { - started: true, + code: TCPListenerStatusCode.INITIALIZED, listeningAddr, peerId, netConfig: multiaddrToNetConfig(listeningAddr, { backlog }) } - await this.netListen() + await this.resume() } async close (): Promise { @@ -287,12 +307,17 @@ export class TCPListener extends EventEmitter implements Listene Array.from(this.connections.values()).map(async maConn => { await attemptClose(maConn) }) ) - // netClose already checks if server.listening - this.netClose() + await this.pause(true) } - private async netListen (): Promise { - if (!this.status.started || this.server.listening) { + /** + * Can resume a stopped or start an inert server + */ + private async resume (): Promise { + if ( + !(this.status.code === TCPListenerStatusCode.INITIALIZED || + this.status.code === TCPListenerStatusCode.PAUSED) || + this.server.listening) { return } @@ -303,12 +328,20 @@ export class TCPListener extends EventEmitter implements Listene this.server.once('error', reject) this.server.listen(netConfig, resolve) }) - + this.status = { ...this.status, code: TCPListenerStatusCode.LISTENING } log('Listening on %s', this.server.address()) } - private netClose (): void { - if (!this.status.started || !this.server.listening) { + private async pause (permanent: boolean): Promise { + if(this.status.code === TCPListenerStatusCode.PAUSED && permanent) { + this.status = { code: TCPListenerStatusCode.INERT } + return + } + + if ( + !(this.status.code === TCPListenerStatusCode.INITIALIZED || + this.status.code === TCPListenerStatusCode.LISTENING) || + !this.server.listening) { return } @@ -326,9 +359,12 @@ export class TCPListener extends EventEmitter implements Listene // Stops the server from accepting new connections and keeps existing connections. // 'close' event is emitted only emitted when all connections are ended. // The optional callback will be called once the 'close' event occurs. - // - // NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary - // to pass a callback to close. - this.server.close() + + // We need to set this status before closing server, so other procedures are aware + // during the time the server is closing + this.status = permanent ? { code: TCPListenerStatusCode.INERT } : { ...this.status, code: TCPListenerStatusCode.PAUSED } + await new Promise((resolve, reject) => { + this.server.close( err => { err ? reject(err) : resolve() }) + }) } }