From 518b562261de87946dcd2508c6e72c26f250d382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 20 Dec 2024 14:50:52 +0100 Subject: [PATCH 1/4] refactor(core): Fix push message type inference --- packages/cli/src/push/index.ts | 4 +- .../cli/src/scaling/pubsub/pubsub-handler.ts | 3 +- packages/cli/src/webhooks/test-webhooks.ts | 5 +-- .../src/workflow-execute-additional-data.ts | 38 ++++++++++--------- 4 files changed, 27 insertions(+), 23 deletions(-) diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 0007001e33355..e2a85bb8ea247 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -1,4 +1,4 @@ -import type { PushPayload, PushType } from '@n8n/api-types'; +import type { PushMessage, PushPayload, PushType } from '@n8n/api-types'; import type { Application } from 'express'; import { ServerResponse } from 'http'; import type { Server } from 'http'; @@ -85,7 +85,7 @@ export class Push extends TypedEmitter { this.backend.sendToAll(type, data); } - send(type: Type, data: PushPayload, pushRef: string) { + send({ type, data }: PushMessage, pushRef: string) { /** * Multi-main setup: In a manual webhook execution, the main process that * handles a webhook might not be the same as the main process that created diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts index deeed5b584dbb..49ee7b083ca23 100644 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -1,3 +1,4 @@ +import type { PushMessage } from '@n8n/api-types'; import { InstanceSettings } from 'n8n-core'; import { ensureError } from 'n8n-workflow'; import { Service } from 'typedi'; @@ -156,7 +157,7 @@ export class PubSubHandler { 'relay-execution-lifecycle-event': async ({ type, args, pushRef }) => { if (!this.push.getBackend().hasPushRef(pushRef)) return; - this.push.send(type, args, pushRef); + this.push.send({ type, data: args } as PushMessage, pushRef); }, 'clear-test-webhooks': async ({ webhookKey, workflowEntity, pushRef }) => { if (!this.push.getBackend().hasPushRef(pushRef)) return; diff --git a/packages/cli/src/webhooks/test-webhooks.ts b/packages/cli/src/webhooks/test-webhooks.ts index ad642a17c33c2..b90b1db59d985 100644 --- a/packages/cli/src/webhooks/test-webhooks.ts +++ b/packages/cli/src/webhooks/test-webhooks.ts @@ -142,8 +142,7 @@ export class TestWebhooks implements IWebhookManager { // Inform editor-ui that webhook got received if (pushRef !== undefined) { this.push.send( - 'testWebhookReceived', - { workflowId: webhook?.workflowId, executionId }, + { type: 'testWebhookReceived', data: { workflowId: webhook?.workflowId, executionId } }, pushRef, ); } @@ -354,7 +353,7 @@ export class TestWebhooks implements IWebhookManager { if (pushRef !== undefined) { try { - this.push.send('testWebhookDeleted', { workflowId }, pushRef); + this.push.send({ type: 'testWebhookDeleted', data: { workflowId } }, pushRef); } catch { // Could not inform editor, probably is not connected anymore. So simply go on. } diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index a97bb3d3faf8f..d748a8e065e38 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -2,7 +2,7 @@ /* eslint-disable @typescript-eslint/no-use-before-define */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -import type { PushType } from '@n8n/api-types'; +import type { PushMessage, PushType } from '@n8n/api-types'; import { GlobalConfig } from '@n8n/config'; import { stringify } from 'flatted'; import { ErrorReporter, WorkflowExecute } from 'n8n-core'; @@ -262,7 +262,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { workflowId: this.workflowData.id, }); - pushInstance.send('nodeExecuteBefore', { executionId, nodeName }, pushRef); + pushInstance.send({ type: 'nodeExecuteBefore', data: { executionId, nodeName } }, pushRef); }, ], nodeExecuteAfter: [ @@ -279,7 +279,10 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { workflowId: this.workflowData.id, }); - pushInstance.send('nodeExecuteAfter', { executionId, nodeName, data }, pushRef); + pushInstance.send( + { type: 'nodeExecuteAfter', data: { executionId, nodeName, data } }, + pushRef, + ); }, ], workflowExecuteBefore: [ @@ -296,17 +299,19 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { return; } pushInstance.send( - 'executionStarted', { - executionId, - mode: this.mode, - startedAt: new Date(), - retryOf: this.retryOf, - workflowId, - workflowName, - flattedRunData: data?.resultData.runData - ? stringify(data.resultData.runData) - : stringify({}), + type: 'executionStarted', + data: { + executionId, + mode: this.mode, + startedAt: new Date(), + retryOf: this.retryOf, + workflowId, + workflowName, + flattedRunData: data?.resultData.runData + ? stringify(data.resultData.runData) + : stringify({}), + }, }, pushRef, ); @@ -326,12 +331,11 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { const { status } = fullRunData; if (status === 'waiting') { - pushInstance.send('executionWaiting', { executionId }, pushRef); + pushInstance.send({ type: 'executionWaiting', data: { executionId } }, pushRef); } else { const rawData = stringify(fullRunData.data); pushInstance.send( - 'executionFinished', - { executionId, workflowId, status, rawData }, + { type: 'executionFinished', data: { executionId, workflowId, status, rawData } }, pushRef, ); } @@ -974,7 +978,7 @@ export function sendDataToUI(type: PushType, data: IDataObject | IDataObject[]) // Push data to session which started workflow try { const pushInstance = Container.get(Push); - pushInstance.send(type, data, pushRef); + pushInstance.send({ type, data } as PushMessage, pushRef); } catch (error) { const logger = Container.get(Logger); logger.warn(`There was a problem sending message to UI: ${error.message}`); From 926435b7ff05944c7b6521eba5114d8a3041a664 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 20 Dec 2024 15:09:02 +0100 Subject: [PATCH 2/4] Fix test --- packages/cli/src/scaling/__tests__/pubsub-handler.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts index 314ded0b8b953..cc5bd81dc6d2a 100644 --- a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts +++ b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts @@ -814,7 +814,7 @@ describe('PubSubHandler', () => { eventService.emit('relay-execution-lifecycle-event', { type, args, pushRef }); - expect(push.send).toHaveBeenCalledWith(type, args, pushRef); + expect(push.send).toHaveBeenCalledWith({ type, data: args }, pushRef); }); it('should handle `clear-test-webhooks` event', () => { From 75b9e71ce4241fa88834d79439fdafab9cd551b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 20 Dec 2024 16:59:58 +0100 Subject: [PATCH 3/4] Update pubsub sevent map and downstream --- .../collaboration/collaboration.service.ts | 2 +- .../community-packages.controller.ts | 45 ++++++++++++------- .../cli/src/controllers/e2e.controller.ts | 13 +++--- .../cli/src/events/maps/pub-sub.event-map.ts | 6 +-- .../executions/execution-recovery.service.ts | 2 +- .../cli/src/load-nodes-and-credentials.ts | 2 +- .../src/push/__tests__/websocket.push.test.ts | 6 +-- packages/cli/src/push/abstract.push.ts | 20 ++++----- packages/cli/src/push/index.ts | 20 ++++----- .../scaling/__tests__/pubsub-handler.test.ts | 12 +++-- .../src/scaling/pubsub/publisher.service.ts | 4 +- .../cli/src/scaling/pubsub/pubsub-handler.ts | 29 +++++++----- .../src/scaling/pubsub/subscriber.service.ts | 4 +- 13 files changed, 90 insertions(+), 75 deletions(-) diff --git a/packages/cli/src/collaboration/collaboration.service.ts b/packages/cli/src/collaboration/collaboration.service.ts index ece93bd5b294e..a6e957f510be2 100644 --- a/packages/cli/src/collaboration/collaboration.service.ts +++ b/packages/cli/src/collaboration/collaboration.service.ts @@ -99,6 +99,6 @@ export class CollaborationService { collaborators: activeCollaborators, }; - this.push.sendToUsers('collaboratorsChanged', msgData, userIds); + this.push.sendToUsers({ type: 'collaboratorsChanged', data: msgData }, userIds); } } diff --git a/packages/cli/src/controllers/community-packages.controller.ts b/packages/cli/src/controllers/community-packages.controller.ts index 918f1cdf74598..ab2134b7e010c 100644 --- a/packages/cli/src/controllers/community-packages.controller.ts +++ b/packages/cli/src/controllers/community-packages.controller.ts @@ -115,9 +115,12 @@ export class CommunityPackagesController { // broadcast to connected frontends that node list has been updated installedPackage.installedNodes.forEach((node) => { - this.push.broadcast('reloadNodeType', { - name: node.type, - version: node.latestVersion, + this.push.broadcast({ + type: 'reloadNodeType', + data: { + name: node.type, + version: node.latestVersion, + }, }); }); @@ -206,9 +209,12 @@ export class CommunityPackagesController { // broadcast to connected frontends that node list has been updated installedPackage.installedNodes.forEach((node) => { - this.push.broadcast('removeNodeType', { - name: node.type, - version: node.latestVersion, + this.push.broadcast({ + type: 'removeNodeType', + data: { + name: node.type, + version: node.latestVersion, + }, }); }); @@ -246,16 +252,22 @@ export class CommunityPackagesController { // broadcast to connected frontends that node list has been updated previouslyInstalledPackage.installedNodes.forEach((node) => { - this.push.broadcast('removeNodeType', { - name: node.type, - version: node.latestVersion, + this.push.broadcast({ + type: 'removeNodeType', + data: { + name: node.type, + version: node.latestVersion, + }, }); }); newInstalledPackage.installedNodes.forEach((node) => { - this.push.broadcast('reloadNodeType', { - name: node.name, - version: node.latestVersion, + this.push.broadcast({ + type: 'reloadNodeType', + data: { + name: node.name, + version: node.latestVersion, + }, }); }); @@ -272,9 +284,12 @@ export class CommunityPackagesController { return newInstalledPackage; } catch (error) { previouslyInstalledPackage.installedNodes.forEach((node) => { - this.push.broadcast('removeNodeType', { - name: node.type, - version: node.latestVersion, + this.push.broadcast({ + type: 'removeNodeType', + data: { + name: node.type, + version: node.latestVersion, + }, }); }); diff --git a/packages/cli/src/controllers/e2e.controller.ts b/packages/cli/src/controllers/e2e.controller.ts index 9c5a1ff36d386..a61342320dd04 100644 --- a/packages/cli/src/controllers/e2e.controller.ts +++ b/packages/cli/src/controllers/e2e.controller.ts @@ -1,4 +1,4 @@ -import type { PushPayload, PushType } from '@n8n/api-types'; +import type { PushMessage } from '@n8n/api-types'; import { Request } from 'express'; import Container from 'typedi'; import { v4 as uuid } from 'uuid'; @@ -58,14 +58,12 @@ type ResetRequest = Request< } >; -type PushRequest = Request< +type PushRequest = Request< {}, {}, { - type: T; pushRef: string; - data: PushPayload; - } + } & PushMessage >; @RestController('/e2e') @@ -144,8 +142,9 @@ export class E2EController { } @Post('/push', { skipAuth: true }) - async pushSend(req: PushRequest) { - this.push.broadcast(req.body.type, req.body.data); + async pushSend(req: PushRequest) { + const { pushRef: _, ...pushMsg } = req.body; + this.push.broadcast(pushMsg); } @Patch('/feature', { skipAuth: true }) diff --git a/packages/cli/src/events/maps/pub-sub.event-map.ts b/packages/cli/src/events/maps/pub-sub.event-map.ts index ff27741b9b772..0d71fcff91510 100644 --- a/packages/cli/src/events/maps/pub-sub.event-map.ts +++ b/packages/cli/src/events/maps/pub-sub.event-map.ts @@ -1,4 +1,4 @@ -import type { PushType, WorkerStatus } from '@n8n/api-types'; +import type { PushMessage, WorkerStatus } from '@n8n/api-types'; import type { IWorkflowDb } from '@/interfaces'; @@ -64,9 +64,7 @@ export type PubSubCommandMap = { errorMessage: string; }; - 'relay-execution-lifecycle-event': { - type: PushType; - args: Record; + 'relay-execution-lifecycle-event': PushMessage & { pushRef: string; }; diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index 33576d1368d0a..a10fc995a472d 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -49,7 +49,7 @@ export class ExecutionRecoveryService { this.push.once('editorUiConnected', async () => { await sleep(1000); - this.push.broadcast('executionRecovered', { executionId }); + this.push.broadcast({ type: 'executionRecovered', data: { executionId } }); }); return amendedExecution; diff --git a/packages/cli/src/load-nodes-and-credentials.ts b/packages/cli/src/load-nodes-and-credentials.ts index f6f66d024b162..db62e8415ea71 100644 --- a/packages/cli/src/load-nodes-and-credentials.ts +++ b/packages/cli/src/load-nodes-and-credentials.ts @@ -520,7 +520,7 @@ export class LoadNodesAndCredentials { loader.reset(); await loader.loadAll(); await this.postProcessLoaders(); - push.broadcast('nodeDescriptionUpdated', {}); + push.broadcast({ type: 'nodeDescriptionUpdated', data: {} }); }, 100); const toWatch = loader.isLazyLoaded diff --git a/packages/cli/src/push/__tests__/websocket.push.test.ts b/packages/cli/src/push/__tests__/websocket.push.test.ts index 2362e5a0c6854..fd1e2f27a0b52 100644 --- a/packages/cli/src/push/__tests__/websocket.push.test.ts +++ b/packages/cli/src/push/__tests__/websocket.push.test.ts @@ -73,7 +73,7 @@ describe('WebSocketPush', () => { it('sends data to one connection', () => { webSocketPush.add(pushRef1, userId, mockWebSocket1); webSocketPush.add(pushRef2, userId, mockWebSocket2); - webSocketPush.sendToOne(pushMessage.type, pushMessage.data, pushRef1); + webSocketPush.sendToOne(pushMessage, pushRef1); expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); expect(mockWebSocket2.send).not.toHaveBeenCalled(); @@ -82,7 +82,7 @@ describe('WebSocketPush', () => { it('sends data to all connections', () => { webSocketPush.add(pushRef1, userId, mockWebSocket1); webSocketPush.add(pushRef2, userId, mockWebSocket2); - webSocketPush.sendToAll(pushMessage.type, pushMessage.data); + webSocketPush.sendToAll(pushMessage); expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg); @@ -101,7 +101,7 @@ describe('WebSocketPush', () => { it('sends data to all users connections', () => { webSocketPush.add(pushRef1, userId, mockWebSocket1); webSocketPush.add(pushRef2, userId, mockWebSocket2); - webSocketPush.sendToUsers(pushMessage.type, pushMessage.data, [userId]); + webSocketPush.sendToUsers(pushMessage, [userId]); expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg); expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg); diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index 83a859fc75d54..574f8a0def056 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -1,4 +1,4 @@ -import type { PushPayload, PushType } from '@n8n/api-types'; +import type { PushMessage } from '@n8n/api-types'; import { ErrorReporter } from 'n8n-core'; import { assert, jsonStringify } from 'n8n-workflow'; import { Service } from 'typedi'; @@ -69,7 +69,7 @@ export abstract class AbstractPush extends TypedEmitter(type: Type, data: PushPayload, pushRefs: string[]) { + private sendTo({ type, data }: PushMessage, pushRefs: string[]) { this.logger.debug(`Pushed to frontend: ${type}`, { dataType: type, pushRefs: pushRefs.join(', '), @@ -90,30 +90,26 @@ export abstract class AbstractPush extends TypedEmitter(type: Type, data: PushPayload) { - this.sendTo(type, data, Object.keys(this.connections)); + sendToAll(pushMsg: PushMessage) { + this.sendTo(pushMsg, Object.keys(this.connections)); } - sendToOne(type: Type, data: PushPayload, pushRef: string) { + sendToOne(pushMsg: PushMessage, pushRef: string) { if (this.connections[pushRef] === undefined) { this.logger.error(`The session "${pushRef}" is not registered.`, { pushRef }); return; } - this.sendTo(type, data, [pushRef]); + this.sendTo(pushMsg, [pushRef]); } - sendToUsers( - type: Type, - data: PushPayload, - userIds: Array, - ) { + sendToUsers(pushMsg: PushMessage, userIds: Array) { const { connections } = this; const userPushRefs = Object.keys(connections).filter((pushRef) => userIds.includes(this.userIdByPushRef[pushRef]), ); - this.sendTo(type, data, userPushRefs); + this.sendTo(pushMsg, userPushRefs); } closeAllConnections() { diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index e2a85bb8ea247..7325981d0bba5 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -1,4 +1,4 @@ -import type { PushMessage, PushPayload, PushType } from '@n8n/api-types'; +import type { PushMessage } from '@n8n/api-types'; import type { Application } from 'express'; import { ServerResponse } from 'http'; import type { Server } from 'http'; @@ -81,11 +81,11 @@ export class Push extends TypedEmitter { this.emit('editorUiConnected', pushRef); } - broadcast(type: Type, data: PushPayload) { - this.backend.sendToAll(type, data); + broadcast(pushMsg: PushMessage) { + this.backend.sendToAll(pushMsg); } - send({ type, data }: PushMessage, pushRef: string) { + send(pushMsg: PushMessage, pushRef: string) { /** * Multi-main setup: In a manual webhook execution, the main process that * handles a webhook might not be the same as the main process that created @@ -95,20 +95,16 @@ export class Push extends TypedEmitter { if (this.instanceSettings.isMultiMain && !this.backend.hasPushRef(pushRef)) { void this.publisher.publishCommand({ command: 'relay-execution-lifecycle-event', - payload: { type, args: data, pushRef }, + payload: { ...pushMsg, pushRef }, }); return; } - this.backend.sendToOne(type, data, pushRef); + this.backend.sendToOne(pushMsg, pushRef); } - sendToUsers( - type: Type, - data: PushPayload, - userIds: Array, - ) { - this.backend.sendToUsers(type, data, userIds); + sendToUsers(pushMsg: PushMessage, userIds: Array) { + this.backend.sendToUsers(pushMsg, userIds); } @OnShutdown() diff --git a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts index cc5bd81dc6d2a..d24339887d45e 100644 --- a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts +++ b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts @@ -806,15 +806,21 @@ describe('PubSubHandler', () => { const pushRef = 'test-push-ref'; const type = 'executionStarted'; - const args = { testArg: 'value' }; + const data = { + executionId: '123', + mode: 'webhook' as const, + startedAt: new Date(), + workflowId: '456', + flattedRunData: '[]', + }; push.getBackend.mockReturnValue( mock({ hasPushRef: jest.fn().mockReturnValue(true) }), ); - eventService.emit('relay-execution-lifecycle-event', { type, args, pushRef }); + eventService.emit('relay-execution-lifecycle-event', { type, data, pushRef }); - expect(push.send).toHaveBeenCalledWith({ type, data: args }, pushRef); + expect(push.send).toHaveBeenCalledWith({ type, data }, pushRef); }); it('should handle `clear-test-webhooks` event', () => { diff --git a/packages/cli/src/scaling/pubsub/publisher.service.ts b/packages/cli/src/scaling/pubsub/publisher.service.ts index fc007f76c0b81..4723b1d37dea7 100644 --- a/packages/cli/src/scaling/pubsub/publisher.service.ts +++ b/packages/cli/src/scaling/pubsub/publisher.service.ts @@ -65,10 +65,10 @@ export class Publisher { const metadata: LogMetadata = { msg: msg.command, channel: 'n8n.commands' }; if (msg.command === 'relay-execution-lifecycle-event') { - const { args, type } = msg.payload; + const { data, type } = msg.payload; msgName += ` (${type})`; metadata.type = type; - metadata.executionId = args.executionId; + if ('executionId' in data) metadata.executionId = data.executionId; } this.logger.debug(`Published pubsub msg: ${msgName}`, metadata); diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts index 49ee7b083ca23..70b5f67f72c37 100644 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ b/packages/cli/src/scaling/pubsub/pubsub-handler.ts @@ -1,4 +1,3 @@ -import type { PushMessage } from '@n8n/api-types'; import { InstanceSettings } from 'n8n-core'; import { ensureError } from 'n8n-workflow'; import { Service } from 'typedi'; @@ -60,9 +59,12 @@ export class PubSubHandler { ...this.commonHandlers, ...this.multiMainHandlers, 'response-to-get-worker-status': async (payload) => - this.push.broadcast('sendWorkerStatusMessage', { - workerId: payload.senderId, - status: payload, + this.push.broadcast({ + type: 'sendWorkerStatusMessage', + data: { + workerId: payload.senderId, + status: payload, + }, }), }); @@ -114,7 +116,7 @@ export class PubSubHandler { shouldPublish: false, // prevent leader from re-publishing message }); - this.push.broadcast('workflowActivated', { workflowId }); + this.push.broadcast({ type: 'workflowActivated', data: { workflowId } }); await this.publisher.publishCommand({ command: 'display-workflow-activation', @@ -126,7 +128,10 @@ export class PubSubHandler { await this.workflowRepository.update(workflowId, { active: false }); - this.push.broadcast('workflowFailedToActivate', { workflowId, errorMessage: message }); + this.push.broadcast({ + type: 'workflowFailedToActivate', + data: { workflowId, errorMessage: message }, + }); await this.publisher.publishCommand({ command: 'display-workflow-activation-error', @@ -140,7 +145,7 @@ export class PubSubHandler { await this.activeWorkflowManager.removeActivationError(workflowId); await this.activeWorkflowManager.removeWorkflowTriggersAndPollers(workflowId); - this.push.broadcast('workflowDeactivated', { workflowId }); + this.push.broadcast({ type: 'workflowDeactivated', data: { workflowId } }); // instruct followers to show workflow deactivation in UI await this.publisher.publishCommand({ @@ -149,15 +154,15 @@ export class PubSubHandler { }); }, 'display-workflow-activation': async ({ workflowId }) => - this.push.broadcast('workflowActivated', { workflowId }), + this.push.broadcast({ type: 'workflowActivated', data: { workflowId } }), 'display-workflow-deactivation': async ({ workflowId }) => - this.push.broadcast('workflowDeactivated', { workflowId }), + this.push.broadcast({ type: 'workflowDeactivated', data: { workflowId } }), 'display-workflow-activation-error': async ({ workflowId, errorMessage }) => - this.push.broadcast('workflowFailedToActivate', { workflowId, errorMessage }), - 'relay-execution-lifecycle-event': async ({ type, args, pushRef }) => { + this.push.broadcast({ type: 'workflowFailedToActivate', data: { workflowId, errorMessage } }), + 'relay-execution-lifecycle-event': async ({ pushRef, ...pushMsg }) => { if (!this.push.getBackend().hasPushRef(pushRef)) return; - this.push.send({ type, data: args } as PushMessage, pushRef); + this.push.send(pushMsg, pushRef); }, 'clear-test-webhooks': async ({ webhookKey, workflowEntity, pushRef }) => { if (!this.push.getBackend().hasPushRef(pushRef)) return; diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index 248c1198d2eb0..0ce343c139b0a 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -95,10 +95,10 @@ export class Subscriber { const metadata: LogMetadata = { msg: msgName, channel }; if ('command' in msg && msg.command === 'relay-execution-lifecycle-event') { - const { args, type } = msg.payload; + const { data, type } = msg.payload; msgName += ` (${type})`; metadata.type = type; - metadata.executionId = args.executionId; + if ('executionId' in data) metadata.executionId = data.executionId; } this.logger.debug(`Received pubsub msg: ${msgName}`, metadata); From 0cab6f97a42803cfa8c60fcba5dc823ea97696a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 20 Dec 2024 17:40:20 +0100 Subject: [PATCH 4/4] Fix tests --- .../scaling/__tests__/pubsub-handler.test.ts | 38 ++++++++--- .../collaboration.service.test.ts | 68 ++++++++++--------- 2 files changed, 65 insertions(+), 41 deletions(-) diff --git a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts index d24339887d45e..4f8c8af85956f 100644 --- a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts +++ b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts @@ -620,7 +620,10 @@ describe('PubSubHandler', () => { expect(activeWorkflowManager.add).toHaveBeenCalledWith(workflowId, 'activate', undefined, { shouldPublish: false, }); - expect(push.broadcast).toHaveBeenCalledWith('workflowActivated', { workflowId }); + expect(push.broadcast).toHaveBeenCalledWith({ + type: 'workflowActivated', + data: { workflowId }, + }); expect(publisher.publishCommand).toHaveBeenCalledWith({ command: 'display-workflow-activation', payload: { workflowId }, @@ -680,7 +683,10 @@ describe('PubSubHandler', () => { expect(activeWorkflowManager.removeWorkflowTriggersAndPollers).toHaveBeenCalledWith( workflowId, ); - expect(push.broadcast).toHaveBeenCalledWith('workflowDeactivated', { workflowId }); + expect(push.broadcast).toHaveBeenCalledWith({ + type: 'workflowDeactivated', + data: { workflowId }, + }); expect(publisher.publishCommand).toHaveBeenCalledWith({ command: 'display-workflow-deactivation', payload: { workflowId }, @@ -735,7 +741,10 @@ describe('PubSubHandler', () => { eventService.emit('display-workflow-activation', { workflowId }); - expect(push.broadcast).toHaveBeenCalledWith('workflowActivated', { workflowId }); + expect(push.broadcast).toHaveBeenCalledWith({ + type: 'workflowActivated', + data: { workflowId }, + }); }); it('should handle `display-workflow-deactivation` event', () => { @@ -758,7 +767,10 @@ describe('PubSubHandler', () => { eventService.emit('display-workflow-deactivation', { workflowId }); - expect(push.broadcast).toHaveBeenCalledWith('workflowDeactivated', { workflowId }); + expect(push.broadcast).toHaveBeenCalledWith({ + type: 'workflowDeactivated', + data: { workflowId }, + }); }); it('should handle `display-workflow-activation-error` event', () => { @@ -782,9 +794,12 @@ describe('PubSubHandler', () => { eventService.emit('display-workflow-activation-error', { workflowId, errorMessage }); - expect(push.broadcast).toHaveBeenCalledWith('workflowFailedToActivate', { - workflowId, - errorMessage, + expect(push.broadcast).toHaveBeenCalledWith({ + type: 'workflowFailedToActivate', + data: { + workflowId, + errorMessage, + }, }); }); @@ -874,9 +889,12 @@ describe('PubSubHandler', () => { eventService.emit('response-to-get-worker-status', workerStatus); - expect(push.broadcast).toHaveBeenCalledWith('sendWorkerStatusMessage', { - workerId: workerStatus.senderId, - status: workerStatus, + expect(push.broadcast).toHaveBeenCalledWith({ + type: 'sendWorkerStatusMessage', + data: { + workerId: workerStatus.senderId, + status: workerStatus, + }, }); }); }); diff --git a/packages/cli/test/integration/collaboration/collaboration.service.test.ts b/packages/cli/test/integration/collaboration/collaboration.service.test.ts index df5f901f2817d..ab7a8314b3a55 100644 --- a/packages/cli/test/integration/collaboration/collaboration.service.test.ts +++ b/packages/cli/test/integration/collaboration/collaboration.service.test.ts @@ -78,37 +78,41 @@ describe('CollaborationService', () => { // Assert expect(sendToUsersSpy).toHaveBeenNthCalledWith( 1, - 'collaboratorsChanged', { - collaborators: [ - { - lastSeen: expect.any(String), - user: owner.toIUser(), - }, - ], - workflowId: workflow.id, + type: 'collaboratorsChanged', + data: { + collaborators: [ + { + lastSeen: expect.any(String), + user: owner.toIUser(), + }, + ], + workflowId: workflow.id, + }, }, [owner.id], ); expect(sendToUsersSpy).toHaveBeenNthCalledWith( 2, - 'collaboratorsChanged', { - collaborators: expect.arrayContaining([ - expect.objectContaining({ - lastSeen: expect.any(String), - user: expect.objectContaining({ - id: owner.id, + type: 'collaboratorsChanged', + data: { + collaborators: expect.arrayContaining([ + expect.objectContaining({ + lastSeen: expect.any(String), + user: expect.objectContaining({ + id: owner.id, + }), }), - }), - expect.objectContaining({ - lastSeen: expect.any(String), - user: expect.objectContaining({ - id: memberWithAccess.id, + expect.objectContaining({ + lastSeen: expect.any(String), + user: expect.objectContaining({ + id: memberWithAccess.id, + }), }), - }), - ]), - workflowId: workflow.id, + ]), + workflowId: workflow.id, + }, }, [owner.id, memberWithAccess.id], ); @@ -151,17 +155,19 @@ describe('CollaborationService', () => { // Assert expect(sendToUsersSpy).toHaveBeenCalledWith( - 'collaboratorsChanged', { - collaborators: expect.arrayContaining([ - expect.objectContaining({ - lastSeen: expect.any(String), - user: expect.objectContaining({ - id: memberWithAccess.id, + type: 'collaboratorsChanged', + data: { + collaborators: expect.arrayContaining([ + expect.objectContaining({ + lastSeen: expect.any(String), + user: expect.objectContaining({ + id: memberWithAccess.id, + }), }), - }), - ]), - workflowId: workflow.id, + ]), + workflowId: workflow.id, + }, }, [memberWithAccess.id], );