Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Fix push message type inference #12331

Merged
merged 5 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cli/src/collaboration/collaboration.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,6 @@ export class CollaborationService {
collaborators: activeCollaborators,
};

this.push.sendToUsers('collaboratorsChanged', msgData, userIds);
this.push.sendToUsers({ type: 'collaboratorsChanged', data: msgData }, userIds);
}
}
45 changes: 30 additions & 15 deletions packages/cli/src/controllers/community-packages.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,12 @@ export class CommunityPackagesController {

// broadcast to connected frontends that node list has been updated
installedPackage.installedNodes.forEach((node) => {
this.push.broadcast('reloadNodeType', {
name: node.type,
version: node.latestVersion,
this.push.broadcast({
type: 'reloadNodeType',
data: {
name: node.type,
version: node.latestVersion,
},
});
});

Expand Down Expand Up @@ -206,9 +209,12 @@ export class CommunityPackagesController {

// broadcast to connected frontends that node list has been updated
installedPackage.installedNodes.forEach((node) => {
this.push.broadcast('removeNodeType', {
name: node.type,
version: node.latestVersion,
this.push.broadcast({
type: 'removeNodeType',
data: {
name: node.type,
version: node.latestVersion,
},
});
});

Expand Down Expand Up @@ -246,16 +252,22 @@ export class CommunityPackagesController {

// broadcast to connected frontends that node list has been updated
previouslyInstalledPackage.installedNodes.forEach((node) => {
this.push.broadcast('removeNodeType', {
name: node.type,
version: node.latestVersion,
this.push.broadcast({
type: 'removeNodeType',
data: {
name: node.type,
version: node.latestVersion,
},
});
});

newInstalledPackage.installedNodes.forEach((node) => {
this.push.broadcast('reloadNodeType', {
name: node.name,
version: node.latestVersion,
this.push.broadcast({
type: 'reloadNodeType',
data: {
name: node.name,
version: node.latestVersion,
},
});
});

Expand All @@ -272,9 +284,12 @@ export class CommunityPackagesController {
return newInstalledPackage;
} catch (error) {
previouslyInstalledPackage.installedNodes.forEach((node) => {
this.push.broadcast('removeNodeType', {
name: node.type,
version: node.latestVersion,
this.push.broadcast({
type: 'removeNodeType',
data: {
name: node.type,
version: node.latestVersion,
},
});
});

Expand Down
13 changes: 6 additions & 7 deletions packages/cli/src/controllers/e2e.controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { PushPayload, PushType } from '@n8n/api-types';
import type { PushMessage } from '@n8n/api-types';
import { Request } from 'express';
import Container from 'typedi';
import { v4 as uuid } from 'uuid';
Expand Down Expand Up @@ -58,14 +58,12 @@ type ResetRequest = Request<
}
>;

type PushRequest<T extends PushType> = Request<
type PushRequest = Request<
{},
{},
{
type: T;
pushRef: string;
data: PushPayload<T>;
}
} & PushMessage
>;

@RestController('/e2e')
Expand Down Expand Up @@ -144,8 +142,9 @@ export class E2EController {
}

@Post('/push', { skipAuth: true })
async pushSend(req: PushRequest<any>) {
this.push.broadcast(req.body.type, req.body.data);
async pushSend(req: PushRequest) {
const { pushRef: _, ...pushMsg } = req.body;
this.push.broadcast(pushMsg);
}

@Patch('/feature', { skipAuth: true })
Expand Down
6 changes: 2 additions & 4 deletions packages/cli/src/events/maps/pub-sub.event-map.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { PushType, WorkerStatus } from '@n8n/api-types';
import type { PushMessage, WorkerStatus } from '@n8n/api-types';

import type { IWorkflowDb } from '@/interfaces';

Expand Down Expand Up @@ -64,9 +64,7 @@ export type PubSubCommandMap = {
errorMessage: string;
};

'relay-execution-lifecycle-event': {
type: PushType;
args: Record<string, unknown>;
'relay-execution-lifecycle-event': PushMessage & {
pushRef: string;
};

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/executions/execution-recovery.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class ExecutionRecoveryService {

this.push.once('editorUiConnected', async () => {
await sleep(1000);
this.push.broadcast('executionRecovered', { executionId });
this.push.broadcast({ type: 'executionRecovered', data: { executionId } });
});

return amendedExecution;
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/load-nodes-and-credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ export class LoadNodesAndCredentials {
loader.reset();
await loader.loadAll();
await this.postProcessLoaders();
push.broadcast('nodeDescriptionUpdated', {});
push.broadcast({ type: 'nodeDescriptionUpdated', data: {} });
}, 100);

const toWatch = loader.isLazyLoaded
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/push/__tests__/websocket.push.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe('WebSocketPush', () => {
it('sends data to one connection', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
webSocketPush.sendToOne(pushMessage.type, pushMessage.data, pushRef1);
webSocketPush.sendToOne(pushMessage, pushRef1);

expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).not.toHaveBeenCalled();
Expand All @@ -82,7 +82,7 @@ describe('WebSocketPush', () => {
it('sends data to all connections', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
webSocketPush.sendToAll(pushMessage.type, pushMessage.data);
webSocketPush.sendToAll(pushMessage);

expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
Expand All @@ -101,7 +101,7 @@ describe('WebSocketPush', () => {
it('sends data to all users connections', () => {
webSocketPush.add(pushRef1, userId, mockWebSocket1);
webSocketPush.add(pushRef2, userId, mockWebSocket2);
webSocketPush.sendToUsers(pushMessage.type, pushMessage.data, [userId]);
webSocketPush.sendToUsers(pushMessage, [userId]);

expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
Expand Down
20 changes: 8 additions & 12 deletions packages/cli/src/push/abstract.push.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { PushPayload, PushType } from '@n8n/api-types';
import type { PushMessage } from '@n8n/api-types';
import { ErrorReporter } from 'n8n-core';
import { assert, jsonStringify } from 'n8n-workflow';
import { Service } from 'typedi';
Expand Down Expand Up @@ -69,7 +69,7 @@ export abstract class AbstractPush<Connection> extends TypedEmitter<AbstractPush
delete this.userIdByPushRef[pushRef];
}

private sendTo<Type extends PushType>(type: Type, data: PushPayload<Type>, pushRefs: string[]) {
private sendTo({ type, data }: PushMessage, pushRefs: string[]) {
this.logger.debug(`Pushed to frontend: ${type}`, {
dataType: type,
pushRefs: pushRefs.join(', '),
Expand All @@ -90,30 +90,26 @@ export abstract class AbstractPush<Connection> extends TypedEmitter<AbstractPush
}
}

sendToAll<Type extends PushType>(type: Type, data: PushPayload<Type>) {
this.sendTo(type, data, Object.keys(this.connections));
sendToAll(pushMsg: PushMessage) {
this.sendTo(pushMsg, Object.keys(this.connections));
}

sendToOne<Type extends PushType>(type: Type, data: PushPayload<Type>, pushRef: string) {
sendToOne(pushMsg: PushMessage, pushRef: string) {
if (this.connections[pushRef] === undefined) {
this.logger.error(`The session "${pushRef}" is not registered.`, { pushRef });
return;
}

this.sendTo(type, data, [pushRef]);
this.sendTo(pushMsg, [pushRef]);
}

sendToUsers<Type extends PushType>(
type: Type,
data: PushPayload<Type>,
userIds: Array<User['id']>,
) {
sendToUsers(pushMsg: PushMessage, userIds: Array<User['id']>) {
const { connections } = this;
const userPushRefs = Object.keys(connections).filter((pushRef) =>
userIds.includes(this.userIdByPushRef[pushRef]),
);

this.sendTo(type, data, userPushRefs);
this.sendTo(pushMsg, userPushRefs);
}

closeAllConnections() {
Expand Down
20 changes: 8 additions & 12 deletions packages/cli/src/push/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { PushPayload, PushType } from '@n8n/api-types';
import type { PushMessage } from '@n8n/api-types';
import type { Application } from 'express';
import { ServerResponse } from 'http';
import type { Server } from 'http';
Expand Down Expand Up @@ -81,11 +81,11 @@ export class Push extends TypedEmitter<PushEvents> {
this.emit('editorUiConnected', pushRef);
}

broadcast<Type extends PushType>(type: Type, data: PushPayload<Type>) {
this.backend.sendToAll(type, data);
broadcast(pushMsg: PushMessage) {
this.backend.sendToAll(pushMsg);
}

send<Type extends PushType>(type: Type, data: PushPayload<Type>, pushRef: string) {
send(pushMsg: PushMessage, pushRef: string) {
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
Expand All @@ -95,20 +95,16 @@ export class Push extends TypedEmitter<PushEvents> {
if (this.instanceSettings.isMultiMain && !this.backend.hasPushRef(pushRef)) {
void this.publisher.publishCommand({
command: 'relay-execution-lifecycle-event',
payload: { type, args: data, pushRef },
payload: { ...pushMsg, pushRef },
});
return;
}

this.backend.sendToOne(type, data, pushRef);
this.backend.sendToOne(pushMsg, pushRef);
}

sendToUsers<Type extends PushType>(
type: Type,
data: PushPayload<Type>,
userIds: Array<User['id']>,
) {
this.backend.sendToUsers(type, data, userIds);
sendToUsers(pushMsg: PushMessage, userIds: Array<User['id']>) {
this.backend.sendToUsers(pushMsg, userIds);
}

@OnShutdown()
Expand Down
50 changes: 37 additions & 13 deletions packages/cli/src/scaling/__tests__/pubsub-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,10 @@ describe('PubSubHandler', () => {
expect(activeWorkflowManager.add).toHaveBeenCalledWith(workflowId, 'activate', undefined, {
shouldPublish: false,
});
expect(push.broadcast).toHaveBeenCalledWith('workflowActivated', { workflowId });
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowActivated',
data: { workflowId },
});
expect(publisher.publishCommand).toHaveBeenCalledWith({
command: 'display-workflow-activation',
payload: { workflowId },
Expand Down Expand Up @@ -680,7 +683,10 @@ describe('PubSubHandler', () => {
expect(activeWorkflowManager.removeWorkflowTriggersAndPollers).toHaveBeenCalledWith(
workflowId,
);
expect(push.broadcast).toHaveBeenCalledWith('workflowDeactivated', { workflowId });
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowDeactivated',
data: { workflowId },
});
expect(publisher.publishCommand).toHaveBeenCalledWith({
command: 'display-workflow-deactivation',
payload: { workflowId },
Expand Down Expand Up @@ -735,7 +741,10 @@ describe('PubSubHandler', () => {

eventService.emit('display-workflow-activation', { workflowId });

expect(push.broadcast).toHaveBeenCalledWith('workflowActivated', { workflowId });
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowActivated',
data: { workflowId },
});
});

it('should handle `display-workflow-deactivation` event', () => {
Expand All @@ -758,7 +767,10 @@ describe('PubSubHandler', () => {

eventService.emit('display-workflow-deactivation', { workflowId });

expect(push.broadcast).toHaveBeenCalledWith('workflowDeactivated', { workflowId });
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowDeactivated',
data: { workflowId },
});
});

it('should handle `display-workflow-activation-error` event', () => {
Expand All @@ -782,9 +794,12 @@ describe('PubSubHandler', () => {

eventService.emit('display-workflow-activation-error', { workflowId, errorMessage });

expect(push.broadcast).toHaveBeenCalledWith('workflowFailedToActivate', {
workflowId,
errorMessage,
expect(push.broadcast).toHaveBeenCalledWith({
type: 'workflowFailedToActivate',
data: {
workflowId,
errorMessage,
},
});
});

Expand All @@ -806,15 +821,21 @@ describe('PubSubHandler', () => {

const pushRef = 'test-push-ref';
const type = 'executionStarted';
const args = { testArg: 'value' };
const data = {
executionId: '123',
mode: 'webhook' as const,
startedAt: new Date(),
workflowId: '456',
flattedRunData: '[]',
};

push.getBackend.mockReturnValue(
mock<WebSocketPush>({ hasPushRef: jest.fn().mockReturnValue(true) }),
);

eventService.emit('relay-execution-lifecycle-event', { type, args, pushRef });
eventService.emit('relay-execution-lifecycle-event', { type, data, pushRef });

expect(push.send).toHaveBeenCalledWith(type, args, pushRef);
expect(push.send).toHaveBeenCalledWith({ type, data }, pushRef);
});

it('should handle `clear-test-webhooks` event', () => {
Expand Down Expand Up @@ -868,9 +889,12 @@ describe('PubSubHandler', () => {

eventService.emit('response-to-get-worker-status', workerStatus);

expect(push.broadcast).toHaveBeenCalledWith('sendWorkerStatusMessage', {
workerId: workerStatus.senderId,
status: workerStatus,
expect(push.broadcast).toHaveBeenCalledWith({
type: 'sendWorkerStatusMessage',
data: {
workerId: workerStatus.senderId,
status: workerStatus,
},
});
});
});
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/scaling/pubsub/publisher.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ export class Publisher {
const metadata: LogMetadata = { msg: msg.command, channel: 'n8n.commands' };

if (msg.command === 'relay-execution-lifecycle-event') {
const { args, type } = msg.payload;
const { data, type } = msg.payload;
msgName += ` (${type})`;
metadata.type = type;
metadata.executionId = args.executionId;
if ('executionId' in data) metadata.executionId = data.executionId;
}

this.logger.debug(`Published pubsub msg: ${msgName}`, metadata);
Expand Down
Loading
Loading