Skip to content

Commit

Permalink
refactor(core): Clean up event relays (no-changelog) (#10284)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Aug 2, 2024
1 parent 55f2ffe commit aa0a470
Show file tree
Hide file tree
Showing 49 changed files with 490 additions and 400 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { Logger } from '@/Logger';
import { jsonParse, type IDataObject, ApplicationError } from 'n8n-workflow';
import { EXTERNAL_SECRETS_INITIAL_BACKOFF, EXTERNAL_SECRETS_MAX_BACKOFF } from './constants';
import { License } from '@/License';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';
import { updateIntervalTime } from './externalSecretsHelper.ee';
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
import { OrchestrationService } from '@/services/orchestration.service';
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/InternalHooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import { Telemetry } from '@/telemetry';
import { MessageEventBus } from './eventbus/MessageEventBus/MessageEventBus';

/**
* @deprecated Do not add to this class. To add audit or telemetry events, use
* `EventService` to emit the event and then use the `AuditEventRelay` or
* @deprecated Do not add to this class. To add log streaming or telemetry events, use
* `EventService` to emit the event and then use the `LogStreamingEventRelay` or
* `TelemetryEventRelay` to forward them to the event bus or telemetry.
*/
@Service()
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/Ldap/ldap.controller.ee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { NON_SENSIBLE_LDAP_CONFIG_PROPERTIES } from './constants';
import { getLdapSynchronizations } from './helpers.ee';
import { LdapConfiguration } from './types';
import { LdapService } from './ldap.service.ee';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

@RestController('/ldap')
export class LdapController {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/Ldap/ldap.service.ee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import {
LDAP_LOGIN_ENABLED,
LDAP_LOGIN_LABEL,
} from './constants';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

@Service()
export class LdapService {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/PublicApi/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { UserRepository } from '@db/repositories/user.repository';
import { UrlService } from '@/services/url.service';
import type { AuthenticatedRequest } from '@/requests';
import { GlobalConfig } from '@n8n/config';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

async function createApiRouter(
version: string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { Container } from 'typedi';
import { CredentialsRepository } from '@db/repositories/credentials.repository';
import { SharedCredentialsRepository } from '@db/repositories/sharedCredentials.repository';
import { ProjectRepository } from '@/databases/repositories/project.repository';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

export async function getCredentials(credentialId: string): Promise<ICredentialsDb | null> {
return await Container.get(CredentialsRepository).findOneBy({ id: credentialId });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
getTrackingInformationFromPullResult,
isSourceControlLicensed,
} from '@/environments/sourceControl/sourceControlHelper.ee';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

export = {
pull: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import { SharedWorkflowRepository } from '@/databases/repositories/sharedWorkflo
import { TagRepository } from '@/databases/repositories/tag.repository';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { ProjectRepository } from '@/databases/repositories/project.repository';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';
import { z } from 'zod';
import { EnterpriseWorkflowService } from '@/workflows/workflow.service.ee';

Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { handleMfaDisable, isMfaFeatureEnabled } from '@/Mfa/helpers';
import type { FrontendService } from '@/services/frontend.service';
import { OrchestrationService } from '@/services/orchestration.service';
import { AuditEventRelay } from './eventbus/audit-event-relay.service';
import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay';

import '@/controllers/activeWorkflows.controller';
import '@/controllers/auth.controller';
Expand Down Expand Up @@ -64,7 +64,7 @@ import '@/ExternalSecrets/ExternalSecrets.controller.ee';
import '@/license/license.controller';
import '@/workflows/workflowHistory/workflowHistory.controller.ee';
import '@/workflows/workflows.controller';
import { EventService } from './eventbus/event.service';
import { EventService } from './events/event.service';

const exec = promisify(callbackExec);

Expand Down Expand Up @@ -250,7 +250,7 @@ export class Server extends AbstractServer {
// ----------------------------------------
const eventBus = Container.get(MessageEventBus);
await eventBus.initialize();
Container.get(AuditEventRelay).init();
Container.get(LogStreamingEventRelay).init();

if (this.endpointPresetCredentials !== '') {
// POST endpoint to set preset credentials
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { toError } from '@/utils';

import type { InviteEmailData, PasswordResetData, SendEmailResult } from './Interfaces';
import { NodeMailer } from './NodeMailer';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

type Template = HandlebarsTemplateDelegate<unknown>;
type TemplateName = 'invite' | 'passwordReset' | 'workflowShared' | 'credentialsShared';
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ import { WorkflowRepository } from './databases/repositories/workflow.repository
import { UrlService } from './services/url.service';
import { WorkflowExecutionService } from './workflows/workflowExecution.service';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { EventService } from './eventbus/event.service';
import { EventService } from './events/event.service';
import { GlobalConfig } from '@n8n/config';
import { SubworkflowPolicyChecker } from './subworkflows/subworkflow-policy-checker.service';

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import { Logger } from '@/Logger';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
import { EventService } from './eventbus/event.service';
import { EventService } from './events/event.service';

@Service()
export class WorkflowRunner {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/auth/methods/email.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Container } from 'typedi';
import { isLdapLoginEnabled } from '@/Ldap/helpers.ee';
import { UserRepository } from '@db/repositories/user.repository';
import { AuthError } from '@/errors/response-errors/auth.error';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

export const handleEmailLogin = async (
email: string,
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/auth/methods/ldap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
updateLdapUserOnLocalDb,
} from '@/Ldap/helpers.ee';
import type { User } from '@db/entities/User';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

export const handleLdapLogin = async (
loginId: string,
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { initExpressionEvaluator } from '@/ExpressionEvaluator';
import { generateHostInstanceId } from '@db/utils/generators';
import { WorkflowHistoryManager } from '@/workflows/workflowHistory/workflowHistoryManager.ee';
import { ShutdownService } from '@/shutdown/Shutdown.service';
import { TelemetryEventRelay } from '@/telemetry/telemetry-event-relay.service';
import { TelemetryEventRelay } from '@/events/telemetry-event-relay';

export abstract class BaseCommand extends Command {
protected logger = Container.get(Logger);
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { ExecutionService } from '@/executions/execution.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import type { WorkerJobStatusSummary } from '@/services/orchestration/worker/typ
import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error';
import { BaseCommand } from './BaseCommand';
import { MaxStalledCountError } from '@/errors/max-stalled-count.error';
import { AuditEventRelay } from '@/eventbus/audit-event-relay.service';
import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay';

export class Worker extends BaseCommand {
static description = '\nStarts a n8n worker';
Expand Down Expand Up @@ -286,7 +286,7 @@ export class Worker extends BaseCommand {
await Container.get(MessageEventBus).initialize({
workerId: this.queueModeId,
});
Container.get(AuditEventRelay).init();
Container.get(LogStreamingEventRelay).init();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutingWorkflowData } from '@/Interfaces';
import type { Telemetry } from '@/telemetry';
import type { EventService } from '@/eventbus/event.service';
import type { EventService } from '@/events/event.service';

describe('ConcurrencyControlService', () => {
const logger = mock<Logger>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito
import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
import type { IExecutingWorkflowData } from '@/Interfaces';
import { Telemetry } from '@/telemetry';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

export const CLOUD_TEMP_PRODUCTION_LIMIT = 999;
export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200];
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/controllers/auth.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { ForbiddenError } from '@/errors/response-errors/forbidden.error';
import { ApplicationError } from 'n8n-workflow';
import { UserRepository } from '@/databases/repositories/user.repository';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

@RestController()
export class AuthController {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { Push } from '@/push';
import { CommunityPackagesService } from '@/services/communityPackages.service';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

const {
PACKAGE_NOT_INSTALLED,
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/controllers/invitation.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { ForbiddenError } from '@/errors/response-errors/forbidden.error';
import { InternalHooks } from '@/InternalHooks';
import { ExternalHooks } from '@/ExternalHooks';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

@RestController('/invitations')
export class InvitationController {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/controllers/me.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { InternalHooks } from '@/InternalHooks';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { UserRepository } from '@/databases/repositories/user.repository';
import { isApiEnabled } from '@/PublicApi';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

export const API_KEY_PREFIX = 'n8n_api_';

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/controllers/passwordReset.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { ForbiddenError } from '@/errors/response-errors/forbidden.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { UnprocessableRequestError } from '@/errors/response-errors/unprocessable.error';
import { UserRepository } from '@/databases/repositories/user.repository';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

@RestController()
export class PasswordResetController {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/controllers/project.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { ProjectRepository } from '@/databases/repositories/project.repository';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import { In, Not } from '@n8n/typeorm';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

@RestController('/projects')
export class ProjectController {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/controllers/users.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import { Project } from '@/databases/entities/Project';
import { WorkflowService } from '@/workflows/workflow.service';
import { CredentialsService } from '@/credentials/credentials.service';
import { ProjectService } from '@/services/project.service';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

@RestController('/users')
export class UsersController {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/credentials/credentials.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { SharedCredentialsRepository } from '@/databases/repositories/sharedCred
import { SharedCredentials } from '@/databases/entities/SharedCredentials';
import { ProjectRelationRepository } from '@/databases/repositories/projectRelation.repository';
import { z } from 'zod';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

@RestController('/credentials')
export class CredentialsController {
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/decorators/Redactable.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { RedactableError } from '@/errors/redactable.error';
import type { UserLike } from '@/eventbus/event.types';
import type { UserLike } from '@/events/relay-event-map';

function toRedactable(userLike: UserLike) {
return {
Expand All @@ -14,7 +14,7 @@ function toRedactable(userLike: UserLike) {
type FieldName = 'user' | 'inviter' | 'invitee';

/**
* Mark redactable properties in a `{ user: UserLike }` field in an `AuditEventRelay`
* Mark redactable properties in a `{ user: UserLike }` field in an `LogStreamingEventRelay`
* method arg. These properties will be later redacted by the log streaming
* destination based on user prefs. Only for `n8n.audit.*` logs.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type { SourceControlPreferences } from './types/sourceControlPreferences'
import type { SourceControlledFile } from './types/sourceControlledFile';
import { SOURCE_CONTROL_DEFAULT_BRANCH } from './constants';
import type { ImportResult } from './types/importResult';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';
import { getRepoType } from './sourceControlHelper.ee';
import { SourceControlGetStatus } from './types/sourceControlGetStatus';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import type { TagEntity } from '@db/entities/TagEntity';
import type { Variables } from '@db/entities/Variables';
import type { SourceControlWorkflowVersionId } from './types/sourceControlWorkflowVersionId';
import type { ExportableCredential } from './types/exportableCredential';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';
import { TagRepository } from '@db/repositories/tag.repository';
import { Logger } from '@/Logger';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { CacheService } from '@/services/cache/cache.service';
import { VariablesRepository } from '@db/repositories/variables.repository';
import { VariableCountLimitReachedError } from '@/errors/variable-count-limit-reached.error';
import { VariableValidationError } from '@/errors/variable-validation.error';
import { EventService } from '@/eventbus/event.service';
import { EventService } from '@/events/event.service';

@Service()
export class VariablesService {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/errors/redactable.error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ApplicationError } from 'n8n-workflow';
export class RedactableError extends ApplicationError {
constructor(fieldName: string, args: string) {
super(
`Failed to find "${fieldName}" property in argument "${args.toString()}". Please set the decorator \`@Redactable()\` only on \`AuditEventRelay\` methods where the argument contains a "${fieldName}" property.`,
`Failed to find "${fieldName}" property in argument "${args.toString()}". Please set the decorator \`@Redactable()\` only on \`LogStreamingEventRelay\` methods where the argument contains a "${fieldName}" property.`,
);
}
}
6 changes: 0 additions & 6 deletions packages/cli/src/eventbus/event.service.ts

This file was deleted.

Loading

0 comments on commit aa0a470

Please sign in to comment.