diff --git a/ee/apps/ddp-streamer/src/Server.ts b/ee/apps/ddp-streamer/src/Server.ts index c4c6a53c8e16..e3fdc3add509 100644 --- a/ee/apps/ddp-streamer/src/Server.ts +++ b/ee/apps/ddp-streamer/src/Server.ts @@ -1,6 +1,6 @@ import { EventEmitter } from 'events'; -import type WebSocket from 'ws'; +import WebSocket from 'ws'; import ejson from 'ejson'; import { v1 as uuidv1 } from 'uuid'; import { MeteorService, isMeteorError, MeteorError } from '@rocket.chat/core-services'; @@ -53,6 +53,10 @@ export class Server extends EventEmitter { }; async call(client: Client, packet: IPacket): Promise { + // if client is not connected we don't need to do anything + if (client.ws.readyState !== WebSocket.OPEN) { + return; + } try { // if method was not defined on DDP Streamer we fall back to Meteor if (!this._methods.has(packet.method)) { @@ -86,6 +90,10 @@ export class Server extends EventEmitter { } async subscribe(client: Client, packet: IPacket): Promise { + // if client is not connected we don't need to do anything + if (client.ws.readyState !== WebSocket.OPEN) { + return; + } try { if (!this._subscriptions.has(packet.name)) { throw new MeteorError(404, `Subscription '${packet.name}' not found`); diff --git a/ee/apps/ddp-streamer/src/Streamer.ts b/ee/apps/ddp-streamer/src/Streamer.ts index fdde06e7a5d5..5a0ec6173a6f 100644 --- a/ee/apps/ddp-streamer/src/Streamer.ts +++ b/ee/apps/ddp-streamer/src/Streamer.ts @@ -57,11 +57,22 @@ export class Stream extends Streamer { }; for await (const { subscription } of subscriptions) { + // if the connection state is not open anymore, it somehow got to a weird state, + // we'll emit close so it can clean up the weird state, and so we stop emitting to it + if (subscription.client.ws.readyState !== WebSocket.OPEN) { + subscription.client.ws.emit('close'); + continue; + } + if (this.retransmitToSelf === false && origin && origin === subscription.connection) { continue; } - if (await this.isEmitAllowed(subscription, eventName, ...args)) { + if (!(await this.isEmitAllowed(subscription, eventName, ...args))) { + continue; + } + + try { await new Promise((resolve, reject) => { const frame = data[subscription.client.meteorClient ? 'meteor' : 'normal']; @@ -72,6 +83,16 @@ export class Stream extends Streamer { resolve(); }); }); + } catch (error: any) { + if (error.code === 'ERR_STREAM_DESTROYED') { + console.warn('Trying to send data to destroyed stream, closing connection.'); + + // if we still tried to send data to a destroyed stream, we'll try again to close the connection + if (subscription.client.ws.readyState !== WebSocket.OPEN) { + subscription.client.ws.emit('close'); + } + } + console.error('Error trying to send data to stream.', error); } } }