Skip to content

Commit

Permalink
fix(editor): Prevent creation of input connections for nodes without …
Browse files Browse the repository at this point in the history
…input slot (#5425)

* fix(editor): Prevent creation of input connections for nodes without input

* WIP: Workflow checks service and controller

* fix: Created SQLite migration to remove broken connections

* Cleanup & add mysql/posgres migrations

* Linter fixes

* Unify the migration scripts

* Escape migration workflow_entity

* Wrap the migration in try/catch and do not parse nodes and connection if mysql/postgres

* Do migration changes also fro mysql

* refactor: Wrap only the necessary call in try catch block

---------

Co-authored-by: Omar Ajoue <krynble@gmail.com>
  • Loading branch information
OlegIvaniv and krynble authored Feb 9, 2023
1 parent d9a4c2c commit 018f8a3
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 26 deletions.
16 changes: 8 additions & 8 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,14 @@ export class Start extends Command {
const { flags } = this.parse(Start);

try {
// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init();

// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials);

// Start directly with the init of the database to improve startup time
await Db.init().catch(async (error: Error) =>
exitWithCrash('There was an error initializing DB', error),
Expand All @@ -265,18 +273,10 @@ export class Start extends Command {
await Start.generateStaticAssets();
}

// Load all node and credential types
const loadNodesAndCredentials = LoadNodesAndCredentials();
await loadNodesAndCredentials.init();

// Load all external hooks
const externalHooks = ExternalHooks();
await externalHooks.init();

// Add the found types to an instance other parts of the application can use
const nodeTypes = NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials);

// Load the credentials overwrites if any exist
CredentialsOverwrites(credentialTypes);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
import { NodeTypes } from '@/NodeTypes';
import { IConnections, INode } from 'n8n-workflow';
import { getLogger } from '@/Logger';

export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationInterface {
name = 'PurgeInvalidWorkflowConnections1675940580449';

async up(queryRunner: QueryRunner): Promise<void> {
logMigrationStart(this.name);

const tablePrefix = getTablePrefix();

const workflows: Array<{ id: number; nodes: INode[]; connections: IConnections }> =
await queryRunner.query(`
SELECT id, nodes, connections
FROM \`${tablePrefix}workflow_entity\`
`);

const nodeTypes = NodeTypes();

workflows.forEach(async (workflow) => {
let connections: IConnections = workflow.connections;
const nodes: INode[] = workflow.nodes;

const nodesThatCannotReceiveInput: string[] = nodes.reduce((acc, node) => {
try {
const nodeType = nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
if ((nodeType.description.inputs?.length ?? []) === 0) {
acc.push(node.name);
}
} catch (error) {
getLogger().warn(`Migration ${this.name} failed with error: ${error.message}`);
}
return acc;
}, [] as string[]);

Object.keys(connections).forEach((sourceNodeName) => {
const connection = connections[sourceNodeName];
const outputs = Object.keys(connection);

outputs.forEach((outputConnectionName /* Like `main` */, idx) => {
const outputConnection = connection[outputConnectionName];

// It filters out all connections that are connected to a node that cannot receive input
outputConnection.forEach((outputConnectionItem, outputConnectionItemIdx) => {
outputConnection[outputConnectionItemIdx] = outputConnectionItem.filter(
(outgoingConnections) =>
!nodesThatCannotReceiveInput.includes(outgoingConnections.node),
);
});

// Filter out output connection items that are empty
connection[outputConnectionName] = connection[outputConnectionName].filter(
(item) => item.length > 0,
);

// Delete the output connection container if it is empty
if (connection[outputConnectionName].length === 0) {
delete connection[outputConnectionName];
}
});

// Finally delete the source node if it has no output connections
if (Object.keys(connection).length === 0) {
delete connections[sourceNodeName];
}
});

// Update database with new connections
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE \`${tablePrefix}workflow_entity\`
SET connections = :connections
WHERE id = '${workflow.id}'
`,
{ connections: JSON.stringify(connections) },
{},
);

await queryRunner.query(updateQuery, updateParams);
});

logMigrationEnd(this.name);
}

async down(queryRunner: QueryRunner): Promise<void> {
// No need to revert this migration
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/mysqldb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { RemoveWorkflowDataLoadedFlag1671726148420 } from './1671726148420-Remov
import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations';
import { DeleteExecutionsWithWorkflows1673268682475 } from './1673268682475-DeleteExecutionsWithWorkflows';
import { CreateLdapEntities1674509946020 } from './1674509946020-CreateLdapEntities';
import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-PurgeInvalidWorkflowConnections';

export const mysqlMigrations = [
InitialMigration1588157391238,
Expand Down Expand Up @@ -64,4 +65,5 @@ export const mysqlMigrations = [
MessageEventBusDestinations1671535397530,
DeleteExecutionsWithWorkflows1673268682475,
CreateLdapEntities1674509946020,
PurgeInvalidWorkflowConnections1675940580449
];
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
import { NodeTypes } from '@/NodeTypes';
import { IConnections, INode } from 'n8n-workflow';
import { getLogger } from '@/Logger';
export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationInterface {
name = 'PurgeInvalidWorkflowConnections1675940580449';

async up(queryRunner: QueryRunner): Promise<void> {
logMigrationStart(this.name);

const tablePrefix = getTablePrefix();

const workflows: Array<{ id: number; nodes: INode[]; connections: IConnections }> =
await queryRunner.query(`
SELECT id, nodes, connections
FROM "${tablePrefix}workflow_entity"
`);

const nodeTypes = NodeTypes();

workflows.forEach(async (workflow) => {
let connections: IConnections = workflow.connections;
const nodes: INode[] = workflow.nodes;

const nodesThatCannotReceiveInput: string[] = nodes.reduce((acc, node) => {
try {
const nodeType = nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
if ((nodeType.description.inputs?.length ?? []) === 0) {
acc.push(node.name);
}
} catch (error) {
getLogger().warn(`Migration ${this.name} failed with error: ${error.message}`);
}
return acc;
}, [] as string[]);

Object.keys(connections).forEach((sourceNodeName) => {
const connection = connections[sourceNodeName];
const outputs = Object.keys(connection);

outputs.forEach((outputConnectionName /* Like `main` */, idx) => {
const outputConnection = connection[outputConnectionName];

// It filters out all connections that are connected to a node that cannot receive input
outputConnection.forEach((outputConnectionItem, outputConnectionItemIdx) => {
outputConnection[outputConnectionItemIdx] = outputConnectionItem.filter(
(outgoingConnections) =>
!nodesThatCannotReceiveInput.includes(outgoingConnections.node),
);
});

// Filter out output connection items that are empty
connection[outputConnectionName] = connection[outputConnectionName].filter(
(item) => item.length > 0,
);

// Delete the output connection container if it is empty
if (connection[outputConnectionName].length === 0) {
delete connection[outputConnectionName];
}
});

// Finally delete the source node if it has no output connections
if (Object.keys(connection).length === 0) {
delete connections[sourceNodeName];
}
});

// Update database with new connections
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE "${tablePrefix}workflow_entity"
SET connections = :connections
WHERE id = '${workflow.id}'
`,
{ connections: JSON.stringify(connections) },
{},
);

await queryRunner.query(updateQuery, updateParams);
});

logMigrationEnd(this.name);
}

async down(queryRunner: QueryRunner): Promise<void> {
// No need to revert this migration
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/postgresdb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { RemoveWorkflowDataLoadedFlag1671726148421 } from './1671726148421-Remov
import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations';
import { DeleteExecutionsWithWorkflows1673268682475 } from './1673268682475-DeleteExecutionsWithWorkflows';
import { CreateLdapEntities1674509946020 } from './1674509946020-CreateLdapEntities';
import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-PurgeInvalidWorkflowConnections';

export const postgresMigrations = [
InitialMigration1587669153312,
Expand Down Expand Up @@ -60,4 +61,5 @@ export const postgresMigrations = [
MessageEventBusDestinations1671535397530,
DeleteExecutionsWithWorkflows1673268682475,
CreateLdapEntities1674509946020,
PurgeInvalidWorkflowConnections1675940580449
];
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
import { NodeTypes } from '@/NodeTypes';
import { IConnections, INode } from 'n8n-workflow';
import { getLogger } from '@/Logger';

export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationInterface {
name = 'PurgeInvalidWorkflowConnections1675940580449';

async up(queryRunner: QueryRunner): Promise<void> {
logMigrationStart(this.name);

const tablePrefix = getTablePrefix();

const workflows: Array<{ id: number; nodes: string; connections: string }> =
await queryRunner.query(`
SELECT id, nodes, connections
FROM "${tablePrefix}workflow_entity"
`);

const nodeTypes = NodeTypes();

workflows.forEach(async (workflow) => {
let connections: IConnections = JSON.parse(workflow.connections);
const nodes: INode[] = JSON.parse(workflow.nodes);

const nodesThatCannotReceiveInput: string[] = nodes.reduce((acc, node) => {
try {
const nodeType = nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
if ((nodeType.description.inputs?.length ?? []) === 0) {
acc.push(node.name);
}
} catch (error) {
getLogger().warn(`Migration ${this.name} failed with error: ${error.message}`);
}
return acc;
}, [] as string[]);

Object.keys(connections).forEach((sourceNodeName) => {
const connection = connections[sourceNodeName];
const outputs = Object.keys(connection);

outputs.forEach((outputConnectionName /* Like `main` */, idx) => {
const outputConnection = connection[outputConnectionName];

// It filters out all connections that are connected to a node that cannot receive input
outputConnection.forEach((outputConnectionItem, outputConnectionItemIdx) => {
outputConnection[outputConnectionItemIdx] = outputConnectionItem.filter(
(outgoingConnections) =>
!nodesThatCannotReceiveInput.includes(outgoingConnections.node),
);
});

// Filter out output connection items that are empty
connection[outputConnectionName] = connection[outputConnectionName].filter(
(item) => item.length > 0,
);

// Delete the output connection container if it is empty
if (connection[outputConnectionName].length === 0) {
delete connection[outputConnectionName];
}
});

// Finally delete the source node if it has no output connections
if (Object.keys(connection).length === 0) {
delete connections[sourceNodeName];
}
});

// Update database with new connections
const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
UPDATE "${tablePrefix}workflow_entity"
SET connections = :connections
WHERE id = '${workflow.id}'
`,
{ connections: JSON.stringify(connections) },
{},
);

await queryRunner.query(updateQuery, updateParams);
});

logMigrationEnd(this.name);
}

async down(queryRunner: QueryRunner): Promise<void> {
// No need to revert this migration
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/sqlite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { RemoveWorkflowDataLoadedFlag1671726148419 } from './1671726148419-Remov
import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations';
import { DeleteExecutionsWithWorkflows1673268682475 } from './1673268682475-DeleteExecutionsWithWorkflows';
import { CreateLdapEntities1674509946020 } from './1674509946020-CreateLdapEntities';
import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-PurgeInvalidWorkflowConnections';

const sqliteMigrations = [
InitialMigration1588102412422,
Expand Down Expand Up @@ -58,6 +59,7 @@ const sqliteMigrations = [
MessageEventBusDestinations1671535397530,
DeleteExecutionsWithWorkflows1673268682475,
CreateLdapEntities1674509946020,
PurgeInvalidWorkflowConnections1675940580449,
];

export { sqliteMigrations };
Loading

0 comments on commit 018f8a3

Please sign in to comment.