diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 590307c26f4a3..90b495fe791b4 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -239,6 +239,14 @@ export class Start extends Command { const { flags } = this.parse(Start); try { + // Load all node and credential types + const loadNodesAndCredentials = LoadNodesAndCredentials(); + await loadNodesAndCredentials.init(); + + // Add the found types to an instance other parts of the application can use + const nodeTypes = NodeTypes(loadNodesAndCredentials); + const credentialTypes = CredentialTypes(loadNodesAndCredentials); + // Start directly with the init of the database to improve startup time await Db.init().catch(async (error: Error) => exitWithCrash('There was an error initializing DB', error), @@ -265,18 +273,10 @@ export class Start extends Command { await Start.generateStaticAssets(); } - // Load all node and credential types - const loadNodesAndCredentials = LoadNodesAndCredentials(); - await loadNodesAndCredentials.init(); - // Load all external hooks const externalHooks = ExternalHooks(); await externalHooks.init(); - // Add the found types to an instance other parts of the application can use - const nodeTypes = NodeTypes(loadNodesAndCredentials); - const credentialTypes = CredentialTypes(loadNodesAndCredentials); - // Load the credentials overwrites if any exist CredentialsOverwrites(credentialTypes); diff --git a/packages/cli/src/databases/migrations/mysqldb/1675940580449-PurgeInvalidWorkflowConnections.ts b/packages/cli/src/databases/migrations/mysqldb/1675940580449-PurgeInvalidWorkflowConnections.ts new file mode 100644 index 0000000000000..654858b422c9e --- /dev/null +++ b/packages/cli/src/databases/migrations/mysqldb/1675940580449-PurgeInvalidWorkflowConnections.ts @@ -0,0 +1,91 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; +import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers'; +import { NodeTypes } from '@/NodeTypes'; +import { IConnections, INode } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; + +export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationInterface { + name = 'PurgeInvalidWorkflowConnections1675940580449'; + + async up(queryRunner: QueryRunner): Promise { + logMigrationStart(this.name); + + const tablePrefix = getTablePrefix(); + + const workflows: Array<{ id: number; nodes: INode[]; connections: IConnections }> = + await queryRunner.query(` + SELECT id, nodes, connections + FROM \`${tablePrefix}workflow_entity\` + `); + + const nodeTypes = NodeTypes(); + + workflows.forEach(async (workflow) => { + let connections: IConnections = workflow.connections; + const nodes: INode[] = workflow.nodes; + + const nodesThatCannotReceiveInput: string[] = nodes.reduce((acc, node) => { + try { + const nodeType = nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + if ((nodeType.description.inputs?.length ?? []) === 0) { + acc.push(node.name); + } + } catch (error) { + getLogger().warn(`Migration ${this.name} failed with error: ${error.message}`); + } + return acc; + }, [] as string[]); + + Object.keys(connections).forEach((sourceNodeName) => { + const connection = connections[sourceNodeName]; + const outputs = Object.keys(connection); + + outputs.forEach((outputConnectionName /* Like `main` */, idx) => { + const outputConnection = connection[outputConnectionName]; + + // It filters out all connections that are connected to a node that cannot receive input + outputConnection.forEach((outputConnectionItem, outputConnectionItemIdx) => { + outputConnection[outputConnectionItemIdx] = outputConnectionItem.filter( + (outgoingConnections) => + !nodesThatCannotReceiveInput.includes(outgoingConnections.node), + ); + }); + + // Filter out output connection items that are empty + connection[outputConnectionName] = connection[outputConnectionName].filter( + (item) => item.length > 0, + ); + + // Delete the output connection container if it is empty + if (connection[outputConnectionName].length === 0) { + delete connection[outputConnectionName]; + } + }); + + // Finally delete the source node if it has no output connections + if (Object.keys(connection).length === 0) { + delete connections[sourceNodeName]; + } + }); + + // Update database with new connections + const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE \`${tablePrefix}workflow_entity\` + SET connections = :connections + WHERE id = '${workflow.id}' + `, + { connections: JSON.stringify(connections) }, + {}, + ); + + await queryRunner.query(updateQuery, updateParams); + }); + + logMigrationEnd(this.name); + } + + async down(queryRunner: QueryRunner): Promise { + // No need to revert this migration + } +} diff --git a/packages/cli/src/databases/migrations/mysqldb/index.ts b/packages/cli/src/databases/migrations/mysqldb/index.ts index 8892ddcbe72d7..2c25db44b1588 100644 --- a/packages/cli/src/databases/migrations/mysqldb/index.ts +++ b/packages/cli/src/databases/migrations/mysqldb/index.ts @@ -30,6 +30,7 @@ import { RemoveWorkflowDataLoadedFlag1671726148420 } from './1671726148420-Remov import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations'; import { DeleteExecutionsWithWorkflows1673268682475 } from './1673268682475-DeleteExecutionsWithWorkflows'; import { CreateLdapEntities1674509946020 } from './1674509946020-CreateLdapEntities'; +import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-PurgeInvalidWorkflowConnections'; export const mysqlMigrations = [ InitialMigration1588157391238, @@ -64,4 +65,5 @@ export const mysqlMigrations = [ MessageEventBusDestinations1671535397530, DeleteExecutionsWithWorkflows1673268682475, CreateLdapEntities1674509946020, + PurgeInvalidWorkflowConnections1675940580449 ]; diff --git a/packages/cli/src/databases/migrations/postgresdb/1675940580449-PurgeInvalidWorkflowConnections.ts b/packages/cli/src/databases/migrations/postgresdb/1675940580449-PurgeInvalidWorkflowConnections.ts new file mode 100644 index 0000000000000..7534742ce401b --- /dev/null +++ b/packages/cli/src/databases/migrations/postgresdb/1675940580449-PurgeInvalidWorkflowConnections.ts @@ -0,0 +1,90 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; +import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers'; +import { NodeTypes } from '@/NodeTypes'; +import { IConnections, INode } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; +export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationInterface { + name = 'PurgeInvalidWorkflowConnections1675940580449'; + + async up(queryRunner: QueryRunner): Promise { + logMigrationStart(this.name); + + const tablePrefix = getTablePrefix(); + + const workflows: Array<{ id: number; nodes: INode[]; connections: IConnections }> = + await queryRunner.query(` + SELECT id, nodes, connections + FROM "${tablePrefix}workflow_entity" + `); + + const nodeTypes = NodeTypes(); + + workflows.forEach(async (workflow) => { + let connections: IConnections = workflow.connections; + const nodes: INode[] = workflow.nodes; + + const nodesThatCannotReceiveInput: string[] = nodes.reduce((acc, node) => { + try { + const nodeType = nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + if ((nodeType.description.inputs?.length ?? []) === 0) { + acc.push(node.name); + } + } catch (error) { + getLogger().warn(`Migration ${this.name} failed with error: ${error.message}`); + } + return acc; + }, [] as string[]); + + Object.keys(connections).forEach((sourceNodeName) => { + const connection = connections[sourceNodeName]; + const outputs = Object.keys(connection); + + outputs.forEach((outputConnectionName /* Like `main` */, idx) => { + const outputConnection = connection[outputConnectionName]; + + // It filters out all connections that are connected to a node that cannot receive input + outputConnection.forEach((outputConnectionItem, outputConnectionItemIdx) => { + outputConnection[outputConnectionItemIdx] = outputConnectionItem.filter( + (outgoingConnections) => + !nodesThatCannotReceiveInput.includes(outgoingConnections.node), + ); + }); + + // Filter out output connection items that are empty + connection[outputConnectionName] = connection[outputConnectionName].filter( + (item) => item.length > 0, + ); + + // Delete the output connection container if it is empty + if (connection[outputConnectionName].length === 0) { + delete connection[outputConnectionName]; + } + }); + + // Finally delete the source node if it has no output connections + if (Object.keys(connection).length === 0) { + delete connections[sourceNodeName]; + } + }); + + // Update database with new connections + const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE "${tablePrefix}workflow_entity" + SET connections = :connections + WHERE id = '${workflow.id}' + `, + { connections: JSON.stringify(connections) }, + {}, + ); + + await queryRunner.query(updateQuery, updateParams); + }); + + logMigrationEnd(this.name); + } + + async down(queryRunner: QueryRunner): Promise { + // No need to revert this migration + } +} diff --git a/packages/cli/src/databases/migrations/postgresdb/index.ts b/packages/cli/src/databases/migrations/postgresdb/index.ts index 6138f89d5641c..dde85c808bec7 100644 --- a/packages/cli/src/databases/migrations/postgresdb/index.ts +++ b/packages/cli/src/databases/migrations/postgresdb/index.ts @@ -28,6 +28,7 @@ import { RemoveWorkflowDataLoadedFlag1671726148421 } from './1671726148421-Remov import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations'; import { DeleteExecutionsWithWorkflows1673268682475 } from './1673268682475-DeleteExecutionsWithWorkflows'; import { CreateLdapEntities1674509946020 } from './1674509946020-CreateLdapEntities'; +import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-PurgeInvalidWorkflowConnections'; export const postgresMigrations = [ InitialMigration1587669153312, @@ -60,4 +61,5 @@ export const postgresMigrations = [ MessageEventBusDestinations1671535397530, DeleteExecutionsWithWorkflows1673268682475, CreateLdapEntities1674509946020, + PurgeInvalidWorkflowConnections1675940580449 ]; diff --git a/packages/cli/src/databases/migrations/sqlite/1675940580449-PurgeInvalidWorkflowConnections.ts b/packages/cli/src/databases/migrations/sqlite/1675940580449-PurgeInvalidWorkflowConnections.ts new file mode 100644 index 0000000000000..459c4782140c5 --- /dev/null +++ b/packages/cli/src/databases/migrations/sqlite/1675940580449-PurgeInvalidWorkflowConnections.ts @@ -0,0 +1,91 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; +import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers'; +import { NodeTypes } from '@/NodeTypes'; +import { IConnections, INode } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; + +export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationInterface { + name = 'PurgeInvalidWorkflowConnections1675940580449'; + + async up(queryRunner: QueryRunner): Promise { + logMigrationStart(this.name); + + const tablePrefix = getTablePrefix(); + + const workflows: Array<{ id: number; nodes: string; connections: string }> = + await queryRunner.query(` + SELECT id, nodes, connections + FROM "${tablePrefix}workflow_entity" + `); + + const nodeTypes = NodeTypes(); + + workflows.forEach(async (workflow) => { + let connections: IConnections = JSON.parse(workflow.connections); + const nodes: INode[] = JSON.parse(workflow.nodes); + + const nodesThatCannotReceiveInput: string[] = nodes.reduce((acc, node) => { + try { + const nodeType = nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + if ((nodeType.description.inputs?.length ?? []) === 0) { + acc.push(node.name); + } + } catch (error) { + getLogger().warn(`Migration ${this.name} failed with error: ${error.message}`); + } + return acc; + }, [] as string[]); + + Object.keys(connections).forEach((sourceNodeName) => { + const connection = connections[sourceNodeName]; + const outputs = Object.keys(connection); + + outputs.forEach((outputConnectionName /* Like `main` */, idx) => { + const outputConnection = connection[outputConnectionName]; + + // It filters out all connections that are connected to a node that cannot receive input + outputConnection.forEach((outputConnectionItem, outputConnectionItemIdx) => { + outputConnection[outputConnectionItemIdx] = outputConnectionItem.filter( + (outgoingConnections) => + !nodesThatCannotReceiveInput.includes(outgoingConnections.node), + ); + }); + + // Filter out output connection items that are empty + connection[outputConnectionName] = connection[outputConnectionName].filter( + (item) => item.length > 0, + ); + + // Delete the output connection container if it is empty + if (connection[outputConnectionName].length === 0) { + delete connection[outputConnectionName]; + } + }); + + // Finally delete the source node if it has no output connections + if (Object.keys(connection).length === 0) { + delete connections[sourceNodeName]; + } + }); + + // Update database with new connections + const [updateQuery, updateParams] = queryRunner.connection.driver.escapeQueryWithParameters( + ` + UPDATE "${tablePrefix}workflow_entity" + SET connections = :connections + WHERE id = '${workflow.id}' + `, + { connections: JSON.stringify(connections) }, + {}, + ); + + await queryRunner.query(updateQuery, updateParams); + }); + + logMigrationEnd(this.name); + } + + async down(queryRunner: QueryRunner): Promise { + // No need to revert this migration + } +} diff --git a/packages/cli/src/databases/migrations/sqlite/index.ts b/packages/cli/src/databases/migrations/sqlite/index.ts index 247daf12809c0..7325893d18f86 100644 --- a/packages/cli/src/databases/migrations/sqlite/index.ts +++ b/packages/cli/src/databases/migrations/sqlite/index.ts @@ -27,6 +27,7 @@ import { RemoveWorkflowDataLoadedFlag1671726148419 } from './1671726148419-Remov import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations'; import { DeleteExecutionsWithWorkflows1673268682475 } from './1673268682475-DeleteExecutionsWithWorkflows'; import { CreateLdapEntities1674509946020 } from './1674509946020-CreateLdapEntities'; +import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-PurgeInvalidWorkflowConnections'; const sqliteMigrations = [ InitialMigration1588102412422, @@ -58,6 +59,7 @@ const sqliteMigrations = [ MessageEventBusDestinations1671535397530, DeleteExecutionsWithWorkflows1673268682475, CreateLdapEntities1674509946020, + PurgeInvalidWorkflowConnections1675940580449, ]; export { sqliteMigrations }; diff --git a/packages/editor-ui/src/views/NodeView.vue b/packages/editor-ui/src/views/NodeView.vue index 51c79d15e2341..700fc076996ed 100644 --- a/packages/editor-ui/src/views/NodeView.vue +++ b/packages/editor-ui/src/views/NodeView.vue @@ -1995,7 +1995,7 @@ export default mixins( }, ] as [IConnection, IConnection]; - this.__addConnection(connectionData, true); + this.__addConnection(connectionData); }, async addNode( nodeTypeName: string, @@ -2571,22 +2571,19 @@ export default mixins( return NodeViewUtils.getInputEndpointUUID(node.id, index); }, - __addConnection(connection: [IConnection, IConnection], addVisualConnection = false) { - if (addVisualConnection) { - const outputUuid = this.getOutputEndpointUUID(connection[0].node, connection[0].index); - const inputUuid = this.getInputEndpointUUID(connection[1].node, connection[1].index); - if (!outputUuid || !inputUuid) { - return; - } - - const uuid: [string, string] = [outputUuid, inputUuid]; - // Create connections in DOM - this.instance?.connect({ - uuids: uuid, - detachable: !this.isReadOnly, - }); + __addConnection(connection: [IConnection, IConnection]) { + const outputUuid = this.getOutputEndpointUUID(connection[0].node, connection[0].index); + const inputUuid = this.getInputEndpointUUID(connection[1].node, connection[1].index); + if (!outputUuid || !inputUuid) { + return; } - this.workflowsStore.addConnection({ connection }); + + const uuid: [string, string] = [outputUuid, inputUuid]; + // Create connections in DOM + this.instance?.connect({ + uuids: uuid, + detachable: !this.isReadOnly, + }); setTimeout(() => { this.addPinDataConnections(this.workflowsStore.pinData); @@ -3277,7 +3274,7 @@ export default mixins( }, ] as [IConnection, IConnection]; - this.__addConnection(connectionData, true); + this.__addConnection(connectionData); }); } } @@ -3763,7 +3760,7 @@ export default mixins( }, async onRevertRemoveConnection({ connection }: { connection: [IConnection, IConnection] }) { this.suspendRecordingDetachedConnections = true; - this.__addConnection(connection, true); + this.__addConnection(connection); this.suspendRecordingDetachedConnections = false; }, async onRevertNameChange({ currentName, newName }: { currentName: string; newName: string }) {