Skip to content

Commit

Permalink
[event-hubs] add timeouts to acquire calls
Browse files Browse the repository at this point in the history
  • Loading branch information
chradek committed Apr 13, 2021
1 parent f92a703 commit 8e1055e
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 47 deletions.
31 changes: 23 additions & 8 deletions sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -794,14 +794,29 @@ export class EventHubReceiver extends LinkEntity {
};

const retryOptions = this.options.retryOptions || {};
const config: RetryConfig<ReceivedEventData[]> = {
connectionHost: this._context.config.host,
connectionId: this._context.connectionId,
operation: retrieveEvents,
operationType: RetryOperationType.receiveMessage,
abortSignal: abortSignal,
retryOptions: retryOptions
};

const config: RetryConfig<ReceivedEventData[]> = Object.defineProperties(
{
operation: retrieveEvents,
operationType: RetryOperationType.receiveMessage,
abortSignal: abortSignal,
retryOptions: retryOptions
},
{
connectionId: {
enumerable: true,
get: () => {
return this._context.connectionId;
}
},
connectionHost: {
enumerable: true,
get: () => {
return this._context.config.host;
}
}
}
);
return retry<ReceivedEventData[]>(config);
}
}
22 changes: 8 additions & 14 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
RetryConfig,
RetryOperationType,
RetryOptions,
defaultLock,
defaultCancellableLock,
retry,
translate
} from "@azure/core-amqp";
Expand All @@ -32,7 +32,7 @@ import { getRetryAttemptTimeoutInMs } from "./util/retries";
import { AbortSignalLike } from "@azure/abort-controller";
import { EventDataBatch, isEventDataBatch } from "./eventDataBatch";
import { defaultDataTransformer } from "./dataTransformer";
import { waitForTimeoutOrAbortOrResolve } from "./util/timeoutAbortSignalUtils";

/**
* Describes the EventHubSender that will send event data to EventHub.
* @internal
Expand Down Expand Up @@ -460,19 +460,13 @@ export class EventHubSender extends LinkEntity {
const senderOptions = this._createSenderOptions(timeoutInMs);

const createLinkPromise = async (): Promise<void> => {
return waitForTimeoutOrAbortOrResolve({
actionFn: () => {
return defaultLock.acquire(this.senderLock, () => {
return this._init(senderOptions);
});
return defaultCancellableLock.acquire(
this.senderLock,
() => {
return this._init(senderOptions);
},
abortSignal: options?.abortSignal,
timeoutMs: timeoutInMs,
timeoutMessage:
`[${this._context.connectionId}] Sender "${this.name}" ` +
`with address "${this.address}", cannot be created right now, due ` +
`to operation timeout.`
});
{ abortSignal: options.abortSignal, acquireTimeoutInMs: timeoutInMs }
);
};

const config: RetryConfig<void> = {
Expand Down
27 changes: 20 additions & 7 deletions sdk/eventhub/event-hubs/src/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
// Licensed under the MIT license.

import { v4 as uuid } from "uuid";
import { Constants, TokenType, defaultLock, isSasTokenProvider } from "@azure/core-amqp";
import { Constants, TokenType, defaultCancellableLock, isSasTokenProvider } from "@azure/core-amqp";
import { AccessToken } from "@azure/core-auth";
import { ConnectionContext } from "./connectionContext";
import { AwaitableSender, Receiver } from "rhea-promise";
import { logger } from "./log";
import { getRetryAttemptTimeoutInMs } from "./util/retries";

/**
* @hidden
Expand Down Expand Up @@ -126,9 +127,15 @@ export class LinkEntity {
this.name,
this.address
);
await defaultLock.acquire(this._context.cbsSession.cbsLock, () => {
return this._context.cbsSession.init();
});
await defaultCancellableLock.acquire(
this._context.cbsSession.cbsLock,
() => {
return this._context.cbsSession.init();
},
{
acquireTimeoutInMs: getRetryAttemptTimeoutInMs(undefined)
}
);
let tokenObject: AccessToken;
let tokenType: TokenType;
if (isSasTokenProvider(this._context.tokenCredential)) {
Expand Down Expand Up @@ -162,9 +169,15 @@ export class LinkEntity {
this.name,
this.address
);
await defaultLock.acquire(this._context.negotiateClaimLock, () => {
return this._context.cbsSession.negotiateClaim(this.audience, tokenObject.token, tokenType);
});
await defaultCancellableLock.acquire(
this._context.negotiateClaimLock,
() => {
return this._context.cbsSession.negotiateClaim(this.audience, tokenObject.token, tokenType);
},
{
acquireTimeoutInMs: getRetryAttemptTimeoutInMs(undefined)
}
);
logger.verbose(
"[%s] Negotiated claim for %s '%s' with with address: %s",
this._context.connectionId,
Expand Down
41 changes: 23 additions & 18 deletions sdk/eventhub/event-hubs/src/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
RetryOperationType,
RetryOptions,
SendRequestOptions,
defaultLock,
defaultCancellableLock,
isSasTokenProvider,
retry,
translate
Expand All @@ -33,7 +33,6 @@ import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "
import { SpanStatusCode } from "@azure/core-tracing";
import { OperationOptions } from "./util/operationOptions";
import { createEventHubSpan } from "./diagnostics/tracing";
import { waitForTimeoutOrAbortOrResolve } from "./util/timeoutAbortSignalUtils";

/**
* Describes the runtime information of an Event Hub.
Expand Down Expand Up @@ -407,16 +406,13 @@ export class ManagementClient extends LinkEntity {
const initOperationStartTime = Date.now();

try {
await waitForTimeoutOrAbortOrResolve({
actionFn: () => {
return defaultLock.acquire(this.managementLock, () => {
return this._init();
});
await defaultCancellableLock.acquire(
this.managementLock,
() => {
return this._init();
},
abortSignal: options?.abortSignal,
timeoutMs: retryTimeoutInMs,
timeoutMessage: `The request with message_id "${request.message_id}" timed out. Please try again later.`
});
{ abortSignal, acquireTimeoutInMs: retryTimeoutInMs }
);
} catch (err) {
const translatedError = translate(err);
logger.warning(
Expand Down Expand Up @@ -451,13 +447,22 @@ export class ManagementClient extends LinkEntity {
return this._mgmtReqResLink!.sendRequest(request, sendRequestOptions);
};

const config: RetryConfig<Message> = {
operation: sendOperationPromise,
connectionId: this._context.connectionId,
operationType: RetryOperationType.management,
abortSignal: abortSignal,
retryOptions: retryOptions
};
const config: RetryConfig<Message> = Object.defineProperties(
{
operation: sendOperationPromise,
operationType: RetryOperationType.management,
abortSignal: abortSignal,
retryOptions: retryOptions
},
{
connectionId: {
enumerable: true,
get: () => {
return this._context.connectionId;
}
}
}
);
return (await retry<Message>(config)).body;
} catch (err) {
const translatedError = translate(err);
Expand Down

0 comments on commit 8e1055e

Please sign in to comment.