From c255d093dbf2d365c48ca70e4d0d996cb4078ee1 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Fri, 9 Oct 2020 03:55:42 -0700 Subject: [PATCH 1/7] fix MaxListenersExceeded for management client --- .../service-bus/src/core/managementClient.ts | 62 ++++++++++++------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/sdk/servicebus/service-bus/src/core/managementClient.ts b/sdk/servicebus/service-bus/src/core/managementClient.ts index 2c185ca36d8c..41506bc0f207 100644 --- a/sdk/servicebus/service-bus/src/core/managementClient.ts +++ b/sdk/servicebus/service-bus/src/core/managementClient.ts @@ -228,18 +228,20 @@ export class ManagementClient extends LinkEntity { target: { address: this.replyTo }, onSessionError: (context: EventContext) => { const id = context.connection.options.id; - const ehError = translate(context.session!.error!); + const sbError = translate(context.session!.error!); logError( - ehError, + sbError, "[%s] An error occurred on the session for request/response links for " + "$management: %O", id, - ehError + sbError ); } }; const sropt: SenderOptions = { target: { address: this.address } }; + // If multiple parallel requests reach here, the initLink secures a lock + // which ensures that there won't be multiple initializations await this.initLink( { senderOptions: sropt, @@ -248,31 +250,43 @@ export class ManagementClient extends LinkEntity { abortSignal ); - this.link!.sender.on(SenderEvents.senderError, (context: EventContext) => { - const id = context.connection.options.id; - const ehError = translate(context.sender!.error!); - logError( - ehError, - "[%s] An error occurred on the $management sender link.. %O", - id, - ehError - ); - }); - this.link!.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => { - const id = context.connection.options.id; - const ehError = translate(context.receiver!.error!); - logError( - ehError, - "[%s] An error occurred on the $management receiver link.. %O", - id, - ehError - ); - }); + // Attach listeners for the `sender_error` and `receiver_error` events to log the errors. + // - It is possible that the previous "_init" call had already added the listeners + // (example: parallel _init calls can cause this), + // hence checking the count of the listeners and adding them only if they're not present. + const senderErrorListenerCount = this.link?.sender.listenerCount(SenderEvents.senderError); + const receiverErrorListenerCount = this.link?.receiver.listenerCount( + ReceiverEvents.receiverError + ); + if (senderErrorListenerCount && senderErrorListenerCount < 1) { + this.link!.sender.on(SenderEvents.senderError, (context: EventContext) => { + const id = context.connection.options.id; + const sbError = translate(context.sender!.error!); + logError( + sbError, + "[%s] An error occurred on the $management sender link.. %O", + id, + sbError + ); + }); + } + if (receiverErrorListenerCount && receiverErrorListenerCount < 1) { + this.link!.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => { + const id = context.connection.options.id; + const sbError = translate(context.receiver!.error!); + logError( + sbError, + "[%s] An error occurred on the $management receiver link.. %O", + id, + sbError + ); + }); + } } catch (err) { err = translate(err); logError( err, - "[%s] An error occured while establishing the $management links: %O", + "[%s] An error occurred while establishing the $management links: %O", this._context.connectionId, err ); From 5788a2b5ab63573ca84348fe548291ddc217424c Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Fri, 9 Oct 2020 03:57:28 -0700 Subject: [PATCH 2/7] comment update --- sdk/servicebus/service-bus/src/core/managementClient.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/service-bus/src/core/managementClient.ts b/sdk/servicebus/service-bus/src/core/managementClient.ts index 41506bc0f207..675ce74174b5 100644 --- a/sdk/servicebus/service-bus/src/core/managementClient.ts +++ b/sdk/servicebus/service-bus/src/core/managementClient.ts @@ -240,8 +240,8 @@ export class ManagementClient extends LinkEntity { }; const sropt: SenderOptions = { target: { address: this.address } }; - // If multiple parallel requests reach here, the initLink secures a lock - // which ensures that there won't be multiple initializations + // Even if multiple parallel requests reach here, the initLink secures a lock + // to ensure there won't be multiple initializations await this.initLink( { senderOptions: sropt, From 211f4409ce4e85fdbbeb7b80c0d3714e35803da6 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Wed, 14 Oct 2020 15:42:55 -0700 Subject: [PATCH 3/7] Add listener for receiver_error inside createRheaLink instead of checking for the count before adding the listener --- .../service-bus/src/core/managementClient.ts | 62 +++++++++---------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/sdk/servicebus/service-bus/src/core/managementClient.ts b/sdk/servicebus/service-bus/src/core/managementClient.ts index d569f3b833b2..3598adad9e92 100644 --- a/sdk/servicebus/service-bus/src/core/managementClient.ts +++ b/sdk/servicebus/service-bus/src/core/managementClient.ts @@ -4,15 +4,14 @@ import Long from "long"; import { EventContext, - ReceiverEvents, ReceiverOptions, message as RheaMessageUtil, - SenderEvents, SenderOptions, generate_uuid, string_to_uuid, types, - Typed + Typed, + ReceiverEvents } from "rhea-promise"; import { AmqpMessage, @@ -233,7 +232,16 @@ export class ManagementClient extends LinkEntity { ); } }; - const sropt: SenderOptions = { target: { address: this.address } }; + const sropt: SenderOptions = { + target: { address: this.address }, + onError: (context: EventContext) => { + const ehError = translate(context.sender!.error!); + managementClientLogger.logError( + ehError, + `${this.logPrefix} An error occurred on the $management sender link` + ); + } + }; // Even if multiple parallel requests reach here, the initLink secures a lock // to ensure there won't be multiple initializations @@ -244,33 +252,6 @@ export class ManagementClient extends LinkEntity { }, abortSignal ); - - // Attach listeners for the `sender_error` and `receiver_error` events to log the errors. - // - It is possible that the previous "_init" call had already added the listeners - // (example: parallel _init calls can cause this), - // hence checking the count of the listeners and adding them only if they're not present. - const senderErrorListenerCount = this.link?.sender.listenerCount(SenderEvents.senderError); - const receiverErrorListenerCount = this.link?.receiver.listenerCount( - ReceiverEvents.receiverError - ); - if (senderErrorListenerCount && senderErrorListenerCount < 1) { - this.link!.sender.on(SenderEvents.senderError, (context: EventContext) => { - const ehError = translate(context.sender!.error!); - managementClientLogger.logError( - ehError, - `${this.logPrefix} An error occurred on the $management sender link` - ); - }); - } - if (receiverErrorListenerCount && receiverErrorListenerCount < 1) { - this.link!.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => { - const ehError = translate(context.receiver!.error!); - managementClientLogger.logError( - ehError, - `${this.logPrefix} An error occurred on the $management receiver link` - ); - }); - } } catch (err) { err = translate(err); managementClientLogger.logError( @@ -281,12 +262,27 @@ export class ManagementClient extends LinkEntity { } } - protected createRheaLink(options: RequestResponseLinkOptions): Promise { - return RequestResponseLink.create( + protected async createRheaLink( + options: RequestResponseLinkOptions + ): Promise { + const rheaLink = await RequestResponseLink.create( this._context.connection, options.senderOptions, options.receiverOptions ); + // Attach listener for the `receiver_error` events to log the errors. + + // "message" event listener is added in core-amqp. + // "rhea" doesn't allow setting only the "onError" handler in the options if it is not accompanied by an "onMessage" handler. + // Hence, not passing onError handler in the receiver options, adding a handler below. + rheaLink.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => { + const ehError = translate(context.receiver!.error!); + managementClientLogger.logError( + ehError, + `${this.logPrefix} An error occurred on the $management receiver link` + ); + }); + return rheaLink; } /** From 34317290e641743c9154753b7251002aebaf30bc Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Mon, 19 Oct 2020 11:27:34 -0700 Subject: [PATCH 4/7] Typo --- sdk/servicebus/service-bus/CHANGELOG.md | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 2e6b699dd6ff..5a160ccf5c59 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -2,8 +2,27 @@ ## 7.0.0-preview.8 (Unreleased) -- `sendMessages` method on the sender and `tryAdd` method to add messages to a batch now support tracing. +- Tracing, using [@azure/core-tracing](https://github.com/Azure/azure-sdk-for-js/blob/master/sdk/core/core-tracing/README.md), has been added for sending and receiving of messages. [PR 11651](https://github.com/Azure/azure-sdk-for-js/pull/11651) + and + [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810) + +- Added new "userId" property to `ServiceBusMessage` interface. [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810) + +- `NamespaceProperties` interface property "messageSku" type changed from "string" to string literal type "Basic" | "Premium" | "Standard". [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810) + +### Breaking changes + +- The `createBatch` method on the sender is renamed to `createMessageBatch` +- The interface `CreateBatchOptions` followed by the options that are passed to the `createBatch` method is renamed to `CreateMessageBatchOptions` +- The `tryAdd` method on the message batch object is renamed to `tryAddMessage` +- `ServiceBusMessage` interface updates: + - "properties" renamed to "applicationProperties" + - "label" renamed to "subject" +- `CorrelationRuleFilter` interface updates: + - "properties" renamed to "applicationProperties" + - "label" renamed to "subject" +- `SqlRuleFilter` interface "sqlExpression" changed from optional to required ## 7.0.0-preview.7 (2020-10-07) From b6a36b2934568e61812cd1c07c37ec6a7c67c22d Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Mon, 19 Oct 2020 11:37:38 -0700 Subject: [PATCH 5/7] changelog --- sdk/servicebus/service-bus/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 35d1b69a0e8b..0fdf650ebdbd 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -13,6 +13,9 @@ - `NamespaceProperties` interface property "messageSku" type changed from "string" to string literal type "Basic" | "Premium" | "Standard". [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810) +- Internal improvement - For the operations depending on `$management` link such as peek or lock renewals, the listeners for the "sender_error" and "receiver_error" events were added to the link for each new request made before the link is initialized. This has been improved such that the listeners are reused. +[PR 11738](https://github.com/Azure/azure-sdk-for-js/pull/11738) + ### Breaking changes - The `createBatch` method on the sender is renamed to `createMessageBatch` From f5fcbd2be871cb409ba3a70708122b0f0e5107be Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Mon, 19 Oct 2020 11:37:58 -0700 Subject: [PATCH 6/7] formatting --- sdk/servicebus/service-bus/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 0fdf650ebdbd..a82d2cc1901c 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -14,7 +14,7 @@ - `NamespaceProperties` interface property "messageSku" type changed from "string" to string literal type "Basic" | "Premium" | "Standard". [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810) - Internal improvement - For the operations depending on `$management` link such as peek or lock renewals, the listeners for the "sender_error" and "receiver_error" events were added to the link for each new request made before the link is initialized. This has been improved such that the listeners are reused. -[PR 11738](https://github.com/Azure/azure-sdk-for-js/pull/11738) + [PR 11738](https://github.com/Azure/azure-sdk-for-js/pull/11738) ### Breaking changes From c0d7fd41081c2bca0fedce6b56566b55e2cf9136 Mon Sep 17 00:00:00 2001 From: HarshaNalluru Date: Mon, 19 Oct 2020 12:13:57 -0700 Subject: [PATCH 7/7] changelog - mention the warning --- sdk/servicebus/service-bus/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index a82d2cc1901c..5765fe559299 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -13,7 +13,7 @@ - `NamespaceProperties` interface property "messageSku" type changed from "string" to string literal type "Basic" | "Premium" | "Standard". [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810) -- Internal improvement - For the operations depending on `$management` link such as peek or lock renewals, the listeners for the "sender_error" and "receiver_error" events were added to the link for each new request made before the link is initialized. This has been improved such that the listeners are reused. +- Internal improvement - For the operations depending on `$management` link such as peek or lock renewals, the listeners for the "sender_error" and "receiver_error" events were added to the link for each new request made before the link is initialized which would have resulted in too many listeners and a warning such as `MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 sender_error listeners added to [Sender]. Use emittr.setMaxListeners() to increase limit`(same for `receiver_error`). This has been improved such that the listeners are reused. [PR 11738](https://github.com/Azure/azure-sdk-for-js/pull/11738) ### Breaking changes