diff --git a/package.json b/package.json index eaf698c..0daec82 100644 --- a/package.json +++ b/package.json @@ -197,6 +197,7 @@ "ws": "^8.4.0" }, "devDependencies": { + "@types/sinon": "^17.0.3", "aegir": "^44.0.1", "delay": "^6.0.0", "it-all": "^3.0.1", @@ -207,6 +208,7 @@ "it-ndjson": "^1.0.0", "it-pipe": "^3.0.1", "p-defer": "^4.0.0", + "sinon": "^18.0.0", "wherearewe": "^2.0.1", "wsurl": "^1.0.0" }, diff --git a/src/index.ts b/src/index.ts index 8878ef8..4525426 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ export { default as duplex } from './duplex.js' export { default as source } from './source.js' export { default as sink } from './sink.js' -export { createServer } from './server.js' +export { createServer, type WebSocketServer } from './server.js' export { connect } from './client.js' diff --git a/src/server.ts b/src/server.ts index 359bb39..36e4f02 100644 --- a/src/server.ts +++ b/src/server.ts @@ -6,12 +6,23 @@ import duplex, { type DuplexWebSocket } from './duplex.js' import type WebSocket from './web-socket.js' import type { VerifyClientCallbackSync, VerifyClientCallbackAsync, AddressInfo } from 'ws' +export interface ClientWebSocket extends WebSocket { + alive?: boolean +} + export interface ServerOptions { key?: string cert?: string server?: http.Server | https.Server verifyClient?: VerifyClientCallbackAsync | VerifyClientCallbackSync onConnection?(connection: DuplexWebSocket): void + + /** + * If specified, send a PING to every connected client, if + * they do not respond with a PONG before the next interval, + * terminate the connection + */ + heartbeatMs?: number } export interface WebSocketServer extends EventEmitter { @@ -23,6 +34,8 @@ export interface WebSocketServer extends EventEmitter { class Server extends EventEmitter { private readonly server: http.Server | https.Server private readonly wsServer: WSServer + private readonly heartbeatMs?: number + private heartbeatInterval?: ReturnType constructor (server: http.Server | https.Server, opts?: ServerOptions) { super() @@ -34,9 +47,26 @@ class Server extends EventEmitter { verifyClient: opts.verifyClient }) this.wsServer.on('connection', this.onWsServerConnection.bind(this)) + this.heartbeatMs = opts?.heartbeatMs } async listen (addrInfo: { port: number } | number): Promise { + if (this.heartbeatMs != null) { + this.heartbeatInterval = setInterval(() => { + this.wsServer.clients.forEach((client: ClientWebSocket) => { + // the client did not send a pong since the last heartbeat so + // terminate the connection + if (client.alive === false) { + client.terminate() + return + } + + client.alive = false + client.ping() + }) + }, this.heartbeatMs) + } + return new Promise((resolve, reject) => { this.wsServer.once('error', (e) => { reject(e) }) this.wsServer.once('listening', () => { resolve(this) }) @@ -45,6 +75,10 @@ class Server extends EventEmitter { } async close (): Promise { + if (this.heartbeatInterval != null) { + clearInterval(this.heartbeatInterval) + } + await new Promise((resolve, reject) => { this.server.close((err) => { if (err != null) { @@ -60,7 +94,7 @@ class Server extends EventEmitter { return this.server.address() } - onWsServerConnection (socket: WebSocket, req: http.IncomingMessage): void { + onWsServerConnection (socket: ClientWebSocket, req: http.IncomingMessage): void { let addr: string | AddressInfo | null try { @@ -83,6 +117,10 @@ class Server extends EventEmitter { return } + socket.on('pong', () => { + socket.alive = true + }) + const stream: DuplexWebSocket = { ...duplex(socket, { remoteAddress: req.socket.remoteAddress, @@ -100,7 +138,7 @@ export function createServer (opts?: ServerOptions): WebSocketServer { opts = opts ?? {} const server = opts.server ?? (opts.key != null && opts.cert != null ? https.createServer(opts) : http.createServer()) - const wss = new Server(server) + const wss = new Server(server, opts) if (opts.onConnection != null) { wss.on('connection', opts.onConnection) diff --git a/test/server-ping.spec.ts b/test/server-ping.spec.ts new file mode 100644 index 0000000..78c43ad --- /dev/null +++ b/test/server-ping.spec.ts @@ -0,0 +1,56 @@ +import { expect } from 'aegir/chai' +import delay from 'delay' +import Sinon from 'sinon' +import { isNode, isElectronMain } from 'wherearewe' +import * as WS from '../src/index.js' +import WebSocket from '../src/web-socket.js' + +describe('ping', () => { + if (!(isNode || isElectronMain)) { + return + } + + let server: WS.WebSocketServer + let client: WebSocket + + afterEach(async () => { + if (client != null) { + client.close() + } + + if (server != null) { + await server.close() + } + }) + + it('server should ping connected clients', async () => { + server = WS.createServer({ + heartbeatMs: 10 + }) + await server.listen(55214) + + client = new WebSocket('http://127.0.0.1:55214') + const pongSpy = Sinon.spy(client, 'pong') + + await delay(200) + + expect(client).to.have.property('readyState', WebSocket.OPEN) + expect(pongSpy).to.have.property('called', true) + }) + + it('server should disconnected unresponsive clients', async () => { + server = WS.createServer({ + heartbeatMs: 10 + }) + await server.listen(55214) + + client = new WebSocket('http://127.0.0.1:55214') + + // make sure the client will not respond to a pong + client.pong = () => {} + + await delay(200) + + expect(client).to.have.property('readyState', WebSocket.CLOSED) + }) +})