Skip to content

Commit

Permalink
fix(core): Scheduler tasks should not trigger on follower instances (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy authored and ShireenMissi committed Aug 23, 2024
1 parent bcc4bb3 commit 819499c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
13 changes: 12 additions & 1 deletion packages/core/src/ScheduledTaskManager.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
import { Service } from 'typedi';
import { CronJob } from 'cron';
import type { CronExpression, Workflow } from 'n8n-workflow';
import { InstanceSettings } from './InstanceSettings';

@Service()
export class ScheduledTaskManager {
constructor(private readonly instanceSettings: InstanceSettings) {}

readonly cronJobs = new Map<string, CronJob[]>();

registerCron(workflow: Workflow, cronExpression: CronExpression, onTick: () => void) {
const cronJob = new CronJob(cronExpression, onTick, undefined, true, workflow.timezone);
const cronJob = new CronJob(
cronExpression,
() => {
if (this.instanceSettings.isLeader) onTick();
},
undefined,
true,
workflow.timezone,
);
const cronJobsForWorkflow = this.cronJobs.get(workflow.id);
if (cronJobsForWorkflow) {
cronJobsForWorkflow.push(cronJob);
Expand Down
13 changes: 12 additions & 1 deletion packages/core/test/ScheduledTaskManager.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { Workflow } from 'n8n-workflow';
import { mock } from 'jest-mock-extended';

import type { InstanceSettings } from '@/InstanceSettings';
import { ScheduledTaskManager } from '@/ScheduledTaskManager';

describe('ScheduledTaskManager', () => {
const instanceSettings = mock<InstanceSettings>({ isLeader: true });
const workflow = mock<Workflow>({ timezone: 'GMT' });
const everyMinute = '0 * * * * *';
const onTick = jest.fn();
Expand All @@ -13,7 +15,7 @@ describe('ScheduledTaskManager', () => {
beforeEach(() => {
jest.clearAllMocks();
jest.useFakeTimers();
scheduledTaskManager = new ScheduledTaskManager();
scheduledTaskManager = new ScheduledTaskManager(instanceSettings);
});

it('should throw when workflow timezone is invalid', () => {
Expand Down Expand Up @@ -41,6 +43,15 @@ describe('ScheduledTaskManager', () => {
expect(onTick).toHaveBeenCalledTimes(10);
});

it('should should not invoke on follower instances', async () => {
scheduledTaskManager = new ScheduledTaskManager(mock<InstanceSettings>({ isLeader: false }));
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);

expect(onTick).not.toHaveBeenCalled();
jest.advanceTimersByTime(10 * 60 * 1000); // 10 minutes
expect(onTick).not.toHaveBeenCalled();
});

it('should deregister CronJobs for a workflow', async () => {
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
scheduledTaskManager.registerCron(workflow, everyMinute, onTick);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as n8nWorkflow from 'n8n-workflow';
import type { INode, ITriggerFunctions, Workflow } from 'n8n-workflow';
import { returnJsonArray } from 'n8n-core';
import { type InstanceSettings, returnJsonArray } from 'n8n-core';
import { ScheduledTaskManager } from 'n8n-core/dist/ScheduledTaskManager';
import { mock } from 'jest-mock-extended';
import { ScheduleTrigger } from '../ScheduleTrigger.node';
Expand All @@ -18,7 +18,8 @@ describe('ScheduleTrigger', () => {

const node = mock<INode>({ typeVersion: 1 });
const workflow = mock<Workflow>({ timezone });
const scheduledTaskManager = new ScheduledTaskManager();
const instanceSettings = mock<InstanceSettings>({ isLeader: true });
const scheduledTaskManager = new ScheduledTaskManager(instanceSettings);
const helpers = mock<ITriggerFunctions['helpers']>({
returnJsonArray,
registerCron: (cronExpression, onTick) =>
Expand Down

0 comments on commit 819499c

Please sign in to comment.