From 77d6e3fc0771872de895e61e0116142ae10b02b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 25 Sep 2023 18:04:52 +0200 Subject: [PATCH] refactor(core): Include workflow ID in binary data writes (no-changelog) (#7220) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Depends on: https://github.com/n8n-io/n8n/pull/7195 | Story: [PAY-837](https://linear.app/n8n/issue/PAY-837/implement-object-store-manager-for-binary-data) This PR includes `workflowId` in binary data writes so that the S3 manager can support this filepath structure `/workflows/{workflowId}/executions/{executionId}/binaryData/{binaryFilename}` to easily delete binary data for workflows. Also all binary data service and manager methods that take `workflowId` and `executionId` are made consistent in arg order. Note: `workflowId` is included in filesystem mode for compatibility with the common interface, but `workflowId` will remain unused by filesystem mode until we decide to restructure how this mode stores data. --------- Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ --- packages/cli/src/ActiveWebhooks.ts | 2 +- packages/cli/src/ActiveWorkflowRunner.ts | 2 +- .../cli/src/workflows/workflows.services.ts | 2 +- .../test/unit/ActiveWorkflowRunner.test.ts | 2 +- .../core/src/BinaryData/BinaryData.service.ts | 53 +++++++++++---- .../core/src/BinaryData/FileSystem.manager.ts | 38 ++++++----- packages/core/src/BinaryData/types.ts | 22 +++---- packages/core/src/NodeExecuteFunctions.ts | 66 ++++++++++++------- packages/workflow/src/Workflow.ts | 4 +- 9 files changed, 121 insertions(+), 70 deletions(-) diff --git a/packages/cli/src/ActiveWebhooks.ts b/packages/cli/src/ActiveWebhooks.ts index 42e2425634898..b594a1b795070 100644 --- a/packages/cli/src/ActiveWebhooks.ts +++ b/packages/cli/src/ActiveWebhooks.ts @@ -157,7 +157,7 @@ export class ActiveWebhooks { * */ async removeWorkflow(workflow: Workflow): Promise { - const workflowId = workflow.id!.toString(); + const workflowId = workflow.id; if (this.workflowWebhooks[workflowId] === undefined) { // If it did not exist then there is nothing to remove diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index f664d7d42348a..f60b3ffcdb28f 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -417,7 +417,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { } try { - await this.removeWorkflowWebhooks(workflow.id as string); + await this.removeWorkflowWebhooks(workflow.id); } catch (error1) { ErrorReporter.error(error1); Logger.error( diff --git a/packages/cli/src/workflows/workflows.services.ts b/packages/cli/src/workflows/workflows.services.ts index e8f566380245f..5a466e65ff57c 100644 --- a/packages/cli/src/workflows/workflows.services.ts +++ b/packages/cli/src/workflows/workflows.services.ts @@ -492,7 +492,7 @@ export class WorkflowsService { // Workflow is saved so update in database try { // eslint-disable-next-line @typescript-eslint/no-use-before-define - await WorkflowsService.saveStaticDataById(workflow.id!, workflow.staticData); + await WorkflowsService.saveStaticDataById(workflow.id, workflow.staticData); workflow.staticData.__dataChanged = false; } catch (error) { ErrorReporter.error(error); diff --git a/packages/cli/test/unit/ActiveWorkflowRunner.test.ts b/packages/cli/test/unit/ActiveWorkflowRunner.test.ts index 0622dac9a2044..0ba430b820500 100644 --- a/packages/cli/test/unit/ActiveWorkflowRunner.test.ts +++ b/packages/cli/test/unit/ActiveWorkflowRunner.test.ts @@ -100,7 +100,7 @@ jest.mock('@/Db', () => { find: jest.fn(async () => generateWorkflows(databaseActiveWorkflowsCount)), findOne: jest.fn(async (searchParams) => { return databaseActiveWorkflowsList.find( - (workflow) => workflow.id.toString() === searchParams.where.id.toString(), + (workflow) => workflow.id === searchParams.where.id.toString(), ); }), update: jest.fn(), diff --git a/packages/core/src/BinaryData/BinaryData.service.ts b/packages/core/src/BinaryData/BinaryData.service.ts index 4f6b63b36768d..8b67993c407a7 100644 --- a/packages/core/src/BinaryData/BinaryData.service.ts +++ b/packages/core/src/BinaryData/BinaryData.service.ts @@ -35,21 +35,33 @@ export class BinaryDataService { } @LogCatch((error) => Logger.error('Failed to copy binary data file', { error })) - async copyBinaryFile(binaryData: IBinaryData, path: string, executionId: string) { + async copyBinaryFile( + workflowId: string, + executionId: string, + binaryData: IBinaryData, + filePath: string, + ) { const manager = this.managers[this.mode]; if (!manager) { - const { size } = await stat(path); + const { size } = await stat(filePath); binaryData.fileSize = prettyBytes(size); - binaryData.data = await readFile(path, { encoding: BINARY_ENCODING }); + binaryData.data = await readFile(filePath, { encoding: BINARY_ENCODING }); return binaryData; } - const { fileId, fileSize } = await manager.copyByFilePath(path, executionId, { + const metadata = { fileName: binaryData.fileName, mimeType: binaryData.mimeType, - }); + }; + + const { fileId, fileSize } = await manager.copyByFilePath( + workflowId, + executionId, + filePath, + metadata, + ); binaryData.id = this.createBinaryDataId(fileId); binaryData.fileSize = prettyBytes(fileSize); @@ -59,7 +71,12 @@ export class BinaryDataService { } @LogCatch((error) => Logger.error('Failed to write binary data file', { error })) - async store(binaryData: IBinaryData, bufferOrStream: Buffer | Readable, executionId: string) { + async store( + workflowId: string, + executionId: string, + bufferOrStream: Buffer | Readable, + binaryData: IBinaryData, + ) { const manager = this.managers[this.mode]; if (!manager) { @@ -70,10 +87,17 @@ export class BinaryDataService { return binaryData; } - const { fileId, fileSize } = await manager.store(bufferOrStream, executionId, { + const metadata = { fileName: binaryData.fileName, mimeType: binaryData.mimeType, - }); + }; + + const { fileId, fileSize } = await manager.store( + workflowId, + executionId, + bufferOrStream, + metadata, + ); binaryData.id = this.createBinaryDataId(fileId); binaryData.fileSize = prettyBytes(fileSize); @@ -128,7 +152,11 @@ export class BinaryDataService { @LogCatch((error) => Logger.error('Failed to copy all binary data files for execution', { error }), ) - async duplicateBinaryData(inputData: Array, executionId: string) { + async duplicateBinaryData( + workflowId: string, + executionId: string, + inputData: Array, + ) { if (inputData && this.managers[this.mode]) { const returnInputData = (inputData as INodeExecutionData[][]).map( async (executionDataArray) => { @@ -136,7 +164,7 @@ export class BinaryDataService { return Promise.all( executionDataArray.map(async (executionData) => { if (executionData.binary) { - return this.duplicateBinaryDataInExecData(executionData, executionId); + return this.duplicateBinaryDataInExecData(workflowId, executionId, executionData); } return executionData; @@ -174,8 +202,9 @@ export class BinaryDataService { } private async duplicateBinaryDataInExecData( - executionData: INodeExecutionData, + workflowId: string, executionId: string, + executionData: INodeExecutionData, ) { const manager = this.managers[this.mode]; @@ -193,7 +222,7 @@ export class BinaryDataService { const [_mode, fileId] = binaryDataId.split(':'); - return manager?.copyByFileId(fileId, executionId).then((newFileId) => ({ + return manager?.copyByFileId(workflowId, executionId, fileId).then((newFileId) => ({ newId: this.createBinaryDataId(newFileId), key, })); diff --git a/packages/core/src/BinaryData/FileSystem.manager.ts b/packages/core/src/BinaryData/FileSystem.manager.ts index d103a1b0c617d..ee2ce6b130f7b 100644 --- a/packages/core/src/BinaryData/FileSystem.manager.ts +++ b/packages/core/src/BinaryData/FileSystem.manager.ts @@ -1,3 +1,9 @@ +/** + * @tech_debt The `workflowId` arguments on write are for compatibility with the + * `BinaryData.Manager` interface. Unused in filesystem mode until we refactor + * how we store binary data files in the `/binaryData` dir. + */ + import { createReadStream } from 'fs'; import fs from 'fs/promises'; import path from 'path'; @@ -25,17 +31,6 @@ export class FileSystemManager implements BinaryData.Manager { return this.resolvePath(fileId); } - async getSize(fileId: string) { - const filePath = this.getPath(fileId); - - try { - const stats = await fs.stat(filePath); - return stats.size; - } catch (error) { - throw new Error('Failed to find binary data file in filesystem', { cause: error }); - } - } - async getAsStream(fileId: string, chunkSize?: number) { const filePath = this.getPath(fileId); @@ -59,14 +54,15 @@ export class FileSystemManager implements BinaryData.Manager { } async store( - binaryData: Buffer | Readable, + _workflowId: string, executionId: string, + bufferOrStream: Buffer | Readable, { mimeType, fileName }: BinaryData.PreWriteMetadata, ) { const fileId = this.createFileId(executionId); const filePath = this.getPath(fileId); - await fs.writeFile(filePath, binaryData); + await fs.writeFile(filePath, bufferOrStream); const fileSize = await this.getSize(fileId); @@ -102,8 +98,9 @@ export class FileSystemManager implements BinaryData.Manager { } async copyByFilePath( - filePath: string, + _workflowId: string, executionId: string, + filePath: string, { mimeType, fileName }: BinaryData.PreWriteMetadata, ) { const newFileId = this.createFileId(executionId); @@ -117,7 +114,7 @@ export class FileSystemManager implements BinaryData.Manager { return { fileId: newFileId, fileSize }; } - async copyByFileId(fileId: string, executionId: string) { + async copyByFileId(_workflowId: string, executionId: string, fileId: string) { const newFileId = this.createFileId(executionId); await fs.copyFile(this.resolvePath(fileId), this.resolvePath(newFileId)); @@ -158,4 +155,15 @@ export class FileSystemManager implements BinaryData.Manager { await fs.writeFile(filePath, JSON.stringify(metadata), { encoding: 'utf-8' }); } + + private async getSize(fileId: string) { + const filePath = this.getPath(fileId); + + try { + const stats = await fs.stat(filePath); + return stats.size; + } catch (error) { + throw new Error('Failed to find binary data file in filesystem', { cause: error }); + } + } } diff --git a/packages/core/src/BinaryData/types.ts b/packages/core/src/BinaryData/types.ts index 8bc628d777366..4538b4d7e3037 100644 --- a/packages/core/src/BinaryData/types.ts +++ b/packages/core/src/BinaryData/types.ts @@ -16,36 +16,34 @@ export namespace BinaryData { fileSize: number; }; + export type WriteResult = { fileId: string; fileSize: number }; + export type PreWriteMetadata = Omit; export interface Manager { init(): Promise; store( - binaryData: Buffer | Readable, + workflowId: string, executionId: string, - preStoreMetadata: PreWriteMetadata, - ): Promise<{ fileId: string; fileSize: number }>; + bufferOrStream: Buffer | Readable, + metadata: PreWriteMetadata, + ): Promise; getPath(fileId: string): string; getAsBuffer(fileId: string): Promise; getAsStream(fileId: string, chunkSize?: number): Promise; getMetadata(fileId: string): Promise; - // @TODO: Refactor to also use `workflowId` to support full path-like identifier: - // `workflows/{workflowId}/executions/{executionId}/binary_data/{fileId}` + copyByFileId(workflowId: string, executionId: string, fileId: string): Promise; copyByFilePath( - path: string, + workflowId: string, executionId: string, + filePath: string, metadata: PreWriteMetadata, - ): Promise<{ fileId: string; fileSize: number }>; - - copyByFileId(fileId: string, prefix: string): Promise; + ): Promise; deleteOne(fileId: string): Promise; - - // @TODO: Refactor to also receive `workflowId` to support full path-like identifier: - // `workflows/{workflowId}/executions/{executionId}/binary_data/{fileId}` deleteManyByExecutionIds(executionIds: string[]): Promise; rename(oldFileId: string, newFileId: string): Promise; diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 3815f45f8f850..1bdd74d0038d2 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -999,19 +999,26 @@ export async function getBinaryDataBuffer( * Store an incoming IBinaryData & related buffer using the configured binary data manager. * * @export - * @param {IBinaryData} data - * @param {Buffer | Readable} binaryData + * @param {IBinaryData} binaryData + * @param {Buffer | Readable} bufferOrStream * @returns {Promise} */ export async function setBinaryDataBuffer( - data: IBinaryData, - binaryData: Buffer | Readable, + binaryData: IBinaryData, + bufferOrStream: Buffer | Readable, + workflowId: string, executionId: string, ): Promise { - return Container.get(BinaryDataService).store(data, binaryData, executionId); + return Container.get(BinaryDataService).store( + workflowId, + executionId, + bufferOrStream, + binaryData, + ); } export async function copyBinaryFile( + workflowId: string, executionId: string, filePath: string, fileName: string, @@ -1061,7 +1068,12 @@ export async function copyBinaryFile( returnData.fileName = path.parse(filePath).base; } - return Container.get(BinaryDataService).copyBinaryFile(returnData, filePath, executionId); + return Container.get(BinaryDataService).copyBinaryFile( + workflowId, + executionId, + returnData, + filePath, + ); } /** @@ -1071,6 +1083,7 @@ export async function copyBinaryFile( async function prepareBinaryData( binaryData: Buffer | Readable, executionId: string, + workflowId: string, filePath?: string, mimeType?: string, ): Promise { @@ -1152,7 +1165,7 @@ async function prepareBinaryData( } } - return setBinaryDataBuffer(returnData, binaryData, executionId); + return setBinaryDataBuffer(returnData, binaryData, workflowId, executionId); } /** @@ -2324,7 +2337,7 @@ export function getNodeWebhookUrl( undefined, false, ) as boolean; - return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id!, node, path.toString(), isFullPath); + return NodeHelpers.getNodeWebhookUrl(baseUrl, workflow.id, node, path.toString(), isFullPath); } /** @@ -2560,25 +2573,27 @@ const getFileSystemHelperFunctions = (node: INode): FileSystemHelperFunctions => }, }); -const getNodeHelperFunctions = ({ - executionId, -}: IWorkflowExecuteAdditionalData): NodeHelperFunctions => ({ +const getNodeHelperFunctions = ( + { executionId }: IWorkflowExecuteAdditionalData, + workflowId: string, +): NodeHelperFunctions => ({ copyBinaryFile: async (filePath, fileName, mimeType) => - copyBinaryFile(executionId!, filePath, fileName, mimeType), + copyBinaryFile(workflowId, executionId!, filePath, fileName, mimeType), }); -const getBinaryHelperFunctions = ({ - executionId, -}: IWorkflowExecuteAdditionalData): BinaryHelperFunctions => ({ +const getBinaryHelperFunctions = ( + { executionId }: IWorkflowExecuteAdditionalData, + workflowId: string, +): BinaryHelperFunctions => ({ getBinaryPath, getBinaryStream, getBinaryMetadata, binaryToBuffer: async (body: Buffer | Readable) => Container.get(BinaryDataService).binaryToBuffer(body), prepareBinaryData: async (binaryData, filePath, mimeType) => - prepareBinaryData(binaryData, executionId!, filePath, mimeType), + prepareBinaryData(binaryData, executionId!, workflowId, filePath, mimeType), setBinaryDataBuffer: async (data, binaryData) => - setBinaryDataBuffer(data, binaryData, executionId!), + setBinaryDataBuffer(data, binaryData, workflowId, executionId!), copyBinaryFile: async () => { throw new Error('copyBinaryFile has been removed. Please upgrade this node'); }, @@ -2638,7 +2653,7 @@ export function getExecutePollFunctions( helpers: { createDeferredPromise, ...getRequestHelperFunctions(workflow, node, additionalData), - ...getBinaryHelperFunctions(additionalData), + ...getBinaryHelperFunctions(additionalData, workflow.id), returnJsonArray, }, }; @@ -2697,7 +2712,7 @@ export function getExecuteTriggerFunctions( helpers: { createDeferredPromise, ...getRequestHelperFunctions(workflow, node, additionalData), - ...getBinaryHelperFunctions(additionalData), + ...getBinaryHelperFunctions(additionalData, workflow.id), returnJsonArray, }, }; @@ -2763,8 +2778,9 @@ export function getExecuteFunctions( }) .then(async (result) => Container.get(BinaryDataService).duplicateBinaryData( - result, + workflow.id, additionalData.executionId!, + result, ), ); }, @@ -2872,7 +2888,7 @@ export function getExecuteFunctions( createDeferredPromise, ...getRequestHelperFunctions(workflow, node, additionalData), ...getFileSystemHelperFunctions(node), - ...getBinaryHelperFunctions(additionalData), + ...getBinaryHelperFunctions(additionalData, workflow.id), assertBinaryData: (itemIndex, propertyName) => assertBinaryData(inputData, node, itemIndex, propertyName, 0), getBinaryDataBuffer: async (itemIndex, propertyName) => @@ -2882,7 +2898,7 @@ export function getExecuteFunctions( normalizeItems, constructExecutionMetaData, }, - nodeHelpers: getNodeHelperFunctions(additionalData), + nodeHelpers: getNodeHelperFunctions(additionalData, workflow.id), }; })(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions; } @@ -3014,7 +3030,7 @@ export function getExecuteSingleFunctions( helpers: { createDeferredPromise, ...getRequestHelperFunctions(workflow, node, additionalData), - ...getBinaryHelperFunctions(additionalData), + ...getBinaryHelperFunctions(additionalData, workflow.id), assertBinaryData: (propertyName, inputIndex = 0) => assertBinaryData(inputData, node, itemIndex, propertyName, inputIndex), @@ -3271,10 +3287,10 @@ export function getExecuteWebhookFunctions( helpers: { createDeferredPromise, ...getRequestHelperFunctions(workflow, node, additionalData), - ...getBinaryHelperFunctions(additionalData), + ...getBinaryHelperFunctions(additionalData, workflow.id), returnJsonArray, }, - nodeHelpers: getNodeHelperFunctions(additionalData), + nodeHelpers: getNodeHelperFunctions(additionalData, workflow.id), }; })(workflow, node); } diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index 06ff5606b0057..6ce26ce302e79 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -56,7 +56,7 @@ function dedupe(arr: T[]): T[] { } export class Workflow { - id: string | undefined; + id: string; name: string | undefined; @@ -92,7 +92,7 @@ export class Workflow { settings?: IWorkflowSettings; pinData?: IPinData; }) { - this.id = parameters.id; + this.id = parameters.id as string; this.name = parameters.name; this.nodeTypes = parameters.nodeTypes; this.pinData = parameters.pinData;