Skip to content

Commit

Permalink
revert(core): Add command to trigger license refresh on workers (#7184)
Browse files Browse the repository at this point in the history
This reverts commit 9f797b9.
  • Loading branch information
netroy committed Sep 21, 2023
1 parent 0a31cfd commit 6ac19a5
Show file tree
Hide file tree
Showing 22 changed files with 139 additions and 294 deletions.
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
},
"dependencies": {
"@n8n/client-oauth2": "workspace:*",
"@n8n_io/license-sdk": "~2.6.0",
"@n8n_io/license-sdk": "~2.5.1",
"@oclif/command": "^1.8.16",
"@oclif/core": "^1.16.4",
"@oclif/errors": "^1.3.6",
Expand Down
40 changes: 5 additions & 35 deletions packages/cli/src/License.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ import {
SETTINGS_LICENSE_CERT_KEY,
UNLIMITED_LICENSE_QUOTA,
} from './constants';
import Container, { Service } from 'typedi';
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces';
import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher';
import { RedisService } from './services/redis.service';
import { Service } from 'typedi';
import type { BooleanLicenseFeature, NumericLicenseFeature } from './Interfaces';

type FeatureReturnType = Partial<
{
Expand All @@ -28,28 +26,18 @@ export class License {

private manager: LicenseManager | undefined;

instanceId: string | undefined;

private redisPublisher: RedisServicePubSubPublisher;

constructor() {
this.logger = getLogger();
}

async init(instanceId: string, instanceType: N8nInstanceType = 'main') {
async init(instanceId: string) {
if (this.manager) {
return;
}

this.instanceId = instanceId;
const isMainInstance = instanceType === 'main';
const server = config.getEnv('license.serverUrl');
const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled');
const offlineMode = !isMainInstance;
const autoRenewEnabled = config.getEnv('license.autoRenewEnabled');
const autoRenewOffset = config.getEnv('license.autoRenewOffset');
const saveCertStr = isMainInstance
? async (value: TLicenseBlock) => this.saveCertStr(value)
: async () => {};

try {
this.manager = new LicenseManager({
Expand All @@ -59,10 +47,9 @@ export class License {
autoRenewEnabled,
renewOnInit: autoRenewEnabled,
autoRenewOffset,
offlineMode,
logger: this.logger,
loadCertStr: async () => this.loadCertStr(),
saveCertStr,
saveCertStr: async (value: TLicenseBlock) => this.saveCertStr(value),
deviceFingerprint: () => instanceId,
});

Expand Down Expand Up @@ -100,15 +87,6 @@ export class License {
},
['key'],
);
if (config.getEnv('executions.mode') === 'queue') {
if (!this.redisPublisher) {
this.logger.debug('Initializing Redis publisher for License Service');
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
}
await this.redisPublisher.publishToCommandChannel({
command: 'reloadLicense',
});
}
}

async activate(activationKey: string): Promise<void> {
Expand All @@ -119,14 +97,6 @@ export class License {
await this.manager.activate(activationKey);
}

async reload(): Promise<void> {
if (!this.manager) {
return;
}
this.logger.debug('Reloading license');
await this.manager.reload();
}

async renew() {
if (!this.manager) {
return;
Expand Down
4 changes: 1 addition & 3 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1470,9 +1470,7 @@ export class Server extends AbstractServer {
// ----------------------------------------

if (!eventBus.isInitialized) {
await eventBus.initialize({
uniqueInstanceId: this.uniqueInstanceId,
});
await eventBus.initialize();
}

if (this.endpointPresetCredentials !== '') {
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { initErrorHandling } from '@/ErrorReporting';
import { ExternalHooks } from '@/ExternalHooks';
import { NodeTypes } from '@/NodeTypes';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import type { IExternalHooksClass, N8nInstanceType } from '@/Interfaces';
import type { IExternalHooksClass } from '@/Interfaces';
import { InternalHooks } from '@/InternalHooks';
import { PostHogClient } from '@/posthog';
import { License } from '@/License';
Expand Down Expand Up @@ -113,9 +113,9 @@ export abstract class BaseCommand extends Command {
await this.externalHooks.init();
}

async initLicense(instanceType: N8nInstanceType = 'main'): Promise<void> {
async initLicense(): Promise<void> {
const license = Container.get(License);
await license.init(this.instanceId, instanceType);
await license.init(this.instanceId);

const activationKey = config.getEnv('license.activationKey');

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ export class Start extends BaseCommand {
this.logger.info('Initializing n8n process');
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);

await this.initLicense('main');
await this.initLicense();
await this.initBinaryManager();
await this.initExternalHooks();
await this.initExternalSecrets();
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class Webhook extends BaseCommand {
await this.initCrashJournal();
await super.init();

await this.initLicense('webhook');
await this.initLicense();
await this.initBinaryManager();
await this.initExternalHooks();
await this.initExternalSecrets();
Expand Down
4 changes: 1 addition & 3 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ export class Worker extends BaseCommand {
this.logger.debug(`Worker ID: ${this.uniqueInstanceId}`);
this.logger.debug('Starting n8n worker...');

await this.initLicense('worker');
await this.initLicense();
await this.initBinaryManager();
await this.initExternalHooks();
await this.initExternalSecrets();
Expand All @@ -268,7 +268,6 @@ export class Worker extends BaseCommand {
async initEventBus() {
await eventBus.initialize({
workerId: this.uniqueInstanceId,
uniqueInstanceId: this.uniqueInstanceId,
});
}

Expand Down Expand Up @@ -296,7 +295,6 @@ export class Worker extends BaseCommand {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
getWorkerCommandReceivedHandler({
uniqueInstanceId: this.uniqueInstanceId,
instanceId: this.instanceId,
redisPublisher: this.redisPublisher,
getRunningJobIds: () => Object.keys(Worker.runningJobs),
}),
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/controllers/e2e.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export class E2EController {

private async resetLogStreaming() {
for (const id in eventBus.destinations) {
await eventBus.removeDestination(id, false);
await eventBus.removeDestination(id);
}
}

Expand Down
5 changes: 4 additions & 1 deletion packages/cli/src/controllers/orchestration.controller.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import config from '@/config';
import { Authorized, Get, RestController } from '@/decorators';
import { OrchestrationRequest } from '@/requests';
import { Service } from 'typedi';
import { OrchestrationService } from '@/services/orchestration.service';
import { OrchestrationService } from '../services/orchestration.service';

@Authorized(['global', 'owner'])
@RestController('/orchestration')
@Service()
export class OrchestrationController {
private config = config;

constructor(private readonly orchestrationService: OrchestrationService) {}

/**
Expand Down
104 changes: 10 additions & 94 deletions packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { LoggerProxy, jsonParse } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import type { MessageEventBusDestinationOptions } from 'n8n-workflow';
import type { DeleteResult } from 'typeorm';
import type {
Expand Down Expand Up @@ -27,18 +27,9 @@ import {
} from '../EventMessageClasses/EventMessageGeneric';
import { recoverExecutionDataFromEventLogMessages } from './recoverEvents';
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
import Container, { Service } from 'typedi';
import Container from 'typedi';
import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories';
import { RedisService } from '@/services/redis.service';
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
import {
COMMAND_REDIS_CHANNEL,
EVENT_BUS_REDIS_CHANNEL,
} from '@/services/redis/RedisServiceHelper';
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
import { messageToRedisServiceCommandObject } from '@/services/orchestration/helpers';
import { OrchestrationService } from '../../services/orchestration.service';

export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';

Expand All @@ -50,21 +41,13 @@ export interface MessageWithCallback {
export interface MessageEventBusInitializeOptions {
skipRecoveryPass?: boolean;
workerId?: string;
uniqueInstanceId?: string;
}

@Service()
export class MessageEventBus extends EventEmitter {
private static instance: MessageEventBus;

isInitialized: boolean;

uniqueInstanceId: string;

redisPublisher: RedisServicePubSubPublisher;

redisSubscriber: RedisServicePubSubSubscriber;

logWriter: MessageEventBusLogWriter;

destinations: {
Expand Down Expand Up @@ -93,30 +76,11 @@ export class MessageEventBus extends EventEmitter {
*
* Sets `isInitialized` to `true` once finished.
*/
async initialize(options: MessageEventBusInitializeOptions): Promise<void> {
async initialize(options?: MessageEventBusInitializeOptions): Promise<void> {
if (this.isInitialized) {
return;
}

this.uniqueInstanceId = options?.uniqueInstanceId ?? '';

if (config.getEnv('executions.mode') === 'queue') {
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber();
await this.redisSubscriber.subscribeToEventLog();
await this.redisSubscriber.subscribeToCommandChannel();
this.redisSubscriber.addMessageHandler(
'MessageEventBusMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === EVENT_BUS_REDIS_CHANNEL) {
await this.handleRedisEventBusMessage(messageString);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await this.handleRedisCommandMessage(messageString);
}
},
);
}

LoggerProxy.debug('Initializing event bus...');

const savedEventDestinations = await Db.collections.EventDestinations.find({});
Expand All @@ -125,7 +89,7 @@ export class MessageEventBus extends EventEmitter {
try {
const destination = messageEventBusDestinationFromDb(this, destinationData);
if (destination) {
await this.addDestination(destination, false);
await this.addDestination(destination);
}
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
Expand Down Expand Up @@ -218,13 +182,10 @@ export class MessageEventBus extends EventEmitter {
this.isInitialized = true;
}

async addDestination(destination: MessageEventBusDestination, notifyWorkers: boolean = true) {
await this.removeDestination(destination.getId(), false);
async addDestination(destination: MessageEventBusDestination) {
await this.removeDestination(destination.getId());
this.destinations[destination.getId()] = destination;
this.destinations[destination.getId()].startListening();
if (notifyWorkers) {
await this.broadcastRestartEventbusAfterDestinationUpdate();
}
return destination;
}

Expand All @@ -238,62 +199,19 @@ export class MessageEventBus extends EventEmitter {
return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? ''));
}

async removeDestination(
id: string,
notifyWorkers: boolean = true,
): Promise<DeleteResult | undefined> {
async removeDestination(id: string): Promise<DeleteResult | undefined> {
let result;
if (Object.keys(this.destinations).includes(id)) {
await this.destinations[id].close();
result = await this.destinations[id].deleteFromDb();
delete this.destinations[id];
}
if (notifyWorkers) {
await this.broadcastRestartEventbusAfterDestinationUpdate();
}
return result;
}

async handleRedisEventBusMessage(messageString: string) {
const eventData = jsonParse<AbstractEventMessageOptions>(messageString);
if (eventData) {
const eventMessage = getEventMessageObjectByType(eventData);
if (eventMessage) {
await Container.get(MessageEventBus).send(eventMessage);
}
}
return eventData;
}

async handleRedisCommandMessage(messageString: string) {
const message = messageToRedisServiceCommandObject(messageString);
if (message) {
if (
message.senderId === this.uniqueInstanceId ||
(message.targets && !message.targets.includes(this.uniqueInstanceId))
) {
LoggerProxy.debug(
`Skipping command message ${message.command} because it's not for this instance.`,
);
return message;
}
switch (message.command) {
case 'restartEventBus':
await this.restart();
default:
break;
}
return message;
}
return;
}

async broadcastRestartEventbusAfterDestinationUpdate() {
if (config.getEnv('executions.mode') === 'queue') {
await this.redisPublisher.publishToCommandChannel({
senderId: this.uniqueInstanceId,
command: 'restartEventBus',
});
await Container.get(OrchestrationService).restartEventBus();
}
}

Expand All @@ -317,8 +235,6 @@ export class MessageEventBus extends EventEmitter {
);
await this.destinations[destinationName].close();
}
await this.redisSubscriber?.unSubscribeFromCommandChannel();
await this.redisSubscriber?.unSubscribeFromEventLog();
this.isInitialized = false;
LoggerProxy.debug('EventBus shut down.');
}
Expand Down Expand Up @@ -501,4 +417,4 @@ export class MessageEventBus extends EventEmitter {
}
}

export const eventBus = Container.get(MessageEventBus);
export const eventBus = MessageEventBus.getInstance();
Loading

0 comments on commit 6ac19a5

Please sign in to comment.