diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 1722ff56afdf1..990c9a78c39c9 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -39,6 +39,7 @@ import { IOAuth2Options, IPollFunctions, IRunExecutionData, + ISourceData, ITaskDataConnections, ITriggerFunctions, IWebhookData, @@ -2320,6 +2321,13 @@ export function getExecuteFunctions( return inputData[inputName][inputIndex] as INodeExecutionData[]; }, + getInputSourceData: (inputIndex = 0, inputName = 'main') => { + if (executeData?.source === null) { + // Should never happen as n8n sets it automatically + throw new Error('Source data is missing!'); + } + return executeData.source[inputName][inputIndex]; + }, getNodeParameter: ( parameterName: string, itemIndex: number, @@ -2580,6 +2588,13 @@ export function getExecuteSingleFunctions( return allItems[itemIndex]; }, + getInputSourceData: (inputIndex = 0, inputName = 'main') => { + if (executeData?.source === null) { + // Should never happen as n8n sets it automatically + throw new Error('Source data is missing!'); + } + return executeData.source[inputName][inputIndex] as ISourceData; + }, getItemIndex() { return itemIndex; }, diff --git a/packages/nodes-base/nodes/SplitInBatches/SplitInBatches.node.ts b/packages/nodes-base/nodes/SplitInBatches/SplitInBatches.node.ts index e9ad33c774fdb..510f954d07f93 100644 --- a/packages/nodes-base/nodes/SplitInBatches/SplitInBatches.node.ts +++ b/packages/nodes-base/nodes/SplitInBatches/SplitInBatches.node.ts @@ -1,5 +1,11 @@ import { IExecuteFunctions } from 'n8n-core'; -import { INodeExecutionData, INodeType, INodeTypeDescription } from 'n8n-workflow'; +import { + deepCopy, + INodeExecutionData, + INodeType, + INodeTypeDescription, + IPairedItemData, +} from 'n8n-workflow'; export class SplitInBatches implements INodeType { description: INodeTypeDescription = { @@ -69,18 +75,56 @@ export class SplitInBatches implements INodeType { if (nodeContext.items === undefined || options.reset === true) { // Is the first time the node runs + const sourceData = this.getInputSourceData(); + nodeContext.currentRunIndex = 0; nodeContext.maxRunIndex = Math.ceil(items.length / batchSize); + nodeContext.sourceData = deepCopy(sourceData); // Get the items which should be returned returnItems.push.apply(returnItems, items.splice(0, batchSize)); // Set the other items to be saved in the context to return at later runs - nodeContext.items = items; + nodeContext.items = [...items]; } else { // The node has been called before. So return the next batch of items. nodeContext.currentRunIndex += 1; returnItems.push.apply(returnItems, nodeContext.items.splice(0, batchSize)); + + const addSourceOverwrite = (pairedItem: IPairedItemData | number): IPairedItemData => { + if (typeof pairedItem === 'number') { + return { + item: pairedItem, + sourceOverwrite: nodeContext.sourceData, + }; + } + + return { + ...pairedItem, + sourceOverwrite: nodeContext.sourceData, + }; + }; + + function getPairedItemInformation( + item: INodeExecutionData, + ): IPairedItemData | IPairedItemData[] { + if (item.pairedItem === undefined) { + return { + item: 0, + sourceOverwrite: nodeContext.sourceData, + }; + } + + if (Array.isArray(item.pairedItem)) { + return item.pairedItem.map(addSourceOverwrite); + } + + return addSourceOverwrite(item.pairedItem); + } + + returnItems.map((item) => { + item.pairedItem = getPairedItemInformation(item); + }); } nodeContext.noItemsLeft = nodeContext.items.length === 0; @@ -90,12 +134,6 @@ export class SplitInBatches implements INodeType { return null; } - returnItems.map((item, index) => { - item.pairedItem = { - item: index, - }; - }); - - return this.prepareOutputData(returnItems); + return [returnItems]; } } diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 4e9ac5fbc2529..7d441b748e2f6 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -619,6 +619,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & { getContext(type: string): IContextObject; getCredentials(type: string, itemIndex?: number): Promise; getInputData(inputIndex?: number, inputName?: string): INodeExecutionData[]; + getInputSourceData(inputIndex?: number, inputName?: string): ISourceData; getMode(): WorkflowExecuteMode; getNode(): INode; getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData; @@ -654,6 +655,7 @@ export interface IExecuteSingleFunctions { getContext(type: string): IContextObject; getCredentials(type: string): Promise; getInputData(inputIndex?: number, inputName?: string): INodeExecutionData; + getInputSourceData(inputIndex?: number, inputName?: string): ISourceData; getItemIndex(): number; getMode(): WorkflowExecuteMode; getNode(): INode; @@ -926,6 +928,7 @@ export interface IBinaryKeyData { export interface IPairedItemData { item: number; input?: number; // If undefined "0" gets used + sourceOverwrite?: ISourceData; } export interface INodeExecutionData { diff --git a/packages/workflow/src/WorkflowDataProxy.ts b/packages/workflow/src/WorkflowDataProxy.ts index 45cb7775d60de..c809e7c616e21 100644 --- a/packages/workflow/src/WorkflowDataProxy.ts +++ b/packages/workflow/src/WorkflowDataProxy.ts @@ -710,6 +710,10 @@ export class WorkflowDataProxy { let sourceData: ISourceData | null = incomingSourceData; + if (pairedItem.sourceOverwrite) { + sourceData = pairedItem.sourceOverwrite; + } + if (typeof pairedItem === 'number') { pairedItem = { item: pairedItem, @@ -871,6 +875,10 @@ export class WorkflowDataProxy { nodeBeforeLast = sourceData.previousNode; sourceData = taskData.source[pairedItem.input || 0] || null; + + if (pairedItem.sourceOverwrite) { + sourceData = pairedItem.sourceOverwrite; + } } if (sourceData === null) {