Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Email inbox memory leak on connection failure #26850

Merged
merged 25 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1b0c72d
[Regression] Invalid Email Inbox no longer leaks event listeners, and…
cauefcr Sep 12, 2022
9a4712e
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Sep 12, 2022
17f1d19
OC-184 jira linking
cauefcr Sep 12, 2022
76d94d9
Merge ssh://github.com/RocketChat/Rocket.Chat into regression/email-m…
cauefcr Sep 12, 2022
fa26b4b
forgotten comment
cauefcr Sep 12, 2022
f473e6a
Merge branch 'regression/email-memory-leak-and-attachment-issues' of …
cauefcr Sep 12, 2022
7191b2b
Update apps/meteor/server/email/IMAPInterceptor.ts
cauefcr Sep 13, 2022
60a5e94
Update apps/meteor/server/features/EmailInbox/EmailInbox_Outgoing.ts
cauefcr Sep 13, 2022
c76a2f5
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Sep 13, 2022
d0cacb2
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Sep 13, 2022
25b20f4
promisifying most callbacks
cauefcr Sep 14, 2022
7ec4deb
Merge github.com:RocketChat/Rocket.Chat into regression/email-memory-…
cauefcr Sep 14, 2022
6249880
Merge branch 'regression/email-memory-leak-and-attachment-issues' of …
cauefcr Sep 14, 2022
24448ec
type error
cauefcr Sep 14, 2022
5c46461
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Sep 14, 2022
fe5f059
linter complaint
cauefcr Sep 14, 2022
886be36
code review changes
cauefcr Sep 20, 2022
b21d2ea
forgotten paren
cauefcr Sep 20, 2022
6b0c057
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Sep 20, 2022
00b9388
review fixes
KevLehman Sep 21, 2022
dc7af33
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
KevLehman Sep 27, 2022
84caaae
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Oct 14, 2022
a35086e
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Oct 14, 2022
470b035
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Oct 14, 2022
dae0e56
Merge branch 'develop' into regression/email-memory-leak-and-attachme…
cauefcr Oct 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apps/meteor/app/models/server/models/LivechatRooms.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ export class LivechatRooms extends Base {
const query = {
't': 'l',
'v.token': visitorToken,
'$or': [{ 'email.thread': { $elemMatch: { $in: emailThread } } }, { 'email.thread': new RegExp(emailThread.join('|')) }],
'$or': [
{ 'email.thread': { $elemMatch: { $in: emailThread } } },
{ 'email.thread': new RegExp(emailThread.map((t) => `"${t}"`).join('|')) },
],
...(departmentId && { departmentId }),
};

Expand Down
232 changes: 141 additions & 91 deletions apps/meteor/server/email/IMAPInterceptor.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { EventEmitter } from 'events';

import type { ImapMessage, ImapMessageBodyInfo } from 'imap';
import IMAP from 'imap';
import type Connection from 'imap';
import type { ParsedMail } from 'mailparser';
import { simpleParser } from 'mailparser';
import { EmailInbox } from '@rocket.chat/models';

import { logger } from '../features/EmailInbox/logger';

Expand All @@ -24,12 +25,14 @@ export class IMAPInterceptor extends EventEmitter {

private config: IMAP.Config;

private initialBackoffDurationMS = 30000;
private backoffDurationMS = 3000;

private backoff: NodeJS.Timeout;

private retries = 0;

private inboxId: string;

constructor(
imapConfig: IMAP.Config,
private options: IMAPOptions = {
Expand All @@ -38,6 +41,7 @@ export class IMAPInterceptor extends EventEmitter {
markSeen: true,
maxRetries: 10,
},
id: string,
) {
super();

Expand All @@ -49,132 +53,178 @@ export class IMAPInterceptor extends EventEmitter {
...(imapConfig.tls && { tlsOptions: { servername: imapConfig.host } }),
...imapConfig,
});
this.retries = 0;
this.inboxId = id;
this.start();
}

openInbox(): Promise<IMAP.Box> {
return new Promise((resolve, reject) => {
const cb = (err: Error, mailbox: IMAP.Box) => {
if (err) {
reject(err);
} else {
resolve(mailbox);
}
};
this.imap.openBox('INBOX', false, cb);
});
}

async start(): Promise<void> {
// On successfully connected.
this.imap.on('ready', () => {
if (this.imap.state !== 'disconnected') {
this.imap.on('ready', async () => {
KevLehman marked this conversation as resolved.
Show resolved Hide resolved
if (this.isActive()) {
logger.info(`IMAP connected to ${this.config.user}`);
clearTimeout(this.backoff);
this.retries = 0;
this.openInbox((err) => {
if (err) {
logger.error(`Error occurred during imap on inbox ${this.config.user}: `, err);
throw err;
}
// fetch new emails & wait [IDLE]
this.getEmails();

// If new message arrived, fetch them
this.imap.on('mail', () => {
this.getEmails();
});
});
this.backoffDurationMS = 3000;
await this.openInbox();
this.imap.on('mail', () => this.getEmails().catch((err: Error) => logger.debug('Error on getEmails: ', err.message)));
} else {
logger.error(`IMAP did not connect on inbox ${this.config.user}`);
this.imap.end();
this.reconnect();
logger.error("Can't connect to IMAP server");
}
});

this.imap.on('error', (err: Error) => {
logger.error(`Error occurred on inbox ${this.config.user}: `, err);
this.stop(() => this.reconnect());
this.imap.on('error', async (err: Error) => {
logger.error({ err });
logger.error(`IMAP error: ${err.message}`);
this.retries++;
await this.reconnect();
});

this.imap.on('close', () => {
this.reconnect();
this.imap.on('close', async () => {
await this.reconnect();
});
}

openInbox(cb: (error: Error, mailbox: Connection.Box) => void): void {
this.imap.openBox('INBOX', false, cb);
}

start(): void {
this.imap.connect();
this.retries += 1;
return this.imap.connect();
}

isActive(): boolean {
if (this.imap?.state && this.imap.state === 'disconnected') {
return false;
}

return true;
return !!(this.imap?.state && this.imap.state !== 'disconnected');
}

stop(callback = new Function()): void {
logger.info('IMAP stop called');
this.imap.end();
logger.debug('IMAP stop called');
this.imap.removeAllListeners();
KevLehman marked this conversation as resolved.
Show resolved Hide resolved
this.imap.once('end', () => {
logger.info('IMAP stopped');
logger.debug('IMAP stopped');
callback?.();
});
this.imap.end();
}

reconnect(): void {
async reconnect(): Promise<void> {
if (!this.isActive() && !this.canRetry()) {
KevLehman marked this conversation as resolved.
Show resolved Hide resolved
logger.info(`Max retries reached for ${this.config.user}`);
this.stop();
return this.selfDisable();
}
if (this.backoff) {
clearTimeout(this.backoff);
this.backoffDurationMS = 3000;
}
const loop = (): void => {
this.start();
if (this.retries < this.options.maxRetries) {
this.retries += 1;
this.initialBackoffDurationMS *= 2;
this.backoff = setTimeout(loop, this.initialBackoffDurationMS);
logger.debug(`Reconnecting to ${this.config.user}: ${this.retries}`);
if (this.canRetry()) {
this.backoffDurationMS *= 2;
this.backoff = setTimeout(loop, this.backoffDurationMS);
} else {
logger.error(`IMAP reconnection failed on inbox ${this.config.user}`);
logger.info(`IMAP reconnection failed on inbox ${this.config.user}`);
clearTimeout(this.backoff);
this.stop();
this.selfDisable();
return;
}
this.stop();
this.start();
};
this.backoff = setTimeout(loop, this.initialBackoffDurationMS);
this.backoff = setTimeout(loop, this.backoffDurationMS);
}

// Fetch all UNSEEN messages and pass them for further processing
getEmails(): void {
this.imap.search(this.options.filter, (err, newEmails) => {
logger.debug(`IMAP search on inbox ${this.config.user} returned ${newEmails.length} new emails: `, newEmails);
if (err) {
logger.error(err);
throw err;
}
// newEmails => array containing serials of unseen messages
if (newEmails.length > 0) {
const fetch = this.imap.fetch(newEmails, {
bodies: ['HEADER', 'TEXT', ''],
struct: true,
markSeen: this.options.markSeen,
});

fetch.on('message', (msg, seqno) => {
logger.debug('E-mail received', seqno, msg);

msg.on('body', (stream, type) => {
if (type.which !== '') {
return;
}
imapSearch(): Promise<number[]> {
return new Promise((resolve, reject) => {
const cb = (err: Error, results: number[]) => {
if (err) {
reject(err);
} else {
resolve(results);
}
};
this.imap.search(this.options.filter, cb);
});
}

simpleParser(stream, (_err, email) => {
if (this.options.rejectBeforeTS && email.date && email.date < this.options.rejectBeforeTS) {
logger.error(`Rejecting email on inbox ${this.config.user}`, email.subject);
return;
}
this.emit('email', email);
});
});
parseEmails(stream: NodeJS.ReadableStream, _info: ImapMessageBodyInfo): Promise<ParsedMail> {
return new Promise((resolve, reject) => {
const cb = (err: Error, mail: ParsedMail) => {
if (err) {
reject(err);
} else {
resolve(mail);
}
};
simpleParser(stream, cb);
});
}

// On fetched each message, pass it further
msg.once('end', () => {
// delete message from inbox
imapFetch(emailIds: number[]): Promise<number[]> {
return new Promise((resolve, reject) => {
const out: number[] = [];
const messagecb = (msg: ImapMessage, seqno: number) => {
out.push(seqno);
const bodycb = (stream: NodeJS.ReadableStream, _info: ImapMessageBodyInfo): void => {
simpleParser(stream, (_err, email) => {
if (this.options.rejectBeforeTS && email.date && email.date < this.options.rejectBeforeTS) {
logger.error(`Rejecting email on inbox ${this.config.user}`, email.subject);
return;
}
this.emit('email', email);
if (this.options.deleteAfterRead) {
this.imap.seq.addFlags(seqno, 'Deleted', (err) => {
this.imap.seq.addFlags(email, 'Deleted', (err) => {
if (err) {
logger.error(`Mark deleted error: ${err}`);
logger.warn(`Mark deleted error: ${err}`);
}
});
}
});
});

fetch.once('error', (err) => {
logger.error(`Fetch error: ${err}`);
});
}
};
msg.once('body', bodycb);
};
const errorcb = (err: Error): void => {
logger.warn(`Fetch error: ${err}`);
reject(err);
};
const endcb = (): void => {
resolve(out);
};
const fetch = this.imap.fetch(emailIds, {
bodies: ['HEADER', 'TEXT', ''],
struct: true,
markSeen: this.options.markSeen,
});

fetch.on('message', messagecb);
fetch.on('error', errorcb);
fetch.on('end', endcb);
});
}

// Fetch all UNSEEN messages and pass them for further processing
async getEmails(): Promise<void> {
const emailIds = await this.imapSearch();
await this.imapFetch(emailIds);
}

canRetry(): boolean {
return this.retries < this.options.maxRetries || this.options.maxRetries === -1;
}

async selfDisable(): Promise<void> {
logger.info(`Disabling inbox ${this.inboxId}`);
// Again, if there's 2 inboxes with the same email, this will prevent looping over the already disabled one
// Active filter is just in case :)
await EmailInbox.findOneAndUpdate({ _id: this.inboxId, active: true }, { $set: { active: false } });
logger.info(`IMAP inbox ${this.inboxId} automatically disabled`);
}
}
5 changes: 3 additions & 2 deletions apps/meteor/server/features/EmailInbox/EmailInbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ export async function configureEmailInboxes(): Promise<void> {
},
{
deleteAfterRead: false,
filter: [['UNSEEN'], ['SINCE', emailInboxRecord._updatedAt]],
rejectBeforeTS: emailInboxRecord._updatedAt,
filter: [['UNSEEN'], ['SINCE', emailInboxRecord._createdAt]],
KevLehman marked this conversation as resolved.
Show resolved Hide resolved
rejectBeforeTS: emailInboxRecord._createdAt,
markSeen: true,
maxRetries: emailInboxRecord.imap.maxRetries,
},
emailInboxRecord._id,
);

imap.on(
Expand Down
20 changes: 13 additions & 7 deletions apps/meteor/server/features/EmailInbox/EmailInbox_Incoming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async function uploadAttachment(attachment: Attachment, rid: string, visitorToke
}

export async function onEmailReceived(email: ParsedMail, inbox: string, department = ''): Promise<void> {
logger.debug(`New email conversation received on inbox ${inbox}. Will be assigned to department ${department}`, email);
logger.debug(`New email conversation received on inbox ${inbox}. Will be assigned to department ${department}`);
if (!email.from?.value?.[0]?.address) {
return;
}
Expand Down Expand Up @@ -161,12 +161,18 @@ export async function onEmailReceived(email: ParsedMail, inbox: string, departme
room = await QueueManager.unarchiveRoom(room);
}

let msg = email.text;

if (email.html) {
// Try to remove the signature and history
msg = stripHtml(email.html.replace(/<div name="messageSignatureSection.+/s, '')).result;
}
// TODO: html => md with turndown
const msg = email.html
? stripHtml(email.html, {
dumpLinkHrefsNearby: {
enabled: true,
putOnNewLine: false,
wrapHeads: '(',
wrapTails: ')',
},
skipHtmlDecoding: false,
}).result
: email.text || '';
KevLehman marked this conversation as resolved.
Show resolved Hide resolved

const rid = room?._id ?? Random.id();
const msgId = Random.id();
Expand Down
Loading