diff --git a/sdk/core/core-amqp/package.json b/sdk/core/core-amqp/package.json index f064ed44d116..03ffed62ac04 100644 --- a/sdk/core/core-amqp/package.json +++ b/sdk/core/core-amqp/package.json @@ -79,7 +79,7 @@ "jssha": "^3.1.0", "process": "^0.11.10", "rhea": "^1.0.24", - "rhea-promise": "^1.2.0", + "rhea-promise": "^1.2.1", "tslib": "^2.0.0", "url": "^0.11.0", "util": "^0.12.1" diff --git a/sdk/eventhub/event-hubs/package.json b/sdk/eventhub/event-hubs/package.json index c76e201a1a2a..10b882a7797d 100644 --- a/sdk/eventhub/event-hubs/package.json +++ b/sdk/eventhub/event-hubs/package.json @@ -99,7 +99,7 @@ "is-buffer": "^2.0.3", "jssha": "^3.1.0", "process": "^0.11.10", - "rhea-promise": "^1.2.0", + "rhea-promise": "^1.2.1", "tslib": "^2.0.0", "uuid": "^8.3.0" }, diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index 77cfd3b978bf..296578ce06d7 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -452,7 +452,7 @@ export class EventHubReceiver extends LinkEntity { if (!this.isOpen()) { try { - await this.initialize(); + await this.initialize({ abortSignal }); if (abortSignal && abortSignal.aborted) { await this.abort(); } @@ -541,14 +541,14 @@ export class EventHubReceiver extends LinkEntity { * Creates a new AMQP receiver under a new AMQP session. * @hidden */ - async initialize(): Promise { + async initialize({ abortSignal }: { abortSignal?: AbortSignalLike }): Promise { try { if (!this.isOpen() && !this.isConnecting) { this.isConnecting = true; // Wait for the connectionContext to be ready to open the link. await this._context.readyToOpenLink(); - await this._negotiateClaim(); + await this._negotiateClaim({ abortSignal }); const receiverOptions: CreateReceiverOptions = { onClose: (context: EventContext) => this._onAmqpClose(context), @@ -568,7 +568,7 @@ export class EventHubReceiver extends LinkEntity { this.name, options ); - this._receiver = await this._context.connection.createReceiver(options); + this._receiver = await this._context.connection.createReceiver({ ...options, abortSignal }); this.isConnecting = false; logger.verbose( "[%s] Receiver '%s' created with receiver options: %O", diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 031f56deac97..e3d6f277598f 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -463,7 +463,7 @@ export class EventHubSender extends LinkEntity { return defaultCancellableLock.acquire( this.senderLock, () => { - return this._init(senderOptions); + return this._init({ ...senderOptions, abortSignal: options.abortSignal }); }, { abortSignal: options.abortSignal, acquireTimeoutInMs: timeoutInMs } ); @@ -496,14 +496,16 @@ export class EventHubSender extends LinkEntity { * Initializes the sender session on the connection. * @hidden */ - private async _init(options: AwaitableSenderOptions): Promise { + private async _init( + options: AwaitableSenderOptions & { abortSignal?: AbortSignalLike } + ): Promise { try { if (!this.isOpen() && !this.isConnecting) { this.isConnecting = true; // Wait for the connectionContext to be ready to open the link. await this._context.readyToOpenLink(); - await this._negotiateClaim(); + await this._negotiateClaim({ abortSignal: options.abortSignal }); logger.verbose( "[%s] Trying to create sender '%s'...", diff --git a/sdk/eventhub/event-hubs/src/linkEntity.ts b/sdk/eventhub/event-hubs/src/linkEntity.ts index da15762634f7..c7b2e8d5cd41 100644 --- a/sdk/eventhub/event-hubs/src/linkEntity.ts +++ b/sdk/eventhub/event-hubs/src/linkEntity.ts @@ -8,6 +8,7 @@ import { ConnectionContext } from "./connectionContext"; import { AwaitableSender, Receiver } from "rhea-promise"; import { logger } from "./log"; import { getRetryAttemptTimeoutInMs } from "./util/retries"; +import { AbortSignalLike } from "@azure/abort-controller"; /** * @hidden @@ -113,7 +114,10 @@ export class LinkEntity { * @param setTokenRenewal - Set the token renewal timer. Default false. * @returns Promise */ - protected async _negotiateClaim(setTokenRenewal?: boolean): Promise { + protected async _negotiateClaim({ + abortSignal, + setTokenRenewal + }: { setTokenRenewal?: boolean; abortSignal?: AbortSignalLike } = {}): Promise { // Acquire the lock and establish a cbs session if it does not exist on the connection. // Although node.js is single threaded, we need a locking mechanism to ensure that a // race condition does not happen while creating a shared resource (in this case the @@ -130,9 +134,10 @@ export class LinkEntity { await defaultCancellableLock.acquire( this._context.cbsSession.cbsLock, () => { - return this._context.cbsSession.init(); + return this._context.cbsSession.init({ abortSignal }); }, { + abortSignal, acquireTimeoutInMs: getRetryAttemptTimeoutInMs(undefined) } ); @@ -172,9 +177,15 @@ export class LinkEntity { await defaultCancellableLock.acquire( this._context.negotiateClaimLock, () => { - return this._context.cbsSession.negotiateClaim(this.audience, tokenObject.token, tokenType); + return this._context.cbsSession.negotiateClaim( + this.audience, + tokenObject.token, + tokenType, + { abortSignal } + ); }, { + abortSignal, acquireTimeoutInMs: getRetryAttemptTimeoutInMs(undefined) } ); @@ -206,7 +217,7 @@ export class LinkEntity { } this._tokenRenewalTimer = setTimeout(async () => { try { - await this._negotiateClaim(true); + await this._negotiateClaim({ setTokenRenewal: true }); } catch (err) { logger.verbose( "[%s] %s '%s' with address %s, an error occurred while renewing the token: %O", diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index 65b05c371354..3941c3872e3c 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -306,12 +306,12 @@ export class ManagementClient extends LinkEntity { } } - private async _init(): Promise { + private async _init({ abortSignal }: { abortSignal?: AbortSignalLike } = {}): Promise { try { if (!this._isMgmtRequestResponseLinkOpen()) { // Wait for the connectionContext to be ready to open the link. await this._context.readyToOpenLink(); - await this._negotiateClaim(); + await this._negotiateClaim({ abortSignal }); const rxopt: ReceiverOptions = { source: { address: this.address }, name: this.replyTo, @@ -340,7 +340,8 @@ export class ManagementClient extends LinkEntity { this._mgmtReqResLink = await RequestResponseLink.create( this._context.connection, sropt, - rxopt + rxopt, + { abortSignal } ); this._mgmtReqResLink.sender.on(SenderEvents.senderError, (context: EventContext) => { const id = context.connection.options.id; @@ -409,7 +410,7 @@ export class ManagementClient extends LinkEntity { await defaultCancellableLock.acquire( this.managementLock, () => { - return this._init(); + return this._init({ abortSignal }); }, { abortSignal, acquireTimeoutInMs: retryTimeoutInMs } ); diff --git a/sdk/eventhub/event-hubs/test/public/receiver.spec.ts b/sdk/eventhub/event-hubs/test/public/receiver.spec.ts index 6db65c47c921..9e8f7038958b 100644 --- a/sdk/eventhub/event-hubs/test/public/receiver.spec.ts +++ b/sdk/eventhub/event-hubs/test/public/receiver.spec.ts @@ -9,7 +9,6 @@ import debugModule from "debug"; const debug = debugModule("azure:event-hubs:receiver-spec"); import { EventData, - MessagingError, ReceivedEventData, latestEventPosition, earliestEventPosition, @@ -484,51 +483,4 @@ describe("EventHubConsumerClient", function(): void { await subscription!.close(); }); }); - - describe("Negative scenarios", function(): void { - it("should throw MessagingEntityNotFoundError for non existing consumer group", async function(): Promise< - void - > { - const badConsumerClient = new EventHubConsumerClient( - "boo", - service.connectionString, - service.path - ); - let subscription: Subscription | undefined; - const caughtErr = await new Promise((resolve) => { - subscription = badConsumerClient.subscribe({ - processEvents: async () => { - /* no-op */ - }, - processError: async (err) => { - resolve(err); - } - }); - }); - await subscription!.close(); - await badConsumerClient.close(); - - should.exist(caughtErr); - should.equal((caughtErr as MessagingError).code, "MessagingEntityNotFoundError"); - }); - - it(`should throw an invalid EventHub address error for invalid partition`, async function(): Promise< - void - > { - let subscription: Subscription | undefined; - const caughtErr = await new Promise((resolve) => { - subscription = consumerClient.subscribe("boo", { - processEvents: async () => { - /* no-op */ - }, - processError: async (err) => { - resolve(err); - } - }); - }); - await subscription!.close(); - should.exist(caughtErr); - should.equal((caughtErr as MessagingError).code, "ArgumentOutOfRangeError"); - }); - }); }).timeout(90000); diff --git a/sdk/servicebus/service-bus/package.json b/sdk/servicebus/service-bus/package.json index 3aad3f0ada27..34c8eed9065b 100644 --- a/sdk/servicebus/service-bus/package.json +++ b/sdk/servicebus/service-bus/package.json @@ -111,7 +111,7 @@ "long": "^4.0.0", "process": "^0.11.10", "tslib": "^2.0.0", - "rhea-promise": "^1.2.0" + "rhea-promise": "^1.2.1" }, "devDependencies": { "@azure/dev-tool": "^1.0.0",