diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 8f0faced798f..5765fe559299 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -13,9 +13,12 @@ - `NamespaceProperties` interface property "messageSku" type changed from "string" to string literal type "Basic" | "Premium" | "Standard". [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810) +- Internal improvement - For the operations depending on `$management` link such as peek or lock renewals, the listeners for the "sender_error" and "receiver_error" events were added to the link for each new request made before the link is initialized which would have resulted in too many listeners and a warning such as `MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 sender_error listeners added to [Sender]. Use emittr.setMaxListeners() to increase limit`(same for `receiver_error`). This has been improved such that the listeners are reused. + [PR 11738](https://github.com/Azure/azure-sdk-for-js/pull/11738) + ### Breaking changes -- The `createBatch` method on the sender is renamed to `createMesageBatch` +- The `createBatch` method on the sender is renamed to `createMessageBatch` - The interface `CreateBatchOptions` followed by the options that are passed to the `createBatch` method is renamed to `CreateMessageBatchOptions` - The `tryAdd` method on the message batch object is renamed to `tryAddMessage` - `ServiceBusMessage` interface updates: diff --git a/sdk/servicebus/service-bus/src/core/managementClient.ts b/sdk/servicebus/service-bus/src/core/managementClient.ts index 2e9417f1c63e..570835373d2a 100644 --- a/sdk/servicebus/service-bus/src/core/managementClient.ts +++ b/sdk/servicebus/service-bus/src/core/managementClient.ts @@ -4,15 +4,14 @@ import Long from "long"; import { EventContext, - ReceiverEvents, ReceiverOptions, message as RheaMessageUtil, - SenderEvents, SenderOptions, generate_uuid, string_to_uuid, types, - Typed + Typed, + ReceiverEvents } from "rhea-promise"; import { AmqpMessage, @@ -226,15 +225,26 @@ export class ManagementClient extends LinkEntity { name: this.replyTo, target: { address: this.replyTo }, onSessionError: (context: EventContext) => { - const ehError = translate(context.session!.error!); + const sbError = translate(context.session!.error!); managementClientLogger.logError( - ehError, + sbError, `${this.logPrefix} An error occurred on the session for request/response links for $management` ); } }; - const sropt: SenderOptions = { target: { address: this.address } }; + const sropt: SenderOptions = { + target: { address: this.address }, + onError: (context: EventContext) => { + const ehError = translate(context.sender!.error!); + managementClientLogger.logError( + ehError, + `${this.logPrefix} An error occurred on the $management sender link` + ); + } + }; + // Even if multiple parallel requests reach here, the initLink secures a lock + // to ensure there won't be multiple initializations await this.initLink( { senderOptions: sropt, @@ -242,37 +252,37 @@ export class ManagementClient extends LinkEntity { }, abortSignal ); - - this.link!.sender.on(SenderEvents.senderError, (context: EventContext) => { - const ehError = translate(context.sender!.error!); - managementClientLogger.logError( - ehError, - `${this.logPrefix} An error occurred on the $management sender link` - ); - }); - this.link!.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => { - const ehError = translate(context.receiver!.error!); - managementClientLogger.logError( - ehError, - `${this.logPrefix} An error occurred on the $management receiver link` - ); - }); } catch (err) { err = translate(err); managementClientLogger.logError( err, - `${this.logPrefix} An error occured while establishing the $management links` + `${this.logPrefix} An error occurred while establishing the $management links` ); throw err; } } - protected createRheaLink(options: RequestResponseLinkOptions): Promise { - return RequestResponseLink.create( + protected async createRheaLink( + options: RequestResponseLinkOptions + ): Promise { + const rheaLink = await RequestResponseLink.create( this._context.connection, options.senderOptions, options.receiverOptions ); + // Attach listener for the `receiver_error` events to log the errors. + + // "message" event listener is added in core-amqp. + // "rhea" doesn't allow setting only the "onError" handler in the options if it is not accompanied by an "onMessage" handler. + // Hence, not passing onError handler in the receiver options, adding a handler below. + rheaLink.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => { + const ehError = translate(context.receiver!.error!); + managementClientLogger.logError( + ehError, + `${this.logPrefix} An error occurred on the $management receiver link` + ); + }); + return rheaLink; } /**