From 373d0d748c386c2f003a85026b802b90c41573f8 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 13 Sep 2020 12:43:01 -0300 Subject: [PATCH 01/20] Remove unused file roomFiles.js --- server/lib/roomFiles.js | 35 ----------------------------------- server/main.js | 1 - 2 files changed, 36 deletions(-) delete mode 100644 server/lib/roomFiles.js diff --git a/server/lib/roomFiles.js b/server/lib/roomFiles.js deleted file mode 100644 index b1ca1aa8808b..000000000000 --- a/server/lib/roomFiles.js +++ /dev/null @@ -1,35 +0,0 @@ -import { Meteor } from 'meteor/meteor'; - -import { Users, Uploads } from '../../app/models'; - -export const roomFiles = (pub, { rid, searchText, fileType, limit = 50 }) => { - if (!pub.userId) { - return pub.ready(); - } - - if (!Meteor.call('canAccessRoom', rid, pub.userId)) { - return this.ready(); - } - - const cursorFileListHandle = Uploads.findNotHiddenFilesOfRoom(rid, searchText, fileType, limit).observeChanges({ - added(_id, record) { - const { username, name } = record.userId ? Users.findOneById(record.userId) : {}; - return pub.added('room_files', _id, { ...record, user: { username, name } }); - }, - changed(_id, recordChanges) { - if (!recordChanges.hasOwnProperty('user') && recordChanges.userId) { - recordChanges.user = Users.findOneById(recordChanges.userId); - } - return pub.changed('room_files', _id, recordChanges); - }, - removed(_id) { - return pub.removed('room_files', _id); - }, - }); - - pub.ready(); - - return pub.onStop(function() { - return cursorFileListHandle.stop(); - }); -}; diff --git a/server/main.js b/server/main.js index d24acf473161..8b62fe259bdb 100644 --- a/server/main.js +++ b/server/main.js @@ -5,7 +5,6 @@ import '../lib/RegExp'; import '../ee/server'; import './lib/pushConfig'; -import './lib/roomFiles'; import './startup/migrations'; import './startup/appcache'; import './startup/cron'; From 30084e2b7218c566db3c069240f924ec26fc4006 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 13 Sep 2020 12:44:37 -0300 Subject: [PATCH 02/20] Prevent uncessary listeners to the oplog changes --- app/metrics/server/index.js | 1 + app/metrics/server/lib/collectMetrics.js | 177 ++++++++++++++++++++++ app/metrics/server/lib/metrics.js | 185 ----------------------- app/models/server/models/_Base.js | 11 -- app/models/server/models/_BaseDb.js | 57 ++++--- app/models/server/oplogEvents.js | 3 - 6 files changed, 213 insertions(+), 221 deletions(-) create mode 100644 app/metrics/server/lib/collectMetrics.js delete mode 100644 app/models/server/oplogEvents.js diff --git a/app/metrics/server/index.js b/app/metrics/server/index.js index 3f176d629163..be589206b65b 100644 --- a/app/metrics/server/index.js +++ b/app/metrics/server/index.js @@ -1,6 +1,7 @@ import { metrics } from './lib/metrics'; import StatsTracker from './lib/statsTracker'; +import './lib/collectMetrics'; import './callbacksMetrics'; export { diff --git a/app/metrics/server/lib/collectMetrics.js b/app/metrics/server/lib/collectMetrics.js new file mode 100644 index 000000000000..31625134bed7 --- /dev/null +++ b/app/metrics/server/lib/collectMetrics.js @@ -0,0 +1,177 @@ +import http from 'http'; + +import client from 'prom-client'; +import connect from 'connect'; +import _ from 'underscore'; +import gcStats from 'prometheus-gc-stats'; +import { Meteor } from 'meteor/meteor'; +import { Facts } from 'meteor/facts-base'; + +import { Info, getOplogInfo } from '../../../utils/server'; +import { Migrations } from '../../../migrations'; +import { settings } from '../../../settings'; +import { Statistics } from '../../../models'; +import { metrics } from './metrics'; + +Facts.incrementServerFact = function(pkg, fact, increment) { + metrics.meteorFacts.inc({ pkg, fact }, increment); +}; + +const setPrometheusData = async () => { + metrics.info.set({ + version: Info.version, + unique_id: settings.get('uniqueID'), + site_url: settings.get('Site_Url'), + }, 1); + + const sessions = Array.from(Meteor.server.sessions.values()); + const authenticatedSessions = sessions.filter((s) => s.userId); + metrics.ddpSessions.set(Meteor.server.sessions.size); + metrics.ddpAuthenticatedSessions.set(authenticatedSessions.length); + metrics.ddpConnectedUsers.set(_.unique(authenticatedSessions.map((s) => s.userId)).length); + + const statistics = Statistics.findLast(); + if (!statistics) { + return; + } + + metrics.version.set({ version: statistics.version }, 1); + metrics.migration.set(Migrations._getControl().version); + metrics.instanceCount.set(statistics.instanceCount); + metrics.oplogEnabled.set({ enabled: statistics.oplogEnabled }, 1); + + // User statistics + metrics.totalUsers.set(statistics.totalUsers); + metrics.activeUsers.set(statistics.activeUsers); + metrics.nonActiveUsers.set(statistics.nonActiveUsers); + metrics.onlineUsers.set(statistics.onlineUsers); + metrics.awayUsers.set(statistics.awayUsers); + metrics.offlineUsers.set(statistics.offlineUsers); + + // Room statistics + metrics.totalRooms.set(statistics.totalRooms); + metrics.totalChannels.set(statistics.totalChannels); + metrics.totalPrivateGroups.set(statistics.totalPrivateGroups); + metrics.totalDirect.set(statistics.totalDirect); + metrics.totalLivechat.set(statistics.totalLivechat); + + // Message statistics + metrics.totalMessages.set(statistics.totalMessages); + metrics.totalChannelMessages.set(statistics.totalChannelMessages); + metrics.totalPrivateGroupMessages.set(statistics.totalPrivateGroupMessages); + metrics.totalDirectMessages.set(statistics.totalDirectMessages); + metrics.totalLivechatMessages.set(statistics.totalLivechatMessages); + + const oplogQueue = getOplogInfo().mongo._oplogHandle?._entryQueue?.length || 0; + metrics.oplogQueue.set(oplogQueue); + + metrics.pushQueue.set(statistics.pushQueue || 0); +}; + +const app = connect(); + +// const compression = require('compression'); +// app.use(compression()); + +app.use('/metrics', (req, res) => { + res.setHeader('Content-Type', 'text/plain'); + const data = client.register.metrics(); + + metrics.metricsRequests.inc(); + metrics.metricsSize.set(data.length); + + res.end(data); +}); + +app.use('/', (req, res) => { + const html = ` + + Rocket.Chat Prometheus Exporter + + +

Rocket.Chat Prometheus Exporter

+

Metrics

+ + `; + + res.write(html); + res.end(); +}); + +const server = http.createServer(app); + +let timer; +let resetTimer; +let defaultMetricsInitiated = false; +let gcStatsInitiated = false; +const was = { + enabled: false, + port: 9458, + resetInterval: 0, + collectGC: false, +}; +const updatePrometheusConfig = async () => { + const is = { + port: process.env.PROMETHEUS_PORT || settings.get('Prometheus_Port'), + enabled: settings.get('Prometheus_Enabled'), + resetInterval: settings.get('Prometheus_Reset_Interval'), + collectGC: settings.get('Prometheus_Garbage_Collector'), + }; + + if (Object.values(is).some((s) => s == null)) { + return; + } + + if (Object.entries(is).every(([k, v]) => v === was[k])) { + return; + } + + if (!is.enabled) { + if (was.enabled) { + console.log('Disabling Prometheus'); + server.close(); + Meteor.clearInterval(timer); + } + Object.assign(was, is); + return; + } + + console.log('Configuring Prometheus', is); + + if (!was.enabled) { + server.listen({ + port: is.port, + host: process.env.BIND_IP || '0.0.0.0', + }); + + timer = Meteor.setInterval(setPrometheusData, 5000); + } + + Meteor.clearInterval(resetTimer); + if (is.resetInterval) { + resetTimer = Meteor.setInterval(() => { + client.register.getMetricsAsArray().forEach((metric) => { metric.hashMap = {}; }); + }, is.resetInterval); + } + + // Prevent exceptions on calling those methods twice since + // it's not possible to stop them to be able to restart + try { + if (defaultMetricsInitiated === false) { + defaultMetricsInitiated = true; + client.collectDefaultMetrics(); + } + if (is.collectGC && gcStatsInitiated === false) { + gcStatsInitiated = true; + gcStats()(); + } + } catch (error) { + console.error(error); + } + + Object.assign(was, is); +}; + +Meteor.startup(async () => { + settings.get(/^Prometheus_.+/, updatePrometheusConfig); +}); diff --git a/app/metrics/server/lib/metrics.js b/app/metrics/server/lib/metrics.js index a3d5de8c8dbe..288daf0d90f2 100644 --- a/app/metrics/server/lib/metrics.js +++ b/app/metrics/server/lib/metrics.js @@ -1,17 +1,4 @@ -import http from 'http'; - import client from 'prom-client'; -import connect from 'connect'; -import _ from 'underscore'; -import gcStats from 'prometheus-gc-stats'; -import { Meteor } from 'meteor/meteor'; -import { Facts } from 'meteor/facts-base'; - -import { Info, getOplogInfo } from '../../../utils/server'; -import { Migrations } from '../../../migrations'; -import { settings } from '../../../settings'; -import { Statistics } from '../../../models'; -import { oplogEvents } from '../../../models/server/oplogEvents'; export const metrics = {}; const percentiles = [0.01, 0.1, 0.9, 0.99]; @@ -102,175 +89,3 @@ metrics.totalLivechatMessages = new client.Gauge({ name: 'rocketchat_livechat_me // Meteor Facts metrics.meteorFacts = new client.Gauge({ name: 'rocketchat_meteor_facts', labelNames: ['pkg', 'fact'], help: 'internal meteor facts' }); - -Facts.incrementServerFact = function(pkg, fact, increment) { - metrics.meteorFacts.inc({ pkg, fact }, increment); -}; - -const setPrometheusData = async () => { - metrics.info.set({ - version: Info.version, - unique_id: settings.get('uniqueID'), - site_url: settings.get('Site_Url'), - }, 1); - - const sessions = Array.from(Meteor.server.sessions.values()); - const authenticatedSessions = sessions.filter((s) => s.userId); - metrics.ddpSessions.set(Meteor.server.sessions.size); - metrics.ddpAuthenticatedSessions.set(authenticatedSessions.length); - metrics.ddpConnectedUsers.set(_.unique(authenticatedSessions.map((s) => s.userId)).length); - - const statistics = Statistics.findLast(); - if (!statistics) { - return; - } - - metrics.version.set({ version: statistics.version }, 1); - metrics.migration.set(Migrations._getControl().version); - metrics.instanceCount.set(statistics.instanceCount); - metrics.oplogEnabled.set({ enabled: statistics.oplogEnabled }, 1); - - // User statistics - metrics.totalUsers.set(statistics.totalUsers); - metrics.activeUsers.set(statistics.activeUsers); - metrics.nonActiveUsers.set(statistics.nonActiveUsers); - metrics.onlineUsers.set(statistics.onlineUsers); - metrics.awayUsers.set(statistics.awayUsers); - metrics.offlineUsers.set(statistics.offlineUsers); - - // Room statistics - metrics.totalRooms.set(statistics.totalRooms); - metrics.totalChannels.set(statistics.totalChannels); - metrics.totalPrivateGroups.set(statistics.totalPrivateGroups); - metrics.totalDirect.set(statistics.totalDirect); - metrics.totalLivechat.set(statistics.totalLivechat); - - // Message statistics - metrics.totalMessages.set(statistics.totalMessages); - metrics.totalChannelMessages.set(statistics.totalChannelMessages); - metrics.totalPrivateGroupMessages.set(statistics.totalPrivateGroupMessages); - metrics.totalDirectMessages.set(statistics.totalDirectMessages); - metrics.totalLivechatMessages.set(statistics.totalLivechatMessages); - - const oplogQueue = getOplogInfo().mongo._oplogHandle?._entryQueue?.length || 0; - metrics.oplogQueue.set(oplogQueue); - - metrics.pushQueue.set(statistics.pushQueue || 0); -}; - -const app = connect(); - -// const compression = require('compression'); -// app.use(compression()); - -app.use('/metrics', (req, res) => { - res.setHeader('Content-Type', 'text/plain'); - const data = client.register.metrics(); - - metrics.metricsRequests.inc(); - metrics.metricsSize.set(data.length); - - res.end(data); -}); - -app.use('/', (req, res) => { - const html = ` - - Rocket.Chat Prometheus Exporter - - -

Rocket.Chat Prometheus Exporter

-

Metrics

- - `; - - res.write(html); - res.end(); -}); - -const server = http.createServer(app); - -const oplogMetric = ({ collection, op }) => { - metrics.oplog.inc({ - collection, - op, - }); -}; - -let timer; -let resetTimer; -let defaultMetricsInitiated = false; -let gcStatsInitiated = false; -const was = { - enabled: false, - port: 9458, - resetInterval: 0, - collectGC: false, -}; -const updatePrometheusConfig = async () => { - const is = { - port: process.env.PROMETHEUS_PORT || settings.get('Prometheus_Port'), - enabled: settings.get('Prometheus_Enabled'), - resetInterval: settings.get('Prometheus_Reset_Interval'), - collectGC: settings.get('Prometheus_Garbage_Collector'), - }; - - if (Object.values(is).some((s) => s == null)) { - return; - } - - if (Object.entries(is).every(([k, v]) => v === was[k])) { - return; - } - - if (!is.enabled) { - if (was.enabled) { - console.log('Disabling Prometheus'); - server.close(); - Meteor.clearInterval(timer); - oplogEvents.removeListener('record', oplogMetric); - } - Object.assign(was, is); - return; - } - - console.log('Configuring Prometheus', is); - - if (!was.enabled) { - server.listen({ - port: is.port, - host: process.env.BIND_IP || '0.0.0.0', - }); - - timer = Meteor.setInterval(setPrometheusData, 5000); - oplogEvents.on('record', oplogMetric); - } - - Meteor.clearInterval(resetTimer); - if (is.resetInterval) { - resetTimer = Meteor.setInterval(() => { - client.register.getMetricsAsArray().forEach((metric) => { metric.hashMap = {}; }); - }, is.resetInterval); - } - - // Prevent exceptions on calling those methods twice since - // it's not possible to stop them to be able to restart - try { - if (defaultMetricsInitiated === false) { - defaultMetricsInitiated = true; - client.collectDefaultMetrics(); - } - if (is.collectGC && gcStatsInitiated === false) { - gcStatsInitiated = true; - gcStats()(); - } - } catch (error) { - console.error(error); - } - - Object.assign(was, is); -}; - -Meteor.startup(async () => { - settings.get(/^Prometheus_.+/, updatePrometheusConfig); -}); diff --git a/app/models/server/models/_Base.js b/app/models/server/models/_Base.js index d5ba676bd7e2..86ca31fa2ad4 100644 --- a/app/models/server/models/_Base.js +++ b/app/models/server/models/_Base.js @@ -4,7 +4,6 @@ import objectPath from 'object-path'; import _ from 'underscore'; import { BaseDb } from './_BaseDb'; -import { oplogEvents } from '../oplogEvents'; export class Base { constructor(nameOrModel, options) { @@ -17,16 +16,6 @@ export class Base { this.emit = this._db.emit.bind(this._db); this.db = this; - - this._db.on('change', ({ action, oplog }) => { - if (!oplog) { - return; - } - oplogEvents.emit('record', { - collection: this.collectionName, - op: action, - }); - }); } get origin() { diff --git a/app/models/server/models/_BaseDb.js b/app/models/server/models/_BaseDb.js index 87785f6a5ec2..fbd3ced3bb73 100644 --- a/app/models/server/models/_BaseDb.js +++ b/app/models/server/models/_BaseDb.js @@ -5,6 +5,7 @@ import { Mongo } from 'meteor/mongo'; import _ from 'underscore'; import { getMongoInfo } from '../../../utils/server/functions/getMongoInfo'; +import { metrics } from '../../../metrics/server/lib/metrics'; const baseName = 'rocketchat_'; @@ -21,6 +22,12 @@ try { console.log(e); } +const actions = { + i: 'insert', + u: 'update', + d: 'delete', +}; + export class BaseDb extends EventEmitter { constructor(model, baseModel, options = {}) { super(); @@ -189,62 +196,68 @@ export class BaseDb extends EventEmitter { ); } - processOplogRecord(action) { - if (action.op.op === 'i') { + processOplogRecord({ id, op }) { + const action = actions[op.op]; + metrics.oplog.inc({ + collection: this.collectionName, + op: action, + }); + + if (action === 'insert') { this.emit('change', { - action: 'insert', + action, clientAction: 'inserted', - id: action.op.o._id, - data: action.op.o, + id: op.o._id, + data: op.o, oplog: true, }); return; } - if (action.op.op === 'u') { - if (!action.op.o.$set && !action.op.o.$unset) { + if (action === 'update') { + if (!op.o.$set && !op.o.$unset) { this.emit('change', { - action: 'update', + action, clientAction: 'updated', - id: action.id, - data: action.op.o, + id, + data: op.o, oplog: true, }); return; } const diff = {}; - if (action.op.o.$set) { - for (const key in action.op.o.$set) { - if (action.op.o.$set.hasOwnProperty(key)) { - diff[key] = action.op.o.$set[key]; + if (op.o.$set) { + for (const key in op.o.$set) { + if (op.o.$set.hasOwnProperty(key)) { + diff[key] = op.o.$set[key]; } } } - if (action.op.o.$unset) { - for (const key in action.op.o.$unset) { - if (action.op.o.$unset.hasOwnProperty(key)) { + if (op.o.$unset) { + for (const key in op.o.$unset) { + if (op.o.$unset.hasOwnProperty(key)) { diff[key] = undefined; } } } this.emit('change', { - action: 'update', + action, clientAction: 'updated', - id: action.id, + id, diff, oplog: true, }); return; } - if (action.op.op === 'd') { + if (action === 'remove') { this.emit('change', { - action: 'remove', + action, clientAction: 'removed', - id: action.id, + id, oplog: true, }); } diff --git a/app/models/server/oplogEvents.js b/app/models/server/oplogEvents.js deleted file mode 100644 index 1508641e9d9a..000000000000 --- a/app/models/server/oplogEvents.js +++ /dev/null @@ -1,3 +0,0 @@ -import { EventEmitter } from 'events'; - -export const oplogEvents = new EventEmitter(); From bb5ed9122abffce40dbba627eb834b0b4aea1f5f Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 13 Sep 2020 12:55:53 -0300 Subject: [PATCH 03/20] Remove missing observers on settings --- app/assets/server/assets.js | 17 ++------------ app/dolphin/lib/common.js | 13 +++-------- .../server/cronPruneMessages.js | 23 +++---------------- app/ui-master/server/inject.js | 6 ++--- 4 files changed, 10 insertions(+), 49 deletions(-) diff --git a/app/assets/server/assets.js b/app/assets/server/assets.js index 3df4a62c2e7c..8864c8e372fa 100644 --- a/app/assets/server/assets.js +++ b/app/assets/server/assets.js @@ -7,8 +7,7 @@ import _ from 'underscore'; import sizeOf from 'image-size'; import sharp from 'sharp'; -import { settings } from '../../settings'; -import { Settings } from '../../models'; +import { settings } from '../../settings/server'; import { getURL } from '../../utils/lib/getURL'; import { mime } from '../../utils/lib/mimeTypes'; import { hasPermission } from '../../authorization'; @@ -354,19 +353,7 @@ for (const key of Object.keys(assets)) { addAssetToSetting(key, value); } -Settings.find().observe({ - added(record) { - return RocketChatAssets.processAsset(record._id, record.value); - }, - - changed(record) { - return RocketChatAssets.processAsset(record._id, record.value); - }, - - removed(record) { - return RocketChatAssets.processAsset(record._id, undefined); - }, -}); +settings.get(/^Assets_/, (key, value) => RocketChatAssets.processAsset(key, value)); Meteor.startup(function() { return Meteor.setTimeout(function() { diff --git a/app/dolphin/lib/common.js b/app/dolphin/lib/common.js index 5366b5fecec0..0e74e6d1fca2 100644 --- a/app/dolphin/lib/common.js +++ b/app/dolphin/lib/common.js @@ -5,7 +5,6 @@ import { ServiceConfiguration } from 'meteor/service-configuration'; import { settings } from '../../settings'; import { CustomOAuth } from '../../custom-oauth'; import { callbacks } from '../../callbacks'; -import { Settings } from '../../models'; const config = { serverURL: '', @@ -31,15 +30,9 @@ function DolphinOnCreateUser(options, user) { if (Meteor.isServer) { Meteor.startup(() => - Settings.find({ _id: 'Accounts_OAuth_Dolphin_URL' }).observe({ - added() { - config.serverURL = settings.get('Accounts_OAuth_Dolphin_URL'); - return Dolphin.configure(config); - }, - changed() { - config.serverURL = settings.get('Accounts_OAuth_Dolphin_URL'); - return Dolphin.configure(config); - }, + settings.get('Accounts_OAuth_Dolphin_URL', (key, value) => { + config.serverURL = value; + return Dolphin.configure(config); }), ); diff --git a/app/retention-policy/server/cronPruneMessages.js b/app/retention-policy/server/cronPruneMessages.js index cb5b0fab1865..00da944ffaf8 100644 --- a/app/retention-policy/server/cronPruneMessages.js +++ b/app/retention-policy/server/cronPruneMessages.js @@ -1,8 +1,8 @@ import { Meteor } from 'meteor/meteor'; import { SyncedCron } from 'meteor/littledata:synced-cron'; -import { settings } from '../../settings'; -import { Rooms, Settings } from '../../models'; +import { settings } from '../../settings/server'; +import { Rooms } from '../../models'; import { cleanRoomHistory } from '../../lib'; let types = []; @@ -106,24 +106,7 @@ function reloadPolicy() { Meteor.startup(function() { Meteor.defer(function() { - Settings.find({ - _id: { - $in: [ - 'RetentionPolicy_Enabled', - 'RetentionPolicy_Precision', - 'RetentionPolicy_AppliesToChannels', - 'RetentionPolicy_AppliesToGroups', - 'RetentionPolicy_AppliesToDMs', - 'RetentionPolicy_MaxAge_Channels', - 'RetentionPolicy_MaxAge_Groups', - 'RetentionPolicy_MaxAge_DMs', - ], - }, - }).observe({ - changed() { - reloadPolicy(); - }, - }); + settings.get(/^RetentionPolicy_/, () => reloadPolicy()); reloadPolicy(); }); diff --git a/app/ui-master/server/inject.js b/app/ui-master/server/inject.js index 5f0ac8fcaed3..2cf6094baa08 100644 --- a/app/ui-master/server/inject.js +++ b/app/ui-master/server/inject.js @@ -6,7 +6,7 @@ import _ from 'underscore'; import s from 'underscore.string'; import { Settings } from '../../models'; -import { settings } from '../../settings'; +import { settings } from '../../settings/server'; const headInjections = new ReactiveDict(); @@ -157,9 +157,7 @@ renderDynamicCssList(); // changed: renderDynamicCssList // }); -Settings.find({ _id: /theme-color-rc/i }, { fields: { value: 1 } }).observe({ - changed: renderDynamicCssList, -}); +settings.get(/theme-color-rc/i, () => renderDynamicCssList()); injectIntoBody('icons', Assets.getText('public/icons.svg')); From 5949a8b4593fb1fd10cb66ef1c2aa323b794f990 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 13 Sep 2020 13:04:11 -0300 Subject: [PATCH 04/20] Remove observer from integrations --- app/integrations/server/lib/triggerHandler.js | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/app/integrations/server/lib/triggerHandler.js b/app/integrations/server/lib/triggerHandler.js index 7ce09dc03f88..3d1344272f84 100644 --- a/app/integrations/server/lib/triggerHandler.js +++ b/app/integrations/server/lib/triggerHandler.js @@ -22,19 +22,26 @@ integrations.triggerHandler = new class RocketChatIntegrationHandler { this.compiledScripts = {}; this.triggers = {}; - Models.Integrations.find({ type: 'webhook-outgoing' }).observe({ - added: (record) => { - this.addIntegration(record); - }, - - changed: (record) => { - this.removeIntegration(record); - this.addIntegration(record); - }, + Models.Integrations.find({ type: 'webhook-outgoing' }).fetch().forEach((data) => this.addIntegration(data)); - removed: (record) => { - this.removeIntegration(record); - }, + Models.Integrations.on('change', ({ clientAction, id, data }) => { + switch (clientAction) { + case 'inserted': + if (data.type === 'webhook-outgoing') { + this.addIntegration(data); + } + break; + case 'updated': + data = Models.Integrations.findOneById(id); + if (data.type === 'webhook-outgoing') { + this.removeIntegration(data); + this.addIntegration(data); + } + break; + case 'removed': + this.removeIntegration({ _id: id }); + break; + } }); } From 207dbd3dc4f718e8675e009e8922e92d617037b1 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 13 Sep 2020 13:19:02 -0300 Subject: [PATCH 05/20] Remove settings observers for cache --- app/settings/server/functions/settings.ts | 22 +++++++++++++++++---- app/settings/server/observer.js | 24 ++++++++++++++++------- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/app/settings/server/functions/settings.ts b/app/settings/server/functions/settings.ts index 480c2f936832..7cccf4fdfc3c 100644 --- a/app/settings/server/functions/settings.ts +++ b/app/settings/server/functions/settings.ts @@ -385,13 +385,27 @@ class Settings extends SettingsBase { */ init(): void { this.initialLoad = true; - SettingsModel.find().observe({ - added: (record: ISettingRecord) => this.storeSettingValue(record, this.initialLoad), - changed: (record: ISettingRecord) => this.storeSettingValue(record, this.initialLoad), - removed: (record: ISettingRecord) => this.removeSettingValue(record, this.initialLoad), + SettingsModel.find().forEach((record: ISettingRecord) => { + this.storeSettingValue(record, this.initialLoad); }); this.initialLoad = false; this.afterInitialLoad.forEach((fn) => fn(Meteor.settings)); + + SettingsModel.on('change', ({ clientAction, id, data }) => { + switch (clientAction) { + case 'inserted': + this.storeSettingValue(data, this.initialLoad); + break; + case 'updated': + data = SettingsModel.findOneById(id); + this.storeSettingValue(data, this.initialLoad); + break; + case 'removed': + data = SettingsModel.trashFindOneById(id); + this.removeSettingValue(data, this.initialLoad); + break; + } + }); } onAfterInitialLoad(fn: (settings: Meteor.Settings) => void): void { diff --git a/app/settings/server/observer.js b/app/settings/server/observer.js index 7c376078aaa4..53bf2664ec5e 100644 --- a/app/settings/server/observer.js +++ b/app/settings/server/observer.js @@ -10,10 +10,20 @@ const updateValue = (id, fields) => { setValue(id, fields.value); }; -Meteor.startup(() => Settings.find({}, { fields: { value: 1 } }).observeChanges({ - added: updateValue, - changed: updateValue, - removed(id) { - setValue(id, undefined); - }, -})); +Meteor.startup(() => { + Settings.find({}, { fields: { value: 1 } }).fetch().forEach((record) => updateValue(record._id, { value: record.value })); + + Settings.on('change', ({ clientAction, id, data }) => { + switch (clientAction) { + case 'inserted': + case 'updated': + if (data.value) { + updateValue(id, { value: data.value }); + } + break; + case 'removed': + setValue(id, undefined); + break; + } + }); +}); From 522c2e771df2c824687f10b0aaeb7df8533a9ede Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 13 Sep 2020 13:37:09 -0300 Subject: [PATCH 06/20] Remove observer from instance status --- app/models/server/models/InstanceStatus.js | 7 +++ server/stream/streamBroadcast.js | 52 ++++++++++++++++------ 2 files changed, 45 insertions(+), 14 deletions(-) create mode 100644 app/models/server/models/InstanceStatus.js diff --git a/app/models/server/models/InstanceStatus.js b/app/models/server/models/InstanceStatus.js new file mode 100644 index 000000000000..22bb58647404 --- /dev/null +++ b/app/models/server/models/InstanceStatus.js @@ -0,0 +1,7 @@ +import { InstanceStatus } from 'meteor/konecty:multiple-instances-status'; + +import { Base } from './_Base'; + +export class InstanceStatusModel extends Base {} + +export default new InstanceStatusModel(InstanceStatus.getCollection()); diff --git a/server/stream/streamBroadcast.js b/server/stream/streamBroadcast.js index 0b982b6e99b5..b615e1b5d685 100644 --- a/server/stream/streamBroadcast.js +++ b/server/stream/streamBroadcast.js @@ -11,6 +11,7 @@ import { hasPermission } from '../../app/authorization'; import { settings } from '../../app/settings'; import { isDocker, getURL } from '../../app/utils'; import { Users } from '../../app/models/server'; +import InstanceStatusModel from '../../app/models/server/models/InstanceStatus'; process.env.PORT = String(process.env.PORT).trim(); process.env.INSTANCE_IP = String(process.env.INSTANCE_IP).trim(); @@ -56,26 +57,17 @@ function authorizeConnection(instance) { return _authorizeConnection(instance); } +const cache = new Map(); const originalSetDefaultStatus = UserPresence.setDefaultStatus; function startMatrixBroadcast() { if (!startMonitor) { UserPresence.setDefaultStatus = originalSetDefaultStatus; } - const query = { - 'extraInformation.port': { - $exists: true, - }, - }; - - const options = { - sort: { - _createdAt: -1, - }, - }; - - return InstanceStatus.getCollection().find(query, options).observe({ + const actions = { added(record) { + cache.set(record._id, record); + const subPath = getURL('', { cdn: false, full: false }); let instance = `${ record.extraInformation.host }:${ record.extraInformation.port }${ subPath }`; @@ -111,7 +103,13 @@ function startMatrixBroadcast() { }; }, - removed(record) { + removed(id) { + const record = cache.get(id); + if (!record) { + return; + } + cache.delete(id); + const subPath = getURL('', { cdn: false, full: false }); let instance = `${ record.extraInformation.host }:${ record.extraInformation.port }${ subPath }`; @@ -130,6 +128,32 @@ function startMatrixBroadcast() { return delete connections[instance]; } }, + }; + + const query = { + 'extraInformation.port': { + $exists: true, + }, + }; + + const options = { + sort: { + _createdAt: -1, + }, + }; + + InstanceStatusModel.find(query, options).fetch().forEach(actions.added); + return InstanceStatusModel.on('change', ({ clientAction, id, data }) => { + switch (clientAction) { + case 'inserted': + if (data.extraInformation?.port) { + actions.added(data); + } + break; + case 'removed': + actions.removed(id); + break; + } }); } From 9788908a163ca162c9ce115a182cd5367d6d3e84 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 13 Sep 2020 15:12:02 -0300 Subject: [PATCH 07/20] Remove observer from presence monitoring --- app/models/server/models/InstanceStatus.js | 2 +- app/models/server/models/UsersSessions.js | 7 ++++ app/models/server/models/_BaseDb.js | 7 +++- server/startup/presence.js | 48 +++++++++++++++++++++- 4 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 app/models/server/models/UsersSessions.js diff --git a/app/models/server/models/InstanceStatus.js b/app/models/server/models/InstanceStatus.js index 22bb58647404..344381e44266 100644 --- a/app/models/server/models/InstanceStatus.js +++ b/app/models/server/models/InstanceStatus.js @@ -4,4 +4,4 @@ import { Base } from './_Base'; export class InstanceStatusModel extends Base {} -export default new InstanceStatusModel(InstanceStatus.getCollection()); +export default new InstanceStatusModel(InstanceStatus.getCollection(), { preventSetUpdatedAt: true }); diff --git a/app/models/server/models/UsersSessions.js b/app/models/server/models/UsersSessions.js new file mode 100644 index 000000000000..43aec902d343 --- /dev/null +++ b/app/models/server/models/UsersSessions.js @@ -0,0 +1,7 @@ +import { UsersSessions } from 'meteor/konecty:user-presence'; + +import { Base } from './_Base'; + +export class UsersSessionsModel extends Base {} + +export default new UsersSessionsModel(UsersSessions, { preventSetUpdatedAt: true }); diff --git a/app/models/server/models/_BaseDb.js b/app/models/server/models/_BaseDb.js index fbd3ced3bb73..cb20910d20d3 100644 --- a/app/models/server/models/_BaseDb.js +++ b/app/models/server/models/_BaseDb.js @@ -25,7 +25,7 @@ try { const actions = { i: 'insert', u: 'update', - d: 'delete', + d: 'remove', }; export class BaseDb extends EventEmitter { @@ -44,6 +44,8 @@ export class BaseDb extends EventEmitter { this.baseModel = baseModel; + this.preventSetUpdatedAt = !!options.preventSetUpdatedAt; + this.wrapModel(); const { oplogEnabled, mongo } = getMongoInfo(); @@ -92,6 +94,9 @@ export class BaseDb extends EventEmitter { } setUpdatedAt(record = {}) { + if (this.preventSetUpdatedAt) { + return record; + } // TODO: Check if this can be deleted, Rodrigo does not rememebr WHY he added it. So he removed it to fix issue #5541 // setUpdatedAt(record = {}, checkQuery = false, query) { // if (checkQuery === true) { diff --git a/server/startup/presence.js b/server/startup/presence.js index 10abc5b7a783..5295e1463076 100644 --- a/server/startup/presence.js +++ b/server/startup/presence.js @@ -2,6 +2,8 @@ import { Meteor } from 'meteor/meteor'; import { InstanceStatus } from 'meteor/konecty:multiple-instances-status'; import { UserPresence, UserPresenceMonitor } from 'meteor/konecty:user-presence'; +import InstanceStatusModel from '../../app/models/server/models/InstanceStatus'; +import UsersSessionsModel from '../../app/models/server/models/UsersSessions'; Meteor.startup(function() { const instance = { @@ -20,6 +22,50 @@ Meteor.startup(function() { const startMonitor = typeof process.env.DISABLE_PRESENCE_MONITOR === 'undefined' || !['true', 'yes'].includes(String(process.env.DISABLE_PRESENCE_MONITOR).toLowerCase()); if (startMonitor) { - UserPresenceMonitor.start(); + // UserPresenceMonitor.start(); + + // Remove lost connections + const ids = InstanceStatusModel.find({}, { fields: { _id: 1 } }).fetch().map((id) => id._id); + + const update = { + $pull: { + connections: { + instanceId: { + $nin: ids, + }, + }, + }, + }; + UsersSessionsModel.update({}, update, { multi: true }); + + InstanceStatusModel.on('change', ({ clientAction, id }) => { + switch (clientAction) { + case 'removed': + UserPresence.removeConnectionsByInstanceId(id); + break; + } + }); + + UsersSessionsModel.on('change', ({ clientAction, id, data }) => { + switch (clientAction) { + case 'inserted': + UserPresenceMonitor.processUserSession(data, 'added'); + break; + case 'updated': + const record = UsersSessionsModel.findOneById(id); + if (record) { + UserPresenceMonitor.processUserSession(record, 'changed'); + } + break; + case 'removed': + UserPresenceMonitor.processUserSession({ + _id: id, + connections: [{ + fake: true, + }], + }, 'removed'); + break; + } + }); } }); From 46706ae23d146f68d46eeaa92402e98a8e1b845e Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 13 Sep 2020 15:49:36 -0300 Subject: [PATCH 08/20] Very first concept of changestream in place of oplog --- .meteor/packages | 1 + .meteor/versions | 1 + .../server/streamer/permissions/emitter.js | 2 +- app/integrations/server/lib/triggerHandler.js | 2 +- app/models/server/models/_BaseDb.js | 15 ++- app/models/server/models/_oplogHandle.ts | 118 ++++++++++++++++++ app/search/server/events/events.js | 4 +- app/settings/server/functions/settings.ts | 6 +- app/settings/server/observer.js | 3 +- app/utils/server/functions/getMongoInfo.js | 2 +- server/publications/settings/emitter.js | 2 +- server/publications/settings/index.js | 2 +- server/startup/presence.js | 6 +- server/stream/messages/emitter.js | 2 +- 14 files changed, 145 insertions(+), 21 deletions(-) create mode 100644 app/models/server/models/_oplogHandle.ts diff --git a/.meteor/packages b/.meteor/packages index 2a2c903bbd21..9004ec0e7296 100644 --- a/.meteor/packages +++ b/.meteor/packages @@ -3,6 +3,7 @@ # 'meteor add' and 'meteor remove' will edit this file for you, # but you can also edit it by hand. +disable-oplog rocketchat:mongo-config accounts-facebook@1.3.2 diff --git a/.meteor/versions b/.meteor/versions index f6ad8cbe3539..02e008d74942 100644 --- a/.meteor/versions +++ b/.meteor/versions @@ -32,6 +32,7 @@ ddp-server@2.3.2 deepwell:bootstrap-datepicker2@1.3.0 deps@1.0.12 diff-sequence@1.1.1 +disable-oplog@1.0.7 dispatch:run-as-user@1.1.1 dynamic-import@0.5.2 ecmascript@0.14.3 diff --git a/app/authorization/server/streamer/permissions/emitter.js b/app/authorization/server/streamer/permissions/emitter.js index a7062439eb46..a258bdc607ec 100644 --- a/app/authorization/server/streamer/permissions/emitter.js +++ b/app/authorization/server/streamer/permissions/emitter.js @@ -12,7 +12,7 @@ Permissions.on('change', ({ clientAction, id, data, diff }) => { switch (clientAction) { case 'updated': case 'inserted': - data = data || Permissions.findOneById(id); + data = data ?? Permissions.findOneById(id); break; case 'removed': diff --git a/app/integrations/server/lib/triggerHandler.js b/app/integrations/server/lib/triggerHandler.js index 3d1344272f84..0541855cbf64 100644 --- a/app/integrations/server/lib/triggerHandler.js +++ b/app/integrations/server/lib/triggerHandler.js @@ -32,7 +32,7 @@ integrations.triggerHandler = new class RocketChatIntegrationHandler { } break; case 'updated': - data = Models.Integrations.findOneById(id); + data = data ?? Models.Integrations.findOneById(id); if (data.type === 'webhook-outgoing') { this.removeIntegration(data); this.addIntegration(data); diff --git a/app/models/server/models/_BaseDb.js b/app/models/server/models/_BaseDb.js index cb20910d20d3..3e5035508758 100644 --- a/app/models/server/models/_BaseDb.js +++ b/app/models/server/models/_BaseDb.js @@ -6,6 +6,7 @@ import _ from 'underscore'; import { getMongoInfo } from '../../../utils/server/functions/getMongoInfo'; import { metrics } from '../../../metrics/server/lib/metrics'; +import { oplog } from './_oplogHandle'; const baseName = 'rocketchat_'; @@ -48,10 +49,10 @@ export class BaseDb extends EventEmitter { this.wrapModel(); - const { oplogEnabled, mongo } = getMongoInfo(); + const { oplogEnabled/* , mongo */ } = getMongoInfo(); // When someone start listening for changes we start oplog if available - const handleListener = (event /* , listener*/) => { + const handleListener = async (event /* , listener*/) => { if (event !== 'change') { return; } @@ -61,8 +62,12 @@ export class BaseDb extends EventEmitter { const query = { collection: this.collectionName, }; + console.log(this.collectionName); - if (!mongo._oplogHandle) { + const _oplogHandle = await oplog; + // const { _oplogHandle } = mongo; + + if (!_oplogHandle) { throw new Error(`Error: Unable to find Mongodb Oplog. You must run the server with oplog enabled. Try the following:\n 1. Start your mongodb in a replicaset mode: mongod --smallfiles --oplogSize 128 --replSet rs0\n 2. Start the replicaset via mongodb shell: mongo mongo/meteor --eval "rs.initiate({ _id: ''rs0'', members: [ { _id: 0, host: ''localhost:27017'' } ]})"\n @@ -70,13 +75,13 @@ export class BaseDb extends EventEmitter { `); } - mongo._oplogHandle.onOplogEntry( + _oplogHandle.onOplogEntry( query, this.processOplogRecord.bind(this), ); // Meteor will handle if we have a value https://github.com/meteor/meteor/blob/5dcd0b2eb9c8bf881ffbee98bc4cb7631772c4da/packages/mongo/oplog_tailing.js#L5 if (process.env.METEOR_OPLOG_TOO_FAR_BEHIND == null) { - mongo._oplogHandle._defineTooFarBehind( + _oplogHandle._defineTooFarBehind( Number.MAX_SAFE_INTEGER, ); } diff --git a/app/models/server/models/_oplogHandle.ts b/app/models/server/models/_oplogHandle.ts new file mode 100644 index 000000000000..c9cb32b07067 --- /dev/null +++ b/app/models/server/models/_oplogHandle.ts @@ -0,0 +1,118 @@ +import { Meteor } from 'meteor/meteor'; +// import s from 'underscore.string'; +import { MongoClient, Cursor, /* , Timestamp */ + Db } from 'mongodb'; +// import urlParser from 'mongodb/lib/url_parser'; + +class OplogHandle { + client: MongoClient; + + // stream: Cursor; + db: Db; + + constructor(oplogUrl: string, private dbName: string) { + this.client = new MongoClient(oplogUrl, { + useUnifiedTopology: true, + useNewUrlParser: true, + // poolSize: 1, + }); + } + + async start(): Promise { + await this.client.connect(); + this.db = this.client.db(); + + // const isMasterDoc = await db.admin().command({ ismaster: 1 }); + // if (!isMasterDoc || !isMasterDoc.setName) { + // throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); + // } + + // const oplogCollection = db.collection('oplog.rs'); + + // const lastOplogEntry = await oplogCollection.findOne<{ts: Timestamp}>({}, { sort: { $natural: -1 }, projection: { _id: 0, ts: 1 } }); + + // const oplogSelector = { + // ns: new RegExp(`^(?:${ [ + // s.escapeRegExp(`${ this.dbName }.`), + // s.escapeRegExp('admin.$cmd'), + // ].join('|') })`), + + // op: { $in: ['i', 'u', 'd'] }, + // ...lastOplogEntry && { ts: { $gt: lastOplogEntry.ts } }, + // }; + + // console.log(oplogSelector); + // this.stream = oplogCollection.find(oplogSelector, { + // tailable: true, + // // awaitData: true, + // }).stream(); + + return this; + } + + onOplogEntry(query: {collection: string}, callback: Function): void { + // this.stream.on('data', Meteor.bindEnvironment((buffer) => { + // const doc = buffer as any; + // if (doc.ns === `${ this.dbName }.${ query.collection }`) { + // // console.log('doc', doc); + // callback({ + // id: doc.op === 'u' ? doc.o2._id : doc.o._id, + // op: doc, + // }); + // } + // })); + this.db.collection(query.collection).watch([], { /* fullDocument: 'updateLookup' */ }).on('change', Meteor.bindEnvironment((event) => { + console.log(event); + switch (event.operationType) { + case 'insert': + callback({ + id: event.documentKey._id, + op: { + op: 'i', + o: event.fullDocument, + }, + }); + break; + case 'update': + callback({ + id: event.documentKey._id, + op: { + op: 'u', + // o: event.fullDocument, + o: { + $set: event.updateDescription.updatedFields, + $unset: event.updateDescription.removedFields, + }, + }, + }); + break; + case 'delete': + callback({ + id: event.documentKey._id, + op: { + op: 'd', + }, + }); + break; + } + })); + } + + _defineTooFarBehind(): void { + // + } +} + +// TODO: Extract from connection string; +const _dbName = 'rocketchat'; +// if (!process.env.MONGO_OPLOG_URL) { +// throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); +// } + +// TODO: +// if (urlParser(process.env.MONGO_OPLOG_URL).database !== 'local') { +// throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); +// } + + +export const oplog = new OplogHandle(process.env.MONGO_URL, _dbName).start(); diff --git a/app/search/server/events/events.js b/app/search/server/events/events.js index 7f0f5032a1d7..65ce63523f9a 100644 --- a/app/search/server/events/events.js +++ b/app/search/server/events/events.js @@ -41,7 +41,7 @@ Users.on('change', ({ clientAction, id, data }) => { switch (clientAction) { case 'updated': case 'inserted': - const user = data || Users.findOneById(id); + const user = data ?? Users.findOneById(id); eventService.promoteEvent('user.save', id, user); break; @@ -55,7 +55,7 @@ Rooms.on('change', ({ clientAction, id, data }) => { switch (clientAction) { case 'updated': case 'inserted': - const room = data || Rooms.findOneById(id); + const room = data ?? Rooms.findOneById(id); eventService.promoteEvent('room.save', id, room); break; diff --git a/app/settings/server/functions/settings.ts b/app/settings/server/functions/settings.ts index 7cccf4fdfc3c..b22355468647 100644 --- a/app/settings/server/functions/settings.ts +++ b/app/settings/server/functions/settings.ts @@ -385,7 +385,7 @@ class Settings extends SettingsBase { */ init(): void { this.initialLoad = true; - SettingsModel.find().forEach((record: ISettingRecord) => { + SettingsModel.find().fetch().forEach((record: ISettingRecord) => { this.storeSettingValue(record, this.initialLoad); }); this.initialLoad = false; @@ -394,10 +394,8 @@ class Settings extends SettingsBase { SettingsModel.on('change', ({ clientAction, id, data }) => { switch (clientAction) { case 'inserted': - this.storeSettingValue(data, this.initialLoad); - break; case 'updated': - data = SettingsModel.findOneById(id); + data = data ?? SettingsModel.findOneById(id); this.storeSettingValue(data, this.initialLoad); break; case 'removed': diff --git a/app/settings/server/observer.js b/app/settings/server/observer.js index 53bf2664ec5e..61e4d415d8b7 100644 --- a/app/settings/server/observer.js +++ b/app/settings/server/observer.js @@ -13,10 +13,11 @@ const updateValue = (id, fields) => { Meteor.startup(() => { Settings.find({}, { fields: { value: 1 } }).fetch().forEach((record) => updateValue(record._id, { value: record.value })); - Settings.on('change', ({ clientAction, id, data }) => { + Settings.on('change', ({ clientAction, id, data, diff }) => { switch (clientAction) { case 'inserted': case 'updated': + data = data ?? diff; if (data.value) { updateValue(id, { value: data.value }); } diff --git a/app/utils/server/functions/getMongoInfo.js b/app/utils/server/functions/getMongoInfo.js index c0cbd9a17a62..fc19530dd120 100644 --- a/app/utils/server/functions/getMongoInfo.js +++ b/app/utils/server/functions/getMongoInfo.js @@ -3,7 +3,7 @@ import { MongoInternals } from 'meteor/mongo'; export function getOplogInfo() { const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); - const oplogEnabled = Boolean(mongo._oplogHandle && mongo._oplogHandle.onOplogEntry); + const oplogEnabled = true; // Boolean(mongo._oplogHandle && mongo._oplogHandle.onOplogEntry); return { oplogEnabled, mongo }; } diff --git a/server/publications/settings/emitter.js b/server/publications/settings/emitter.js index 2a9312b22548..334a4f733848 100644 --- a/server/publications/settings/emitter.js +++ b/server/publications/settings/emitter.js @@ -9,7 +9,7 @@ Settings.on('change', ({ clientAction, id, data, diff }) => { switch (clientAction) { case 'updated': case 'inserted': { - const setting = data || Settings.findOneById(id); + const setting = data ?? Settings.findOneById(id); const value = { _id: setting._id, value: setting.value, diff --git a/server/publications/settings/index.js b/server/publications/settings/index.js index 7b379059b50b..ff60c06a106c 100644 --- a/server/publications/settings/index.js +++ b/server/publications/settings/index.js @@ -85,7 +85,7 @@ Settings.on('change', ({ clientAction, id, data, diff }) => { switch (clientAction) { case 'updated': case 'inserted': { - const setting = data || Settings.findOneById(id); + const setting = data ?? Settings.findOneById(id); const value = { _id: setting._id, value: setting.value, diff --git a/server/startup/presence.js b/server/startup/presence.js index 5295e1463076..d28c69906aec 100644 --- a/server/startup/presence.js +++ b/server/startup/presence.js @@ -52,9 +52,9 @@ Meteor.startup(function() { UserPresenceMonitor.processUserSession(data, 'added'); break; case 'updated': - const record = UsersSessionsModel.findOneById(id); - if (record) { - UserPresenceMonitor.processUserSession(record, 'changed'); + data = data ?? UsersSessionsModel.findOneById(id); + if (data) { + UserPresenceMonitor.processUserSession(data, 'changed'); } break; case 'removed': diff --git a/server/stream/messages/emitter.js b/server/stream/messages/emitter.js index aa1f7260d917..d86dea2639f5 100644 --- a/server/stream/messages/emitter.js +++ b/server/stream/messages/emitter.js @@ -31,7 +31,7 @@ Meteor.startup(function() { switch (clientAction) { case 'inserted': case 'updated': - const message = data || Messages.findOne({ _id: id }); + const message = data ?? Messages.findOne({ _id: id }); publishMessage(clientAction, message); break; } From a269a23743a4f53e8c70d3e291876351246e9fd7 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 13 Sep 2020 19:17:32 -0300 Subject: [PATCH 09/20] Reduce observers to prevent unecessary finds --- app/models/server/models/_Base.js | 1 + app/search/server/events/events.js | 40 +++++++++++++------ app/settings/server/functions/settings.ts | 5 +++ app/settings/server/observer.js | 24 +----------- server/publications/settings/emitter.js | 13 ++++--- server/publications/settings/index.js | 47 +---------------------- 6 files changed, 44 insertions(+), 86 deletions(-) diff --git a/app/models/server/models/_Base.js b/app/models/server/models/_Base.js index 86ca31fa2ad4..c668d3761ecf 100644 --- a/app/models/server/models/_Base.js +++ b/app/models/server/models/_Base.js @@ -12,6 +12,7 @@ export class Base { this.collectionName = this._db.collectionName; this.name = this._db.name; + this.removeListener = this._db.removeListener.bind(this._db); this.on = this._db.on.bind(this._db); this.emit = this._db.emit.bind(this._db); diff --git a/app/search/server/events/events.js b/app/search/server/events/events.js index 65ce63523f9a..ffb1701fab2b 100644 --- a/app/search/server/events/events.js +++ b/app/search/server/events/events.js @@ -1,5 +1,8 @@ -import { callbacks } from '../../../callbacks'; -import { Users, Rooms } from '../../../models'; +import _ from 'underscore'; + +import { settings } from '../../../settings/server'; +import { callbacks } from '../../../callbacks/server'; +import { Users, Rooms } from '../../../models/server'; import { searchProviderService } from '../service/providerService'; import SearchLogger from '../logger/logger'; @@ -22,22 +25,20 @@ const eventService = new EventService(); /** * Listen to message changes via Hooks */ -callbacks.add('afterSaveMessage', function(m) { +function afterSaveMessage(m) { eventService.promoteEvent('message.save', m._id, m); return m; -}, callbacks.priority.MEDIUM, 'search-events'); +} -callbacks.add('afterDeleteMessage', function(m) { +function afterDeleteMessage(m) { eventService.promoteEvent('message.delete', m._id); return m; -}, callbacks.priority.MEDIUM, 'search-events-delete'); +} /** * Listen to user and room changes via cursor */ - - -Users.on('change', ({ clientAction, id, data }) => { +function onUsersChange({ clientAction, id, data }) { switch (clientAction) { case 'updated': case 'inserted': @@ -49,9 +50,9 @@ Users.on('change', ({ clientAction, id, data }) => { eventService.promoteEvent('user.delete', id); break; } -}); +} -Rooms.on('change', ({ clientAction, id, data }) => { +function onRoomsChange({ clientAction, id, data }) { switch (clientAction) { case 'updated': case 'inserted': @@ -63,4 +64,19 @@ Rooms.on('change', ({ clientAction, id, data }) => { eventService.promoteEvent('room.delete', id); break; } -}); +} + +settings.get('Search.Provider', _.debounce((key, value) => { + console.log('searchProviderService.activeProvider?.on', searchProviderService.activeProvider?.on); + if (searchProviderService.activeProvider?.on) { + Users.on('change', onUsersChange); + Rooms.on('change', onRoomsChange); + callbacks.add('afterSaveMessage', afterSaveMessage, callbacks.priority.MEDIUM, 'search-events'); + callbacks.add('afterDeleteMessage', afterDeleteMessage, callbacks.priority.MEDIUM, 'search-events-delete'); + } else { + Users.removeListener('change', onUsersChange); + Rooms.removeListener('change', onRoomsChange); + callbacks.remove('afterSaveMessage', 'search-events'); + callbacks.remove('afterDeleteMessage', 'search-events-delete'); + } +}, 1000)); diff --git a/app/settings/server/functions/settings.ts b/app/settings/server/functions/settings.ts index b22355468647..74816a72699f 100644 --- a/app/settings/server/functions/settings.ts +++ b/app/settings/server/functions/settings.ts @@ -5,6 +5,8 @@ import _ from 'underscore'; import { SettingsBase, SettingValue } from '../../lib/settings'; import SettingsModel from '../../../models/server/models/Settings'; +import { updateValue } from '../observer'; +import { setValue } from '../raw'; const blockedSettings = new Set(); const hiddenSettings = new Set(); @@ -387,6 +389,7 @@ class Settings extends SettingsBase { this.initialLoad = true; SettingsModel.find().fetch().forEach((record: ISettingRecord) => { this.storeSettingValue(record, this.initialLoad); + updateValue(record._id, { value: record.value }); }); this.initialLoad = false; this.afterInitialLoad.forEach((fn) => fn(Meteor.settings)); @@ -397,10 +400,12 @@ class Settings extends SettingsBase { case 'updated': data = data ?? SettingsModel.findOneById(id); this.storeSettingValue(data, this.initialLoad); + updateValue(id, { value: data.value }); break; case 'removed': data = SettingsModel.trashFindOneById(id); this.removeSettingValue(data, this.initialLoad); + setValue(id, undefined); break; } }); diff --git a/app/settings/server/observer.js b/app/settings/server/observer.js index 61e4d415d8b7..7e941aea0c95 100644 --- a/app/settings/server/observer.js +++ b/app/settings/server/observer.js @@ -1,30 +1,8 @@ -import { Meteor } from 'meteor/meteor'; - -import { Settings } from '../../models/server'; import { setValue } from './raw'; -const updateValue = (id, fields) => { +export const updateValue = (id, fields) => { if (typeof fields.value === 'undefined') { return; } setValue(id, fields.value); }; - -Meteor.startup(() => { - Settings.find({}, { fields: { value: 1 } }).fetch().forEach((record) => updateValue(record._id, { value: record.value })); - - Settings.on('change', ({ clientAction, id, data, diff }) => { - switch (clientAction) { - case 'inserted': - case 'updated': - data = data ?? diff; - if (data.value) { - updateValue(id, { value: data.value }); - } - break; - case 'removed': - setValue(id, undefined); - break; - } - }); -}); diff --git a/server/publications/settings/emitter.js b/server/publications/settings/emitter.js index 334a4f733848..4bd3a4c627bf 100644 --- a/server/publications/settings/emitter.js +++ b/server/publications/settings/emitter.js @@ -1,6 +1,7 @@ -import { Settings } from '../../../app/models'; -import { Notifications } from '../../../app/notifications'; -import { hasPermission } from '../../../app/authorization'; +import { Settings } from '../../../app/models/server'; +import { Notifications } from '../../../app/notifications/server'; +import { hasAtLeastOnePermission } from '../../../app/authorization/server'; +import { SettingsEvents } from '../../../app/settings/server/functions/settings'; Settings.on('change', ({ clientAction, id, data, diff }) => { if (diff && Object.keys(diff).length === 1 && diff._updatedAt) { // avoid useless changes @@ -15,8 +16,11 @@ Settings.on('change', ({ clientAction, id, data, diff }) => { value: setting.value, editor: setting.editor, properties: setting.properties, + enterprise: setting.enterprise, }; + SettingsEvents.emit('change-setting', setting, value); + if (setting.public === true) { Notifications.notifyAllInThisInstance('public-settings-changed', clientAction, value); } @@ -36,10 +40,9 @@ Settings.on('change', ({ clientAction, id, data, diff }) => { } }); - Notifications.streamAll.allowRead('private-settings-changed', function() { if (this.userId == null) { return false; } - return hasPermission(this.userId, 'view-privileged-setting'); + return hasAtLeastOnePermission(this.userId, ['view-privileged-setting', 'edit-privileged-setting', 'manage-selected-settings']); }); diff --git a/server/publications/settings/index.js b/server/publications/settings/index.js index ff60c06a106c..e14f2079ca30 100644 --- a/server/publications/settings/index.js +++ b/server/publications/settings/index.js @@ -1,7 +1,6 @@ import { Meteor } from 'meteor/meteor'; import { Settings } from '../../../app/models/server'; -import { Notifications } from '../../../app/notifications/server'; import { hasPermission, hasAtLeastOnePermission } from '../../../app/authorization/server'; import { getSettingPermissionId } from '../../../app/authorization/lib'; import { SettingsEvents } from '../../../app/settings/server/functions/settings'; @@ -57,7 +56,7 @@ Meteor.methods({ if (!(updatedAfter instanceof Date)) { // this does not only imply an unfiltered setting range, it also identifies the caller's context: - // If called *with* filter (see below), the user wants a colllection as a result. + // If called *with* filter (see below), the user wants a collection as a result. // in this case, it shall only be a plain array return getAuthorizedSettings(updatedAfter, privilegedSetting); } @@ -77,47 +76,3 @@ Meteor.methods({ }; }, }); - -Settings.on('change', ({ clientAction, id, data, diff }) => { - if (diff && Object.keys(diff).length === 1 && diff._updatedAt) { // avoid useless changes - return; - } - switch (clientAction) { - case 'updated': - case 'inserted': { - const setting = data ?? Settings.findOneById(id); - const value = { - _id: setting._id, - value: setting.value, - editor: setting.editor, - properties: setting.properties, - enterprise: setting.enterprise, - }; - - SettingsEvents.emit('change-setting', setting, value); - - if (setting.public === true) { - Notifications.notifyAllInThisInstance('public-settings-changed', clientAction, value); - } - Notifications.notifyLoggedInThisInstance('private-settings-changed', clientAction, setting); - break; - } - - case 'removed': { - const setting = data || Settings.findOneById(id, { fields: { public: 1 } }); - - if (setting && setting.public === true) { - Notifications.notifyAllInThisInstance('public-settings-changed', clientAction, { _id: id }); - } - Notifications.notifyLoggedInThisInstance('private-settings-changed', clientAction, { _id: id }); - break; - } - } -}); - -Notifications.streamAll.allowRead('private-settings-changed', function() { - if (this.userId == null) { - return false; - } - return hasAtLeastOnePermission(this.userId, ['view-privileged-setting', 'edit-privileged-setting', 'manage-selected-settings']); -}); From cdcdcc63edae2b2c597bef7455d8afa24fe8eb3f Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 13 Sep 2020 20:14:42 -0300 Subject: [PATCH 10/20] Handle oplog options --- app/models/server/models/_BaseDb.js | 10 +- app/models/server/models/_oplogHandle.ts | 147 ++++++++++++++------- app/utils/server/functions/getMongoInfo.js | 4 +- 3 files changed, 103 insertions(+), 58 deletions(-) diff --git a/app/models/server/models/_BaseDb.js b/app/models/server/models/_BaseDb.js index 3e5035508758..1cf75165cf2d 100644 --- a/app/models/server/models/_BaseDb.js +++ b/app/models/server/models/_BaseDb.js @@ -4,9 +4,8 @@ import { Match } from 'meteor/check'; import { Mongo } from 'meteor/mongo'; import _ from 'underscore'; -import { getMongoInfo } from '../../../utils/server/functions/getMongoInfo'; import { metrics } from '../../../metrics/server/lib/metrics'; -import { oplog } from './_oplogHandle'; +import { getOplogHandle } from './_oplogHandle'; const baseName = 'rocketchat_'; @@ -49,7 +48,7 @@ export class BaseDb extends EventEmitter { this.wrapModel(); - const { oplogEnabled/* , mongo */ } = getMongoInfo(); + const _oplogHandle = Promise.await(getOplogHandle()); // When someone start listening for changes we start oplog if available const handleListener = async (event /* , listener*/) => { @@ -64,9 +63,6 @@ export class BaseDb extends EventEmitter { }; console.log(this.collectionName); - const _oplogHandle = await oplog; - // const { _oplogHandle } = mongo; - if (!_oplogHandle) { throw new Error(`Error: Unable to find Mongodb Oplog. You must run the server with oplog enabled. Try the following:\n 1. Start your mongodb in a replicaset mode: mongod --smallfiles --oplogSize 128 --replSet rs0\n @@ -87,7 +83,7 @@ export class BaseDb extends EventEmitter { } }; - if (oplogEnabled) { + if (_oplogHandle) { this.on('newListener', handleListener); } diff --git a/app/models/server/models/_oplogHandle.ts b/app/models/server/models/_oplogHandle.ts index c9cb32b07067..25a34ed76396 100644 --- a/app/models/server/models/_oplogHandle.ts +++ b/app/models/server/models/_oplogHandle.ts @@ -1,68 +1,114 @@ import { Meteor } from 'meteor/meteor'; -// import s from 'underscore.string'; -import { MongoClient, Cursor, /* , Timestamp */ - Db } from 'mongodb'; -// import urlParser from 'mongodb/lib/url_parser'; +import { MongoInternals } from 'meteor/mongo'; +import s from 'underscore.string'; +import { MongoClient, Cursor, Timestamp, Db } from 'mongodb'; +import urlParser from 'mongodb/lib/url_parser'; class OplogHandle { + dbName: string; + client: MongoClient; - // stream: Cursor; + stream: Cursor; + db: Db; - constructor(oplogUrl: string, private dbName: string) { + usingChangeStream: boolean; + + async isChangeStreamAvailable(): Promise { + if (process.env.IGNORE_CHANGE_STREAM) { + return false; + } + + const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); + const { storageEngine } = await mongo.db.command({ serverStatus: 1 }); + return storageEngine?.name === 'wiredTiger'; + } + + async start(): Promise { + this.usingChangeStream = await this.isChangeStreamAvailable(); + console.log({ usingChangeStream: this.usingChangeStream }); + const oplogUrl = this.usingChangeStream ? process.env.MONGO_URL : process.env.MONGO_OPLOG_URL; + + if (!this.usingChangeStream && (!oplogUrl || urlParser(oplogUrl).database !== 'local')) { + throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); + } + + if (!oplogUrl) { + throw Error('$MONGO_URL must be set'); + } + + if (process.env.MONGO_OPLOG_URL) { + this.dbName = urlParser(process.env.MONGO_URL).database; + } + this.client = new MongoClient(oplogUrl, { useUnifiedTopology: true, useNewUrlParser: true, - // poolSize: 1, + ...!this.usingChangeStream && { poolSize: 1 }, }); - } - async start(): Promise { await this.client.connect(); this.db = this.client.db(); - // const isMasterDoc = await db.admin().command({ ismaster: 1 }); - // if (!isMasterDoc || !isMasterDoc.setName) { - // throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); - // } + if (!this.usingChangeStream) { + await this.startOplog(); + } - // const oplogCollection = db.collection('oplog.rs'); + return this; + } - // const lastOplogEntry = await oplogCollection.findOne<{ts: Timestamp}>({}, { sort: { $natural: -1 }, projection: { _id: 0, ts: 1 } }); + async startOplog(): Promise { + const isMasterDoc = await this.db.admin().command({ ismaster: 1 }); + if (!isMasterDoc || !isMasterDoc.setName) { + throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); + } - // const oplogSelector = { - // ns: new RegExp(`^(?:${ [ - // s.escapeRegExp(`${ this.dbName }.`), - // s.escapeRegExp('admin.$cmd'), - // ].join('|') })`), + const oplogCollection = this.db.collection('oplog.rs'); - // op: { $in: ['i', 'u', 'd'] }, - // ...lastOplogEntry && { ts: { $gt: lastOplogEntry.ts } }, - // }; + const lastOplogEntry = await oplogCollection.findOne<{ts: Timestamp}>({}, { sort: { $natural: -1 }, projection: { _id: 0, ts: 1 } }); - // console.log(oplogSelector); - // this.stream = oplogCollection.find(oplogSelector, { - // tailable: true, - // // awaitData: true, - // }).stream(); + const oplogSelector = { + ns: new RegExp(`^(?:${ [ + s.escapeRegExp(`${ this.dbName }.`), + s.escapeRegExp('admin.$cmd'), + ].join('|') })`), - return this; + op: { $in: ['i', 'u', 'd'] }, + ...lastOplogEntry && { ts: { $gt: lastOplogEntry.ts } }, + }; + + console.log(oplogSelector); + this.stream = oplogCollection.find(oplogSelector, { + tailable: true, + // awaitData: true, + }).stream(); } onOplogEntry(query: {collection: string}, callback: Function): void { - // this.stream.on('data', Meteor.bindEnvironment((buffer) => { - // const doc = buffer as any; - // if (doc.ns === `${ this.dbName }.${ query.collection }`) { - // // console.log('doc', doc); - // callback({ - // id: doc.op === 'u' ? doc.o2._id : doc.o._id, - // op: doc, - // }); - // } - // })); + if (this.usingChangeStream) { + return this._onOplogEntryChangeStream(query, callback); + } + + return this._onOplogEntryOplog(query, callback); + } + + _onOplogEntryOplog(query: {collection: string}, callback: Function): void { + this.stream.on('data', Meteor.bindEnvironment((buffer) => { + const doc = buffer as any; + if (doc.ns === `${ this.dbName }.${ query.collection }`) { + // console.log('doc', doc); + callback({ + id: doc.op === 'u' ? doc.o2._id : doc.o._id, + op: doc, + }); + } + })); + } + + _onOplogEntryChangeStream(query: {collection: string}, callback: Function): void { this.db.collection(query.collection).watch([], { /* fullDocument: 'updateLookup' */ }).on('change', Meteor.bindEnvironment((event) => { - console.log(event); + // console.log(event); switch (event.operationType) { case 'insert': callback({ @@ -103,16 +149,17 @@ class OplogHandle { } } -// TODO: Extract from connection string; -const _dbName = 'rocketchat'; -// if (!process.env.MONGO_OPLOG_URL) { -// throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); -// } +// process.env.USE_NEW_OPLOG = 'true'; +const oplogHandle = process.env.USE_NEW_OPLOG ? new OplogHandle().start() : undefined; -// TODO: -// if (urlParser(process.env.MONGO_OPLOG_URL).database !== 'local') { -// throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); -// } +export const getOplogHandle = async (): Promise => { + const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); + if (process.env.USE_NEW_OPLOG) { + return oplogHandle; + } -export const oplog = new OplogHandle(process.env.MONGO_URL, _dbName).start(); + if (mongo._oplogHandle?.onOplogEntry) { + return mongo._oplogHandle; + } +}; diff --git a/app/utils/server/functions/getMongoInfo.js b/app/utils/server/functions/getMongoInfo.js index fc19530dd120..7b34b5a3054b 100644 --- a/app/utils/server/functions/getMongoInfo.js +++ b/app/utils/server/functions/getMongoInfo.js @@ -1,9 +1,11 @@ import { MongoInternals } from 'meteor/mongo'; +import { getOplogHandle } from '../../../models/server/models/_oplogHandle'; + export function getOplogInfo() { const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); - const oplogEnabled = true; // Boolean(mongo._oplogHandle && mongo._oplogHandle.onOplogEntry); + const oplogEnabled = !!Promise.await(getOplogHandle()); return { oplogEnabled, mongo }; } From 5bb310a85981fa7532bc4dbf3fbba431d0709e73 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 13 Sep 2020 20:50:02 -0300 Subject: [PATCH 11/20] Fix url parse --- app/models/server/models/_oplogHandle.ts | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/app/models/server/models/_oplogHandle.ts b/app/models/server/models/_oplogHandle.ts index 25a34ed76396..421f1028d555 100644 --- a/app/models/server/models/_oplogHandle.ts +++ b/app/models/server/models/_oplogHandle.ts @@ -1,8 +1,12 @@ +import { promisify } from 'util'; + import { Meteor } from 'meteor/meteor'; import { MongoInternals } from 'meteor/mongo'; import s from 'underscore.string'; import { MongoClient, Cursor, Timestamp, Db } from 'mongodb'; -import urlParser from 'mongodb/lib/url_parser'; +import _urlParser from 'mongodb/lib/url_parser'; + +const urlParser = promisify(_urlParser); class OplogHandle { dbName: string; @@ -27,10 +31,10 @@ class OplogHandle { async start(): Promise { this.usingChangeStream = await this.isChangeStreamAvailable(); - console.log({ usingChangeStream: this.usingChangeStream }); const oplogUrl = this.usingChangeStream ? process.env.MONGO_URL : process.env.MONGO_OPLOG_URL; - if (!this.usingChangeStream && (!oplogUrl || urlParser(oplogUrl).database !== 'local')) { + const urlParsed = await urlParser(oplogUrl); + if (!this.usingChangeStream && (!oplogUrl || urlParsed.dbName !== 'local')) { throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); } @@ -39,7 +43,8 @@ class OplogHandle { } if (process.env.MONGO_OPLOG_URL) { - this.dbName = urlParser(process.env.MONGO_URL).database; + const urlParsed = await urlParser(process.env.MONGO_URL); + this.dbName = urlParsed.dbName; } this.client = new MongoClient(oplogUrl, { @@ -149,7 +154,9 @@ class OplogHandle { } } -// process.env.USE_NEW_OPLOG = 'true'; +process.env.USE_NEW_OPLOG = 'true'; +process.env.IGNORE_CHANGE_STREAM = 'true'; + const oplogHandle = process.env.USE_NEW_OPLOG ? new OplogHandle().start() : undefined; export const getOplogHandle = async (): Promise => { From 58d9caf367282d9c7a5eb4faa6ed26fd044685d7 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 13 Sep 2020 22:01:46 -0300 Subject: [PATCH 12/20] Fix importer issue --- app/models/server/models/_oplogHandle.ts | 16 ++++++---------- app/models/server/models/_oplogUrlParser.js | 5 +++++ app/settings/server/raw.js | 2 +- server/main.d.ts | 6 ++++++ 4 files changed, 18 insertions(+), 11 deletions(-) create mode 100644 app/models/server/models/_oplogUrlParser.js diff --git a/app/models/server/models/_oplogHandle.ts b/app/models/server/models/_oplogHandle.ts index 421f1028d555..aeb26c08f15e 100644 --- a/app/models/server/models/_oplogHandle.ts +++ b/app/models/server/models/_oplogHandle.ts @@ -1,12 +1,9 @@ -import { promisify } from 'util'; - import { Meteor } from 'meteor/meteor'; import { MongoInternals } from 'meteor/mongo'; import s from 'underscore.string'; import { MongoClient, Cursor, Timestamp, Db } from 'mongodb'; -import _urlParser from 'mongodb/lib/url_parser'; -const urlParser = promisify(_urlParser); +import { urlParser } from './_oplogUrlParser'; class OplogHandle { dbName: string; @@ -154,18 +151,17 @@ class OplogHandle { } } -process.env.USE_NEW_OPLOG = 'true'; -process.env.IGNORE_CHANGE_STREAM = 'true'; +// process.env.USE_OLD_OPLOG = 'true'; +// process.env.IGNORE_CHANGE_STREAM = 'true'; -const oplogHandle = process.env.USE_NEW_OPLOG ? new OplogHandle().start() : undefined; +const oplogHandle = !process.env.USE_OLD_OPLOG ? new OplogHandle().start() : undefined; export const getOplogHandle = async (): Promise => { - const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); - - if (process.env.USE_NEW_OPLOG) { + if (oplogHandle) { return oplogHandle; } + const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); if (mongo._oplogHandle?.onOplogEntry) { return mongo._oplogHandle; } diff --git a/app/models/server/models/_oplogUrlParser.js b/app/models/server/models/_oplogUrlParser.js new file mode 100644 index 000000000000..8bf3fdd2e836 --- /dev/null +++ b/app/models/server/models/_oplogUrlParser.js @@ -0,0 +1,5 @@ +import { promisify } from 'util'; + +import _urlParser from 'mongodb/lib/url_parser'; + +export const urlParser = promisify(_urlParser); diff --git a/app/settings/server/raw.js b/app/settings/server/raw.js index 9bfd51fbc029..357c62674a99 100644 --- a/app/settings/server/raw.js +++ b/app/settings/server/raw.js @@ -1,4 +1,4 @@ -import { Settings } from '../../models/server/raw'; +import { Settings } from '../../models/server/models/Settings'; const cache = new Map(); diff --git a/server/main.d.ts b/server/main.d.ts index d1d7791954db..e8f1f8cfee8c 100644 --- a/server/main.d.ts +++ b/server/main.d.ts @@ -7,6 +7,12 @@ declare module 'meteor/random' { } } +declare module 'meteor/mongo' { + namespace MongoInternals { + function defaultRemoteCollectionDriver(): any; + } +} + declare module 'meteor/accounts-base' { namespace Accounts { function _bcryptRounds(): number; From 553a9d7405425a5a8d055c49d0cd85f9b2bcd3d0 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Mon, 14 Sep 2020 08:07:18 -0300 Subject: [PATCH 13/20] Verify if mongodb is gte 3.6 to allow usage of changestreams --- app/models/server/models/_oplogHandle.ts | 6 ++++-- package-lock.json | 12 +++++++++--- package.json | 1 + 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/app/models/server/models/_oplogHandle.ts b/app/models/server/models/_oplogHandle.ts index aeb26c08f15e..3d55ba497f60 100644 --- a/app/models/server/models/_oplogHandle.ts +++ b/app/models/server/models/_oplogHandle.ts @@ -1,5 +1,7 @@ + import { Meteor } from 'meteor/meteor'; import { MongoInternals } from 'meteor/mongo'; +import semver from 'semver'; import s from 'underscore.string'; import { MongoClient, Cursor, Timestamp, Db } from 'mongodb'; @@ -22,8 +24,8 @@ class OplogHandle { } const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); - const { storageEngine } = await mongo.db.command({ serverStatus: 1 }); - return storageEngine?.name === 'wiredTiger'; + const { version, storageEngine } = await mongo.db.command({ serverStatus: 1 }); + return storageEngine?.name === 'wiredTiger' && semver.satisfies(semver.coerce(version) || '', '>=3.6.0'); } async start(): Promise { diff --git a/package-lock.json b/package-lock.json index 3b2455825f34..7a2766502e29 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8583,6 +8583,12 @@ "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.0.tgz", "integrity": "sha512-wWKOClTTiizcZhXnPY4wikVAwmdYHp8q6DmC+EJUzAMsycb7HB32Kh9RN4+0gExjmPmZSAQjgURXIGATPegAvA==" }, + "@types/semver": { + "version": "7.3.3", + "resolved": "https://registry.npmjs.org/@types/semver/-/semver-7.3.3.tgz", + "integrity": "sha512-jQxClWFzv9IXdLdhSaTf16XI3NYe6zrEbckSpb5xhKfPbWgIyAY0AFyWWWfaiDcBuj3UHmMkCIwSRqpKMTZL2Q==", + "dev": true + }, "@types/serve-static": { "version": "1.13.5", "resolved": "https://registry.npmjs.org/@types/serve-static/-/serve-static-1.13.5.tgz", @@ -18103,7 +18109,7 @@ }, "minimist": { "version": "0.0.8", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-0.0.8.tgz", + "resolved": false, "integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0=", "dev": true, "optional": true @@ -18131,7 +18137,7 @@ }, "mkdirp": { "version": "0.5.1", - "resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-0.5.1.tgz", + "resolved": false, "integrity": "sha1-MAV0OOrGz3+MR2fzhkjWaX11yQM=", "dev": true, "optional": true, @@ -18304,7 +18310,7 @@ "dependencies": { "minimist": { "version": "1.2.0", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.0.tgz", + "resolved": false, "integrity": "sha1-o1AIsg9BOD7sH7kU9M1d95omQoQ=", "dev": true, "optional": true diff --git a/package.json b/package.json index 0966101fb2fb..aae20588b945 100644 --- a/package.json +++ b/package.json @@ -70,6 +70,7 @@ "@types/moment-timezone": "^0.5.30", "@types/mongodb": "^3.5.26", "@types/react-dom": "^16.9.8", + "@types/semver": "^7.3.3", "@types/toastr": "^2.1.38", "@typescript-eslint/eslint-plugin": "^2.34.0", "@typescript-eslint/parser": "^2.34.0", From 3238a94a895a7c4b6e0541a02f85d031c1caca07 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Mon, 14 Sep 2020 12:41:43 -0300 Subject: [PATCH 14/20] Update settings cache after change via API --- app/api/server/v1/settings.js | 6 +++++- app/search/server/events/events.js | 1 - server/publications/subscription/emitter.js | 7 +++++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/app/api/server/v1/settings.js b/app/api/server/v1/settings.js index e323a635ddf2..cf926f44d478 100644 --- a/app/api/server/v1/settings.js +++ b/app/api/server/v1/settings.js @@ -6,7 +6,7 @@ import _ from 'underscore'; import { Settings } from '../../../models/server'; import { hasPermission } from '../../../authorization'; import { API } from '../api'; -import { SettingsEvents } from '../../../settings/server'; +import { SettingsEvents, settings } from '../../../settings/server'; const fetchSettings = (query, sort, offset, count, fields) => { const settings = Settings.find(query, { @@ -146,6 +146,10 @@ API.v1.addRoute('settings/:_id', { authRequired: true }, { value: Match.Any, }); if (Settings.updateValueNotHiddenById(this.urlParams._id, this.bodyParams.value)) { + settings.storeSettingValue({ + _id: this.urlParams._id, + value: this.bodyParams.value, + }); return API.v1.success(); } diff --git a/app/search/server/events/events.js b/app/search/server/events/events.js index ffb1701fab2b..141d534bc0f9 100644 --- a/app/search/server/events/events.js +++ b/app/search/server/events/events.js @@ -67,7 +67,6 @@ function onRoomsChange({ clientAction, id, data }) { } settings.get('Search.Provider', _.debounce((key, value) => { - console.log('searchProviderService.activeProvider?.on', searchProviderService.activeProvider?.on); if (searchProviderService.activeProvider?.on) { Users.on('change', onUsersChange); Rooms.on('change', onRoomsChange); diff --git a/server/publications/subscription/emitter.js b/server/publications/subscription/emitter.js index dfcd50f88a3d..ba93668f488b 100644 --- a/server/publications/subscription/emitter.js +++ b/server/publications/subscription/emitter.js @@ -14,11 +14,18 @@ Subscriptions.on('change', ({ clientAction, id, data }) => { case 'removed': data = Subscriptions.trashFindOneById(id, { fields: { u: 1, rid: 1 } }); + if (!data) { + return; + } // emit a removed event on msg stream to remove the user's stream-room-messages subscription when the user is removed from room msgStream.__emit(data.u._id, clientAction, data); break; } + if (!data) { + return; + } + Notifications.streamUser.__emit(data.u._id, clientAction, data); Notifications.notifyUserInThisInstance( From 73ace9f0d442be2d3bbc82421dbcc4a3b34b4ef6 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Mon, 14 Sep 2020 20:08:37 -0300 Subject: [PATCH 15/20] Add settings emitter --- server/publications/settings/index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/server/publications/settings/index.js b/server/publications/settings/index.js index e14f2079ca30..d0afdc93a98b 100644 --- a/server/publications/settings/index.js +++ b/server/publications/settings/index.js @@ -4,6 +4,7 @@ import { Settings } from '../../../app/models/server'; import { hasPermission, hasAtLeastOnePermission } from '../../../app/authorization/server'; import { getSettingPermissionId } from '../../../app/authorization/lib'; import { SettingsEvents } from '../../../app/settings/server/functions/settings'; +import './emitter'; Meteor.methods({ 'public-settings/get'(updatedAt) { From f3fc83bc31dbbfffd00fc8056b9b58d9671aa7cc Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Tue, 15 Sep 2020 20:16:45 -0300 Subject: [PATCH 16/20] =?UTF-8?q?Prevent=20polling=20of=20users=E2=80=99?= =?UTF-8?q?=20tokens=20due=20to=20logout=20reactivity?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/lib/server/startup/userDataStream.js | 51 ++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/app/lib/server/startup/userDataStream.js b/app/lib/server/startup/userDataStream.js index 8cdc95e4b66b..35b3540ba15b 100644 --- a/app/lib/server/startup/userDataStream.js +++ b/app/lib/server/startup/userDataStream.js @@ -1,10 +1,61 @@ +import { MongoInternals } from 'meteor/mongo'; + import { Users } from '../../../models/server'; import { Notifications } from '../../../notifications/server'; +// Stores the callbacks for the disconnection reactivity bellow +const userCallbacks = new Map(); + +// Overrides the native observe changes to prevent database polling and stores the callbacks +// for the users' tokens to re-implement the reactivity based on our database listeners +const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); +MongoInternals.Connection.prototype._observeChanges = function({ collectionName, selector, options = {} }, _ordered, callbacks) { + // console.error('Connection.Collection.prototype._observeChanges', collectionName, selector, options); + let cbs; + if (callbacks?.added) { + const records = Promise.await(mongo.rawCollection(collectionName).find(selector, { projection: options.fields }).toArray()); + for (const { _id, ...fields } of records) { + callbacks.added(_id, fields); + } + + if (collectionName === 'users' && selector['services.resume.loginTokens.hashedToken']) { + cbs = userCallbacks.get(selector._id) || new Set(); + cbs.add({ + hashedToken: selector['services.resume.loginTokens.hashedToken'], + callbacks, + }); + userCallbacks.set(selector._id, cbs); + } + } + return { + stop() { + if (cbs) { + cbs.delete(callbacks); + } + }, + }; +}; + Users.on('change', ({ clientAction, id, data, diff }) => { switch (clientAction) { case 'updated': Notifications.notifyUserInThisInstance(id, 'userData', { diff, type: clientAction }); + + // Re-implement meteor's reactivity that uses observe to disconnect sessions when the token + // associated was removed + const loginTokens = diff['services.resume.loginTokens']; + if (loginTokens) { + const tokens = loginTokens.map(({ hashedToken }) => hashedToken); + + const cbs = userCallbacks.get(id); + if (cbs) { + [...cbs].filter(({ hashedToken }) => !tokens.includes(hashedToken)).forEach((item) => { + item.callbacks.removed(id); + cbs.delete(item); + }); + } + } + break; case 'inserted': Notifications.notifyUserInThisInstance(id, 'userData', { data, type: clientAction }); From e17189ba1ec5cae3e117560ee6cd2cbaa50ff1db Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Thu, 17 Sep 2020 21:01:04 -0300 Subject: [PATCH 17/20] Remove package to disable oplog and use env USE_NATIVE_OPLOG to control it --- .meteor/packages | 1 - .meteor/versions | 1 - app/lib/server/startup/userDataStream.js | 90 +++++++++++-------- app/models/server/models/_BaseDb.js | 1 - app/models/server/models/_oplogHandle.ts | 10 ++- .../rocketchat-mongo-config/server/index.js | 4 + 6 files changed, 62 insertions(+), 45 deletions(-) diff --git a/.meteor/packages b/.meteor/packages index 9004ec0e7296..2a2c903bbd21 100644 --- a/.meteor/packages +++ b/.meteor/packages @@ -3,7 +3,6 @@ # 'meteor add' and 'meteor remove' will edit this file for you, # but you can also edit it by hand. -disable-oplog rocketchat:mongo-config accounts-facebook@1.3.2 diff --git a/.meteor/versions b/.meteor/versions index 02e008d74942..f6ad8cbe3539 100644 --- a/.meteor/versions +++ b/.meteor/versions @@ -32,7 +32,6 @@ ddp-server@2.3.2 deepwell:bootstrap-datepicker2@1.3.0 deps@1.0.12 diff-sequence@1.1.1 -disable-oplog@1.0.7 dispatch:run-as-user@1.1.1 dynamic-import@0.5.2 ecmascript@0.14.3 diff --git a/app/lib/server/startup/userDataStream.js b/app/lib/server/startup/userDataStream.js index 35b3540ba15b..f30f745de500 100644 --- a/app/lib/server/startup/userDataStream.js +++ b/app/lib/server/startup/userDataStream.js @@ -3,57 +3,69 @@ import { MongoInternals } from 'meteor/mongo'; import { Users } from '../../../models/server'; import { Notifications } from '../../../notifications/server'; -// Stores the callbacks for the disconnection reactivity bellow -const userCallbacks = new Map(); +let processOnChange; +// eslint-disable-next-line no-undef +const disableOplog = Package['disable-oplog']; -// Overrides the native observe changes to prevent database polling and stores the callbacks -// for the users' tokens to re-implement the reactivity based on our database listeners -const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); -MongoInternals.Connection.prototype._observeChanges = function({ collectionName, selector, options = {} }, _ordered, callbacks) { - // console.error('Connection.Collection.prototype._observeChanges', collectionName, selector, options); - let cbs; - if (callbacks?.added) { - const records = Promise.await(mongo.rawCollection(collectionName).find(selector, { projection: options.fields }).toArray()); - for (const { _id, ...fields } of records) { - callbacks.added(_id, fields); - } +if (disableOplog) { + // Stores the callbacks for the disconnection reactivity bellow + const userCallbacks = new Map(); + + // Overrides the native observe changes to prevent database polling and stores the callbacks + // for the users' tokens to re-implement the reactivity based on our database listeners + const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); + MongoInternals.Connection.prototype._observeChanges = function({ collectionName, selector, options = {} }, _ordered, callbacks) { + // console.error('Connection.Collection.prototype._observeChanges', collectionName, selector, options); + let cbs; + if (callbacks?.added) { + const records = Promise.await(mongo.rawCollection(collectionName).find(selector, { projection: options.fields }).toArray()); + for (const { _id, ...fields } of records) { + callbacks.added(_id, fields); + } - if (collectionName === 'users' && selector['services.resume.loginTokens.hashedToken']) { - cbs = userCallbacks.get(selector._id) || new Set(); - cbs.add({ - hashedToken: selector['services.resume.loginTokens.hashedToken'], - callbacks, - }); - userCallbacks.set(selector._id, cbs); + if (collectionName === 'users' && selector['services.resume.loginTokens.hashedToken']) { + cbs = userCallbacks.get(selector._id) || new Set(); + cbs.add({ + hashedToken: selector['services.resume.loginTokens.hashedToken'], + callbacks, + }); + userCallbacks.set(selector._id, cbs); + } } - } - return { - stop() { + return { + stop() { + if (cbs) { + cbs.delete(callbacks); + } + }, + }; + }; + + // Re-implement meteor's reactivity that uses observe to disconnect sessions when the token + // associated was removed + processOnChange = (diff, id) => { + const loginTokens = diff['services.resume.loginTokens']; + if (loginTokens) { + const tokens = loginTokens.map(({ hashedToken }) => hashedToken); + + const cbs = userCallbacks.get(id); if (cbs) { - cbs.delete(callbacks); + [...cbs].filter(({ hashedToken }) => !tokens.includes(hashedToken)).forEach((item) => { + item.callbacks.removed(id); + cbs.delete(item); + }); } - }, + } }; -}; +} Users.on('change', ({ clientAction, id, data, diff }) => { switch (clientAction) { case 'updated': Notifications.notifyUserInThisInstance(id, 'userData', { diff, type: clientAction }); - // Re-implement meteor's reactivity that uses observe to disconnect sessions when the token - // associated was removed - const loginTokens = diff['services.resume.loginTokens']; - if (loginTokens) { - const tokens = loginTokens.map(({ hashedToken }) => hashedToken); - - const cbs = userCallbacks.get(id); - if (cbs) { - [...cbs].filter(({ hashedToken }) => !tokens.includes(hashedToken)).forEach((item) => { - item.callbacks.removed(id); - cbs.delete(item); - }); - } + if (disableOplog) { + processOnChange(diff, id); } break; diff --git a/app/models/server/models/_BaseDb.js b/app/models/server/models/_BaseDb.js index 1cf75165cf2d..7201335c92e1 100644 --- a/app/models/server/models/_BaseDb.js +++ b/app/models/server/models/_BaseDb.js @@ -61,7 +61,6 @@ export class BaseDb extends EventEmitter { const query = { collection: this.collectionName, }; - console.log(this.collectionName); if (!_oplogHandle) { throw new Error(`Error: Unable to find Mongodb Oplog. You must run the server with oplog enabled. Try the following:\n diff --git a/app/models/server/models/_oplogHandle.ts b/app/models/server/models/_oplogHandle.ts index 3d55ba497f60..4a31a4485faa 100644 --- a/app/models/server/models/_oplogHandle.ts +++ b/app/models/server/models/_oplogHandle.ts @@ -82,11 +82,14 @@ class OplogHandle { ...lastOplogEntry && { ts: { $gt: lastOplogEntry.ts } }, }; - console.log(oplogSelector); + // console.log(oplogSelector); this.stream = oplogCollection.find(oplogSelector, { tailable: true, // awaitData: true, }).stream(); + + // Prevent warning about many listeners, we add 11 + this.stream.setMaxListeners(20); } onOplogEntry(query: {collection: string}, callback: Function): void { @@ -153,10 +156,11 @@ class OplogHandle { } } -// process.env.USE_OLD_OPLOG = 'true'; // process.env.IGNORE_CHANGE_STREAM = 'true'; -const oplogHandle = !process.env.USE_OLD_OPLOG ? new OplogHandle().start() : undefined; +// @ts-ignore +// eslint-disable-next-line no-undef +const oplogHandle = Package['disable-oplog'] ? new OplogHandle().start() : undefined; export const getOplogHandle = async (): Promise => { if (oplogHandle) { diff --git a/packages/rocketchat-mongo-config/server/index.js b/packages/rocketchat-mongo-config/server/index.js index 5c39b353a5b7..2db736697b63 100644 --- a/packages/rocketchat-mongo-config/server/index.js +++ b/packages/rocketchat-mongo-config/server/index.js @@ -5,6 +5,10 @@ import { EmailTest } from 'meteor/email'; import { Mongo } from 'meteor/mongo'; import { HTTP } from 'meteor/http'; +if (!process.env.USE_NATIVE_OPLOG) { + Package['disable-oplog'] = {}; +} + // Set default HTTP call timeout to 20s const envTimeout = parseInt(process.env.HTTP_DEFAULT_TIMEOUT, 10); const timeout = !isNaN(envTimeout) ? envTimeout : 20000; From 345e54db35ddae6232cc06deb7555886faab916d Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Mon, 21 Sep 2020 19:05:07 -0300 Subject: [PATCH 18/20] Fix review --- app/search/server/events/events.js | 5 ++--- app/settings/server/functions/settings.ts | 3 +-- app/settings/server/index.ts | 1 - app/settings/server/observer.js | 8 -------- app/settings/server/raw.js | 16 +++++++++++++--- 5 files changed, 16 insertions(+), 17 deletions(-) delete mode 100644 app/settings/server/observer.js diff --git a/app/search/server/events/events.js b/app/search/server/events/events.js index 141d534bc0f9..41a1e217b45c 100644 --- a/app/search/server/events/events.js +++ b/app/search/server/events/events.js @@ -7,8 +7,7 @@ import { searchProviderService } from '../service/providerService'; import SearchLogger from '../logger/logger'; class EventService { - /* eslint no-unused-vars: [2, { "args": "none" }]*/ - _pushError(name, value, payload) { + _pushError(name, value/* , payload */) { // TODO implement a (performant) cache SearchLogger.debug(`Error on event '${ name }' with id '${ value }'`); } @@ -66,7 +65,7 @@ function onRoomsChange({ clientAction, id, data }) { } } -settings.get('Search.Provider', _.debounce((key, value) => { +settings.get('Search.Provider', _.debounce(() => { if (searchProviderService.activeProvider?.on) { Users.on('change', onUsersChange); Rooms.on('change', onRoomsChange); diff --git a/app/settings/server/functions/settings.ts b/app/settings/server/functions/settings.ts index 9a8f5965989d..3bff85f1f1b8 100644 --- a/app/settings/server/functions/settings.ts +++ b/app/settings/server/functions/settings.ts @@ -5,8 +5,7 @@ import _ from 'underscore'; import { SettingsBase, SettingValue } from '../../lib/settings'; import SettingsModel from '../../../models/server/models/Settings'; -import { updateValue } from '../observer'; -import { setValue } from '../raw'; +import { setValue, updateValue } from '../raw'; const blockedSettings = new Set(); const hiddenSettings = new Set(); diff --git a/app/settings/server/index.ts b/app/settings/server/index.ts index 7a4f6ebf00b4..3adfad5409b3 100644 --- a/app/settings/server/index.ts +++ b/app/settings/server/index.ts @@ -1,5 +1,4 @@ import { settings, SettingsEvents } from './functions/settings'; -import './observer'; export { settings, diff --git a/app/settings/server/observer.js b/app/settings/server/observer.js deleted file mode 100644 index 7e941aea0c95..000000000000 --- a/app/settings/server/observer.js +++ /dev/null @@ -1,8 +0,0 @@ -import { setValue } from './raw'; - -export const updateValue = (id, fields) => { - if (typeof fields.value === 'undefined') { - return; - } - setValue(id, fields.value); -}; diff --git a/app/settings/server/raw.js b/app/settings/server/raw.js index 357c62674a99..436643cbae44 100644 --- a/app/settings/server/raw.js +++ b/app/settings/server/raw.js @@ -5,11 +5,14 @@ const cache = new Map(); export const setValue = (_id, value) => cache.set(_id, value); const setFromDB = async (_id) => { - const value = await Settings.getValueById(_id); + const setting = Settings.findOneById(_id, { fields: { value: 1 } }); + if (!setting) { + return; + } - setValue(_id, value); + setValue(_id, setting.value); - return value; + return setting.value; }; export const getValue = async (_id) => { @@ -19,3 +22,10 @@ export const getValue = async (_id) => { return cache.get(_id); }; + +export const updateValue = (id, fields) => { + if (typeof fields.value === 'undefined') { + return; + } + setValue(id, fields.value); +}; From 0f91ff41b29811eb457a725e32671762019e48a3 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Mon, 21 Sep 2020 23:04:21 -0300 Subject: [PATCH 19/20] Improve error handling --- app/models/server/models/_oplogHandle.ts | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/app/models/server/models/_oplogHandle.ts b/app/models/server/models/_oplogHandle.ts index 4a31a4485faa..745d612d9e19 100644 --- a/app/models/server/models/_oplogHandle.ts +++ b/app/models/server/models/_oplogHandle.ts @@ -1,5 +1,6 @@ import { Meteor } from 'meteor/meteor'; +import { Promise } from 'meteor/promise'; import { MongoInternals } from 'meteor/mongo'; import semver from 'semver'; import s from 'underscore.string'; @@ -32,7 +33,13 @@ class OplogHandle { this.usingChangeStream = await this.isChangeStreamAvailable(); const oplogUrl = this.usingChangeStream ? process.env.MONGO_URL : process.env.MONGO_OPLOG_URL; - const urlParsed = await urlParser(oplogUrl); + let urlParsed; + try { + urlParsed = await urlParser(oplogUrl); + } catch (e) { + throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); + } + if (!this.usingChangeStream && (!oplogUrl || urlParsed.dbName !== 'local')) { throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); } @@ -158,9 +165,19 @@ class OplogHandle { // process.env.IGNORE_CHANGE_STREAM = 'true'; +let oplogHandle: Promise; + // @ts-ignore // eslint-disable-next-line no-undef -const oplogHandle = Package['disable-oplog'] ? new OplogHandle().start() : undefined; +if (Package['disable-oplog']) { + const { mongo } = MongoInternals.defaultRemoteCollectionDriver(); + try { + Promise.await(mongo.db.admin().command({ replSetGetStatus: 1 })); + oplogHandle = Promise.await(new OplogHandle().start()); + } catch (e) { + console.error(e.message); + } +} export const getOplogHandle = async (): Promise => { if (oplogHandle) { From 24a392a0859dde04bdc8a51241e9da138692b6e4 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Mon, 21 Sep 2020 23:33:12 -0300 Subject: [PATCH 20/20] Code cleanup --- app/models/server/models/_oplogHandle.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/app/models/server/models/_oplogHandle.ts b/app/models/server/models/_oplogHandle.ts index 745d612d9e19..74d50bf6c234 100644 --- a/app/models/server/models/_oplogHandle.ts +++ b/app/models/server/models/_oplogHandle.ts @@ -1,4 +1,3 @@ - import { Meteor } from 'meteor/meteor'; import { Promise } from 'meteor/promise'; import { MongoInternals } from 'meteor/mongo'; @@ -89,7 +88,6 @@ class OplogHandle { ...lastOplogEntry && { ts: { $gt: lastOplogEntry.ts } }, }; - // console.log(oplogSelector); this.stream = oplogCollection.find(oplogSelector, { tailable: true, // awaitData: true, @@ -111,7 +109,6 @@ class OplogHandle { this.stream.on('data', Meteor.bindEnvironment((buffer) => { const doc = buffer as any; if (doc.ns === `${ this.dbName }.${ query.collection }`) { - // console.log('doc', doc); callback({ id: doc.op === 'u' ? doc.o2._id : doc.o._id, op: doc, @@ -122,7 +119,6 @@ class OplogHandle { _onOplogEntryChangeStream(query: {collection: string}, callback: Function): void { this.db.collection(query.collection).watch([], { /* fullDocument: 'updateLookup' */ }).on('change', Meteor.bindEnvironment((event) => { - // console.log(event); switch (event.operationType) { case 'insert': callback({ @@ -163,8 +159,6 @@ class OplogHandle { } } -// process.env.IGNORE_CHANGE_STREAM = 'true'; - let oplogHandle: Promise; // @ts-ignore