Skip to content

Commit

Permalink
pass abortSignal to init/negotiateClaim methods
Browse files Browse the repository at this point in the history
  • Loading branch information
chradek committed Apr 16, 2021
1 parent a46c318 commit 17a2b84
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 66 deletions.
2 changes: 1 addition & 1 deletion sdk/core/core-amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
"jssha": "^3.1.0",
"process": "^0.11.10",
"rhea": "^1.0.24",
"rhea-promise": "^1.2.0",
"rhea-promise": "^1.2.1",
"tslib": "^2.0.0",
"url": "^0.11.0",
"util": "^0.12.1"
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
"is-buffer": "^2.0.3",
"jssha": "^3.1.0",
"process": "^0.11.10",
"rhea-promise": "^1.2.0",
"rhea-promise": "^1.2.1",
"tslib": "^2.0.0",
"uuid": "^8.3.0"
},
Expand Down
8 changes: 4 additions & 4 deletions sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ export class EventHubReceiver extends LinkEntity {

if (!this.isOpen()) {
try {
await this.initialize();
await this.initialize({ abortSignal });
if (abortSignal && abortSignal.aborted) {
await this.abort();
}
Expand Down Expand Up @@ -541,14 +541,14 @@ export class EventHubReceiver extends LinkEntity {
* Creates a new AMQP receiver under a new AMQP session.
* @hidden
*/
async initialize(): Promise<void> {
async initialize({ abortSignal }: { abortSignal?: AbortSignalLike }): Promise<void> {
try {
if (!this.isOpen() && !this.isConnecting) {
this.isConnecting = true;

// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim();
await this._negotiateClaim({ abortSignal });

const receiverOptions: CreateReceiverOptions = {
onClose: (context: EventContext) => this._onAmqpClose(context),
Expand All @@ -568,7 +568,7 @@ export class EventHubReceiver extends LinkEntity {
this.name,
options
);
this._receiver = await this._context.connection.createReceiver(options);
this._receiver = await this._context.connection.createReceiver({ ...options, abortSignal });
this.isConnecting = false;
logger.verbose(
"[%s] Receiver '%s' created with receiver options: %O",
Expand Down
8 changes: 5 additions & 3 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ export class EventHubSender extends LinkEntity {
return defaultCancellableLock.acquire(
this.senderLock,
() => {
return this._init(senderOptions);
return this._init({ ...senderOptions, abortSignal: options.abortSignal });
},
{ abortSignal: options.abortSignal, acquireTimeoutInMs: timeoutInMs }
);
Expand Down Expand Up @@ -496,14 +496,16 @@ export class EventHubSender extends LinkEntity {
* Initializes the sender session on the connection.
* @hidden
*/
private async _init(options: AwaitableSenderOptions): Promise<void> {
private async _init(
options: AwaitableSenderOptions & { abortSignal?: AbortSignalLike }
): Promise<void> {
try {
if (!this.isOpen() && !this.isConnecting) {
this.isConnecting = true;

// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim();
await this._negotiateClaim({ abortSignal: options.abortSignal });

logger.verbose(
"[%s] Trying to create sender '%s'...",
Expand Down
19 changes: 15 additions & 4 deletions sdk/eventhub/event-hubs/src/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { ConnectionContext } from "./connectionContext";
import { AwaitableSender, Receiver } from "rhea-promise";
import { logger } from "./log";
import { getRetryAttemptTimeoutInMs } from "./util/retries";
import { AbortSignalLike } from "@azure/abort-controller";

/**
* @hidden
Expand Down Expand Up @@ -113,7 +114,10 @@ export class LinkEntity {
* @param setTokenRenewal - Set the token renewal timer. Default false.
* @returns Promise<void>
*/
protected async _negotiateClaim(setTokenRenewal?: boolean): Promise<void> {
protected async _negotiateClaim({
abortSignal,
setTokenRenewal
}: { setTokenRenewal?: boolean; abortSignal?: AbortSignalLike } = {}): Promise<void> {
// Acquire the lock and establish a cbs session if it does not exist on the connection.
// Although node.js is single threaded, we need a locking mechanism to ensure that a
// race condition does not happen while creating a shared resource (in this case the
Expand All @@ -130,9 +134,10 @@ export class LinkEntity {
await defaultCancellableLock.acquire(
this._context.cbsSession.cbsLock,
() => {
return this._context.cbsSession.init();
return this._context.cbsSession.init({ abortSignal });
},
{
abortSignal,
acquireTimeoutInMs: getRetryAttemptTimeoutInMs(undefined)
}
);
Expand Down Expand Up @@ -172,9 +177,15 @@ export class LinkEntity {
await defaultCancellableLock.acquire(
this._context.negotiateClaimLock,
() => {
return this._context.cbsSession.negotiateClaim(this.audience, tokenObject.token, tokenType);
return this._context.cbsSession.negotiateClaim(
this.audience,
tokenObject.token,
tokenType,
{ abortSignal }
);
},
{
abortSignal,
acquireTimeoutInMs: getRetryAttemptTimeoutInMs(undefined)
}
);
Expand Down Expand Up @@ -206,7 +217,7 @@ export class LinkEntity {
}
this._tokenRenewalTimer = setTimeout(async () => {
try {
await this._negotiateClaim(true);
await this._negotiateClaim({ setTokenRenewal: true });
} catch (err) {
logger.verbose(
"[%s] %s '%s' with address %s, an error occurred while renewing the token: %O",
Expand Down
9 changes: 5 additions & 4 deletions sdk/eventhub/event-hubs/src/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,12 @@ export class ManagementClient extends LinkEntity {
}
}

private async _init(): Promise<void> {
private async _init({ abortSignal }: { abortSignal?: AbortSignalLike } = {}): Promise<void> {
try {
if (!this._isMgmtRequestResponseLinkOpen()) {
// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim();
await this._negotiateClaim({ abortSignal });
const rxopt: ReceiverOptions = {
source: { address: this.address },
name: this.replyTo,
Expand Down Expand Up @@ -340,7 +340,8 @@ export class ManagementClient extends LinkEntity {
this._mgmtReqResLink = await RequestResponseLink.create(
this._context.connection,
sropt,
rxopt
rxopt,
{ abortSignal }
);
this._mgmtReqResLink.sender.on(SenderEvents.senderError, (context: EventContext) => {
const id = context.connection.options.id;
Expand Down Expand Up @@ -409,7 +410,7 @@ export class ManagementClient extends LinkEntity {
await defaultCancellableLock.acquire(
this.managementLock,
() => {
return this._init();
return this._init({ abortSignal });
},
{ abortSignal, acquireTimeoutInMs: retryTimeoutInMs }
);
Expand Down
48 changes: 0 additions & 48 deletions sdk/eventhub/event-hubs/test/public/receiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import debugModule from "debug";
const debug = debugModule("azure:event-hubs:receiver-spec");
import {
EventData,
MessagingError,
ReceivedEventData,
latestEventPosition,
earliestEventPosition,
Expand Down Expand Up @@ -484,51 +483,4 @@ describe("EventHubConsumerClient", function(): void {
await subscription!.close();
});
});

describe("Negative scenarios", function(): void {
it("should throw MessagingEntityNotFoundError for non existing consumer group", async function(): Promise<
void
> {
const badConsumerClient = new EventHubConsumerClient(
"boo",
service.connectionString,
service.path
);
let subscription: Subscription | undefined;
const caughtErr = await new Promise<Error | MessagingError>((resolve) => {
subscription = badConsumerClient.subscribe({
processEvents: async () => {
/* no-op */
},
processError: async (err) => {
resolve(err);
}
});
});
await subscription!.close();
await badConsumerClient.close();

should.exist(caughtErr);
should.equal((caughtErr as MessagingError).code, "MessagingEntityNotFoundError");
});

it(`should throw an invalid EventHub address error for invalid partition`, async function(): Promise<
void
> {
let subscription: Subscription | undefined;
const caughtErr = await new Promise<Error | MessagingError>((resolve) => {
subscription = consumerClient.subscribe("boo", {
processEvents: async () => {
/* no-op */
},
processError: async (err) => {
resolve(err);
}
});
});
await subscription!.close();
should.exist(caughtErr);
should.equal((caughtErr as MessagingError).code, "ArgumentOutOfRangeError");
});
});
}).timeout(90000);
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
"long": "^4.0.0",
"process": "^0.11.10",
"tslib": "^2.0.0",
"rhea-promise": "^1.2.0"
"rhea-promise": "^1.2.1"
},
"devDependencies": {
"@azure/dev-tool": "^1.0.0",
Expand Down

0 comments on commit 17a2b84

Please sign in to comment.