Skip to content

Commit

Permalink
feat: delete connections after 24h of inactivity (#70)
Browse files Browse the repository at this point in the history
* feat: delete connections after 24h of inactivity

* fix: JS-0327 and JS-0116

* final touchups and testing done

* rename files

* chore: fix condition for running tasks in production build

* hi

* chore: update recurring task interval for syncing botlist stats

* feat: disconnect idle connections after 24 hours
  • Loading branch information
dev-737 authored May 24, 2024
1 parent 2036974 commit 43476dc
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 64 deletions.
14 changes: 8 additions & 6 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ model blacklistedServers {
}

model connectedList {
id String @id @default(auto()) @map("_id") @db.ObjectId
channelId String @unique // channel can be thread, or a normal channel
id String @id @default(auto()) @map("_id") @db.ObjectId
channelId String @unique // channel can be thread, or a normal channel
parentId String? // ID of the parent channel, if it's a thread @map("parentChannelId")
serverId String
connected Boolean
Expand All @@ -85,9 +85,10 @@ model connectedList {
profFilter Boolean
embedColor String?
webhookURL String
date DateTime @default(now())
hub hubs? @relation(fields: [hubId], references: [id])
hubId String @db.ObjectId
lastActive DateTime? @default(now())
date DateTime @default(now())
hub hubs? @relation(fields: [hubId], references: [id])
hubId String @db.ObjectId
}

model hubs {
Expand Down Expand Up @@ -125,6 +126,7 @@ model originalMessages {
serverId String
authorId String
reactions Json? // eg. {"👎": ["9893820930928", "39283902803982"]} "emoji": userId[] basically
createdAt DateTime
broadcastMsgs broadcastedMessages[] // Ids of messages that were broadcasted to other hubs
messageReference String? @db.String // id of the original message this message is replying to
hub hubs? @relation(fields: [hubId], references: [id])
Expand All @@ -134,7 +136,7 @@ model originalMessages {
model broadcastedMessages {
messageId String @id @map("_id")
channelId String
createdAt Int
createdAt DateTime
originalMsg originalMessages @relation(fields: [originalMsgId], references: [messageId])
originalMsgId String @db.String
}
Expand Down
38 changes: 22 additions & 16 deletions src/cluster.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
import db from './utils/Db.js';
import Logger from './utils/Logger.js';
import Scheduler from './services/SchedulerService.js';
import syncBotlistStats from './scripts/tasks/syncBotlistStats.js';
import updateBlacklists from './scripts/tasks/updateBlacklists.js';
import deleteExpiredInvites from './scripts/tasks/deleteExpiredInvites.js';
import pauseIdleConnections from './scripts/tasks/pauseIdleConnections.js';
import { startApi } from './api/index.js';
import { isDevBuild } from './utils/Constants.js';
import { VoteManager } from './managers/VoteManager.js';
import { ClusterManager } from 'discord-hybrid-sharding';
import syncBotlistStats from './scripts/tasks/syncBotlistStats.js';
import deleteExpiredInvites from './scripts/tasks/deleteExpiredInvites.js';
import updateBlacklists from './scripts/tasks/updateBlacklists.js';
import deleteOldMessages from './scripts/tasks/deleteOldMessages.js';
import 'dotenv/config';
import { getUsername, wait } from './utils/Utils.js';
import Logger from './utils/Logger.js';
import 'dotenv/config';

const clusterManager = new ClusterManager('build/index.js', {
token: process.env.TOKEN,
shardsPerClusters: 2,
totalClusters: 'auto',
});

clusterManager.on('clusterCreate', async (cluster) => {
// if it is the last cluster
if (cluster.id === clusterManager.totalClusters - 1) {
const scheduler = new Scheduler();

// remove expired blacklists or set new timers for them
const serverQuery = { where: { hubs: { some: { expires: { isSet: true } } } } };
const userQuery = { where: { blacklistedFrom: { some: { expires: { isSet: true } } } } };
updateBlacklists(await db.blacklistedServers.findMany(serverQuery), scheduler).catch(
Logger.error,
);
const serverQuery = await db.blacklistedServers.findMany({
where: { hubs: { some: { expires: { isSet: true } } } },
});
const userQuery = await db.userData.findMany({
where: { blacklistedFrom: { some: { expires: { isSet: true } } } },
});

updateBlacklists(await db.userData.findMany(userQuery), scheduler).catch(Logger.error);
updateBlacklists(serverQuery, scheduler).catch(Logger.error);
updateBlacklists(userQuery, scheduler).catch(Logger.error);

// code must be in production to run these tasks
if (isDevBuild) return;
Expand All @@ -37,12 +41,14 @@ clusterManager.on('clusterCreate', async (cluster) => {

// perform start up tasks
syncBotlistStats(clusterManager).catch(Logger.error);
deleteOldMessages().catch(Logger.error);
deleteExpiredInvites().catch(Logger.error);
pauseIdleConnections(clusterManager).catch(Logger.error);

scheduler.addRecurringTask('deleteExpiredInvites', 60 * 60 * 1_000, deleteExpiredInvites);
scheduler.addRecurringTask('deleteOldMessages', 60 * 60 * 12_000, deleteOldMessages);
scheduler.addRecurringTask('syncBotlistStats', 60 * 10_000, () =>
scheduler.addRecurringTask('deleteExpiredInvites', 60 * 60 * 1000, deleteExpiredInvites);
scheduler.addRecurringTask('pauseIdleConnections', 60 * 60 * 1000, () =>
pauseIdleConnections(clusterManager),
);
scheduler.addRecurringTask('syncBotlistStats', 10 * 60 * 10_000, () =>
syncBotlistStats(clusterManager),
);
}
Expand Down
9 changes: 9 additions & 0 deletions src/core/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { loadLocales } from '../utils/Locale.js';
import loadCommandFiles from '../utils/LoadCommands.js';
import {
connectionCache as _connectionCache,
messageTimestamps,
storeMsgTimestamps,
syncConnectionCache,
} from '../utils/ConnectedList.js';
import { PROJECT_VERSION } from '../utils/Constants.js';
Expand Down Expand Up @@ -108,6 +110,13 @@ export default class SuperClient extends Client {
syncConnectionCache,
);

// store network message timestamps to connectedList every minute
this.scheduler.addRecurringTask('storeMsgTimestamps', 60 * 1_000, () => {
storeMsgTimestamps(messageTimestamps);
messageTimestamps.clear();
});


await this.login(process.env.TOKEN);
}

Expand Down
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import './instrument.js';
import Logger from './utils/Logger.js';
import SuperClient from './core/Client.js';
import { eventMethods } from './decorators/GatewayEvent.js';
import ReactionUpdater from './utils/ReactionUpdater.js';
import { RandomComponents } from './utils/RandomComponents.js';
import EventManager from './managers/EventManager.js';

const client = new SuperClient();

// dum classes
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const _reactionUpdater = ReactionUpdater;
const _randomComponentHandlers = RandomComponents;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const _eventManager = EventManager;

Expand Down
23 changes: 13 additions & 10 deletions src/managers/EventManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,25 @@ import {
Interaction,
Client,
} from 'discord.js';
import db from '../utils/Db.js';
import Logger from '../utils/Logger.js';
import SuperClient from '../core/Client.js';
import GatewayEvent from '../decorators/GatewayEvent.js';
import { stripIndents } from 'common-tags';
import sendBroadcast from '../scripts/network/sendBroadcast.js';
import storeMessageData from '../scripts/network/storeMessageData.js';
import getWelcomeTargets from '../scripts/guilds/getWelcomeTarget.js';
import { t } from '../utils/Locale.js';
import { check } from '../utils/Profanity.js';
import { runChecks } from '../scripts/network/runChecks.js';
import { stripIndents } from 'common-tags';
import { logGuildJoin, logGuildLeave } from '../scripts/guilds/goals.js';
import { channels, emojis, colors, LINKS } from '../utils/Constants.js';
import { check } from '../utils/Profanity.js';
import db from '../utils/Db.js';
import { t } from '../utils/Locale.js';
import storeMessageData from '../scripts/network/storeMessageData.js';
import { getReferredMsgData, sendWelcomeMsg } from '../scripts/network/helpers.js';
import { HubSettingsBitField } from '../utils/BitFields.js';
import { getAttachmentURL, getUserLocale, handleError, simpleEmbed, wait } from '../utils/Utils.js';
import { runChecks } from '../scripts/network/runChecks.js';
import { addReaction, updateReactions } from '../scripts/reaction/actions.js';
import { checkBlacklists } from '../scripts/reaction/helpers.js';
import { CustomID } from '../utils/CustomID.js';
import SuperClient from '../core/Client.js';
import sendBroadcast from '../scripts/network/sendBroadcast.js';
import { logServerLeave } from '../utils/HubLogger/JoinLeave.js';
import { deleteConnections, modifyConnection } from '../utils/ConnectedList.js';

Expand Down Expand Up @@ -235,9 +235,12 @@ export default abstract class EventManager {

const { connectionCache, cachePopulated } = message.client;

while (!cachePopulated) {
Logger.debug('[InterChat]: Cache not populated, retrying in 5 seconds...');
if (!cachePopulated) {
Logger.debug('[InterChat]: Connection cache not populated, 5 secs until retry...');
await wait(5000);

EventManager.onMessageCreate(message);
return;
}

const locale = await getUserLocale(message.author.id);
Expand Down
4 changes: 3 additions & 1 deletion src/scripts/network/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ type extraOpts = {
disconnectEmoji?: string;
connectEmoji?: string;
userId?: string;
/** set custom prefix for customId and handle it urself, eg: `epik_reconnect` */
customCustomId?: string;
};

/**
Expand All @@ -25,7 +27,7 @@ export const buildConnectionButtons = (
new ButtonBuilder()
.setCustomId(
new CustomID()
.setIdentifier('connection', 'toggle')
.setIdentifier(opts.customCustomId ?? 'connection', 'toggle')
.addArgs(channelId)
.addArgs(opts?.userId ?? '')
.toString(),
Expand Down
15 changes: 9 additions & 6 deletions src/scripts/network/storeMessageData.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import db from '../../utils/Db.js';
import { originalMessages } from '@prisma/client';
import { APIMessage, Message } from 'discord.js';
import { parseTimestampFromId } from '../../utils/Utils.js';
import db from '../../utils/Db.js';
import { modifyConnections } from '../../utils/ConnectedList.js';
import { messageTimestamps, modifyConnections } from '../../utils/ConnectedList.js';
import { handleError, parseTimestampFromId } from '../../utils/Utils.js';

export interface NetworkWebhookSendResult {
messageOrError: APIMessage | string;
Expand All @@ -20,7 +20,7 @@ export default async (
hubId: string,
dbReference?: originalMessages | null,
) => {
const messageDataObj: { channelId: string; messageId: string; createdAt: number }[] = [];
const messageDataObj: { channelId: string; messageId: string, createdAt: Date }[] = [];
const invalidWebhookURLs: string[] = [];
const validErrors = ['Invalid Webhook Token', 'Unknown Webhook', 'Missing Permissions'];

Expand All @@ -30,7 +30,7 @@ export default async (
messageDataObj.push({
channelId: result.messageOrError.channel_id,
messageId: result.messageOrError.id,
createdAt: parseTimestampFromId(result.messageOrError.id),
createdAt: new Date(parseTimestampFromId(result.messageOrError.id)),
});
}
else if (validErrors.some((e) => (result.messageOrError as string).includes(e))) {
Expand All @@ -48,13 +48,16 @@ export default async (
authorId: message.author.id,
serverId: message.guild.id,
messageReference: dbReference?.messageId,
createdAt: message.createdAt,
broadcastMsgs: { createMany: { data: messageDataObj } },
hub: { connect: { id: hubId } },
reactions: {},
},
});
}).catch(handleError);
}

// store message timestamps to push to db later
messageTimestamps.set(message.channel.id, message.createdAt);
// disconnect network if, webhook does not exist/bot cannot access webhook
if (invalidWebhookURLs.length > 0) {
await modifyConnections({ webhookURL: { in: invalidWebhookURLs } }, { connected: false });
Expand Down
15 changes: 0 additions & 15 deletions src/scripts/tasks/deleteOldMessages.ts

This file was deleted.

68 changes: 68 additions & 0 deletions src/scripts/tasks/pauseIdleConnections.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import db from '../../utils/Db.js';
import Logger from '../../utils/Logger.js';
import { ClusterManager } from 'discord-hybrid-sharding';
import { modifyConnection } from '../../utils/ConnectedList.js';
import { APIActionRowComponent, APIButtonComponent, Snowflake } from 'discord.js';
import { buildConnectionButtons } from '../network/components.js';
import { simpleEmbed } from '../../utils/Utils.js';
import { stripIndents } from 'common-tags';
import { emojis } from '../../utils/Constants.js';
import 'dotenv/config';

export default async (manager: ClusterManager) => {
const connections = await db.connectedList.findMany({
where: {
connected: true,
lastActive: { not: null, lte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
},
});

if (!connections) return;

const reconnectButtonArr: {
channelId: Snowflake;
button: APIActionRowComponent<APIButtonComponent>;
}[] = [];

// Loop through the data
connections.forEach(async ({ channelId, lastActive }) => {
Logger.debug(
`[InterChat]: Channel ${channelId} is older than 24 hours: ${lastActive?.toLocaleString()} - ${new Date().toLocaleString()}`,
);

// Create the button
reconnectButtonArr.push({
channelId,
button: buildConnectionButtons(false, channelId, {
customCustomId: 'inactiveConnect',
}).toJSON(),
});

// disconnect the channel
await modifyConnection({ channelId }, { lastActive: null, connected: false });
});

const embed = simpleEmbed(
stripIndents`
## ${emojis.timeout} Paused Due to Inactivity
Connection to this hub has been stopped. **Click the button** below to resume chatting (or alternatively, \`/connection\`).
`,
).toJSON();

await manager.broadcastEval(
(client, { _connections, _embed, buttons }) => {
_connections.forEach(async (connection) => {
const channel = await client.channels.fetch(connection.channelId).catch(() => null);
const button = buttons.find((b) => b.channelId === connection.channelId)?.button;

if (!channel?.isTextBased() || !button) return;

// remove it since we are done with it
_connections.splice(_connections.indexOf(connection), 1);

await channel.send({ embeds: [_embed], components: [button] }).catch(() => null);
});
},
{ context: { _connections: connections, _embed: embed, buttons: reconnectButtonArr } },
);
};
5 changes: 5 additions & 0 deletions src/services/SchedulerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ export default class Scheduler {
this.tasks.forEach((_, taskName) => this.stopTask(taskName));
}


hasTask(taskName: string): boolean {
return this.tasks.has(taskName);
}

/**
* Returns an array of currently running task names.
*/
Expand Down
14 changes: 11 additions & 3 deletions src/utils/ConnectedList.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { Prisma, connectedList } from '@prisma/client';
import db from './Db.js';
import { Collection } from 'discord.js';
import Logger from './Logger.js';
import { Prisma, connectedList } from '@prisma/client';
import { Collection } from 'discord.js';
import { captureException } from '@sentry/node';

/** 📡 Contains all the **connected** channels from all hubs. */
export const connectionCache = new Collection<string, connectedList>();
export const messageTimestamps = new Collection<string, Date>();

export const syncConnectionCache = async () => {
Logger.debug('[InterChat]: Populating connection cache.');
Expand Down Expand Up @@ -40,7 +42,6 @@ export const modifyConnection = async (
data: Prisma.connectedListUpdateInput,
) => {
const connection = await db.connectedList.update({ where, data });

connectionCache.set(connection.channelId, connection);
return connection;
};
Expand All @@ -57,3 +58,10 @@ export const modifyConnections = async (

return connections;
};

export const storeMsgTimestamps = (data: Collection<string, Date>): void => {
data.forEach(
async (lastActive, channelId) =>
await modifyConnection({ channelId }, { lastActive }).catch(captureException),
);
};
Loading

0 comments on commit 43476dc

Please sign in to comment.