Skip to content

Commit

Permalink
fix(core): Ensure only leader handles waiting executions (#9014)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored and despairblue committed Apr 4, 2024
1 parent 0fba0bf commit 52b6947
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 10 deletions.
22 changes: 20 additions & 2 deletions packages/cli/src/WaitTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import { Container, Service } from 'typedi';
import type { IExecutionsStopData, IWorkflowExecutionDataProcess } from '@/Interfaces';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { OwnershipService } from './services/ownership.service';
import { OwnershipService } from '@/services/ownership.service';
import { Logger } from '@/Logger';
import { OrchestrationService } from '@/services/orchestration.service';

@Service()
export class WaitTracker {
Expand All @@ -26,7 +27,22 @@ export class WaitTracker {
private readonly executionRepository: ExecutionRepository,
private readonly ownershipService: OwnershipService,
private readonly workflowRunner: WorkflowRunner,
readonly orchestrationService: OrchestrationService,
) {
const { isLeader, isMultiMainSetupEnabled, multiMainSetup } = orchestrationService;

if (isLeader) this.startTracking();

if (isMultiMainSetupEnabled) {
multiMainSetup
.on('leader-takeover', () => this.startTracking())
.on('leader-stepdown', () => this.stopTracking());
}
}

startTracking() {
this.logger.debug('Wait tracker started tracking waiting executions');

// Poll every 60 seconds a list of upcoming executions
this.mainTimer = setInterval(() => {
void this.getWaitingExecutions();
Expand Down Expand Up @@ -174,7 +190,9 @@ export class WaitTracker {
});
}

shutdown() {
stopTracking() {
this.logger.debug('Wait tracker shutting down');

clearInterval(this.mainTimer);
Object.keys(this.waitingExecutions).forEach((executionId) => {
clearTimeout(this.waitingExecutions[executionId].timer);
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export class Start extends BaseCommand {
// Stop with trying to activate workflows that could not be activated
this.activeWorkflowRunner.removeAllQueuedWorkflowActivations();

Container.get(WaitTracker).shutdown();
Container.get(WaitTracker).stopTracking();

await this.externalHooks?.run('n8n.stop', []);

Expand Down
3 changes: 3 additions & 0 deletions packages/cli/src/services/orchestration.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ export class OrchestrationService {
return config.getEnv('redis.queueModeId');
}

/**
* Whether this instance is the leader in a multi-main setup. Always `true` in single-main setup.
*/
get isLeader() {
return config.getEnv('multiMainSetup.instanceType') === 'leader';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export class MultiMainSetup extends EventEmitter {
if (config.getEnv('multiMainSetup.instanceType') === 'leader') {
config.set('multiMainSetup.instanceType', 'follower');

this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning
this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking

EventReporter.info('[Multi-main setup] Leader failed to renew leader key');
}
Expand Down Expand Up @@ -97,7 +97,7 @@ export class MultiMainSetup extends EventEmitter {

await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl);

this.emit('leader-takeover'); // gained leadership - start triggers, pollers, pruning
this.emit('leader-takeover'); // gained leadership - start triggers, pollers, pruning, wait-tracking
} else {
config.set('multiMainSetup.instanceType', 'follower');
}
Expand Down
64 changes: 59 additions & 5 deletions packages/cli/test/unit/WaitTracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ import { WaitTracker } from '@/WaitTracker';
import { mock } from 'jest-mock-extended';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionResponse } from '@/Interfaces';
import type { OrchestrationService } from '@/services/orchestration.service';
import type { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';

jest.useFakeTimers();

describe('WaitTracker', () => {
const executionRepository = mock<ExecutionRepository>();
const orchestrationService = mock<OrchestrationService>({
isLeader: true,
isMultiMainSetupEnabled: false,
});

const execution = mock<IExecutionResponse>({
id: '123',
Expand All @@ -21,23 +27,29 @@ describe('WaitTracker', () => {
it('should query DB for waiting executions', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);

new WaitTracker(mock(), executionRepository, mock(), mock());
new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);

expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
});

it('if no executions to start, should do nothing', () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);

new WaitTracker(mock(), executionRepository, mock(), mock());
new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);

expect(executionRepository.findSingleExecution).not.toHaveBeenCalled();
});

describe('if execution to start', () => {
it('if not enough time passed, should not start execution', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
const waitTracker = new WaitTracker(mock(), executionRepository, mock(), mock());
const waitTracker = new WaitTracker(
mock(),
executionRepository,
mock(),
mock(),
orchestrationService,
);

executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
await waitTracker.getWaitingExecutions();
Expand All @@ -51,7 +63,13 @@ describe('WaitTracker', () => {

it('if enough time passed, should start execution', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
const waitTracker = new WaitTracker(mock(), executionRepository, mock(), mock());
const waitTracker = new WaitTracker(
mock(),
executionRepository,
mock(),
mock(),
orchestrationService,
);

executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
await waitTracker.getWaitingExecutions();
Expand All @@ -68,7 +86,13 @@ describe('WaitTracker', () => {
describe('startExecution()', () => {
it('should query for execution to start', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
const waitTracker = new WaitTracker(mock(), executionRepository, mock(), mock());
const waitTracker = new WaitTracker(
mock(),
executionRepository,
mock(),
mock(),
orchestrationService,
);

executionRepository.findSingleExecution.mockResolvedValue(execution);
waitTracker.startExecution(execution.id);
Expand All @@ -80,4 +104,34 @@ describe('WaitTracker', () => {
});
});
});

describe('multi-main setup', () => {
it('should start tracking if leader', () => {
const orchestrationService = mock<OrchestrationService>({
isLeader: true,
isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>({ on: jest.fn().mockReturnThis() }),
});

executionRepository.getWaitingExecutions.mockResolvedValue([]);

new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);

expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
});

it('should not start tracking if follower', () => {
const orchestrationService = mock<OrchestrationService>({
isLeader: false,
isMultiMainSetupEnabled: true,
multiMainSetup: mock<MultiMainSetup>({ on: jest.fn().mockReturnThis() }),
});

executionRepository.getWaitingExecutions.mockResolvedValue([]);

new WaitTracker(mock(), executionRepository, mock(), mock(), orchestrationService);

expect(executionRepository.getWaitingExecutions).not.toHaveBeenCalled();
});
});
});

0 comments on commit 52b6947

Please sign in to comment.