Skip to content

Commit

Permalink
chore: makes instance-status works along with Moleculer lifecyle (#34134
Browse files Browse the repository at this point in the history
)

Co-authored-by: Diego Sampaio <8591547+sampaiodiego@users.noreply.github.com>
  • Loading branch information
ricardogarim and sampaiodiego authored Dec 17, 2024
1 parent 1a11303 commit 7edc3db
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 167 deletions.
70 changes: 16 additions & 54 deletions apps/meteor/ee/server/local-services/instance/service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os from 'os';

import { License, ServiceClassInternal } from '@rocket.chat/core-services';
import { InstanceStatus } from '@rocket.chat/instance-status';
import { InstanceStatus, defaultPingInterval, indexExpire } from '@rocket.chat/instance-status';
import { InstanceStatus as InstanceStatusRaw } from '@rocket.chat/models';
import EJSON from 'ejson';
import type { BrokerNode } from 'moleculer';
Expand Down Expand Up @@ -33,37 +33,13 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe

private transporter: Transporters.TCP | Transporters.NATS;

private isTransporterTCP = true;

private broker: ServiceBroker;

private troubleshootDisableInstanceBroadcast = false;

constructor() {
super();

const tx = getTransporter({ transporter: process.env.TRANSPORTER, port: process.env.TCP_PORT, extra: process.env.TRANSPORTER_EXTRA });
if (typeof tx === 'string') {
this.transporter = new Transporters.NATS({ url: tx });
this.isTransporterTCP = false;
} else {
this.transporter = new Transporters.TCP(tx);
}

if (this.isTransporterTCP) {
this.onEvent('watch.instanceStatus', async ({ clientAction, data }): Promise<void> => {
if (clientAction === 'removed') {
(this.broker.transit?.tx as any).nodes.disconnected(data?._id, false);
(this.broker.transit?.tx as any).nodes.nodes.delete(data?._id);
return;
}

if (clientAction === 'inserted' && data?.extraInformation?.tcpPort) {
this.connectNode(data);
}
});
}

this.onEvent('license.module', async ({ module, valid }) => {
if (module === 'scalability' && valid) {
await this.startBroadcast();
Expand Down Expand Up @@ -93,17 +69,28 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
}

async created() {
const transporter = getTransporter({
transporter: process.env.TRANSPORTER,
port: process.env.TCP_PORT,
extra: process.env.TRANSPORTER_EXTRA,
});

const activeInstances = InstanceStatusRaw.getActiveInstancesAddress();

this.transporter =
typeof transporter !== 'string'
? new Transporters.TCP({ ...transporter, urls: activeInstances })
: new Transporters.NATS({ url: transporter });

this.broker = new ServiceBroker({
nodeID: InstanceStatus.id(),
transporter: this.transporter,
serializer: new EJSONSerializer(),
heartbeatInterval: defaultPingInterval,
heartbeatTimeout: indexExpire,
...getLogger(process.env),
});

if ((this.broker.transit?.tx as any)?.nodes?.localNode) {
(this.broker.transit?.tx as any).nodes.localNode.ipList = [hostIP];
}

this.broker.createService({
name: 'matrix',
events: {
Expand Down Expand Up @@ -176,31 +163,6 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
this.broadcastStarted = true;

StreamerCentral.on('broadcast', this.sendBroadcast.bind(this));

if (this.isTransporterTCP) {
await InstanceStatusRaw.find(
{
'extraInformation.tcpPort': {
$exists: true,
},
},
{
sort: {
_createdAt: -1,
},
},
).forEach(this.connectNode.bind(this));
}
}

private connectNode(record: any) {
if (record._id === InstanceStatus.id()) {
return;
}

const { host, tcpPort } = record.extraInformation;

(this.broker?.transit?.tx as any).addOfflineNode(record._id, host, tcpPort);
}

private sendBroadcast(streamName: string, eventName: string, args: unknown[]) {
Expand Down
3 changes: 2 additions & 1 deletion apps/meteor/server/database/watchCollections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ const onlyCollections = DBWATCHER_ONLY_COLLECTIONS.split(',')
.filter(Boolean);

export function getWatchCollections(): string[] {
const collections = [InstanceStatus.getCollectionName()];
const collections = [];

// add back to the list of collections in case db watchers are enabled
if (!dbWatchersDisabled) {
collections.push(InstanceStatus.getCollectionName());
collections.push(Users.getCollectionName());
collections.push(Messages.getCollectionName());
collections.push(LivechatInquiry.getCollectionName());
Expand Down
47 changes: 46 additions & 1 deletion apps/meteor/server/models/raw/InstanceStatus.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { IInstanceStatus } from '@rocket.chat/core-typings';
import type { IInstanceStatusModel } from '@rocket.chat/model-typings';
import type { Db } from 'mongodb';
import type { Db, ModifyResult, UpdateResult, DeleteResult } from 'mongodb';

import { BaseRaw } from './BaseRaw';

Expand All @@ -17,4 +17,49 @@ export class InstanceStatusRaw extends BaseRaw<IInstanceStatus> implements IInst
async getActiveInstanceCount(): Promise<number> {
return this.col.countDocuments({ _updatedAt: { $gt: new Date(Date.now() - process.uptime() * 1000 - 2000) } });
}

async getActiveInstancesAddress(): Promise<string[]> {
const instances = await this.find({}, { projection: { _id: 1, extraInformation: { host: 1, tcpPort: 1 } } }).toArray();
return instances.map((instance) => `${instance.extraInformation.host}:${instance.extraInformation.tcpPort}/${instance._id}`);
}

async removeInstanceById(_id: IInstanceStatus['_id']): Promise<DeleteResult> {
return this.deleteOne({ _id });
}

async setDocumentHeartbeat(documentId: string): Promise<UpdateResult> {
return this.updateOne({ _id: documentId }, { $currentDate: { _updatedAt: true } });
}

async upsertInstance(instance: Partial<IInstanceStatus>): Promise<ModifyResult<IInstanceStatus>> {
return this.findOneAndUpdate(
{
_id: instance._id,
},
{
$set: instance,
$currentDate: {
_createdAt: true,
_updatedAt: true,
},
},
{
upsert: true,
returnDocument: 'after',
},
);
}

async updateConnections(_id: IInstanceStatus['_id'], conns: number) {
return this.updateOne(
{
_id,
},
{
$set: {
'extraInformation.conns': conns,
},
},
);
}
}
17 changes: 11 additions & 6 deletions apps/meteor/server/startup/watchDb.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { api } from '@rocket.chat/core-services';
import { api, dbWatchersDisabled } from '@rocket.chat/core-services';
import { Logger } from '@rocket.chat/logger';
import { MongoInternals } from 'meteor/mongo';

Expand All @@ -19,12 +19,17 @@ watcher.watch().catch((err: Error) => {
process.exit(1);
});

setInterval(function _checkDatabaseWatcher() {
if (watcher.isLastDocDelayed()) {
SystemLogger.error('No real time data received recently');
}
}, 20000);
if (!dbWatchersDisabled) {
setInterval(function _checkDatabaseWatcher() {
if (watcher.isLastDocDelayed()) {
SystemLogger.error('No real time data received recently');
}
}, 20000);
}

export function isLastDocDelayed(): boolean {
if (dbWatchersDisabled) {
return true;
}
return watcher.isLastDocDelayed();
}
Loading

0 comments on commit 7edc3db

Please sign in to comment.