Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX][ENTERPRISE] DDP streamer sending data to destroyed streams #27929

Merged
merged 2 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ee/apps/ddp-streamer/src/Server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from 'events';

import type WebSocket from 'ws';
import WebSocket from 'ws';
import ejson from 'ejson';
import { v1 as uuidv1 } from 'uuid';
import { MeteorService, isMeteorError, MeteorError } from '@rocket.chat/core-services';
Expand Down Expand Up @@ -53,6 +53,10 @@ export class Server extends EventEmitter {
};

async call(client: Client, packet: IPacket): Promise<void> {
// if client is not connected we don't need to do anything
if (client.ws.readyState !== WebSocket.OPEN) {
return;
}
try {
// if method was not defined on DDP Streamer we fall back to Meteor
if (!this._methods.has(packet.method)) {
Expand Down Expand Up @@ -86,6 +90,10 @@ export class Server extends EventEmitter {
}

async subscribe(client: Client, packet: IPacket): Promise<void> {
// if client is not connected we don't need to do anything
if (client.ws.readyState !== WebSocket.OPEN) {
return;
}
try {
if (!this._subscriptions.has(packet.name)) {
throw new MeteorError(404, `Subscription '${packet.name}' not found`);
Expand Down
23 changes: 22 additions & 1 deletion ee/apps/ddp-streamer/src/Streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,22 @@ export class Stream extends Streamer {
};

for await (const { subscription } of subscriptions) {
// if the connection state is not open anymore, it somehow got to a weird state,
// we'll emit close so it can clean up the weird state, and so we stop emitting to it
if (subscription.client.ws.readyState !== WebSocket.OPEN) {
subscription.client.ws.emit('close');
continue;
}

if (this.retransmitToSelf === false && origin && origin === subscription.connection) {
continue;
}

if (await this.isEmitAllowed(subscription, eventName, ...args)) {
if (!(await this.isEmitAllowed(subscription, eventName, ...args))) {
continue;
}

try {
await new Promise<void>((resolve, reject) => {
const frame = data[subscription.client.meteorClient ? 'meteor' : 'normal'];

Expand All @@ -72,6 +83,16 @@ export class Stream extends Streamer {
resolve();
});
});
} catch (error: any) {
if (error.code === 'ERR_STREAM_DESTROYED') {
console.warn('Trying to send data to destroyed stream, closing connection.');

// if we still tried to send data to a destroyed stream, we'll try again to close the connection
if (subscription.client.ws.readyState !== WebSocket.OPEN) {
subscription.client.ws.emit('close');
}
}
console.error('Error trying to send data to stream.', error);
}
}
}
Expand Down