diff --git a/packages/nodes-base/credentials/Amqp.credentials.ts b/packages/nodes-base/credentials/Amqp.credentials.ts index 1b20baf38eb4d..8afc480137347 100644 --- a/packages/nodes-base/credentials/Amqp.credentials.ts +++ b/packages/nodes-base/credentials/Amqp.credentials.ts @@ -12,6 +12,7 @@ export class Amqp implements ICredentialType { displayName: 'Hostname', name: 'hostname', type: 'string', + placeholder: 'e.g. localhost', default: '', }, { @@ -24,12 +25,14 @@ export class Amqp implements ICredentialType { displayName: 'User', name: 'username', type: 'string', + placeholder: 'e.g. guest', default: '', }, { displayName: 'Password', name: 'password', type: 'string', + placeholder: 'e.g. guest', typeOptions: { password: true, }, @@ -39,8 +42,9 @@ export class Amqp implements ICredentialType { displayName: 'Transport Type', name: 'transportType', type: 'string', + placeholder: 'e.g. tcp', default: '', - description: 'Optional Transport Type to use. Either tcp or tls.', + hint: 'Optional transport type to use, either tcp or tls', }, ]; } diff --git a/packages/nodes-base/nodes/Amqp/Amqp.node.ts b/packages/nodes-base/nodes/Amqp/Amqp.node.ts index 83ab5253bb70f..cf2952a984b64 100644 --- a/packages/nodes-base/nodes/Amqp/Amqp.node.ts +++ b/packages/nodes-base/nodes/Amqp/Amqp.node.ts @@ -1,4 +1,4 @@ -import type { Connection, ContainerOptions, Dictionary, EventContext } from 'rhea'; +import type { Connection, ContainerOptions, Dictionary, EventContext, Sender } from 'rhea'; import { create_container } from 'rhea'; import type { @@ -14,6 +14,46 @@ import type { } from 'n8n-workflow'; import { NodeOperationError } from 'n8n-workflow'; +async function checkIfCredentialsValid( + credentials: IDataObject, +): Promise { + const connectOptions: ContainerOptions = { + reconnect: false, + host: credentials.hostname as string, + hostname: credentials.hostname as string, + port: credentials.port as number, + username: credentials.username ? (credentials.username as string) : undefined, + password: credentials.password ? (credentials.password as string) : undefined, + transport: credentials.transportType ? (credentials.transportType as string) : undefined, + }; + + let conn: Connection | undefined = undefined; + try { + const container = create_container(); + await new Promise((resolve, reject) => { + container.on('connection_open', function (_context: EventContext) { + resolve(); + }); + container.on('disconnected', function (context: EventContext) { + reject(context.error ?? new Error('unknown error')); + }); + conn = container.connect(connectOptions); + }); + } catch (error) { + return { + status: 'Error', + message: (error as Error).message, + }; + } finally { + if (conn) (conn as Connection).close(); + } + + return { + status: 'OK', + message: 'Connection successful!', + }; +} + export class Amqp implements INodeType { description: INodeTypeDescription = { displayName: 'AMQP Sender', @@ -40,7 +80,7 @@ export class Amqp implements INodeType { name: 'sink', type: 'string', default: '', - placeholder: 'topic://sourcename.something', + placeholder: 'e.g. topic://sourcename.something', description: 'Name of the queue of topic to publish to', }, // Header Parameters @@ -106,49 +146,27 @@ export class Amqp implements INodeType { credential: ICredentialsDecrypted, ): Promise { const credentials = credential.data as ICredentialDataDecryptedObject; - const connectOptions: ContainerOptions = { - reconnect: false, - host: credentials.hostname as string, - hostname: credentials.hostname as string, - port: credentials.port as number, - username: credentials.username ? (credentials.username as string) : undefined, - password: credentials.password ? (credentials.password as string) : undefined, - transport: credentials.transportType ? (credentials.transportType as string) : undefined, - }; - - let conn: Connection | undefined = undefined; - try { - const container = create_container(); - await new Promise((resolve, reject) => { - container.on('connection_open', function (_contex: EventContext) { - resolve(); - }); - container.on('disconnected', function (context: EventContext) { - reject(context.error ?? new Error('unknown error')); - }); - conn = container.connect(connectOptions); - }); - } catch (error) { - return { - status: 'Error', - message: (error as Error).message, - }; - } finally { - if (conn) (conn as Connection).close(); - } - - return { - status: 'OK', - message: 'Connection successful!', - }; + return await checkIfCredentialsValid(credentials); }, }, }; async execute(this: IExecuteFunctions): Promise { + const container = create_container(); + let connection: Connection | undefined = undefined; + let sender: Sender | undefined = undefined; + try { const credentials = await this.getCredentials('amqp'); + // check if credentials are valid to avoid unnecessary reconnects + const credentialsTestResult = await checkIfCredentialsValid(credentials); + if (credentialsTestResult.status === 'Error') { + throw new NodeOperationError(this.getNode(), credentialsTestResult.message, { + description: 'Check your credentials and try again', + }); + } + const sink = this.getNodeParameter('sink', 0, '') as string; const applicationProperties = this.getNodeParameter('headerParametersJson', 0, {}) as | string @@ -169,30 +187,50 @@ export class Amqp implements INodeType { throw new NodeOperationError(this.getNode(), 'Queue or Topic required!'); } - const container = create_container(); - /* - Values are documentet here: https://github.com/amqp/rhea#container + Values are documented here: https://github.com/amqp/rhea#container */ const connectOptions: ContainerOptions = { host: credentials.hostname, hostname: credentials.hostname, port: credentials.port, - reconnect: containerReconnect, - reconnect_limit: containerReconnectLimit, username: credentials.username ? credentials.username : undefined, password: credentials.password ? credentials.password : undefined, transport: credentials.transportType ? credentials.transportType : undefined, container_id: containerId ? containerId : undefined, id: containerId ? containerId : undefined, + reconnect: containerReconnect, + reconnect_limit: containerReconnectLimit, }; - const conn = container.connect(connectOptions); - const sender = conn.open_sender(sink); + const node = this.getNode(); + + const responseData: INodeExecutionData[] = await new Promise((resolve, reject) => { + connection = container.connect(connectOptions); + sender = connection.open_sender(sink); + let limit = containerReconnectLimit; + + container.on('disconnected', function (context: EventContext) { + //handling this manually as container, despite reconnect_limit, does reconnect on disconnect + if (limit <= 0) { + connection!.options.reconnect = false; + const error = new NodeOperationError( + node, + ((context.error as Error) ?? {}).message ?? 'Disconnected', + { + description: `Check your credentials${options.reconnect ? '' : ', and consider enabling reconnect in the options'}`, + itemIndex: 0, + }, + ); + + reject(error); + } + + limit--; + }); - const responseData: IDataObject[] = await new Promise((resolve) => { container.once('sendable', (context: EventContext) => { - const returnData = []; + const returnData: INodeExecutionData[] = []; const items = this.getInputData(); for (let i = 0; i < items.length; i++) { @@ -214,23 +252,23 @@ export class Amqp implements INodeType { body, }); - returnData.push({ id: result?.id }); + returnData.push({ json: { id: result?.id }, pairedItems: { item: i } }); } resolve(returnData); }); }); - sender.close(); - conn.close(); - - return [this.helpers.returnJsonArray(responseData)]; + return [responseData]; } catch (error) { if (this.continueOnFail(error)) { - return [this.helpers.returnJsonArray({ error: error.message })]; + return [[{ json: { error: error.message }, pairedItems: { item: 0 } }]]; } else { throw error; } + } finally { + if (sender) (sender as Sender).close(); + if (connection) (connection as Connection).close(); } } }