From b6a2812f366e4bb58a23b53e1d4ca52527c93771 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Thu, 24 Nov 2022 19:17:20 -0300 Subject: [PATCH] Chore: Remove role requirement to use change streams (#27281) Co-authored-by: Aaron Ogle --- .../meteor/server/database/DatabaseWatcher.ts | 107 +++++++++--------- apps/meteor/server/lib/logger/Logger.ts | 4 + apps/meteor/server/lib/logger/getPino.ts | 3 + apps/meteor/server/startup/watchDb.ts | 8 +- ee/apps/stream-hub-service/src/StreamHub.ts | 15 ++- ee/apps/stream-hub-service/src/service.ts | 6 +- 6 files changed, 86 insertions(+), 57 deletions(-) diff --git a/apps/meteor/server/database/DatabaseWatcher.ts b/apps/meteor/server/database/DatabaseWatcher.ts index b49b5e2ab49b9..501b407199509 100644 --- a/apps/meteor/server/database/DatabaseWatcher.ts +++ b/apps/meteor/server/database/DatabaseWatcher.ts @@ -5,6 +5,7 @@ import type { Timestamp, Db, ChangeStreamDeleteDocument, ChangeStreamInsertDocum import { escapeRegExp } from '@rocket.chat/string-helpers'; import { MongoClient } from 'mongodb'; +import type { Logger } from '../lib/logger/Logger'; import { convertChangeStreamPayload } from './convertChangeStreamPayload'; import { convertOplogPayload } from './convertOplogPayload'; import { watchCollections } from './watchCollections'; @@ -34,76 +35,69 @@ export class DatabaseWatcher extends EventEmitter { private metrics?: any; + private logger: Logger; + /** * Last doc timestamp received from a real time event */ private lastDocTS: Date; - constructor({ db, _oplogHandle, metrics }: { db: Db; _oplogHandle?: any; metrics?: any }) { + // eslint-disable-next-line @typescript-eslint/naming-convention + constructor({ db, _oplogHandle, metrics, logger: LoggerClass }: { db: Db; _oplogHandle?: any; metrics?: any; logger: typeof Logger }) { super(); this.db = db; this._oplogHandle = _oplogHandle; this.metrics = metrics; + this.logger = new LoggerClass('DatabaseWatcher'); } async watch(): Promise { if (useMeteorOplog) { + // TODO remove this when updating to Meteor 2.8 + this.logger.warn( + 'Using USE_NATIVE_OPLOG=true is currently discouraged due to known performance issues. Please use IGNORE_CHANGE_STREAM=true instead.', + ); this.watchMeteorOplog(); return; } - if (await this.isChangeStreamAvailable()) { - this.watchChangeStream(); - return; - } - - this.watchOplog(); - } - - private async isChangeStreamAvailable(): Promise { if (ignoreChangeStream) { - return false; + await this.watchOplog(); + return; } try { - const { storageEngine } = await this.db.command({ serverStatus: 1 }); - - if (!storageEngine || storageEngine.name !== 'wiredTiger') { - return false; - } - - await this.db.admin().command({ replSetGetStatus: 1 }); - } catch (e) { - if (e instanceof Error && e.message.startsWith('not authorized')) { - console.info( - 'Change Stream is available for your installation, give admin permissions to your database user to use this improved version.', - ); - } - return false; + this.watchChangeStream(); + } catch (err: unknown) { + await this.watchOplog(); } - - return true; } private async watchOplog(): Promise { - console.log('[DatabaseWatcher] Using oplog'); + if (!process.env.MONGO_OPLOG_URL) { + throw Error('No $MONGO_OPLOG_URL provided'); + } const isMasterDoc = await this.db.admin().command({ ismaster: 1 }); if (!isMasterDoc || !isMasterDoc.setName) { - throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); + throw Error("$MONGO_URL should be a replica set's URL"); } - if (!process.env.MONGO_OPLOG_URL) { - throw Error('no-mongo-url'); - } const dbName = this.db.databaseName; const client = new MongoClient(process.env.MONGO_OPLOG_URL, { maxPoolSize: 1, }); + + if (client.db().databaseName !== 'local') { + throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); + } + await client.connect(); + this.logger.startup('Using oplog'); + const db = client.db(); const oplogCollection = db.collection('oplog.rs'); @@ -141,11 +135,12 @@ export class DatabaseWatcher extends EventEmitter { } private watchMeteorOplog(): void { - console.log('[DatabaseWatcher] Using Meteor oplog'); - if (!this._oplogHandle) { throw new Error('no-oplog-handle'); } + + this.logger.startup('Using Meteor oplog'); + watchCollections.forEach((collection) => { this._oplogHandle.onOplogEntry({ collection }, (event: any) => { this.emitDoc(collection, convertOplogPayload(event)); @@ -154,24 +149,34 @@ export class DatabaseWatcher extends EventEmitter { } private watchChangeStream(): void { - console.log('[DatabaseWatcher] Using change streams'); - - const changeStream = this.db.watch< - IRocketChatRecord, - | ChangeStreamInsertDocument - | ChangeStreamUpdateDocument - | ChangeStreamDeleteDocument - >([ - { - $match: { - 'operationType': { $in: ['insert', 'update', 'delete'] }, - 'ns.coll': { $in: watchCollections }, + try { + const changeStream = this.db.watch< + IRocketChatRecord, + | ChangeStreamInsertDocument + | ChangeStreamUpdateDocument + | ChangeStreamDeleteDocument + >([ + { + $match: { + 'operationType': { $in: ['insert', 'update', 'delete'] }, + 'ns.coll': { $in: watchCollections }, + }, }, - }, - ]); - changeStream.on('change', (event) => { - this.emitDoc(event.ns.coll, convertChangeStreamPayload(event)); - }); + ]); + changeStream.on('change', (event) => { + this.emitDoc(event.ns.coll, convertChangeStreamPayload(event)); + }); + + changeStream.on('error', (err) => { + throw err; + }); + + this.logger.startup('Using change streams'); + } catch (err: unknown) { + this.logger.error(err, 'Change stream error'); + + throw err; + } } private emitDoc(collection: string, doc: RealTimeData | void): void { diff --git a/apps/meteor/server/lib/logger/Logger.ts b/apps/meteor/server/lib/logger/Logger.ts index e317157cabb01..1be3aa47ae14e 100644 --- a/apps/meteor/server/lib/logger/Logger.ts +++ b/apps/meteor/server/lib/logger/Logger.ts @@ -118,4 +118,8 @@ export class Logger { subscription(msg: string, ...args: any[]): void { this.logger.subscription(msg, ...args); } + + fatal(err: unknown, ...args: any[]): void { + this.logger.fatal(err, ...args); + } } diff --git a/apps/meteor/server/lib/logger/getPino.ts b/apps/meteor/server/lib/logger/getPino.ts index 30b3219e8182e..94247f6ada8d1 100644 --- a/apps/meteor/server/lib/logger/getPino.ts +++ b/apps/meteor/server/lib/logger/getPino.ts @@ -7,6 +7,9 @@ import './logQueue'; // add support to multiple params on the log commands, i.e.: // logger.info('user', Meteor.user()); // will print: {"level":30,"time":1629814080968,"msg":"user {\"username\": \"foo\"}"} function logMethod(this: Logger, args: unknown[], method: any): void { + if (args.length === 2 && args[0] instanceof Error) { + return method.apply(this, args); + } if (args.length > 1) { args[0] = `${args[0]}${' %j'.repeat(args.length - 1)}`; } diff --git a/apps/meteor/server/startup/watchDb.ts b/apps/meteor/server/startup/watchDb.ts index da3a020400fb4..8579b1f670fab 100644 --- a/apps/meteor/server/startup/watchDb.ts +++ b/apps/meteor/server/startup/watchDb.ts @@ -6,14 +6,18 @@ import { initWatchers } from '../modules/watchers/watchers.module'; import { api } from '../sdk/api'; import { metrics } from '../../app/metrics/server/lib/metrics'; import { SystemLogger } from '../lib/logger/system'; +import { Logger } from '../lib/logger/Logger'; const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); -const watcher = new DatabaseWatcher({ db, _oplogHandle: (mongo as any)._oplogHandle, metrics }); +const watcher = new DatabaseWatcher({ db, _oplogHandle: (mongo as any)._oplogHandle, metrics, logger: Logger }); initWatchers(watcher, api.broadcastLocal.bind(api)); -watcher.watch(); +watcher.watch().catch((err: Error) => { + SystemLogger.fatal(err, 'Fatal error occurred when watching database'); + process.exit(1); +}); setInterval(function _checkDatabaseWatcher() { if (watcher.isLastDocDelayed()) { diff --git a/ee/apps/stream-hub-service/src/StreamHub.ts b/ee/apps/stream-hub-service/src/StreamHub.ts index ce102023f8508..85af31e78f272 100755 --- a/ee/apps/stream-hub-service/src/StreamHub.ts +++ b/ee/apps/stream-hub-service/src/StreamHub.ts @@ -2,17 +2,28 @@ import type { IServiceClass } from '../../../../apps/meteor/server/sdk/types/Ser import { ServiceClass } from '../../../../apps/meteor/server/sdk/types/ServiceClass'; import { initWatchers } from '../../../../apps/meteor/server/modules/watchers/watchers.module'; import type { DatabaseWatcher } from '../../../../apps/meteor/server/database/DatabaseWatcher'; +import type { Logger } from '../../../../apps/meteor/server/lib/logger/Logger'; export class StreamHub extends ServiceClass implements IServiceClass { protected name = 'hub'; - constructor(private watcher: DatabaseWatcher) { + private logger: Logger; + + constructor(private watcher: DatabaseWatcher, loggerClass: typeof Logger) { super(); + + // eslint-disable-next-line new-cap + this.logger = new loggerClass('StreamHub'); } async created(): Promise { initWatchers(this.watcher, this.api.broadcast.bind(this.api)); - this.watcher.watch(); + try { + await this.watcher.watch(); + } catch (err: unknown) { + this.logger.fatal(err, 'Fatal error occurred when watching database'); + process.exit(1); + } } } diff --git a/ee/apps/stream-hub-service/src/service.ts b/ee/apps/stream-hub-service/src/service.ts index b2d864825a693..469b7c0e3e295 100755 --- a/ee/apps/stream-hub-service/src/service.ts +++ b/ee/apps/stream-hub-service/src/service.ts @@ -5,6 +5,7 @@ import { api } from '../../../../apps/meteor/server/sdk/api'; import { broker } from '../../../../apps/meteor/ee/server/startup/broker'; import { Collections, getCollection, getConnection } from '../../../../apps/meteor/ee/server/services/mongo'; import { registerServiceModels } from '../../../../apps/meteor/ee/server/lib/registerServiceModels'; +import { Logger } from '../../../../apps/meteor/server/lib/logger/Logger'; const PORT = process.env.PORT || 3035; @@ -21,9 +22,10 @@ const PORT = process.env.PORT || 3035; const { StreamHub } = await import('./StreamHub'); const { DatabaseWatcher } = await import('../../../../apps/meteor/server/database/DatabaseWatcher'); - const watcher = new DatabaseWatcher({ db }); + // TODO having to import Logger to pass as a param is a temporary solution. logger should come from the service (either from broker or api) + const watcher = new DatabaseWatcher({ db, logger: Logger }); - api.registerService(new StreamHub(watcher)); + api.registerService(new StreamHub(watcher, Logger)); await api.start();