Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Fix MaxListenersExceeded for management client #11738

5 changes: 4 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@

- `NamespaceProperties` interface property "messageSku" type changed from "string" to string literal type "Basic" | "Premium" | "Standard". [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810)

- Internal improvement - For the operations depending on `$management` link such as peek or lock renewals, the listeners for the "sender_error" and "receiver_error" events were added to the link for each new request made before the link is initialized. This has been improved such that the listeners are reused.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth mentioning the error message that would have popped up (for people searching).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned

[PR 11738](https://github.com/Azure/azure-sdk-for-js/pull/11738)

### Breaking changes

- The `createBatch` method on the sender is renamed to `createMesageBatch`
- The `createBatch` method on the sender is renamed to `createMessageBatch`
- The interface `CreateBatchOptions` followed by the options that are passed to the `createBatch` method is renamed to `CreateMessageBatchOptions`
- The `tryAdd` method on the message batch object is renamed to `tryAddMessage`
- `ServiceBusMessage` interface updates:
Expand Down
58 changes: 34 additions & 24 deletions sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import Long from "long";
import {
EventContext,
ReceiverEvents,
ReceiverOptions,
message as RheaMessageUtil,
SenderEvents,
SenderOptions,
generate_uuid,
string_to_uuid,
types,
Typed
Typed,
ReceiverEvents
} from "rhea-promise";
import {
AmqpMessage,
Expand Down Expand Up @@ -226,53 +225,64 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
name: this.replyTo,
target: { address: this.replyTo },
onSessionError: (context: EventContext) => {
const ehError = translate(context.session!.error!);
const sbError = translate(context.session!.error!);
managementClientLogger.logError(
ehError,
sbError,
`${this.logPrefix} An error occurred on the session for request/response links for $management`
);
}
};
const sropt: SenderOptions = { target: { address: this.address } };
const sropt: SenderOptions = {
target: { address: this.address },
onError: (context: EventContext) => {
const ehError = translate(context.sender!.error!);
managementClientLogger.logError(
ehError,
`${this.logPrefix} An error occurred on the $management sender link`
);
}
};

// Even if multiple parallel requests reach here, the initLink secures a lock
// to ensure there won't be multiple initializations
await this.initLink(
{
senderOptions: sropt,
receiverOptions: rxopt
},
abortSignal
);

this.link!.sender.on(SenderEvents.senderError, (context: EventContext) => {
const ehError = translate(context.sender!.error!);
managementClientLogger.logError(
ehError,
`${this.logPrefix} An error occurred on the $management sender link`
);
});
this.link!.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const ehError = translate(context.receiver!.error!);
managementClientLogger.logError(
ehError,
`${this.logPrefix} An error occurred on the $management receiver link`
);
});
} catch (err) {
err = translate(err);
managementClientLogger.logError(
err,
`${this.logPrefix} An error occured while establishing the $management links`
`${this.logPrefix} An error occurred while establishing the $management links`
);
throw err;
}
}

protected createRheaLink(options: RequestResponseLinkOptions): Promise<RequestResponseLink> {
return RequestResponseLink.create(
protected async createRheaLink(
options: RequestResponseLinkOptions
): Promise<RequestResponseLink> {
const rheaLink = await RequestResponseLink.create(
this._context.connection,
options.senderOptions,
options.receiverOptions
);
// Attach listener for the `receiver_error` events to log the errors.

// "message" event listener is added in core-amqp.
// "rhea" doesn't allow setting only the "onError" handler in the options if it is not accompanied by an "onMessage" handler.
// Hence, not passing onError handler in the receiver options, adding a handler below.
rheaLink.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const ehError = translate(context.receiver!.error!);
managementClientLogger.logError(
ehError,
`${this.logPrefix} An error occurred on the $management receiver link`
);
});
return rheaLink;
}

/**
Expand Down