Skip to content

Commit

Permalink
refactor: IntegrationHistory out of DB Watcher (#32502)
Browse files Browse the repository at this point in the history
Co-authored-by: Diego Sampaio <8591547+sampaiodiego@users.noreply.github.com>
  • Loading branch information
ricardogarim and sampaiodiego authored May 27, 2024
1 parent cd96032 commit b21d32b
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 10 deletions.
21 changes: 15 additions & 6 deletions apps/meteor/app/integrations/server/lib/updateHistory.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { IIntegrationHistory, OutgoingIntegrationEvent, IIntegration, IMessage, AtLeast } from '@rocket.chat/core-typings';
import { IntegrationHistory } from '@rocket.chat/models';
import { Random } from '@rocket.chat/random';

import { omit } from '../../../../lib/utils/omit';
import { notifyOnIntegrationHistoryChangedById, notifyOnIntegrationHistoryChanged } from '../../../lib/server/lib/notifyListener';

export const updateHistory = async ({
historyId,
Expand Down Expand Up @@ -77,7 +77,12 @@ export const updateHistory = async ({
};

if (historyId) {
await IntegrationHistory.updateOne({ _id: historyId }, { $set: history });
// Projecting just integration field to comply with existing listener behaviour
const integrationHistory = await IntegrationHistory.updateById(historyId, history, { projection: { 'integration._id': 1 } });
if (!integrationHistory) {
throw new Error('error-updating-integration-history');
}
void notifyOnIntegrationHistoryChanged(integrationHistory, 'updated', history);
return historyId;
}

Expand All @@ -86,11 +91,15 @@ export const updateHistory = async ({
throw new Error('error-invalid-integration');
}

history._createdAt = new Date();
// TODO: Had to force type cast here because of function's signature
// It would be easier if we separate into create and update functions
const { insertedId } = await IntegrationHistory.create(history as IIntegrationHistory);

const _id = Random.id();
if (!insertedId) {
throw new Error('error-creating-integration-history');
}

await IntegrationHistory.insertOne({ _id, ...history } as IIntegrationHistory);
void notifyOnIntegrationHistoryChangedById(insertedId, 'inserted');

return _id;
return insertedId;
};
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Meteor.methods<ServerMethods>({
});
}

// Don't sending to IntegrationHistory listener since it don't waits for 'removed' events.
await IntegrationHistory.removeByIntegrationId(integrationId);

notifications.streamIntegrationHistory.emit(integrationId, { type: 'removed', id: integrationId });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export const deleteOutgoingIntegration = async (integrationId: string, userId: s
}

await Integrations.removeById(integrationId);
// Don't sending to IntegrationHistory listener since it don't waits for 'removed' events.
await IntegrationHistory.removeByIntegrationId(integrationId);
void notifyOnIntegrationChangedById(integrationId, 'removed');
};
Expand Down
43 changes: 42 additions & 1 deletion apps/meteor/app/lib/server/lib/notifyListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,19 @@ import type {
IPbxEvent,
LoginServiceConfiguration as LoginServiceConfigurationData,
ILivechatPriority,
IIntegrationHistory,
AtLeast,
} from '@rocket.chat/core-typings';
import { Rooms, Permissions, Settings, PbxEvents, Roles, Integrations, LoginServiceConfiguration } from '@rocket.chat/models';
import {
Rooms,
Permissions,
Settings,
PbxEvents,
Roles,
Integrations,
LoginServiceConfiguration,
IntegrationHistory,
} from '@rocket.chat/models';

type ClientAction = 'inserted' | 'updated' | 'removed';

Expand Down Expand Up @@ -254,3 +265,33 @@ export async function notifyOnIntegrationChangedByChannels<T extends IIntegratio
void api.broadcast('watch.integrations', { clientAction, id: item._id, data: item });
}
}

export async function notifyOnIntegrationHistoryChanged<T extends IIntegrationHistory>(
data: AtLeast<T, '_id'>,
clientAction: ClientAction = 'updated',
diff: Partial<T> = {},
): Promise<void> {
if (!dbWatchersDisabled) {
return;
}

void api.broadcast('watch.integrationHistory', { clientAction, id: data._id, data, diff });
}

export async function notifyOnIntegrationHistoryChangedById<T extends IIntegrationHistory>(
id: T['_id'],
clientAction: ClientAction = 'updated',
diff: Partial<T> = {},
): Promise<void> {
if (!dbWatchersDisabled) {
return;
}

const item = await IntegrationHistory.findOneById(id);

if (!item) {
return;
}

void api.broadcast('watch.integrationHistory', { clientAction, id: item._id, data: item, diff });
}
2 changes: 1 addition & 1 deletion apps/meteor/server/database/watchCollections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export function getWatchCollections(): string[] {
LivechatInquiry.getCollectionName(),
LivechatDepartmentAgents.getCollectionName(),
InstanceStatus.getCollectionName(),
IntegrationHistory.getCollectionName(),
EmailInbox.getCollectionName(),
Settings.getCollectionName(),
Subscriptions.getCollectionName(),
Expand All @@ -50,6 +49,7 @@ export function getWatchCollections(): string[] {
collections.push(Permissions.getCollectionName());
collections.push(LivechatPriority.getCollectionName());
collections.push(LoginServiceConfiguration.getCollectionName());
collections.push(IntegrationHistory.getCollectionName());
}

if (onlyCollections.length > 0) {
Expand Down
15 changes: 14 additions & 1 deletion apps/meteor/server/models/raw/IntegrationHistory.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { IIntegrationHistory } from '@rocket.chat/core-typings';
import type { IIntegrationHistoryModel } from '@rocket.chat/model-typings';
import type { Db, IndexDescription } from 'mongodb';
import type { Db, IndexDescription, InsertOneResult, FindOneAndUpdateOptions } from 'mongodb';

import { BaseRaw } from './BaseRaw';

Expand All @@ -23,4 +23,17 @@ export class IntegrationHistoryRaw extends BaseRaw<IIntegrationHistory> implemen
findOneByIntegrationIdAndHistoryId(integrationId: string, historyId: string): Promise<IIntegrationHistory | null> {
return this.findOne({ 'integration._id': integrationId, '_id': historyId });
}

async create(integrationHistory: IIntegrationHistory): Promise<InsertOneResult<IIntegrationHistory>> {
return this.insertOne({ ...integrationHistory, _createdAt: new Date() });
}

async updateById(
_id: IIntegrationHistory['_id'],
data: Partial<IIntegrationHistory>,
options?: FindOneAndUpdateOptions,
): Promise<IIntegrationHistory | null> {
const response = await this.findOneAndUpdate({ _id }, { $set: data }, { returnDocument: 'after', ...options });
return response.value;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import type { IIntegrationHistory } from '@rocket.chat/core-typings';
import type { FindOneAndUpdateOptions, InsertOneResult } from 'mongodb';

import type { IBaseModel } from './IBaseModel';

export interface IIntegrationHistoryModel extends IBaseModel<IIntegrationHistory> {
removeByIntegrationId(integrationId: string): ReturnType<IBaseModel<IIntegrationHistory>['deleteMany']>;

findOneByIntegrationIdAndHistoryId(integrationId: string, historyId: string): Promise<IIntegrationHistory | null>;
create(integrationHistory: IIntegrationHistory): Promise<InsertOneResult<IIntegrationHistory>>;
updateById(
_id: IIntegrationHistory['_id'],
data: Partial<IIntegrationHistory>,
options?: FindOneAndUpdateOptions,
): Promise<IIntegrationHistory | null>;
}

0 comments on commit b21d32b

Please sign in to comment.