Skip to content

Commit

Permalink
Chore: Remove role requirement to use change streams (#27281)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Ogle <geekgonecrazy@users.noreply.github.com>
  • Loading branch information
2 people authored and MartinSchoeler committed Nov 28, 2022
1 parent cb16247 commit b6a2812
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 57 deletions.
107 changes: 56 additions & 51 deletions apps/meteor/server/database/DatabaseWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { Timestamp, Db, ChangeStreamDeleteDocument, ChangeStreamInsertDocum
import { escapeRegExp } from '@rocket.chat/string-helpers';
import { MongoClient } from 'mongodb';

import type { Logger } from '../lib/logger/Logger';
import { convertChangeStreamPayload } from './convertChangeStreamPayload';
import { convertOplogPayload } from './convertOplogPayload';
import { watchCollections } from './watchCollections';
Expand Down Expand Up @@ -34,76 +35,69 @@ export class DatabaseWatcher extends EventEmitter {

private metrics?: any;

private logger: Logger;

/**
* Last doc timestamp received from a real time event
*/
private lastDocTS: Date;

constructor({ db, _oplogHandle, metrics }: { db: Db; _oplogHandle?: any; metrics?: any }) {
// eslint-disable-next-line @typescript-eslint/naming-convention
constructor({ db, _oplogHandle, metrics, logger: LoggerClass }: { db: Db; _oplogHandle?: any; metrics?: any; logger: typeof Logger }) {
super();

this.db = db;
this._oplogHandle = _oplogHandle;
this.metrics = metrics;
this.logger = new LoggerClass('DatabaseWatcher');
}

async watch(): Promise<void> {
if (useMeteorOplog) {
// TODO remove this when updating to Meteor 2.8
this.logger.warn(
'Using USE_NATIVE_OPLOG=true is currently discouraged due to known performance issues. Please use IGNORE_CHANGE_STREAM=true instead.',
);
this.watchMeteorOplog();
return;
}

if (await this.isChangeStreamAvailable()) {
this.watchChangeStream();
return;
}

this.watchOplog();
}

private async isChangeStreamAvailable(): Promise<boolean> {
if (ignoreChangeStream) {
return false;
await this.watchOplog();
return;
}

try {
const { storageEngine } = await this.db.command({ serverStatus: 1 });

if (!storageEngine || storageEngine.name !== 'wiredTiger') {
return false;
}

await this.db.admin().command({ replSetGetStatus: 1 });
} catch (e) {
if (e instanceof Error && e.message.startsWith('not authorized')) {
console.info(
'Change Stream is available for your installation, give admin permissions to your database user to use this improved version.',
);
}
return false;
this.watchChangeStream();
} catch (err: unknown) {
await this.watchOplog();
}

return true;
}

private async watchOplog(): Promise<void> {
console.log('[DatabaseWatcher] Using oplog');
if (!process.env.MONGO_OPLOG_URL) {
throw Error('No $MONGO_OPLOG_URL provided');
}

const isMasterDoc = await this.db.admin().command({ ismaster: 1 });
if (!isMasterDoc || !isMasterDoc.setName) {
throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set");
throw Error("$MONGO_URL should be a replica set's URL");
}

if (!process.env.MONGO_OPLOG_URL) {
throw Error('no-mongo-url');
}
const dbName = this.db.databaseName;

const client = new MongoClient(process.env.MONGO_OPLOG_URL, {
maxPoolSize: 1,
});

if (client.db().databaseName !== 'local') {
throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set");
}

await client.connect();

this.logger.startup('Using oplog');

const db = client.db();

const oplogCollection = db.collection('oplog.rs');
Expand Down Expand Up @@ -141,11 +135,12 @@ export class DatabaseWatcher extends EventEmitter {
}

private watchMeteorOplog(): void {
console.log('[DatabaseWatcher] Using Meteor oplog');

if (!this._oplogHandle) {
throw new Error('no-oplog-handle');
}

this.logger.startup('Using Meteor oplog');

watchCollections.forEach((collection) => {
this._oplogHandle.onOplogEntry({ collection }, (event: any) => {
this.emitDoc(collection, convertOplogPayload(event));
Expand All @@ -154,24 +149,34 @@ export class DatabaseWatcher extends EventEmitter {
}

private watchChangeStream(): void {
console.log('[DatabaseWatcher] Using change streams');

const changeStream = this.db.watch<
IRocketChatRecord,
| ChangeStreamInsertDocument<IRocketChatRecord>
| ChangeStreamUpdateDocument<IRocketChatRecord>
| ChangeStreamDeleteDocument<IRocketChatRecord>
>([
{
$match: {
'operationType': { $in: ['insert', 'update', 'delete'] },
'ns.coll': { $in: watchCollections },
try {
const changeStream = this.db.watch<
IRocketChatRecord,
| ChangeStreamInsertDocument<IRocketChatRecord>
| ChangeStreamUpdateDocument<IRocketChatRecord>
| ChangeStreamDeleteDocument<IRocketChatRecord>
>([
{
$match: {
'operationType': { $in: ['insert', 'update', 'delete'] },
'ns.coll': { $in: watchCollections },
},
},
},
]);
changeStream.on('change', (event) => {
this.emitDoc(event.ns.coll, convertChangeStreamPayload(event));
});
]);
changeStream.on('change', (event) => {
this.emitDoc(event.ns.coll, convertChangeStreamPayload(event));
});

changeStream.on('error', (err) => {
throw err;
});

this.logger.startup('Using change streams');
} catch (err: unknown) {
this.logger.error(err, 'Change stream error');

throw err;
}
}

private emitDoc(collection: string, doc: RealTimeData<IRocketChatRecord> | void): void {
Expand Down
4 changes: 4 additions & 0 deletions apps/meteor/server/lib/logger/Logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,8 @@ export class Logger {
subscription(msg: string, ...args: any[]): void {
this.logger.subscription(msg, ...args);
}

fatal(err: unknown, ...args: any[]): void {
this.logger.fatal(err, ...args);
}
}
3 changes: 3 additions & 0 deletions apps/meteor/server/lib/logger/getPino.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import './logQueue';
// add support to multiple params on the log commands, i.e.:
// logger.info('user', Meteor.user()); // will print: {"level":30,"time":1629814080968,"msg":"user {\"username\": \"foo\"}"}
function logMethod(this: Logger, args: unknown[], method: any): void {
if (args.length === 2 && args[0] instanceof Error) {
return method.apply(this, args);
}
if (args.length > 1) {
args[0] = `${args[0]}${' %j'.repeat(args.length - 1)}`;
}
Expand Down
8 changes: 6 additions & 2 deletions apps/meteor/server/startup/watchDb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ import { initWatchers } from '../modules/watchers/watchers.module';
import { api } from '../sdk/api';
import { metrics } from '../../app/metrics/server/lib/metrics';
import { SystemLogger } from '../lib/logger/system';
import { Logger } from '../lib/logger/Logger';

const { mongo } = MongoInternals.defaultRemoteCollectionDriver();

const watcher = new DatabaseWatcher({ db, _oplogHandle: (mongo as any)._oplogHandle, metrics });
const watcher = new DatabaseWatcher({ db, _oplogHandle: (mongo as any)._oplogHandle, metrics, logger: Logger });

initWatchers(watcher, api.broadcastLocal.bind(api));

watcher.watch();
watcher.watch().catch((err: Error) => {
SystemLogger.fatal(err, 'Fatal error occurred when watching database');
process.exit(1);
});

setInterval(function _checkDatabaseWatcher() {
if (watcher.isLastDocDelayed()) {
Expand Down
15 changes: 13 additions & 2 deletions ee/apps/stream-hub-service/src/StreamHub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,28 @@ import type { IServiceClass } from '../../../../apps/meteor/server/sdk/types/Ser
import { ServiceClass } from '../../../../apps/meteor/server/sdk/types/ServiceClass';
import { initWatchers } from '../../../../apps/meteor/server/modules/watchers/watchers.module';
import type { DatabaseWatcher } from '../../../../apps/meteor/server/database/DatabaseWatcher';
import type { Logger } from '../../../../apps/meteor/server/lib/logger/Logger';

export class StreamHub extends ServiceClass implements IServiceClass {
protected name = 'hub';

constructor(private watcher: DatabaseWatcher) {
private logger: Logger;

constructor(private watcher: DatabaseWatcher, loggerClass: typeof Logger) {
super();

// eslint-disable-next-line new-cap
this.logger = new loggerClass('StreamHub');
}

async created(): Promise<void> {
initWatchers(this.watcher, this.api.broadcast.bind(this.api));

this.watcher.watch();
try {
await this.watcher.watch();
} catch (err: unknown) {
this.logger.fatal(err, 'Fatal error occurred when watching database');
process.exit(1);
}
}
}
6 changes: 4 additions & 2 deletions ee/apps/stream-hub-service/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { api } from '../../../../apps/meteor/server/sdk/api';
import { broker } from '../../../../apps/meteor/ee/server/startup/broker';
import { Collections, getCollection, getConnection } from '../../../../apps/meteor/ee/server/services/mongo';
import { registerServiceModels } from '../../../../apps/meteor/ee/server/lib/registerServiceModels';
import { Logger } from '../../../../apps/meteor/server/lib/logger/Logger';

const PORT = process.env.PORT || 3035;

Expand All @@ -21,9 +22,10 @@ const PORT = process.env.PORT || 3035;
const { StreamHub } = await import('./StreamHub');
const { DatabaseWatcher } = await import('../../../../apps/meteor/server/database/DatabaseWatcher');

const watcher = new DatabaseWatcher({ db });
// TODO having to import Logger to pass as a param is a temporary solution. logger should come from the service (either from broker or api)
const watcher = new DatabaseWatcher({ db, logger: Logger });

api.registerService(new StreamHub(watcher));
api.registerService(new StreamHub(watcher, Logger));

await api.start();

Expand Down

0 comments on commit b6a2812

Please sign in to comment.