-
Notifications
You must be signed in to change notification settings - Fork 9.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(editor): Prevent creation of input connections for nodes without …
…input slot (#5425) * fix(editor): Prevent creation of input connections for nodes without input * WIP: Workflow checks service and controller * fix: Created SQLite migration to remove broken connections * Cleanup & add mysql/posgres migrations * Linter fixes * Unify the migration scripts * Escape migration workflow_entity * Wrap the migration in try/catch and do not parse nodes and connection if mysql/postgres * Do migration changes also fro mysql * refactor: Wrap only the necessary call in try catch block --------- Co-authored-by: Omar Ajoue <krynble@gmail.com>
- Loading branch information
Showing
8 changed files
with
301 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
91 changes: 91 additions & 0 deletions
91
...ges/cli/src/databases/migrations/mysqldb/1675940580449-PurgeInvalidWorkflowConnections.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
import { MigrationInterface, QueryRunner } from 'typeorm'; | ||
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers'; | ||
import { NodeTypes } from '@/NodeTypes'; | ||
import { IConnections, INode } from 'n8n-workflow'; | ||
import { getLogger } from '@/Logger'; | ||
|
||
export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationInterface { | ||
name = 'PurgeInvalidWorkflowConnections1675940580449'; | ||
|
||
async up(queryRunner: QueryRunner): Promise<void> { | ||
logMigrationStart(this.name); | ||
|
||
const tablePrefix = getTablePrefix(); | ||
|
||
const workflows: Array<{ id: number; nodes: INode[]; connections: IConnections }> = | ||
await queryRunner.query(` | ||
SELECT id, nodes, connections | ||
FROM \`${tablePrefix}workflow_entity\` | ||
`); | ||
|
||
const nodeTypes = NodeTypes(); | ||
|
||
workflows.forEach(async (workflow) => { | ||
let connections: IConnections = workflow.connections; | ||
const nodes: INode[] = workflow.nodes; | ||
|
||
const nodesThatCannotReceiveInput: string[] = nodes.reduce((acc, node) => { | ||
try { | ||
const nodeType = nodeTypes.getByNameAndVersion(node.type, node.typeVersion); | ||
if ((nodeType.description.inputs?.length ?? []) === 0) { | ||
acc.push(node.name); | ||
} | ||
} catch (error) { | ||
getLogger().warn(`Migration ${this.name} failed with error: ${error.message}`); | ||
} | ||
return acc; | ||
}, [] as string[]); | ||
|
||
Object.keys(connections).forEach((sourceNodeName) => { | ||
const connection = connections[sourceNodeName]; | ||
const outputs = Object.keys(connection); | ||
|
||
outputs.forEach((outputConnectionName /* Like `main` */, idx) => { | ||
const outputConnection = connection[outputConnectionName]; | ||
|
||
// It filters out all connections that are connected to a node that cannot receive input | ||
outputConnection.forEach((outputConnectionItem, outputConnectionItemIdx) => { | ||
outputConnection[outputConnectionItemIdx] = outputConnectionItem.filter( | ||
(outgoingConnections) => | ||
!nodesThatCannotReceiveInput.includes(outgoingConnections.node), | ||
); | ||
}); | ||
|
||
// Filter out output connection items that are empty | ||
connection[outputConnectionName] = connection[outputConnectionName].filter( | ||
(item) => item.length > 0, | ||
); | ||
|
||
// Delete the output connection container if it is empty | ||
if (connection[outputConnectionName].length === 0) { | ||
delete connection[outputConnectionName]; | ||
} | ||
}); | ||
|
||
// Finally delete the source node if it has no output connections | ||
if (Object.keys(connection).length === 0) { | ||
delete connections[sourceNodeName]; | ||
} | ||
}); | ||
|
||
// Update database with new connections | ||
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( | ||
` | ||
UPDATE \`${tablePrefix}workflow_entity\` | ||
SET connections = :connections | ||
WHERE id = '${workflow.id}' | ||
`, | ||
{ connections: JSON.stringify(connections) }, | ||
{}, | ||
); | ||
|
||
await queryRunner.query(updateQuery, updateParams); | ||
}); | ||
|
||
logMigrationEnd(this.name); | ||
} | ||
|
||
async down(queryRunner: QueryRunner): Promise<void> { | ||
// No need to revert this migration | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
90 changes: 90 additions & 0 deletions
90
.../cli/src/databases/migrations/postgresdb/1675940580449-PurgeInvalidWorkflowConnections.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
import { MigrationInterface, QueryRunner } from 'typeorm'; | ||
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers'; | ||
import { NodeTypes } from '@/NodeTypes'; | ||
import { IConnections, INode } from 'n8n-workflow'; | ||
import { getLogger } from '@/Logger'; | ||
export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationInterface { | ||
name = 'PurgeInvalidWorkflowConnections1675940580449'; | ||
|
||
async up(queryRunner: QueryRunner): Promise<void> { | ||
logMigrationStart(this.name); | ||
|
||
const tablePrefix = getTablePrefix(); | ||
|
||
const workflows: Array<{ id: number; nodes: INode[]; connections: IConnections }> = | ||
await queryRunner.query(` | ||
SELECT id, nodes, connections | ||
FROM "${tablePrefix}workflow_entity" | ||
`); | ||
|
||
const nodeTypes = NodeTypes(); | ||
|
||
workflows.forEach(async (workflow) => { | ||
let connections: IConnections = workflow.connections; | ||
const nodes: INode[] = workflow.nodes; | ||
|
||
const nodesThatCannotReceiveInput: string[] = nodes.reduce((acc, node) => { | ||
try { | ||
const nodeType = nodeTypes.getByNameAndVersion(node.type, node.typeVersion); | ||
if ((nodeType.description.inputs?.length ?? []) === 0) { | ||
acc.push(node.name); | ||
} | ||
} catch (error) { | ||
getLogger().warn(`Migration ${this.name} failed with error: ${error.message}`); | ||
} | ||
return acc; | ||
}, [] as string[]); | ||
|
||
Object.keys(connections).forEach((sourceNodeName) => { | ||
const connection = connections[sourceNodeName]; | ||
const outputs = Object.keys(connection); | ||
|
||
outputs.forEach((outputConnectionName /* Like `main` */, idx) => { | ||
const outputConnection = connection[outputConnectionName]; | ||
|
||
// It filters out all connections that are connected to a node that cannot receive input | ||
outputConnection.forEach((outputConnectionItem, outputConnectionItemIdx) => { | ||
outputConnection[outputConnectionItemIdx] = outputConnectionItem.filter( | ||
(outgoingConnections) => | ||
!nodesThatCannotReceiveInput.includes(outgoingConnections.node), | ||
); | ||
}); | ||
|
||
// Filter out output connection items that are empty | ||
connection[outputConnectionName] = connection[outputConnectionName].filter( | ||
(item) => item.length > 0, | ||
); | ||
|
||
// Delete the output connection container if it is empty | ||
if (connection[outputConnectionName].length === 0) { | ||
delete connection[outputConnectionName]; | ||
} | ||
}); | ||
|
||
// Finally delete the source node if it has no output connections | ||
if (Object.keys(connection).length === 0) { | ||
delete connections[sourceNodeName]; | ||
} | ||
}); | ||
|
||
// Update database with new connections | ||
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( | ||
` | ||
UPDATE "${tablePrefix}workflow_entity" | ||
SET connections = :connections | ||
WHERE id = '${workflow.id}' | ||
`, | ||
{ connections: JSON.stringify(connections) }, | ||
{}, | ||
); | ||
|
||
await queryRunner.query(updateQuery, updateParams); | ||
}); | ||
|
||
logMigrationEnd(this.name); | ||
} | ||
|
||
async down(queryRunner: QueryRunner): Promise<void> { | ||
// No need to revert this migration | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
91 changes: 91 additions & 0 deletions
91
...ages/cli/src/databases/migrations/sqlite/1675940580449-PurgeInvalidWorkflowConnections.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
import { MigrationInterface, QueryRunner } from 'typeorm'; | ||
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers'; | ||
import { NodeTypes } from '@/NodeTypes'; | ||
import { IConnections, INode } from 'n8n-workflow'; | ||
import { getLogger } from '@/Logger'; | ||
|
||
export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationInterface { | ||
name = 'PurgeInvalidWorkflowConnections1675940580449'; | ||
|
||
async up(queryRunner: QueryRunner): Promise<void> { | ||
logMigrationStart(this.name); | ||
|
||
const tablePrefix = getTablePrefix(); | ||
|
||
const workflows: Array<{ id: number; nodes: string; connections: string }> = | ||
await queryRunner.query(` | ||
SELECT id, nodes, connections | ||
FROM "${tablePrefix}workflow_entity" | ||
`); | ||
|
||
const nodeTypes = NodeTypes(); | ||
|
||
workflows.forEach(async (workflow) => { | ||
let connections: IConnections = JSON.parse(workflow.connections); | ||
const nodes: INode[] = JSON.parse(workflow.nodes); | ||
|
||
const nodesThatCannotReceiveInput: string[] = nodes.reduce((acc, node) => { | ||
try { | ||
const nodeType = nodeTypes.getByNameAndVersion(node.type, node.typeVersion); | ||
if ((nodeType.description.inputs?.length ?? []) === 0) { | ||
acc.push(node.name); | ||
} | ||
} catch (error) { | ||
getLogger().warn(`Migration ${this.name} failed with error: ${error.message}`); | ||
} | ||
return acc; | ||
}, [] as string[]); | ||
|
||
Object.keys(connections).forEach((sourceNodeName) => { | ||
const connection = connections[sourceNodeName]; | ||
const outputs = Object.keys(connection); | ||
|
||
outputs.forEach((outputConnectionName /* Like `main` */, idx) => { | ||
const outputConnection = connection[outputConnectionName]; | ||
|
||
// It filters out all connections that are connected to a node that cannot receive input | ||
outputConnection.forEach((outputConnectionItem, outputConnectionItemIdx) => { | ||
outputConnection[outputConnectionItemIdx] = outputConnectionItem.filter( | ||
(outgoingConnections) => | ||
!nodesThatCannotReceiveInput.includes(outgoingConnections.node), | ||
); | ||
}); | ||
|
||
// Filter out output connection items that are empty | ||
connection[outputConnectionName] = connection[outputConnectionName].filter( | ||
(item) => item.length > 0, | ||
); | ||
|
||
// Delete the output connection container if it is empty | ||
if (connection[outputConnectionName].length === 0) { | ||
delete connection[outputConnectionName]; | ||
} | ||
}); | ||
|
||
// Finally delete the source node if it has no output connections | ||
if (Object.keys(connection).length === 0) { | ||
delete connections[sourceNodeName]; | ||
} | ||
}); | ||
|
||
// Update database with new connections | ||
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( | ||
` | ||
UPDATE "${tablePrefix}workflow_entity" | ||
SET connections = :connections | ||
WHERE id = '${workflow.id}' | ||
`, | ||
{ connections: JSON.stringify(connections) }, | ||
{}, | ||
); | ||
|
||
await queryRunner.query(updateQuery, updateParams); | ||
}); | ||
|
||
logMigrationEnd(this.name); | ||
} | ||
|
||
async down(queryRunner: QueryRunner): Promise<void> { | ||
// No need to revert this migration | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.