diff --git a/packages/cli/src/WaitTracker.ts b/packages/cli/src/WaitTracker.ts index 9acb96cfc082a..4e1bcb7585387 100644 --- a/packages/cli/src/WaitTracker.ts +++ b/packages/cli/src/WaitTracker.ts @@ -7,8 +7,9 @@ import { Container, Service } from 'typedi'; import type { IExecutionsStopData, IWorkflowExecutionDataProcess } from '@/Interfaces'; import { WorkflowRunner } from '@/WorkflowRunner'; import { ExecutionRepository } from '@db/repositories/execution.repository'; -import { OwnershipService } from './services/ownership.service'; +import { OwnershipService } from '@/services/ownership.service'; import { Logger } from '@/Logger'; +import { OrchestrationService } from '@/services/orchestration.service'; @Service() export class WaitTracker { @@ -26,7 +27,22 @@ export class WaitTracker { private readonly executionRepository: ExecutionRepository, private readonly ownershipService: OwnershipService, private readonly workflowRunner: WorkflowRunner, + readonly orchestrationService: OrchestrationService, ) { + const { isLeader, isMultiMainSetupEnabled, multiMainSetup } = orchestrationService; + + if (isLeader) this.startTracking(); + + if (isMultiMainSetupEnabled) { + multiMainSetup + .on('leader-takeover', () => this.startTracking()) + .on('leader-stepdown', () => this.stopTracking()); + } + } + + startTracking() { + this.logger.debug('Wait tracker started tracking waiting executions'); + // Poll every 60 seconds a list of upcoming executions this.mainTimer = setInterval(() => { void this.getWaitingExecutions(); @@ -174,7 +190,9 @@ export class WaitTracker { }); } - shutdown() { + stopTracking() { + this.logger.debug('Wait tracker shutting down'); + clearInterval(this.mainTimer); Object.keys(this.waitingExecutions).forEach((executionId) => { clearTimeout(this.waitingExecutions[executionId].timer); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index ad09299089644..3b7ee763a3244 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -94,7 +94,7 @@ export class Start extends BaseCommand { // Stop with trying to activate workflows that could not be activated this.activeWorkflowRunner.removeAllQueuedWorkflowActivations(); - Container.get(WaitTracker).shutdown(); + Container.get(WaitTracker).stopTracking(); await this.externalHooks?.run('n8n.stop', []); diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index d80f4dee19d71..cc0724aaf14dc 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -39,6 +39,9 @@ export class OrchestrationService { return config.getEnv('redis.queueModeId'); } + /** + * Whether this instance is the leader in a multi-main setup. Always `true` in single-main setup. + */ get isLeader() { return config.getEnv('multiMainSetup.instanceType') === 'leader'; } diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 8d9cd5da23ef2..eda788ae670ce 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -62,7 +62,7 @@ export class MultiMainSetup extends EventEmitter { if (config.getEnv('multiMainSetup.instanceType') === 'leader') { config.set('multiMainSetup.instanceType', 'follower'); - this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning + this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking EventReporter.info('[Multi-main setup] Leader failed to renew leader key'); } @@ -97,7 +97,7 @@ export class MultiMainSetup extends EventEmitter { await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); - this.emit('leader-takeover'); // gained leadership - start triggers, pollers, pruning + this.emit('leader-takeover'); // gained leadership - start triggers, pollers, pruning, wait-tracking } else { config.set('multiMainSetup.instanceType', 'follower'); } diff --git a/packages/cli/test/unit/WaitTracker.test.ts b/packages/cli/test/unit/WaitTracker.test.ts index 4bf43bb94059c..a3e26826e276b 100644 --- a/packages/cli/test/unit/WaitTracker.test.ts +++ b/packages/cli/test/unit/WaitTracker.test.ts @@ -2,11 +2,17 @@ import { WaitTracker } from '@/WaitTracker'; import { mock } from 'jest-mock-extended'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { IExecutionResponse } from '@/Interfaces'; +import type { OrchestrationService } from '@/services/orchestration.service'; +import type { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; jest.useFakeTimers(); describe('WaitTracker', () => { const executionRepository = mock(); + const orchestrationService = mock({ + isLeader: true, + isMultiMainSetupEnabled: false, + }); const execution = mock({ id: '123', @@ -21,7 +27,7 @@ describe('WaitTracker', () => { it('should query DB for waiting executions', async () => { executionRepository.getWaitingExecutions.mockResolvedValue([execution]); - new WaitTracker(mock(), executionRepository, mock(), mock()); + new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService); expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1); }); @@ -29,7 +35,7 @@ describe('WaitTracker', () => { it('if no executions to start, should do nothing', () => { executionRepository.getWaitingExecutions.mockResolvedValue([]); - new WaitTracker(mock(), executionRepository, mock(), mock()); + new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService); expect(executionRepository.findSingleExecution).not.toHaveBeenCalled(); }); @@ -37,7 +43,13 @@ describe('WaitTracker', () => { describe('if execution to start', () => { it('if not enough time passed, should not start execution', async () => { executionRepository.getWaitingExecutions.mockResolvedValue([execution]); - const waitTracker = new WaitTracker(mock(), executionRepository, mock(), mock()); + const waitTracker = new WaitTracker( + mock(), + executionRepository, + mock(), + mock(), + orchestrationService, + ); executionRepository.getWaitingExecutions.mockResolvedValue([execution]); await waitTracker.getWaitingExecutions(); @@ -51,7 +63,13 @@ describe('WaitTracker', () => { it('if enough time passed, should start execution', async () => { executionRepository.getWaitingExecutions.mockResolvedValue([]); - const waitTracker = new WaitTracker(mock(), executionRepository, mock(), mock()); + const waitTracker = new WaitTracker( + mock(), + executionRepository, + mock(), + mock(), + orchestrationService, + ); executionRepository.getWaitingExecutions.mockResolvedValue([execution]); await waitTracker.getWaitingExecutions(); @@ -68,7 +86,13 @@ describe('WaitTracker', () => { describe('startExecution()', () => { it('should query for execution to start', async () => { executionRepository.getWaitingExecutions.mockResolvedValue([]); - const waitTracker = new WaitTracker(mock(), executionRepository, mock(), mock()); + const waitTracker = new WaitTracker( + mock(), + executionRepository, + mock(), + mock(), + orchestrationService, + ); executionRepository.findSingleExecution.mockResolvedValue(execution); waitTracker.startExecution(execution.id); @@ -80,4 +104,34 @@ describe('WaitTracker', () => { }); }); }); + + describe('multi-main setup', () => { + it('should start tracking if leader', () => { + const orchestrationService = mock({ + isLeader: true, + isMultiMainSetupEnabled: true, + multiMainSetup: mock({ on: jest.fn().mockReturnThis() }), + }); + + executionRepository.getWaitingExecutions.mockResolvedValue([]); + + new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService); + + expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1); + }); + + it('should not start tracking if follower', () => { + const orchestrationService = mock({ + isLeader: false, + isMultiMainSetupEnabled: true, + multiMainSetup: mock({ on: jest.fn().mockReturnThis() }), + }); + + executionRepository.getWaitingExecutions.mockResolvedValue([]); + + new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService); + + expect(executionRepository.getWaitingExecutions).not.toHaveBeenCalled(); + }); + }); });