-
Notifications
You must be signed in to change notification settings - Fork 8.9k
/
WaitingWebhooks.ts
143 lines (118 loc) · 4.41 KB
/
WaitingWebhooks.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import { NodeHelpers, Workflow } from 'n8n-workflow';
import { Service } from 'typedi';
import type express from 'express';
import * as WebhookHelpers from '@/WebhookHelpers';
import { NodeTypes } from '@/NodeTypes';
import type {
IExecutionResponse,
IResponseCallbackData,
IWebhookManager,
IWorkflowDb,
WaitingWebhookRequest,
} from '@/Interfaces';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger';
import { ConflictError } from './errors/response-errors/conflict.error';
import { NotFoundError } from './errors/response-errors/not-found.error';
@Service()
export class WaitingWebhooks implements IWebhookManager {
protected includeForms = false;
constructor(
protected readonly logger: Logger,
private readonly nodeTypes: NodeTypes,
private readonly executionRepository: ExecutionRepository,
) {}
// TODO: implement `getWebhookMethods` for CORS support
protected logReceivedWebhook(method: string, executionId: string) {
this.logger.debug(`Received waiting-webhook "${method}" for execution "${executionId}"`);
}
protected disableNode(execution: IExecutionResponse, _method?: string) {
execution.data.executionData!.nodeExecutionStack[0].node.disabled = true;
}
async executeWebhook(
req: WaitingWebhookRequest,
res: express.Response,
): Promise<IResponseCallbackData> {
const { path: executionId, suffix } = req.params;
this.logReceivedWebhook(req.method, executionId);
// Reset request parameters
req.params = {} as WaitingWebhookRequest['params'];
const execution = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
if (!execution) {
throw new NotFoundError(`The execution "${executionId} does not exist.`);
}
if (execution.status === 'running') {
throw new ConflictError(`The execution "${executionId} is running already.`);
}
if (execution.finished || execution.data.resultData.error) {
throw new ConflictError(`The execution "${executionId} has finished already.`);
}
const lastNodeExecuted = execution.data.resultData.lastNodeExecuted as string;
// Set the node as disabled so that the data does not get executed again as it would result
// in starting the wait all over again
this.disableNode(execution, req.method);
// Remove waitTill information else the execution would stop
execution.data.waitTill = undefined;
// Remove the data of the node execution again else it will display the node as executed twice
execution.data.resultData.runData[lastNodeExecuted].pop();
const { workflowData } = execution;
const workflow = new Workflow({
id: workflowData.id,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});
const workflowStartNode = workflow.getNode(lastNodeExecuted);
if (workflowStartNode === null) {
throw new NotFoundError('Could not find node to process webhook.');
}
const additionalData = await WorkflowExecuteAdditionalData.getBase();
const webhookData = NodeHelpers.getNodeWebhooks(
workflow,
workflowStartNode,
additionalData,
).find(
(webhook) =>
webhook.httpMethod === req.method &&
webhook.path === (suffix ?? '') &&
webhook.webhookDescription.restartWebhook === true &&
(webhook.webhookDescription.isForm || false) === this.includeForms,
);
if (webhookData === undefined) {
// If no data got found it means that the execution can not be started via a webhook.
// Return 404 because we do not want to give any data if the execution exists or not.
const errorMessage = `The workflow for execution "${executionId}" does not contain a waiting webhook with a matching path/method.`;
throw new NotFoundError(errorMessage);
}
const runExecutionData = execution.data;
return await new Promise((resolve, reject) => {
const executionMode = 'webhook';
void WebhookHelpers.executeWebhook(
workflow,
webhookData,
workflowData as IWorkflowDb,
workflowStartNode,
executionMode,
undefined,
runExecutionData,
execution.id,
req,
res,
(error: Error | null, data: object) => {
if (error !== null) {
return reject(error);
}
resolve(data);
},
);
});
}
}