From 3a046f6de4d90d7a2c450f753159a88fe7ba7d60 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Wed, 29 Jul 2020 15:44:34 -0700 Subject: [PATCH] [service-bus] Track 2 - push down methods of MessageReceiver that are only used for StreamingReceiver (#10322) Pushing down methods from MessageReceiver that really only belong to StreamingReceiver. This is a prep-PR for a larger refactoring with StreamingReceiver but it's useful on its own. --- .../service-bus/src/core/batchingReceiver.ts | 5 - .../service-bus/src/core/messageReceiver.ts | 790 +------ .../service-bus/src/core/receiverHelper.ts | 85 + .../service-bus/src/core/streamingReceiver.ts | 641 +++++- .../service-bus/src/receivers/receiver.ts | 2 +- .../service-bus/src/session/messageSession.ts | 48 +- .../test/internal/abortSignal.spec.ts | 8 +- .../service-bus/test/internal/atomXml.spec.ts | 1830 +++++++++-------- .../test/internal/receiverInit.spec.ts | 3 +- .../test/internal/streamingReceiver.spec.ts | 2 +- .../service-bus/test/retries.spec.ts | 11 +- .../test/streamingReceiver.spec.ts | 2 +- 12 files changed, 1704 insertions(+), 1723 deletions(-) create mode 100644 sdk/servicebus/service-bus/src/core/receiverHelper.ts diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index b791448d6e7b..112b0e96c185 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -122,11 +122,6 @@ export class BatchingReceiver extends MessageReceiver { this.name ); - // while creating the receiver link for batching receiver the max concurrent calls - // i.e. the credit_window on the link is set to zero. After the link is created - // successfully, we add credit which is the maxMessageCount specified by the user. - this.maxConcurrentCalls = 0; - return await this._batchingReceiverLite.receiveMessages({ maxMessageCount, maxWaitTimeInMs, diff --git a/sdk/servicebus/service-bus/src/core/messageReceiver.ts b/sdk/servicebus/service-bus/src/core/messageReceiver.ts index 8112af7bb7f0..965cd5dc9692 100644 --- a/sdk/servicebus/service-bus/src/core/messageReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/messageReceiver.ts @@ -2,30 +2,18 @@ // Licensed under the MIT license. import { - ConditionErrorNameMapper, Constants, ErrorNameConditionMapper, MessagingError, - RetryConfig, - RetryOperationType, RetryOptions, - retry, translate } from "@azure/core-amqp"; -import { - AmqpError, - EventContext, - OnAmqpEvent, - Receiver, - ReceiverOptions, - isAmqpError, - ReceiverEvents -} from "rhea-promise"; +import { AmqpError, EventContext, OnAmqpEvent, Receiver, ReceiverOptions } from "rhea-promise"; import * as log from "../log"; import { LinkEntity } from "./linkEntity"; import { ClientEntityContext } from "../clientEntityContext"; import { DispositionType, ReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage"; -import { calculateRenewAfterDuration, getUniqueName, StandardAbortMessage } from "../util/utils"; +import { getUniqueName, StandardAbortMessage } from "../util/utils"; import { MessageHandlerOptions } from "../models"; import { DispositionStatusOptions } from "./managementClient"; import { AbortSignalLike } from "@azure/core-http"; @@ -34,14 +22,12 @@ import { onMessageSettled, DeferredPromiseAndTimer } from "./shared"; /** * @internal + * @ignore */ -interface CreateReceiverOptions { - onMessage: OnAmqpEventAsPromise; - onClose: OnAmqpEventAsPromise; - onSessionClose: OnAmqpEventAsPromise; - onError: OnAmqpEvent; - onSessionError: OnAmqpEvent; -} +export type ReceiverHandlers = Pick< + ReceiverOptions, + "onMessage" | "onError" | "onClose" | "onSessionError" | "onSessionClose" +>; /** * @internal @@ -109,13 +95,6 @@ export class MessageReceiver extends LinkEntity { * @property {string} receiverType The type of receiver: "batching" or "streaming". */ receiverType: ReceiverType; - /** - * @property {number} [maxConcurrentCalls] The maximum number of messages that should be - * processed concurrently while in streaming mode. Once this limit has been reached, more - * messages will not be received until the user's message handler has completed processing current message. - * Default: 1 - */ - maxConcurrentCalls: number = 1; /** * @property {number} [receiveMode] The mode in which messages should be received. * Default: ReceiveMode.peekLock @@ -143,16 +122,15 @@ export class MessageReceiver extends LinkEntity { * @property {Receiver} [_receiver] The AMQP receiver link. */ protected _receiver?: Receiver; - /** - *Retry policy options that determine the mode, number of retries, retry interval etc. - */ - private _retryOptions: RetryOptions; /** * @property {Map>} _deliveryDispositionMap Maintains a map of deliveries that * are being actively disposed. It acts as a store for correlating the responses received for * active dispositions. */ - protected _deliveryDispositionMap: Map = new Map(); + protected _deliveryDispositionMap: Map = new Map< + number, + DeferredPromiseAndTimer + >(); /** * @property {OnMessage} _onMessage The message handler provided by the user that will be wrapped * inside _onAmqpMessage. @@ -163,31 +141,6 @@ export class MessageReceiver extends LinkEntity { * inside _onAmqpError. */ protected _onError?: OnError; - /** - * @property {OnAmqpEventAsPromise} _onAmqpMessage The message handler that will be set as the handler on the - * underlying rhea receiver for the "message" event. - */ - protected _onAmqpMessage: OnAmqpEventAsPromise; - /** - * @property {OnAmqpEventAsPromise} _onAmqpClose The message handler that will be set as the handler on the - * underlying rhea receiver for the "receiver_close" event. - */ - protected _onAmqpClose: OnAmqpEventAsPromise; - /** - * @property {OnAmqpEvent} _onSessionError The message handler that will be set as the handler on - * the underlying rhea receiver's session for the "session_error" event. - */ - protected _onSessionError: OnAmqpEvent; - /** - * @property {OnAmqpEventAsPromise} _onSessionClose The message handler that will be set as the handler on - * the underlying rhea receiver's session for the "session_close" event. - */ - protected _onSessionClose: OnAmqpEventAsPromise; - /** - * @property {OnAmqpEvent} _onAmqpError The message handler that will be set as the handler on the - * underlying rhea receiver for the "receiver_error" event. - */ - protected _onAmqpError: OnAmqpEvent; /** * @property {boolean} wasCloseInitiated Denotes if receiver was explicitly closed by user. */ @@ -210,36 +163,22 @@ export class MessageReceiver extends LinkEntity { * the active messages. */ protected _clearAllMessageLockRenewTimers: () => void; - /** - * Indicates whether the receiver is already actively - * running `onDetached`. - * This is expected to be true while the receiver attempts - * to bring its link back up due to a retryable issue. - */ - private _isDetaching: boolean = false; - private _stopReceivingMessages: boolean = false; - - public get receiverHelper(): ReceiverHelper { - return this._receiverHelper; - } - private _receiverHelper: ReceiverHelper; - constructor(context: ClientEntityContext, receiverType: ReceiverType, options?: ReceiveOptions) { + constructor( + context: ClientEntityContext, + receiverType: ReceiverType, + options?: Omit + ) { super(context.entityPath, context, { address: context.entityPath, audience: `${context.namespace.config.endpoint}${context.entityPath}` }); if (!options) options = {}; - this._retryOptions = options.retryOptions || {}; this.wasCloseInitiated = false; this.receiverType = receiverType; this.receiveMode = options.receiveMode || ReceiveMode.peekLock; - this._receiverHelper = new ReceiverHelper(() => this._receiver); - if (typeof options.maxConcurrentCalls === "number" && options.maxConcurrentCalls > 0) { - this.maxConcurrentCalls = options.maxConcurrentCalls; - } // If explicitly set to false then autoComplete is false else true (default). this.autoComplete = options.autoComplete === false ? options.autoComplete : true; this.maxAutoRenewDurationInMs = @@ -268,456 +207,15 @@ export class MessageReceiver extends LinkEntity { this._clearMessageLockRenewTimer(messageId); } }; - // setting all the handlers - this._onAmqpMessage = async (context: EventContext) => { - // If the receiver got closed in PeekLock mode, avoid processing the message as we - // cannot settle the message. - if ( - this.receiveMode === ReceiveMode.peekLock && - (!this._receiver || !this._receiver.isOpen()) - ) { - log.error( - "[%s] Not calling the user's message handler for the current message " + - "as the receiver '%s' is closed", - this._context.namespace.connectionId, - this.name - ); - return; - } - - const connectionId = this._context.namespace.connectionId; - const bMessage: ServiceBusMessageImpl = new ServiceBusMessageImpl( - this._context, - context.message!, - context.delivery!, - true - ); - - if (this.autoRenewLock && bMessage.lockToken) { - const lockToken = bMessage.lockToken; - // - We need to renew locks before they expire by looking at bMessage.lockedUntilUtc. - // - This autorenewal needs to happen **NO MORE** than maxAutoRenewDurationInMs - // - We should be able to clear the renewal timer when the user's message handler - // is done (whether it succeeds or fails). - // Setting the messageId with undefined value in the _messageRenewockTimers Map because we - // track state by checking the presence of messageId in the map. It is removed from the map - // when an attempt is made to settle the message (either by the user or by the sdk) OR - // when the execution of user's message handler completes. - this._messageRenewLockTimers.set(bMessage.messageId as string, undefined); - log.receiver( - "[%s] message with id '%s' is locked until %s.", - connectionId, - bMessage.messageId, - bMessage.lockedUntilUtc!.toString() - ); - const totalAutoLockRenewDuration = Date.now() + this.maxAutoRenewDurationInMs; - log.receiver( - "[%s] Total autolockrenew duration for message with id '%s' is: ", - connectionId, - bMessage.messageId, - new Date(totalAutoLockRenewDuration).toString() - ); - const autoRenewLockTask = (): void => { - if ( - new Date(totalAutoLockRenewDuration) > bMessage.lockedUntilUtc! && - Date.now() < totalAutoLockRenewDuration - ) { - if (this._messageRenewLockTimers.has(bMessage.messageId as string)) { - // TODO: We can run into problems with clock skew between the client and the server. - // It would be better to calculate the duration based on the "lockDuration" property - // of the queue. However, we do not have the management plane of the client ready for - // now. Hence we rely on the lockedUntilUtc property on the message set by ServiceBus. - const amount = calculateRenewAfterDuration(bMessage.lockedUntilUtc!); - log.receiver( - "[%s] Sleeping for %d milliseconds while renewing the lock for " + - "message with id '%s' is: ", - connectionId, - amount, - bMessage.messageId - ); - // Setting the value of the messageId to the actual timer. This will be cleared when - // an attempt is made to settle the message (either by the user or by the sdk) OR - // when the execution of user's message handler completes. - this._messageRenewLockTimers.set( - bMessage.messageId as string, - setTimeout(async () => { - try { - log.receiver( - "[%s] Attempting to renew the lock for message with id '%s'.", - connectionId, - bMessage.messageId - ); - bMessage.lockedUntilUtc = await this._context.managementClient!.renewLock( - lockToken - ); - log.receiver( - "[%s] Successfully renewed the lock for message with id '%s'.", - connectionId, - bMessage.messageId - ); - log.receiver( - "[%s] Calling the autorenewlock task again for message with " + "id '%s'.", - connectionId, - bMessage.messageId - ); - autoRenewLockTask(); - } catch (err) { - log.error( - "[%s] An error occured while auto renewing the message lock '%s' " + - "for message with id '%s': %O.", - connectionId, - bMessage.lockToken, - bMessage.messageId, - err - ); - // Let the user know that there was an error renewing the message lock. - this._onError!(err); - } - }, amount) - ); - } else { - log.receiver( - "[%s] Looks like the message lock renew timer has already been " + - "cleared for message with id '%s'.", - connectionId, - bMessage.messageId - ); - } - } else { - log.receiver( - "[%s] Current time %s exceeds the total autolockrenew duration %s for " + - "message with messageId '%s'. Hence we will stop the autoLockRenewTask.", - connectionId, - new Date(Date.now()).toString(), - new Date(totalAutoLockRenewDuration).toString(), - bMessage.messageId - ); - this._clearMessageLockRenewTimer(bMessage.messageId as string); - } - }; - // start - autoRenewLockTask(); - } - try { - await this._onMessage(bMessage); - this._clearMessageLockRenewTimer(bMessage.messageId as string); - } catch (err) { - // This ensures we call users' error handler when users' message handler throws. - if (!isAmqpError(err)) { - log.error( - "[%s] An error occurred while running user's message handler for the message " + - "with id '%s' on the receiver '%s': %O", - connectionId, - bMessage.messageId, - this.name, - err - ); - this._onError!(err); - } - - // Do not want renewLock to happen unnecessarily, while abandoning the message. Hence, - // doing this here. Otherwise, this should be done in finally. - this._clearMessageLockRenewTimer(bMessage.messageId as string); - const error = translate(err) as MessagingError; - // Nothing much to do if user's message handler throws. Let us try abandoning the message. - if ( - !bMessage.delivery.remote_settled && - error.code !== ConditionErrorNameMapper["com.microsoft:message-lock-lost"] && - this.receiveMode === ReceiveMode.peekLock && - this.isOpen() // only try to abandon the messages if the connection is still open - ) { - try { - log.error( - "[%s] Abandoning the message with id '%s' on the receiver '%s' since " + - "an error occured: %O.", - connectionId, - bMessage.messageId, - this.name, - error - ); - await bMessage.abandon(); - } catch (abandonError) { - const translatedError = translate(abandonError); - log.error( - "[%s] An error occurred while abandoning the message with id '%s' on the " + - "receiver '%s': %O.", - connectionId, - bMessage.messageId, - this.name, - translatedError - ); - this._onError!(translatedError); - } - } - return; - } finally { - this._receiverHelper.addCredit(1); - } - - // If we've made it this far, then user's message handler completed fine. Let us try - // completing the message. - if ( - this.autoComplete && - this.receiveMode === ReceiveMode.peekLock && - !bMessage.delivery.remote_settled - ) { - try { - log[this.receiverType]( - "[%s] Auto completing the message with id '%s' on " + "the receiver '%s'.", - connectionId, - bMessage.messageId, - this.name - ); - await bMessage.complete(); - } catch (completeError) { - const translatedError = translate(completeError); - log.error( - "[%s] An error occurred while completing the message with id '%s' on the " + - "receiver '%s': %O.", - connectionId, - bMessage.messageId, - this.name, - translatedError - ); - this._onError!(translatedError); - } - } - }; - - this._onAmqpError = (context: EventContext) => { - const connectionId = this._context.namespace.connectionId; - const receiver = this._receiver || context.receiver!; - const receiverError = context.receiver && context.receiver.error; - if (receiverError) { - const sbError = translate(receiverError) as MessagingError; - log.error( - "[%s] An error occurred for Receiver '%s': %O.", - connectionId, - this.name, - sbError - ); - if (!sbError.retryable) { - if (receiver && !receiver.isItselfClosed()) { - log.error( - "[%s] Since the user did not close the receiver and the error is not " + - "retryable, we let the user know about it by calling the user's error handler.", - connectionId - ); - this._onError!(sbError); - } else { - log.error( - "[%s] The received error is not retryable. However, the receiver was " + - "closed by the user. Hence not notifying the user's error handler.", - connectionId - ); - } - } else { - log.error( - "[%s] Since received error is retryable, we will NOT notify the user's " + - "error handler.", - connectionId - ); - } - } - }; - - this._onSessionError = (context: EventContext) => { - const connectionId = this._context.namespace.connectionId; - const receiver = this._receiver || context.receiver!; - const sessionError = context.session && context.session.error; - if (sessionError) { - const sbError = translate(sessionError) as MessagingError; - log.error( - "[%s] An error occurred on the session for Receiver '%s': %O.", - connectionId, - this.name, - sbError - ); - if (receiver && !receiver.isSessionItselfClosed() && !sbError.retryable) { - log.error( - "[%s] Since the user did not close the receiver and the session error is not " + - "retryable, we let the user know about it by calling the user's error handler.", - connectionId - ); - this._onError!(sbError); - } - } - }; - - this._onAmqpClose = async (context: EventContext) => { - const connectionId = this._context.namespace.connectionId; - const receiverError = context.receiver && context.receiver.error; - const receiver = this._receiver || context.receiver!; - if (receiverError) { - log.error( - "[%s] 'receiver_close' event occurred for receiver '%s' with address '%s'. " + - "The associated error is: %O", - connectionId, - this.name, - this.address, - receiverError - ); - } - this._clearAllMessageLockRenewTimers(); - if (receiver && !receiver.isItselfClosed()) { - if (!this.isConnecting) { - log.error( - "[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " + - "and the sdk did not initiate this. The receiver is not reconnecting. Hence, calling " + - "detached from the _onAmqpClose() handler.", - connectionId, - this.name, - this.address - ); - await this.onDetached(receiverError); - } else { - log.error( - "[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " + - "and the sdk did not initate this. Moreover the receiver is already re-connecting. " + - "Hence not calling detached from the _onAmqpClose() handler.", - connectionId, - this.name, - this.address - ); - } - } else { - log.error( - "[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " + - "because the sdk initiated it. Hence not calling detached from the _onAmqpClose" + - "() handler.", - connectionId, - this.name, - this.address - ); - } - }; - - this._onSessionClose = async (context: EventContext) => { - const connectionId = this._context.namespace.connectionId; - const receiver = this._receiver || context.receiver!; - const sessionError = context.session && context.session.error; - if (sessionError) { - log.error( - "[%s] 'session_close' event occurred for receiver '%s' with address '%s'. " + - "The associated error is: %O", - connectionId, - this.name, - this.address, - sessionError - ); - } - this._clearAllMessageLockRenewTimers(); - if (receiver && !receiver.isSessionItselfClosed()) { - if (!this.isConnecting) { - log.error( - "[%s] 'session_close' event occurred on the session of receiver '%s' with " + - "address '%s' and the sdk did not initiate this. Hence calling detached from the " + - "_onSessionClose() handler.", - connectionId, - this.name, - this.address - ); - await this.onDetached(sessionError); - } else { - log.error( - "[%s] 'session_close' event occurred on the session of receiver '%s' with " + - "address '%s' and the sdk did not initiate this. Moreover the receiver is already " + - "re-connecting. Hence not calling detached from the _onSessionClose() handler.", - connectionId, - this.name, - this.address - ); - } - } else { - log.error( - "[%s] 'session_close' event occurred on the session of receiver '%s' with address " + - "'%s' because the sdk initiated it. Hence not calling detached from the _onSessionClose" + - "() handler.", - connectionId, - this.name, - this.address - ); - } - }; - } - - /** - * Prevents us from receiving any further messages. - */ - public stopReceivingMessages(): Promise { - log.receiver( - `[${this.name}] User has requested to stop receiving new messages, attempting to drain the credits.` - ); - this._stopReceivingMessages = true; - - return this.drainReceiver(); - } - - private drainReceiver(): Promise { - log.receiver(`[${this.name}] Receiver is starting drain.`); - - const drainPromise = new Promise((resolve) => { - if (this._receiver == null) { - log.receiver(`[${this.name}] Internal receiver has been removed. Not draining.`); - resolve(); - return; - } - - this._receiver.once(ReceiverEvents.receiverDrained, () => { - log.receiver(`[${this.name}] Receiver has been drained.`); - resolve(); - }); - - this._receiver.drain = true; - // this is not actually adding another credit - it'll just - // cause the drain call to start. - this._receiver.addCredit(1); - }); - - return drainPromise; - } - - /** - * Adds credits to the receiver, respecting any state that - * indicates the receiver is closed or should not continue - * to receive more messages. - * - * @param credits Number of credits to add. - */ - protected addCredit(credits: number): boolean { - if (this._stopReceivingMessages || this._receiver == null) { - return false; - } - - this._receiver.addCredit(credits); - return true; } /** * Creates the options that need to be specified while creating an AMQP receiver link. */ protected _createReceiverOptions( - useNewName?: boolean, - options?: CreateReceiverOptions + useNewName: boolean, + handlers: ReceiverHandlers ): ReceiverOptions { - if (!options) { - options = { - onMessage: (context: EventContext) => - this._onAmqpMessage(context).catch(() => { - /* */ - }), - onClose: (context: EventContext) => - this._onAmqpClose(context).catch(() => { - /* */ - }), - onSessionClose: (context: EventContext) => - this._onSessionClose(context).catch(() => { - /* */ - }), - onError: this._onAmqpError, - onSessionError: this._onSessionError - }; - } const rcvrOptions: ReceiverOptions = { name: useNewName ? getUniqueName(this._context.entityPath) : this.name, autoaccept: this.receiveMode === ReceiveMode.receiveAndDelete ? true : false, @@ -736,7 +234,7 @@ export class MessageReceiver extends LinkEntity { this._deliveryDispositionMap ); }, - ...options + ...handlers }; return rcvrOptions; @@ -747,7 +245,7 @@ export class MessageReceiver extends LinkEntity { * * @returns {Promise} Promise. */ - protected async _init(options?: ReceiverOptions, abortSignal?: AbortSignalLike): Promise { + protected async _init(options: ReceiverOptions, abortSignal?: AbortSignalLike): Promise { const checkAborted = (): void => { if (abortSignal?.aborted) { throw new AbortError(StandardAbortMessage); @@ -769,7 +267,7 @@ export class MessageReceiver extends LinkEntity { log.error( "[%s] The receiver '%s' with address '%s' is not open and is not currently " + - "establishing itself. Hence let's try to connect.", + "establishing itself. Hence let's try to connect.", connectionId, this.name, this.address @@ -784,9 +282,6 @@ export class MessageReceiver extends LinkEntity { await this._negotiateClaim(); checkAborted(); - if (!options) { - options = this._createReceiverOptions(); - } log.error( "[%s] Trying to create receiver '%s' with options %O", connectionId, @@ -825,7 +320,7 @@ export class MessageReceiver extends LinkEntity { } else { log.error( "[%s] The receiver '%s' with address '%s' is open -> %s and is connecting " + - "-> %s. Hence not reconnecting.", + "-> %s. Hence not reconnecting.", connectionId, this.name, this.address, @@ -860,165 +355,6 @@ export class MessageReceiver extends LinkEntity { ); } - /** - * Will reconnect the receiver link if necessary. - * @param receiverError The receiver error or connection error, if any. - * @param connectionDidDisconnect Whether this method is called as a result of a connection disconnect. - * @returns {Promise} Promise. - */ - async onDetached(receiverError?: AmqpError | Error, causedByDisconnect?: boolean): Promise { - const connectionId = this._context.namespace.connectionId; - - // User explicitly called `close` on the receiver, so link is already closed - // and we can exit early. - if (this.wasCloseInitiated) { - return; - } - - // Prevent multiple onDetached invocations from running concurrently. - if (this._isDetaching) { - // This can happen when the network connection goes down for some amount of time. - // The first connection `disconnect` will trigger `onDetached` and attempt to retry - // creating the connection/receiver link. - // While those retry attempts fail (until the network connection comes back up), - // we'll continue to see connection `disconnect` errors. - // These should be ignored until the already running `onDetached` completes - // its retry attempts or errors. - log.error( - `[${connectionId}] Call to detached on streaming receiver '${this.name}' is already in progress.` - ); - return; - } - - this._isDetaching = true; - try { - // Clears the token renewal timer. Closes the link and its session if they are open. - // Removes the link and its session if they are present in rhea's cache. - await this._closeLink(this._receiver); - - if (this.receiverType === ReceiverType.batching) { - log.error( - "[%s] Receiver '%s' with address '%s' is a Batching Receiver, so we will not be " + - "re-establishing the receiver link.", - connectionId, - this.name, - this.address - ); - return; - } - - const translatedError = receiverError ? translate(receiverError) : receiverError; - - // Track-1 - // - We should only attempt to reopen if either no error was present, - // or the error is considered retryable. - // Track-2 - // Reopen - // - If no error was present - // - If the error is a MessagingError and is considered retryable - // - Any non MessagingError because such errors do not get - // translated by `@azure/core-amqp` to a MessagingError - // - More details here - https://github.com/Azure/azure-sdk-for-js/pull/8580#discussion_r417087030 - const shouldReopen = - translatedError instanceof MessagingError ? translatedError.retryable : true; - - // Non-retryable errors that aren't caused by disconnect - // will have already been forwarded to the user's error handler. - // Swallow the error and return quickly. - if (!shouldReopen && !causedByDisconnect) { - log.error( - "[%s] Encountered a non retryable error on the receiver. Cannot recover receiver '%s' with address '%s' encountered error: %O", - connectionId, - this.name, - this.address, - translatedError - ); - return; - } - - // Non-retryable errors that are caused by disconnect - // haven't had a chance to show up in the user's error handler. - // Rethrow the error so the surrounding try/catch forwards it appropriately. - if (!shouldReopen && causedByDisconnect) { - log.error( - "[%s] Encountered a non retryable error on the connection. Cannot recover receiver '%s' with address '%s': %O", - connectionId, - this.name, - this.address, - translatedError - ); - throw translatedError; - } - - // provide a new name to the link while re-connecting it. This ensures that - // the service does not send an error stating that the link is still open. - const options: ReceiverOptions = this._createReceiverOptions(true); - - // shall retry forever at an interval of 15 seconds if the error is a retryable error - // else bail out when the error is not retryable or the operation succeeds. - const config: RetryConfig = { - operation: () => - this._init(options).then(async () => { - if (this.wasCloseInitiated) { - log.error( - "[%s] close() method of Receiver '%s' with address '%s' was called. " + - "by the time the receiver finished getting created. Hence, disallowing messages from being received. ", - connectionId, - this.name, - this.address - ); - await this.close(); - } else { - if (this._receiver && this.receiverType === ReceiverType.streaming) { - this._receiverHelper.addCredit(this.maxConcurrentCalls); - } - } - return; - }), - connectionId: connectionId, - operationType: RetryOperationType.receiverLink, - retryOptions: this._retryOptions, - connectionHost: this._context.namespace.config.host - }; - // Attempt to reconnect. If a non-retryable error is encountered, - // retry will throw and the error will surface to the user's error handler. - await retry(config); - } catch (err) { - log.error( - "[%s] An error occurred while processing detached() of Receiver '%s': %O ", - connectionId, - this.name, - this.address, - err - ); - if (typeof this._onError === "function") { - log.error( - "[%s] Unable to automatically reconnect Receiver '%s' with address '%s'.", - connectionId, - this.name, - this.address - ); - try { - this._onError(err); - } catch (err) { - log.error( - "[%s] User-code error in error handler called after disconnect: %O", - connectionId, - err - ); - } finally { - // Once the user's error handler has been called, - // close the receiver to prevent future messages/errors from being received. - // Swallow errors from the close rather than forwarding to user's error handler - // to prevent a never ending loop. - await this.close().catch(() => { }); - } - } - } finally { - this._isDetaching = false; - } - } - /** * Closes the underlying AMQP receiver. * @return {Promise} Promise. @@ -1062,7 +398,7 @@ export class MessageReceiver extends LinkEntity { log.receiver( "[%s] Disposition for delivery id: %d, did not complete in %d milliseconds. " + - "Hence rejecting the promise with timeout error.", + "Hence rejecting the promise with timeout error.", this._context.namespace.connectionId, delivery.id, Constants.defaultOperationTimeoutInMs @@ -1125,83 +461,3 @@ export class MessageReceiver extends LinkEntity { return result; } } - -/** - * Wraps the receiver with some higher level operations for managing state - * like credits, draining, etc... - * - * @internal - * @ignore - */ -export class ReceiverHelper { - private _stopReceivingMessages: boolean = false; - - constructor(private _getCurrentReceiver: () => Receiver | undefined) { } - - /** - * Adds credits to the receiver, respecting any state that - * indicates the receiver is closed or should not continue - * to receive more messages. - * - * @param credits Number of credits to add. - * @returns true if credits were added, false if there is no current receiver instance - * or `stopReceivingMessages` has been called. - */ - public addCredit(credits: number): boolean { - const receiver = this._getCurrentReceiver(); - - if (this._stopReceivingMessages || receiver == null) { - return false; - } - - receiver.addCredit(credits); - return true; - } - - /** - * Prevents us from receiving any further messages. - */ - public async stopReceivingMessages(): Promise { - const receiver = this._getCurrentReceiver(); - - if (receiver == null) { - return; - } - - log.receiver( - `[${receiver.name}] User has requested to stop receiving new messages, attempting to drain the credits.` - ); - this._stopReceivingMessages = true; - - return this.drain(); - } - - /** - * Initiates a drain for the current receiver and resolves when - * the drain has completed. - */ - public async drain(): Promise { - const receiver = this._getCurrentReceiver(); - - if (receiver == null) { - return; - } - - log.receiver(`[${receiver.name}] Receiver is starting drain.`); - - const drainPromise = new Promise((resolve) => { - receiver.once(ReceiverEvents.receiverDrained, () => { - log.receiver(`[${receiver.name}] Receiver has been drained.`); - receiver.drain = false; - resolve(); - }); - - receiver.drain = true; - // this is not actually adding another credit - it'll just - // cause the drain call to start. - receiver.addCredit(1); - }); - - return drainPromise; - } -} diff --git a/sdk/servicebus/service-bus/src/core/receiverHelper.ts b/sdk/servicebus/service-bus/src/core/receiverHelper.ts new file mode 100644 index 000000000000..9bcec051ab8b --- /dev/null +++ b/sdk/servicebus/service-bus/src/core/receiverHelper.ts @@ -0,0 +1,85 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { Receiver, ReceiverEvents } from "rhea-promise"; +import * as log from "../log"; + +/** + * Wraps the receiver with some higher level operations for managing state + * like credits, draining, etc... + * + * @internal + * @ignore + */ +export class ReceiverHelper { + private _stopReceivingMessages: boolean = false; + + constructor(private _getCurrentReceiver: () => Receiver | undefined) {} + + /** + * Adds credits to the receiver, respecting any state that + * indicates the receiver is closed or should not continue + * to receive more messages. + * + * @param credits Number of credits to add. + * @returns true if credits were added, false if there is no current receiver instance + * or `stopReceivingMessages` has been called. + */ + public addCredit(credits: number): boolean { + const receiver = this._getCurrentReceiver(); + + if (this._stopReceivingMessages || receiver == null) { + return false; + } + + receiver.addCredit(credits); + return true; + } + + /** + * Prevents us from receiving any further messages. + */ + public async stopReceivingMessages(): Promise { + const receiver = this._getCurrentReceiver(); + + if (receiver == null) { + return; + } + + log.receiver( + `[${receiver.name}] User has requested to stop receiving new messages, attempting to drain the credits.` + ); + this._stopReceivingMessages = true; + + return this.drain(); + } + + /** + * Initiates a drain for the current receiver and resolves when + * the drain has completed. + */ + public async drain(): Promise { + const receiver = this._getCurrentReceiver(); + + if (receiver == null) { + return; + } + + log.receiver(`[${receiver.name}] Receiver is starting drain.`); + + const drainPromise = new Promise((resolve) => { + receiver.once(ReceiverEvents.receiverDrained, () => { + log.receiver(`[${receiver.name}] Receiver has been drained.`); + receiver.drain = false; + resolve(); + }); + + receiver.drain = true; + // this is not actually adding another credit - it'll just + // cause the drain call to start. + receiver.addCredit(1); + }); + + return drainPromise; + } +} diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index b00c4d5410b3..9eb61fef1753 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -3,17 +3,34 @@ import { MessageReceiver, + OnAmqpEventAsPromise, OnError, OnMessage, ReceiveOptions, + ReceiverHandlers, ReceiverType } from "./messageReceiver"; +import { ReceiverHelper } from "./receiverHelper"; + import { ClientEntityContext } from "../clientEntityContext"; import { throwErrorIfConnectionClosed } from "../util/errors"; -import { RetryOperationType, RetryConfig, retry } from "@azure/core-amqp"; +import { + RetryOperationType, + RetryConfig, + retry, + MessagingError, + translate, + RetryOptions, + ConditionErrorNameMapper +} from "@azure/core-amqp"; import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; +import * as log from "../log"; +import { AmqpError, EventContext, isAmqpError, OnAmqpEvent } from "rhea-promise"; +import { ReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage"; +import { calculateRenewAfterDuration } from "../util/utils"; +import { AbortSignalLike } from "@azure/abort-controller"; /** * @internal @@ -23,6 +40,57 @@ import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; * @extends MessageReceiver */ export class StreamingReceiver extends MessageReceiver { + /** + * @property {number} [maxConcurrentCalls] The maximum number of messages that should be + * processed concurrently while in streaming mode. Once this limit has been reached, more + * messages will not be received until the user's message handler has completed processing current message. + * Default: 1 + */ + maxConcurrentCalls: number = 1; + + /** + * Indicates whether the receiver is already actively + * running `onDetached`. + * This is expected to be true while the receiver attempts + * to bring its link back up due to a retryable issue. + */ + private _isDetaching: boolean = false; + /** + *Retry policy options that determine the mode, number of retries, retry interval etc. + */ + private _retryOptions: RetryOptions; + + private _receiverHelper: ReceiverHelper; + + /** + * @property {OnAmqpEventAsPromise} _onAmqpClose The message handler that will be set as the handler on the + * underlying rhea receiver for the "receiver_close" event. + */ + private _onAmqpClose: OnAmqpEventAsPromise; + + /** + * @property {OnAmqpEventAsPromise} _onSessionClose The message handler that will be set as the handler on + * the underlying rhea receiver's session for the "session_close" event. + */ + private _onSessionClose: OnAmqpEventAsPromise; + + /** + * @property {OnAmqpEvent} _onSessionError The message handler that will be set as the handler on + * the underlying rhea receiver's session for the "session_error" event. + */ + private _onSessionError: OnAmqpEvent; + /** + * @property {OnAmqpEvent} _onAmqpError The message handler that will be set as the handler on the + * underlying rhea receiver for the "receiver_error" event. + */ + private _onAmqpError: OnAmqpEvent; + + /** + * @property {OnAmqpEventAsPromise} _onAmqpMessage The message handler that will be set as the handler on the + * underlying rhea receiver for the "message" event. + */ + protected _onAmqpMessage: OnAmqpEventAsPromise; + /** * Instantiate a new Streaming receiver for receiving messages with handlers. * @@ -32,6 +100,412 @@ export class StreamingReceiver extends MessageReceiver { */ constructor(context: ClientEntityContext, options?: ReceiveOptions) { super(context, ReceiverType.streaming, options); + + if (typeof options?.maxConcurrentCalls === "number" && options?.maxConcurrentCalls > 0) { + this.maxConcurrentCalls = options.maxConcurrentCalls; + } + + this._retryOptions = options?.retryOptions || {}; + this._receiverHelper = new ReceiverHelper(() => this._receiver); + + this._onAmqpClose = async (context: EventContext) => { + const connectionId = this._context.namespace.connectionId; + const receiverError = context.receiver && context.receiver.error; + const receiver = this._receiver || context.receiver!; + if (receiverError) { + log.error( + "[%s] 'receiver_close' event occurred for receiver '%s' with address '%s'. " + + "The associated error is: %O", + connectionId, + this.name, + this.address, + receiverError + ); + } + this._clearAllMessageLockRenewTimers(); + if (receiver && !receiver.isItselfClosed()) { + if (!this.isConnecting) { + log.error( + "[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " + + "and the sdk did not initiate this. The receiver is not reconnecting. Hence, calling " + + "detached from the _onAmqpClose() handler.", + connectionId, + this.name, + this.address + ); + await this.onDetached(receiverError); + } else { + log.error( + "[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " + + "and the sdk did not initate this. Moreover the receiver is already re-connecting. " + + "Hence not calling detached from the _onAmqpClose() handler.", + connectionId, + this.name, + this.address + ); + } + } else { + log.error( + "[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " + + "because the sdk initiated it. Hence not calling detached from the _onAmqpClose" + + "() handler.", + connectionId, + this.name, + this.address + ); + } + }; + + this._onSessionClose = async (context: EventContext) => { + const connectionId = this._context.namespace.connectionId; + const receiver = this._receiver || context.receiver!; + const sessionError = context.session && context.session.error; + if (sessionError) { + log.error( + "[%s] 'session_close' event occurred for receiver '%s' with address '%s'. " + + "The associated error is: %O", + connectionId, + this.name, + this.address, + sessionError + ); + } + this._clearAllMessageLockRenewTimers(); + if (receiver && !receiver.isSessionItselfClosed()) { + if (!this.isConnecting) { + log.error( + "[%s] 'session_close' event occurred on the session of receiver '%s' with " + + "address '%s' and the sdk did not initiate this. Hence calling detached from the " + + "_onSessionClose() handler.", + connectionId, + this.name, + this.address + ); + await this.onDetached(sessionError); + } else { + log.error( + "[%s] 'session_close' event occurred on the session of receiver '%s' with " + + "address '%s' and the sdk did not initiate this. Moreover the receiver is already " + + "re-connecting. Hence not calling detached from the _onSessionClose() handler.", + connectionId, + this.name, + this.address + ); + } + } else { + log.error( + "[%s] 'session_close' event occurred on the session of receiver '%s' with address " + + "'%s' because the sdk initiated it. Hence not calling detached from the _onSessionClose" + + "() handler.", + connectionId, + this.name, + this.address + ); + } + }; + + this._onAmqpError = (context: EventContext) => { + const connectionId = this._context.namespace.connectionId; + const receiver = this._receiver || context.receiver!; + const receiverError = context.receiver && context.receiver.error; + if (receiverError) { + const sbError = translate(receiverError) as MessagingError; + log.error( + "[%s] An error occurred for Receiver '%s': %O.", + connectionId, + this.name, + sbError + ); + if (!sbError.retryable) { + if (receiver && !receiver.isItselfClosed()) { + log.error( + "[%s] Since the user did not close the receiver and the error is not " + + "retryable, we let the user know about it by calling the user's error handler.", + connectionId + ); + this._onError!(sbError); + } else { + log.error( + "[%s] The received error is not retryable. However, the receiver was " + + "closed by the user. Hence not notifying the user's error handler.", + connectionId + ); + } + } else { + log.error( + "[%s] Since received error is retryable, we will NOT notify the user's " + + "error handler.", + connectionId + ); + } + } + }; + + this._onSessionError = (context: EventContext) => { + const connectionId = this._context.namespace.connectionId; + const receiver = this._receiver || context.receiver!; + const sessionError = context.session && context.session.error; + if (sessionError) { + const sbError = translate(sessionError) as MessagingError; + log.error( + "[%s] An error occurred on the session for Receiver '%s': %O.", + connectionId, + this.name, + sbError + ); + if (receiver && !receiver.isSessionItselfClosed() && !sbError.retryable) { + log.error( + "[%s] Since the user did not close the receiver and the session error is not " + + "retryable, we let the user know about it by calling the user's error handler.", + connectionId + ); + this._onError!(sbError); + } + } + }; + + this._onAmqpMessage = async (context: EventContext) => { + // If the receiver got closed in PeekLock mode, avoid processing the message as we + // cannot settle the message. + if ( + this.receiveMode === ReceiveMode.peekLock && + (!this._receiver || !this._receiver.isOpen()) + ) { + log.error( + "[%s] Not calling the user's message handler for the current message " + + "as the receiver '%s' is closed", + this._context.namespace.connectionId, + this.name + ); + return; + } + + const connectionId = this._context.namespace.connectionId; + const bMessage: ServiceBusMessageImpl = new ServiceBusMessageImpl( + this._context, + context.message!, + context.delivery!, + true + ); + + if (this.autoRenewLock && bMessage.lockToken) { + const lockToken = bMessage.lockToken; + // - We need to renew locks before they expire by looking at bMessage.lockedUntilUtc. + // - This autorenewal needs to happen **NO MORE** than maxAutoRenewDurationInMs + // - We should be able to clear the renewal timer when the user's message handler + // is done (whether it succeeds or fails). + // Setting the messageId with undefined value in the _messageRenewockTimers Map because we + // track state by checking the presence of messageId in the map. It is removed from the map + // when an attempt is made to settle the message (either by the user or by the sdk) OR + // when the execution of user's message handler completes. + this._messageRenewLockTimers.set(bMessage.messageId as string, undefined); + log.receiver( + "[%s] message with id '%s' is locked until %s.", + connectionId, + bMessage.messageId, + bMessage.lockedUntilUtc!.toString() + ); + const totalAutoLockRenewDuration = Date.now() + this.maxAutoRenewDurationInMs; + log.receiver( + "[%s] Total autolockrenew duration for message with id '%s' is: ", + connectionId, + bMessage.messageId, + new Date(totalAutoLockRenewDuration).toString() + ); + const autoRenewLockTask = (): void => { + if ( + new Date(totalAutoLockRenewDuration) > bMessage.lockedUntilUtc! && + Date.now() < totalAutoLockRenewDuration + ) { + if (this._messageRenewLockTimers.has(bMessage.messageId as string)) { + // TODO: We can run into problems with clock skew between the client and the server. + // It would be better to calculate the duration based on the "lockDuration" property + // of the queue. However, we do not have the management plane of the client ready for + // now. Hence we rely on the lockedUntilUtc property on the message set by ServiceBus. + const amount = calculateRenewAfterDuration(bMessage.lockedUntilUtc!); + log.receiver( + "[%s] Sleeping for %d milliseconds while renewing the lock for " + + "message with id '%s' is: ", + connectionId, + amount, + bMessage.messageId + ); + // Setting the value of the messageId to the actual timer. This will be cleared when + // an attempt is made to settle the message (either by the user or by the sdk) OR + // when the execution of user's message handler completes. + this._messageRenewLockTimers.set( + bMessage.messageId as string, + setTimeout(async () => { + try { + log.receiver( + "[%s] Attempting to renew the lock for message with id '%s'.", + connectionId, + bMessage.messageId + ); + bMessage.lockedUntilUtc = await this._context.managementClient!.renewLock( + lockToken + ); + log.receiver( + "[%s] Successfully renewed the lock for message with id '%s'.", + connectionId, + bMessage.messageId + ); + log.receiver( + "[%s] Calling the autorenewlock task again for message with " + "id '%s'.", + connectionId, + bMessage.messageId + ); + autoRenewLockTask(); + } catch (err) { + log.error( + "[%s] An error occured while auto renewing the message lock '%s' " + + "for message with id '%s': %O.", + connectionId, + bMessage.lockToken, + bMessage.messageId, + err + ); + // Let the user know that there was an error renewing the message lock. + this._onError!(err); + } + }, amount) + ); + } else { + log.receiver( + "[%s] Looks like the message lock renew timer has already been " + + "cleared for message with id '%s'.", + connectionId, + bMessage.messageId + ); + } + } else { + log.receiver( + "[%s] Current time %s exceeds the total autolockrenew duration %s for " + + "message with messageId '%s'. Hence we will stop the autoLockRenewTask.", + connectionId, + new Date(Date.now()).toString(), + new Date(totalAutoLockRenewDuration).toString(), + bMessage.messageId + ); + this._clearMessageLockRenewTimer(bMessage.messageId as string); + } + }; + // start + autoRenewLockTask(); + } + try { + await this._onMessage(bMessage); + this._clearMessageLockRenewTimer(bMessage.messageId as string); + } catch (err) { + // This ensures we call users' error handler when users' message handler throws. + if (!isAmqpError(err)) { + log.error( + "[%s] An error occurred while running user's message handler for the message " + + "with id '%s' on the receiver '%s': %O", + connectionId, + bMessage.messageId, + this.name, + err + ); + this._onError!(err); + } + + // Do not want renewLock to happen unnecessarily, while abandoning the message. Hence, + // doing this here. Otherwise, this should be done in finally. + this._clearMessageLockRenewTimer(bMessage.messageId as string); + const error = translate(err) as MessagingError; + // Nothing much to do if user's message handler throws. Let us try abandoning the message. + if ( + !bMessage.delivery.remote_settled && + error.code !== ConditionErrorNameMapper["com.microsoft:message-lock-lost"] && + this.receiveMode === ReceiveMode.peekLock && + this.isOpen() // only try to abandon the messages if the connection is still open + ) { + try { + log.error( + "[%s] Abandoning the message with id '%s' on the receiver '%s' since " + + "an error occured: %O.", + connectionId, + bMessage.messageId, + this.name, + error + ); + await bMessage.abandon(); + } catch (abandonError) { + const translatedError = translate(abandonError); + log.error( + "[%s] An error occurred while abandoning the message with id '%s' on the " + + "receiver '%s': %O.", + connectionId, + bMessage.messageId, + this.name, + translatedError + ); + this._onError!(translatedError); + } + } + return; + } finally { + this._receiverHelper.addCredit(1); + } + + // If we've made it this far, then user's message handler completed fine. Let us try + // completing the message. + if ( + this.autoComplete && + this.receiveMode === ReceiveMode.peekLock && + !bMessage.delivery.remote_settled + ) { + try { + log[this.receiverType]( + "[%s] Auto completing the message with id '%s' on " + "the receiver '%s'.", + connectionId, + bMessage.messageId, + this.name + ); + await bMessage.complete(); + } catch (completeError) { + const translatedError = translate(completeError); + log.error( + "[%s] An error occurred while completing the message with id '%s' on the " + + "receiver '%s': %O.", + connectionId, + bMessage.messageId, + this.name, + translatedError + ); + this._onError!(translatedError); + } + } + }; + } + + private _getHandlers(): ReceiverHandlers { + return { + onMessage: (context: EventContext) => + this._onAmqpMessage(context).catch(() => { + /* */ + }), + onClose: (context: EventContext) => + this._onAmqpClose(context).catch(() => { + /* */ + }), + onSessionClose: (context: EventContext) => + this._onSessionClose(context).catch(() => { + /* */ + }), + onError: this._onAmqpError, + onSessionError: this._onSessionError + }; + } + + stopReceivingMessages(): Promise { + return this._receiverHelper.stopReceivingMessages(); + } + + init(useNewName: boolean, abortSignal?: AbortSignalLike): Promise { + const options = this._createReceiverOptions(useNewName, this._getHandlers()); + return super._init(options, abortSignal); } /** @@ -42,9 +516,170 @@ export class StreamingReceiver extends MessageReceiver { */ receive(onMessage: OnMessage, onError: OnError): void { throwErrorIfConnectionClosed(this._context.namespace); + this._onMessage = onMessage; this._onError = onError; - this.receiverHelper.addCredit(this.maxConcurrentCalls); + + this._receiverHelper.addCredit(this.maxConcurrentCalls); + } + + /** + * Will reconnect the receiver link if necessary. + * @param receiverError The receiver error or connection error, if any. + * @param connectionDidDisconnect Whether this method is called as a result of a connection disconnect. + * @returns {Promise} Promise. + */ + async onDetached(receiverError?: AmqpError | Error, causedByDisconnect?: boolean): Promise { + const connectionId = this._context.namespace.connectionId; + + // User explicitly called `close` on the receiver, so link is already closed + // and we can exit early. + if (this.wasCloseInitiated) { + return; + } + + // Prevent multiple onDetached invocations from running concurrently. + if (this._isDetaching) { + // This can happen when the network connection goes down for some amount of time. + // The first connection `disconnect` will trigger `onDetached` and attempt to retry + // creating the connection/receiver link. + // While those retry attempts fail (until the network connection comes back up), + // we'll continue to see connection `disconnect` errors. + // These should be ignored until the already running `onDetached` completes + // its retry attempts or errors. + log.error( + `[${connectionId}] Call to detached on streaming receiver '${this.name}' is already in progress.` + ); + return; + } + + this._isDetaching = true; + try { + // Clears the token renewal timer. Closes the link and its session if they are open. + // Removes the link and its session if they are present in rhea's cache. + await this._closeLink(this._receiver); + + if (this.receiverType === ReceiverType.batching) { + log.error( + "[%s] Receiver '%s' with address '%s' is a Batching Receiver, so we will not be " + + "re-establishing the receiver link.", + connectionId, + this.name, + this.address + ); + return; + } + + const translatedError = receiverError ? translate(receiverError) : receiverError; + + // Track-1 + // - We should only attempt to reopen if either no error was present, + // or the error is considered retryable. + // Track-2 + // Reopen + // - If no error was present + // - If the error is a MessagingError and is considered retryable + // - Any non MessagingError because such errors do not get + // translated by `@azure/core-amqp` to a MessagingError + // - More details here - https://github.com/Azure/azure-sdk-for-js/pull/8580#discussion_r417087030 + const shouldReopen = + translatedError instanceof MessagingError ? translatedError.retryable : true; + + // Non-retryable errors that aren't caused by disconnect + // will have already been forwarded to the user's error handler. + // Swallow the error and return quickly. + if (!shouldReopen && !causedByDisconnect) { + log.error( + "[%s] Encountered a non retryable error on the receiver. Cannot recover receiver '%s' with address '%s' encountered error: %O", + connectionId, + this.name, + this.address, + translatedError + ); + return; + } + + // Non-retryable errors that are caused by disconnect + // haven't had a chance to show up in the user's error handler. + // Rethrow the error so the surrounding try/catch forwards it appropriately. + if (!shouldReopen && causedByDisconnect) { + log.error( + "[%s] Encountered a non retryable error on the connection. Cannot recover receiver '%s' with address '%s': %O", + connectionId, + this.name, + this.address, + translatedError + ); + throw translatedError; + } + + // shall retry forever at an interval of 15 seconds if the error is a retryable error + // else bail out when the error is not retryable or the operation succeeds. + const config: RetryConfig = { + operation: () => + this.init( + // provide a new name to the link while re-connecting it. This ensures that + // the service does not send an error stating that the link is still open. + true + ).then(async () => { + if (this.wasCloseInitiated) { + log.error( + "[%s] close() method of Receiver '%s' with address '%s' was called. " + + "by the time the receiver finished getting created. Hence, disallowing messages from being received. ", + connectionId, + this.name, + this.address + ); + await this.close(); + } else { + if (this._receiver && this.receiverType === ReceiverType.streaming) { + this._receiverHelper.addCredit(this.maxConcurrentCalls); + } + } + return; + }), + connectionId: connectionId, + operationType: RetryOperationType.receiverLink, + retryOptions: this._retryOptions, + connectionHost: this._context.namespace.config.host + }; + // Attempt to reconnect. If a non-retryable error is encountered, + // retry will throw and the error will surface to the user's error handler. + await retry(config); + } catch (err) { + log.error( + "[%s] An error occurred while processing detached() of Receiver '%s': %O ", + connectionId, + this.name, + this.address, + err + ); + if (typeof this._onError === "function") { + log.error( + "[%s] Unable to automatically reconnect Receiver '%s' with address '%s'.", + connectionId, + this.name, + this.address + ); + try { + this._onError(err); + } catch (err) { + log.error( + "[%s] User-code error in error handler called after disconnect: %O", + connectionId, + err + ); + } finally { + // Once the user's error handler has been called, + // close the receiver to prevent future messages/errors from being received. + // Swallow errors from the close rather than forwarding to user's error handler + // to prevent a never ending loop. + await this.close().catch(() => {}); + } + } + } finally { + this._isDetaching = false; + } } /** @@ -79,7 +714,7 @@ export class StreamingReceiver extends MessageReceiver { const config: RetryConfig = { operation: () => { - return sReceiver._init(undefined, options?.abortSignal); + return sReceiver.init(false, options?.abortSignal); }, connectionId: context.namespace.connectionId, operationType: RetryOperationType.receiveMessage, diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index a69c73919f93..2556ed8f1663 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -403,7 +403,7 @@ export class ReceiverImpl => { - return this._context.streamingReceiver?.receiverHelper.stopReceivingMessages(); + return this._context.streamingReceiver?.stopReceivingMessages(); } }; } diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index 121df9d3e50c..4486d664d14b 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -14,7 +14,7 @@ import { import { ClientEntityContext } from "../clientEntityContext"; import { LinkEntity } from "../core/linkEntity"; import { DispositionStatusOptions } from "../core/managementClient"; -import { OnAmqpEventAsPromise, OnError, OnMessage, ReceiverHelper } from "../core/messageReceiver"; +import { OnAmqpEventAsPromise, OnError, OnMessage } from "../core/messageReceiver"; import * as log from "../log"; import { DispositionType, ReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage"; import { throwErrorIfConnectionClosed } from "../util/errors"; @@ -22,6 +22,7 @@ import { calculateRenewAfterDuration, convertTicksToDate } from "../util/utils"; import { BatchingReceiverLite, MinimalReceiver } from "../core/batchingReceiver"; import { onMessageSettled, DeferredPromiseAndTimer } from "../core/shared"; import { AbortSignalLike } from "@azure/core-http"; +import { ReceiverHelper } from "../core/receiverHelper"; /** * Describes the options that need to be provided while creating a message session receiver link. @@ -156,7 +157,10 @@ export class MessageSession extends LinkEntity { * are being actively disposed. It acts as a store for correlating the responses received for * active dispositions. */ - private _deliveryDispositionMap: Map = new Map(); + private _deliveryDispositionMap: Map = new Map< + number, + DeferredPromiseAndTimer + >(); /** * @property {OnMessage} _onMessage The message handler provided by the user that will * be wrapped inside _onAmqpMessage. @@ -227,7 +231,7 @@ export class MessageSession extends LinkEntity { try { log.messageSession( "[%s] Attempting to renew the session lock for MessageSession '%s' " + - "with name '%s'.", + "with name '%s'.", connectionId, this.sessionId, this.name @@ -240,7 +244,7 @@ export class MessageSession extends LinkEntity { ); log.receiver( "[%s] Successfully renewed the session lock for MessageSession '%s' " + - "with name '%s'.", + "with name '%s'.", connectionId, this.sessionId, this.name @@ -254,7 +258,7 @@ export class MessageSession extends LinkEntity { } catch (err) { log.error( "[%s] An error occurred while renewing the session lock for MessageSession " + - "'%s' with name '%s': %O", + "'%s' with name '%s': %O", this._context.namespace.connectionId, this.sessionId, this.name, @@ -264,7 +268,7 @@ export class MessageSession extends LinkEntity { }, nextRenewalTimeout); log.messageSession( "[%s] MessageSession '%s' with name '%s', has next session lock renewal " + - "in %d milliseconds @(%s).", + "in %d milliseconds @(%s).", this._context.namespace.connectionId, this.sessionId, this.name, @@ -297,7 +301,7 @@ export class MessageSession extends LinkEntity { if (!this.isOpen() && !this.isConnecting) { log.error( "[%s] The receiver '%s' with address '%s' is not open and is not currently " + - "establishing itself. Hence let's try to connect.", + "establishing itself. Hence let's try to connect.", connectionId, this.name, this.address @@ -377,7 +381,7 @@ export class MessageSession extends LinkEntity { } else { log.error( "[%s] The receiver '%s' for sessionId '%s' is open -> %s and is connecting " + - "-> %s. Hence not reconnecting.", + "-> %s. Hence not reconnecting.", connectionId, this.name, this.sessionId, @@ -470,7 +474,7 @@ export class MessageSession extends LinkEntity { this._onError(error); log.error( "[%s] Notified the user's error handler about the error received by the " + - "Receiver '%s'.", + "Receiver '%s'.", this._context.namespace.connectionId, this.name ); @@ -518,7 +522,7 @@ export class MessageSession extends LinkEntity { const sbError = translate(receiverError) as MessagingError; log.error( "[%s] 'receiver_close' event occurred for receiver '%s' for sessionId '%s'. " + - "The associated error is: %O", + "The associated error is: %O", connectionId, this.name, this.sessionId, @@ -530,7 +534,7 @@ export class MessageSession extends LinkEntity { if (receiver && !receiver.isItselfClosed()) { log.error( "[%s] 'receiver_close' event occurred on the receiver '%s' for sessionId '%s' " + - "and the sdk did not initiate this. Hence, let's gracefully close the receiver.", + "and the sdk did not initiate this. Hence, let's gracefully close the receiver.", connectionId, this.name, this.sessionId @@ -549,7 +553,7 @@ export class MessageSession extends LinkEntity { } else { log.error( "[%s] 'receiver_close' event occurred on the receiver '%s' for sessionId '%s' " + - "because the sdk initiated it. Hence no need to gracefully close the receiver", + "because the sdk initiated it. Hence no need to gracefully close the receiver", connectionId, this.name, this.sessionId @@ -565,7 +569,7 @@ export class MessageSession extends LinkEntity { const sbError = translate(sessionError); log.error( "[%s] 'session_close' event occurred for receiver '%s' for sessionId '%s'. " + - "The associated error is: %O", + "The associated error is: %O", connectionId, this.name, this.sessionId, @@ -578,7 +582,7 @@ export class MessageSession extends LinkEntity { if (receiver && !receiver.isSessionItselfClosed()) { log.error( "[%s] 'session_close' event occurred on the receiver '%s' for sessionId '%s' " + - "and the sdk did not initiate this. Hence, let's gracefully close the receiver.", + "and the sdk did not initiate this. Hence, let's gracefully close the receiver.", connectionId, this.name, this.sessionId @@ -597,7 +601,7 @@ export class MessageSession extends LinkEntity { } else { log.error( "[%s] 'session_close' event occurred on the receiver '%s' for sessionId '%s' " + - "because the sdk initiated it. Hence no need to gracefully close the receiver", + "because the sdk initiated it. Hence no need to gracefully close the receiver", connectionId, this.name, this.sessionId @@ -622,7 +626,7 @@ export class MessageSession extends LinkEntity { if (this._sessionLockRenewalTimer) clearTimeout(this._sessionLockRenewalTimer); log.messageSession( "[%s] Cleared the timers for 'no new message received' task and " + - "'session lock renewal' task.", + "'session lock renewal' task.", this._context.namespace.connectionId ); @@ -695,7 +699,7 @@ export class MessageSession extends LinkEntity { ) { log.error( "[%s] Not calling the user's message handler for the current message " + - "as the receiver '%s' is closed", + "as the receiver '%s' is closed", connectionId, this.name ); @@ -716,7 +720,7 @@ export class MessageSession extends LinkEntity { if (!isAmqpError(err)) { log.error( "[%s] An error occurred while running user's message handler for the message " + - "with id '%s' on the receiver '%s': %O", + "with id '%s' on the receiver '%s': %O", connectionId, bMessage.messageId, this.name, @@ -735,7 +739,7 @@ export class MessageSession extends LinkEntity { try { log.error( "[%s] Abandoning the message with id '%s' on the receiver '%s' since " + - "an error occured: %O.", + "an error occured: %O.", connectionId, bMessage.messageId, this.name, @@ -746,7 +750,7 @@ export class MessageSession extends LinkEntity { const translatedError = translate(abandonError); log.error( "[%s] An error occurred while abandoning the message with id '%s' on the " + - "receiver '%s': %O.", + "receiver '%s': %O.", connectionId, bMessage.messageId, this.name, @@ -779,7 +783,7 @@ export class MessageSession extends LinkEntity { const translatedError = translate(completeError); log.error( "[%s] An error occurred while completing the message with id '%s' on the " + - "receiver '%s': %O.", + "receiver '%s': %O.", connectionId, bMessage.messageId, this.name, @@ -857,7 +861,7 @@ export class MessageSession extends LinkEntity { this._deliveryDispositionMap.delete(delivery.id); log.receiver( "[%s] Disposition for delivery id: %d, did not complete in %d milliseconds. " + - "Hence rejecting the promise with timeout error", + "Hence rejecting the promise with timeout error", this._context.namespace.connectionId, delivery.id, Constants.defaultOperationTimeoutInMs diff --git a/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts b/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts index 00d34e1b9d98..dc7f93a0734d 100644 --- a/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts @@ -8,7 +8,7 @@ const assert = chai.assert; import { MessageSender } from "../../src/core/messageSender"; import { OperationOptionsBase } from "../../src/modelsToBeSharedWithEventHubs"; -import { AwaitableSender, delay } from "rhea-promise"; +import { AwaitableSender, delay, ReceiverOptions } from "rhea-promise"; import { ServiceBusMessageBatchImpl } from "../../src/serviceBusMessageBatch"; import { MessageReceiver, ReceiverType } from "../../src/core/messageReceiver"; import { @@ -228,7 +228,7 @@ describe("AbortSignal", () => { const abortSignal = createCountdownAbortSignal(1); try { - await messageReceiver["_init"](undefined, abortSignal); + await messageReceiver["_init"]({} as ReceiverOptions, abortSignal); assert.fail("Should have thrown an AbortError"); } catch (err) { assert.equal(err.message, StandardAbortMessage); @@ -252,7 +252,7 @@ describe("AbortSignal", () => { }; try { - await messageReceiver["_init"](undefined, abortSignal); + await messageReceiver["_init"]({} as ReceiverOptions, abortSignal); assert.fail("Should have thrown an AbortError"); } catch (err) { assert.equal(err.message, StandardAbortMessage); @@ -276,7 +276,7 @@ describe("AbortSignal", () => { messageReceiver["_negotiateClaim"] = async () => {}; try { - await messageReceiver["_init"](undefined, abortSignal); + await messageReceiver["_init"]({} as ReceiverOptions, abortSignal); assert.fail("Should have thrown an AbortError"); } catch (err) { assert.equal(err.message, StandardAbortMessage); diff --git a/sdk/servicebus/service-bus/test/internal/atomXml.spec.ts b/sdk/servicebus/service-bus/test/internal/atomXml.spec.ts index ef600dad79df..8f10b85de1a5 100644 --- a/sdk/servicebus/service-bus/test/internal/atomXml.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/atomXml.spec.ts @@ -72,1065 +72,1067 @@ const mockServiceBusAtomManagementClient: ServiceBusManagementClient = new Servi "Endpoint=sb://test/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test" ); -describe("atomSerializationPolicy", function() { - it("should throw an error if receiving a non-XML response body", async function() { - const request = new WebResource(); - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: request, - status: 200, - headers: new HttpHeaders({}), - bodyAsText: `{ hello: "this is a JSON response body" }` +describe("ATOM Serializers", () => { + describe("atomSerializationPolicy", function() { + it("should throw an error if receiving a non-XML response body", async function() { + const request = new WebResource(); + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: request, + status: 200, + headers: new HttpHeaders({}), + bodyAsText: `{ hello: "this is a JSON response body" }` + }; }; - }; - try { + try { + await executeAtomXmlOperation( + mockServiceBusAtomManagementClient, + request, + new MockSerializer(), + {} + ); + assert.fail("Error must be thrown"); + } catch (err) { + assert.equal( + err.message.startsWith( + "Error occurred while parsing the response body - expected the service to return valid xml content." + ), + true, + `"${err.message}" was expected to begin with "Error occurred while parsing the response body - expected the service to return valid xml content." ` + ); + assert.equal(err.code, "PARSE_ERROR"); + } + }); + + it("should properly serialize when using valid inputs and serializer", async function() { + const request = new WebResource(); + request.body = { lockDuration: "PT3M", maxSizeInMegabytes: "2048" }; + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: request, + status: 200, + headers: new HttpHeaders({}) + }; + }; await executeAtomXmlOperation( mockServiceBusAtomManagementClient, request, new MockSerializer(), {} ); - assert.fail("Error must be thrown"); - } catch (err) { + + const expectedRequestBody = `2019-10-15T19:55:26.821ZPT3M2048`; + + const requestBody: string = request.body.toString(); + const indexOfOpenUpdateTag = requestBody.indexOf(""); + const indexOfCloseUpdateTag = requestBody.indexOf(""); assert.equal( - err.message.startsWith( - "Error occurred while parsing the response body - expected the service to return valid xml content." - ), - true, - `"${err.message}" was expected to begin with "Error occurred while parsing the response body - expected the service to return valid xml content." ` + requestBody.substring(0, indexOfOpenUpdateTag), + expectedRequestBody.substring(0, indexOfOpenUpdateTag), + "Atom XML serialization failure" ); - assert.equal(err.code, "PARSE_ERROR"); - } + assert.equal( + requestBody.substring(indexOfCloseUpdateTag), + expectedRequestBody.substring(indexOfCloseUpdateTag), + "Atom XML serialization failure" + ); + }); }); - it("should properly serialize when using valid inputs and serializer", async function() { - const request = new WebResource(); - request.body = { lockDuration: "PT3M", maxSizeInMegabytes: "2048" }; - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: request, + describe("deserializeAtomXmlResponse", function() { + it("should throw an error if receiving a valid XML but invalid Atom XML", async function() { + const request: WebResource = new WebResource(); + const _response = { + request, status: 200, - headers: new HttpHeaders({}) + headers: new HttpHeaders({}), + bodyAsText: null, + parsedBody: { notAnEntry: "" } }; - }; - await executeAtomXmlOperation( - mockServiceBusAtomManagementClient, - request, - new MockSerializer(), - {} - ); - - const expectedRequestBody = `2019-10-15T19:55:26.821ZPT3M2048`; - - const requestBody: string = request.body.toString(); - const indexOfOpenUpdateTag = requestBody.indexOf(""); - const indexOfCloseUpdateTag = requestBody.indexOf(""); - assert.equal( - requestBody.substring(0, indexOfOpenUpdateTag), - expectedRequestBody.substring(0, indexOfOpenUpdateTag), - "Atom XML serialization failure" - ); - assert.equal( - requestBody.substring(indexOfCloseUpdateTag), - expectedRequestBody.substring(indexOfCloseUpdateTag), - "Atom XML serialization failure" - ); - }); -}); - -describe("deserializeAtomXmlResponse", function() { - it("should throw an error if receiving a valid XML but invalid Atom XML", async function() { - const request: WebResource = new WebResource(); - const _response = { - request, - status: 200, - headers: new HttpHeaders({}), - bodyAsText: null, - parsedBody: { notAnEntry: "" } - }; - try { - await deserializeAtomXmlResponse(["QueueName"], _response); - assert.fail("Error must be thrown"); - } catch (err) { - assert.equal( - err.message, - "Error occurred while parsing the response body - expected the service to return atom xml content with either feed or entry elements.", - `Unexpected error message found.` - ); - assert.equal(err.code, "PARSE_ERROR"); - } - }); + try { + await deserializeAtomXmlResponse(["QueueName"], _response); + assert.fail("Error must be thrown"); + } catch (err) { + assert.equal( + err.message, + "Error occurred while parsing the response body - expected the service to return atom xml content with either feed or entry elements.", + `Unexpected error message found.` + ); + assert.equal(err.code, "PARSE_ERROR"); + } + }); - it("should throw appropriate error if unrecognized HTTP code is returned by service", async function() { - const request: WebResource = new WebResource(); - const _response = { - request, - status: 666, - headers: new HttpHeaders({}), - bodyAsText: null, - parsedBody: undefined - }; - try { - await deserializeAtomXmlResponse(["QueueName"], _response); - assert.fail("Error must be thrown"); - } catch (err) { - assert.equal( - err.message, - "Service returned an error response with an unrecognized HTTP status code - 666", - `Unexpected error message found.` - ); - assert.equal(err.code, "ServiceError", `Unexpected error code found.`); - } + it("should throw appropriate error if unrecognized HTTP code is returned by service", async function() { + const request: WebResource = new WebResource(); + const _response = { + request, + status: 666, + headers: new HttpHeaders({}), + bodyAsText: null, + parsedBody: undefined + }; + try { + await deserializeAtomXmlResponse(["QueueName"], _response); + assert.fail("Error must be thrown"); + } catch (err) { + assert.equal( + err.message, + "Service returned an error response with an unrecognized HTTP status code - 666", + `Unexpected error message found.` + ); + assert.equal(err.code, "ServiceError", `Unexpected error code found.`); + } + }); }); -}); -describe("Serializer construct requests with properties in specific order", function() { - it("Queue serializer generates XML in expected order", async function() { - const queueOptions = { - messageCount: 5, - sizeInBytes: 250, - requiresDuplicateDetection: true, - requiresSession: true, - defaultMessageTimeToLive: "P2D", - deadLetteringOnMessageExpiration: true, - duplicateDetectionHistoryTimeWindow: "PT1M", - maxDeliveryCount: 8, - lockDuration: "PT45S", - enableBatchedOperations: false, - autoDeleteOnIdle: "PT1H", - authorizationRules: [ - { - claimType: "SharedAccessKey", - claimValue: "None", - rights: { - accessRights: ["Manage", "Send", "Listen"] - }, - keyName: "allClaims_v2", - primaryKey: "pNSRzKKm2vfdbCuTXMa9gOMHD66NwCTxJi4KWJX/TDc=", - secondaryKey: "UreXLPWiP6Murmsq2HYiIXs23qAvWa36ZOL3gb9rXLs=" - }, - { - claimType: "SharedAccessKey", - claimValue: "None", - rights: { - accessRights: ["Manage", "Send", "Listen"] + describe("Serializer construct requests with properties in specific order", function() { + it("Queue serializer generates XML in expected order", async function() { + const queueOptions = { + messageCount: 5, + sizeInBytes: 250, + requiresDuplicateDetection: true, + requiresSession: true, + defaultMessageTimeToLive: "P2D", + deadLetteringOnMessageExpiration: true, + duplicateDetectionHistoryTimeWindow: "PT1M", + maxDeliveryCount: 8, + lockDuration: "PT45S", + enableBatchedOperations: false, + autoDeleteOnIdle: "PT1H", + authorizationRules: [ + { + claimType: "SharedAccessKey", + claimValue: "None", + rights: { + accessRights: ["Manage", "Send", "Listen"] + }, + keyName: "allClaims_v2", + primaryKey: "pNSRzKKm2vfdbCuTXMa9gOMHD66NwCTxJi4KWJX/TDc=", + secondaryKey: "UreXLPWiP6Murmsq2HYiIXs23qAvWa36ZOL3gb9rXLs=" }, - keyName: "allClaims_v3", - primaryKey: "pNSRzKKm2vfdbCuTXMa9gOMHD66NwCTxJi4KWJX/TDc=", - secondaryKey: "UreXLPWiP6Murmsq2HYiIXs23qAvWa36ZOL3gb9rXLs=" - } - ], - enablePartitioning: true - }; + { + claimType: "SharedAccessKey", + claimValue: "None", + rights: { + accessRights: ["Manage", "Send", "Listen"] + }, + keyName: "allClaims_v3", + primaryKey: "pNSRzKKm2vfdbCuTXMa9gOMHD66NwCTxJi4KWJX/TDc=", + secondaryKey: "UreXLPWiP6Murmsq2HYiIXs23qAvWa36ZOL3gb9rXLs=" + } + ], + enablePartitioning: true + }; - const request: WebResource = new WebResource(); - request.body = queueOptions; + const request: WebResource = new WebResource(); + request.body = queueOptions; - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: request, - status: 200, - headers: new HttpHeaders({}) + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: request, + status: 200, + headers: new HttpHeaders({}) + }; }; - }; - await executeAtomXmlOperation( - mockServiceBusAtomManagementClient, - request, - new QueueResourceSerializer(), - {} - ); + await executeAtomXmlOperation( + mockServiceBusAtomManagementClient, + request, + new QueueResourceSerializer(), + {} + ); - checkXmlHasPropertiesInExpectedOrder(request.body.toString(), queueProperties); - }); + checkXmlHasPropertiesInExpectedOrder(request.body.toString(), queueProperties); + }); - it("Topic serializer generates XML in expected order", async function() { - const topicOptions = { - sizeInBytes: 100, - messageCount: 7, - subscriptionCount: 6, - maxDeliveryCount: 20, - requiresDuplicateDetection: true, - defaultMessageTimeToLive: "P2D", - duplicateDetectionHistoryTimeWindow: "PT1M", - enableBatchedOperations: false, - autoDeleteOnIdle: "PT1H", - enablePartitioning: true, - supportOrdering: false, - authorizationRules: [ - { - claimType: "SharedAccessKey", - claimValue: "None", - rights: { - accessRights: ["Manage", "Send", "Listen"] + it("Topic serializer generates XML in expected order", async function() { + const topicOptions = { + sizeInBytes: 100, + messageCount: 7, + subscriptionCount: 6, + maxDeliveryCount: 20, + requiresDuplicateDetection: true, + defaultMessageTimeToLive: "P2D", + duplicateDetectionHistoryTimeWindow: "PT1M", + enableBatchedOperations: false, + autoDeleteOnIdle: "PT1H", + enablePartitioning: true, + supportOrdering: false, + authorizationRules: [ + { + claimType: "SharedAccessKey", + claimValue: "None", + rights: { + accessRights: ["Manage", "Send", "Listen"] + }, + keyName: "allClaims_v2", + primaryKey: "pNSRzKKm2vfdbCuTXMa9gOMHD66NwCTxJi4KWJX/TDc=", + secondaryKey: "UreXLPWiP6Murmsq2HYiIXs23qAvWa36ZOL3gb9rXLs=" }, - keyName: "allClaims_v2", - primaryKey: "pNSRzKKm2vfdbCuTXMa9gOMHD66NwCTxJi4KWJX/TDc=", - secondaryKey: "UreXLPWiP6Murmsq2HYiIXs23qAvWa36ZOL3gb9rXLs=" - }, - { - claimType: "SharedAccessKey", - claimValue: "None", - rights: { - accessRights: ["Manage", "Send", "Listen"] - }, - keyName: "allClaims_v3", - primaryKey: "pNSRzKKm2vfdbCuTXMa9gOMHD66NwCTxJi4KWJX/TDc=", - secondaryKey: "UreXLPWiP6Murmsq2HYiIXs23qAvWa36ZOL3gb9rXLs=" - } - ] - }; + { + claimType: "SharedAccessKey", + claimValue: "None", + rights: { + accessRights: ["Manage", "Send", "Listen"] + }, + keyName: "allClaims_v3", + primaryKey: "pNSRzKKm2vfdbCuTXMa9gOMHD66NwCTxJi4KWJX/TDc=", + secondaryKey: "UreXLPWiP6Murmsq2HYiIXs23qAvWa36ZOL3gb9rXLs=" + } + ] + }; - const request: WebResource = new WebResource(); - request.body = topicOptions; + const request: WebResource = new WebResource(); + request.body = topicOptions; - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: request, - status: 200, - headers: new HttpHeaders({}) + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: request, + status: 200, + headers: new HttpHeaders({}) + }; }; - }; - await executeAtomXmlOperation( - mockServiceBusAtomManagementClient, - request, - new TopicResourceSerializer(), - {} - ); + await executeAtomXmlOperation( + mockServiceBusAtomManagementClient, + request, + new TopicResourceSerializer(), + {} + ); - checkXmlHasPropertiesInExpectedOrder(request.body.toString(), topicProperties); - }); + checkXmlHasPropertiesInExpectedOrder(request.body.toString(), topicProperties); + }); - it("Subscription serializer generates XML in expected order", async function() { - const subscriptionOptions = { - defaultMessageTimeToLive: "P2D", - autoDeleteOnIdle: "PT1H", - deadLetteringOnFilterEvaluationExceptions: false, - deadLetteringOnMessageExpiration: true, - enableBatchedOperations: false, - requiresSession: true, - lockDuration: "PT5M", - maxDeliveryCount: 20 - }; + it("Subscription serializer generates XML in expected order", async function() { + const subscriptionOptions = { + defaultMessageTimeToLive: "P2D", + autoDeleteOnIdle: "PT1H", + deadLetteringOnFilterEvaluationExceptions: false, + deadLetteringOnMessageExpiration: true, + enableBatchedOperations: false, + requiresSession: true, + lockDuration: "PT5M", + maxDeliveryCount: 20 + }; - const request: WebResource = new WebResource(); - request.body = subscriptionOptions; + const request: WebResource = new WebResource(); + request.body = subscriptionOptions; - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: request, - status: 200, - headers: new HttpHeaders({}) + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: request, + status: 200, + headers: new HttpHeaders({}) + }; }; - }; - await executeAtomXmlOperation( - mockServiceBusAtomManagementClient, - request, - new SubscriptionResourceSerializer(), - {} - ); + await executeAtomXmlOperation( + mockServiceBusAtomManagementClient, + request, + new SubscriptionResourceSerializer(), + {} + ); - checkXmlHasPropertiesInExpectedOrder(request.body.toString(), subscriptionProperties); - }); + checkXmlHasPropertiesInExpectedOrder(request.body.toString(), subscriptionProperties); + }); - it("Rule serializer generates XML in expected order", async function() { - const ruleOptions = { - filter: { - sqlExpression: "stringValue = @stringParam AND intValue = @intParam", - sqlParameters: [ - { key: "@intParam", value: 1, type: "int" }, - { key: "@stringParam", value: "b", type: "string" } - ] - }, - action: { sqlExpression: "SET a='b'" } - }; + it("Rule serializer generates XML in expected order", async function() { + const ruleOptions = { + filter: { + sqlExpression: "stringValue = @stringParam AND intValue = @intParam", + sqlParameters: [ + { key: "@intParam", value: 1, type: "int" }, + { key: "@stringParam", value: "b", type: "string" } + ] + }, + action: { sqlExpression: "SET a='b'" } + }; - const request: WebResource = new WebResource(); - request.body = ruleOptions; + const request: WebResource = new WebResource(); + request.body = ruleOptions; - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: request, - status: 200, - headers: new HttpHeaders({}) + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: request, + status: 200, + headers: new HttpHeaders({}) + }; }; - }; - await executeAtomXmlOperation( - mockServiceBusAtomManagementClient, - request, - new RuleResourceSerializer(), - {} - ); + await executeAtomXmlOperation( + mockServiceBusAtomManagementClient, + request, + new RuleResourceSerializer(), + {} + ); - checkXmlHasPropertiesInExpectedOrder(request.body.toString(), ruleProperties); + checkXmlHasPropertiesInExpectedOrder(request.body.toString(), ruleProperties); + }); }); -}); -function checkXmlHasPropertiesInExpectedOrder( - xml: string, - expectedOrderedProperties: Array -) { - const orderedPropertyIndices: Array = []; - for (let i = 0; i < expectedOrderedProperties.length; i++) { - const index = xml.indexOf(`<${expectedOrderedProperties[i]}>`); - if (index < 0) { - continue; - } else { - orderedPropertyIndices.push(index); + function checkXmlHasPropertiesInExpectedOrder( + xml: string, + expectedOrderedProperties: Array + ) { + const orderedPropertyIndices: Array = []; + for (let i = 0; i < expectedOrderedProperties.length; i++) { + const index = xml.indexOf(`<${expectedOrderedProperties[i]}>`); + if (index < 0) { + continue; + } else { + orderedPropertyIndices.push(index); + } } - } - for (let i = 0; i < orderedPropertyIndices.length - 1; i++) { - const curr = orderedPropertyIndices[i]; - const next = orderedPropertyIndices[i + 1]; - assert.equal( - curr < next, - true, - "The properties in constructed request are not in expected order" - ); + for (let i = 0; i < orderedPropertyIndices.length - 1; i++) { + const curr = orderedPropertyIndices[i]; + const next = orderedPropertyIndices[i + 1]; + assert.equal( + curr < next, + true, + "The properties in constructed request are not in expected order" + ); + } } -} -class MockSerializer implements AtomXmlSerializer { - serialize(resource: any): object { - const property1 = "LockDuration"; - const property2 = "MaxSizeInMegabytes"; + class MockSerializer implements AtomXmlSerializer { + serialize(resource: any): object { + const property1 = "LockDuration"; + const property2 = "MaxSizeInMegabytes"; - const serializedContent = { - $: { - type: "application/xml" - }, - QueueDescription: { + const serializedContent = { $: { - xmlns: "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect", - "xmlns:i": "http://www.w3.org/2001/XMLSchema-instance" + type: "application/xml" }, - LockDuration: "PT1M", - MaxSizeInMegabytes: "1024" - } - }; - serializedContent.QueueDescription[property1] = resource["lockDuration"]; - serializedContent.QueueDescription[property2] = resource["maxSizeInMegabytes"]; + QueueDescription: { + $: { + xmlns: "http://schemas.microsoft.com/netservices/2010/10/servicebus/connect", + "xmlns:i": "http://www.w3.org/2001/XMLSchema-instance" + }, + LockDuration: "PT1M", + MaxSizeInMegabytes: "1024" + } + }; + serializedContent.QueueDescription[property1] = resource["lockDuration"]; + serializedContent.QueueDescription[property2] = resource["maxSizeInMegabytes"]; - return { - $: { - xmlns: "http://www.w3.org/2005/Atom" - }, - updated: new Date().toISOString(), - content: serializedContent - }; - } + return { + $: { + xmlns: "http://www.w3.org/2005/Atom" + }, + updated: new Date().toISOString(), + content: serializedContent + }; + } - async deserialize(response: HttpOperationResponse): Promise { - return response; + async deserialize(response: HttpOperationResponse): Promise { + return response; + } } -} -[ - { - testCaseTitle: - "Rule serializer throws Error if rule filter input has SQL parameters that uses an unsupported type", - input: { - filter: { - sqlExpression: "stringValue = @stringParam AND intValue = @intParam", - sqlParameters: [ - { key: "@intParam", value: 1, type: "int" }, - { key: "@stringParam", value: "b", type: "notAKnownType" } - ] + [ + { + testCaseTitle: + "Rule serializer throws Error if rule filter input has SQL parameters that uses an unsupported type", + input: { + filter: { + sqlExpression: "stringValue = @stringParam AND intValue = @intParam", + sqlParameters: [ + { key: "@intParam", value: 1, type: "int" }, + { key: "@stringParam", value: "b", type: "notAKnownType" } + ] + }, + action: { sqlExpression: "SET a='b'" } }, - action: { sqlExpression: "SET a='b'" } + output: { + testErrorMessage: `Invalid type "notAKnownType" supplied for the SQL Parameter. Must be either of "int", "string", "long" or "date".`, + testErrorType: Error + } }, - output: { - testErrorMessage: `Invalid type "notAKnownType" supplied for the SQL Parameter. Must be either of "int", "string", "long" or "date".`, - testErrorType: Error - } - }, - { - testCaseTitle: - "Rule serializer throws Error if rule filter input has SQL parameters as not an array", - input: { - filter: { - sqlExpression: "stringValue = @stringParam AND intValue = @intParam", - sqlParameters: "notAnArray" + { + testCaseTitle: + "Rule serializer throws Error if rule filter input has SQL parameters as not an array", + input: { + filter: { + sqlExpression: "stringValue = @stringParam AND intValue = @intParam", + sqlParameters: "notAnArray" + }, + action: { sqlExpression: "SET a='b'" } }, - action: { sqlExpression: "SET a='b'" } + output: { + testErrorMessage: `parameters must be an array of SqlParameter objects or undefined, but received "notAnArray"`, + testErrorType: TypeError + } }, - output: { - testErrorMessage: `parameters must be an array of SqlParameter objects or undefined, but received "notAnArray"`, - testErrorType: TypeError - } - }, - { - testCaseTitle: - "Rule serializer throws Error if rule filter input has SQL parameter information in not a JS object like representation", - input: { - filter: { - sqlExpression: "stringValue = @stringParam AND intValue = @intParam", - sqlParameters: ["notAJsObjectLikeValue"] + { + testCaseTitle: + "Rule serializer throws Error if rule filter input has SQL parameter information in not a JS object like representation", + input: { + filter: { + sqlExpression: "stringValue = @stringParam AND intValue = @intParam", + sqlParameters: ["notAJsObjectLikeValue"] + }, + action: { sqlExpression: "SET a='b'" } }, - action: { sqlExpression: "SET a='b'" } + output: { + testErrorMessage: `Expected SQL parameter input to be a JS object value, but received "notAJsObjectLikeValue"`, + testErrorType: TypeError + } }, - output: { - testErrorMessage: `Expected SQL parameter input to be a JS object value, but received "notAJsObjectLikeValue"`, - testErrorType: TypeError - } - }, - { - testCaseTitle: - "Rule serializer throws Error if rule action input has SQL parameters that uses an unsupported type", - input: { - filter: { - sqlExpression: "stringValue = 'abc'" + { + testCaseTitle: + "Rule serializer throws Error if rule action input has SQL parameters that uses an unsupported type", + input: { + filter: { + sqlExpression: "stringValue = 'abc'" + }, + action: { + sqlExpression: "stringValue = @stringParam AND intValue = @intParam", + sqlParameters: [ + { notKey: "@intParam", value: 1, type: "int" }, + { key: "@stringParam", value: "b", type: "notAKnownType" } + ] + } }, - action: { - sqlExpression: "stringValue = @stringParam AND intValue = @intParam", - sqlParameters: [ - { notKey: "@intParam", value: 1, type: "int" }, - { key: "@stringParam", value: "b", type: "notAKnownType" } - ] + output: { + testErrorMessage: `Invalid type "notAKnownType" supplied for the SQL Parameter. Must be either of "int", "string", "long" or "date".`, + testErrorType: Error } }, - output: { - testErrorMessage: `Invalid type "notAKnownType" supplied for the SQL Parameter. Must be either of "int", "string", "long" or "date".`, - testErrorType: Error - } - }, - { - testCaseTitle: - "Rule serializer throws Error if rule action input has SQL parameters as not an array", - input: { - filter: { - sqlExpression: "stringValue = 'abc'" + { + testCaseTitle: + "Rule serializer throws Error if rule action input has SQL parameters as not an array", + input: { + filter: { + sqlExpression: "stringValue = 'abc'" + }, + action: { sqlExpression: "SET a='b'", sqlParameters: "notAnArray" } }, - action: { sqlExpression: "SET a='b'", sqlParameters: "notAnArray" } + output: { + testErrorMessage: `parameters must be an array of SqlParameter objects or undefined, but received "notAnArray"`, + testErrorType: TypeError + } }, - output: { - testErrorMessage: `parameters must be an array of SqlParameter objects or undefined, but received "notAnArray"`, - testErrorType: TypeError - } - }, - { - testCaseTitle: - "Rule serializer throws Error if rule action input has SQL parameter information in not a JS object like representation", - input: { - filter: { - sqlExpression: "stringValue = 'abc'" + { + testCaseTitle: + "Rule serializer throws Error if rule action input has SQL parameter information in not a JS object like representation", + input: { + filter: { + sqlExpression: "stringValue = 'abc'" + }, + action: { sqlExpression: "SET a='b'", sqlParameters: ["notAJsObjectLikeValue"] } }, - action: { sqlExpression: "SET a='b'", sqlParameters: ["notAJsObjectLikeValue"] } - }, - output: { - testErrorMessage: `Expected SQL parameter input to be a JS object value, but received "notAJsObjectLikeValue"`, - testErrorType: TypeError + output: { + testErrorMessage: `Expected SQL parameter input to be a JS object value, but received "notAJsObjectLikeValue"`, + testErrorType: TypeError + } } - } -].forEach((testCase) => { - describe(`Type validation errors on SQL parameter inputs`, function(): void { - it(`${testCase.testCaseTitle}`, async () => { - try { - const request = new WebResource(); - request.body = testCase.input; + ].forEach((testCase) => { + describe(`Type validation errors on SQL parameter inputs`, function(): void { + it(`${testCase.testCaseTitle}`, async () => { + try { + const request = new WebResource(); + request.body = testCase.input; - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: request, - status: 200, - headers: new HttpHeaders({}) + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: request, + status: 200, + headers: new HttpHeaders({}) + }; }; - }; - await executeAtomXmlOperation( - mockServiceBusAtomManagementClient, - request, - new RuleResourceSerializer(), - {} - ); - assert.fail("Error must be thrown"); - } catch (err) { - assert.equal( - err.message, - testCase.output.testErrorMessage, - `Unexpected error message found.` - ); + await executeAtomXmlOperation( + mockServiceBusAtomManagementClient, + request, + new RuleResourceSerializer(), + {} + ); + assert.fail("Error must be thrown"); + } catch (err) { + assert.equal( + err.message, + testCase.output.testErrorMessage, + `Unexpected error message found.` + ); - assert.equal( - err instanceof testCase.output.testErrorType, - true, - `Expected error type to be "${testCase.output.testErrorType}"` - ); - } + assert.equal( + err instanceof testCase.output.testErrorType, + true, + `Expected error type to be "${testCase.output.testErrorType}"` + ); + } + }); }); }); -}); -[ - { - testCaseTitle: - "Rule serializer throws Error if rule correlation filter input has user properties has an array as value", - input: { - filter: { - correlationId: "abcd", - properties: { - message: ["hello"] + [ + { + testCaseTitle: + "Rule serializer throws Error if rule correlation filter input has user properties has an array as value", + input: { + filter: { + correlationId: "abcd", + properties: { + message: ["hello"] + } } + }, + output: { + testErrorMessage: `Unsupported type for the value in the user property {message:["hello"]}`, + testErrorType: Error } }, - output: { - testErrorMessage: `Unsupported type for the value in the user property {message:["hello"]}`, - testErrorType: Error - } - }, - { - testCaseTitle: - "Rule serializer throws Error if rule correlation filter input has user properties has an empty object as value", - input: { - filter: { - correlationId: "abcd", - properties: { - message: {} + { + testCaseTitle: + "Rule serializer throws Error if rule correlation filter input has user properties has an empty object as value", + input: { + filter: { + correlationId: "abcd", + properties: { + message: {} + } } + }, + output: { + testErrorMessage: `Unsupported type for the value in the user property {message:{}}`, + testErrorType: Error } }, - output: { - testErrorMessage: `Unsupported type for the value in the user property {message:{}}`, - testErrorType: Error - } - }, - { - testCaseTitle: - "Rule serializer throws Error if rule correlation filter input has user properties that uses an unsupported type", - input: { - filter: { - correlationId: "abcd", - properties: { - message: undefined + { + testCaseTitle: + "Rule serializer throws Error if rule correlation filter input has user properties that uses an unsupported type", + input: { + filter: { + correlationId: "abcd", + properties: { + message: undefined + } } + }, + output: { + testErrorMessage: `Unsupported type for the value in the user property {message:undefined}`, + testErrorType: Error } }, - output: { - testErrorMessage: `Unsupported type for the value in the user property {message:undefined}`, - testErrorType: Error - } - }, - { - testCaseTitle: - "Rule serializer throws Error if rule correlation filter input has user properties an integer", - input: { - filter: { - correlationId: "abcd", - properties: 123 + { + testCaseTitle: + "Rule serializer throws Error if rule correlation filter input has user properties an integer", + input: { + filter: { + correlationId: "abcd", + properties: 123 + } + }, + output: { + testErrorMessage: `Unsupported value for the properties 123, expected a JSON object with key-value pairs.`, + testErrorType: Error } }, - output: { - testErrorMessage: `Unsupported value for the properties 123, expected a JSON object with key-value pairs.`, - testErrorType: Error - } - }, - { - testCaseTitle: - "Rule serializer throws Error if rule correlation filter input has user properties a string", - input: { - filter: { - correlationId: "abcd", - properties: "abcd" + { + testCaseTitle: + "Rule serializer throws Error if rule correlation filter input has user properties a string", + input: { + filter: { + correlationId: "abcd", + properties: "abcd" + } + }, + output: { + testErrorMessage: `Unsupported value for the properties "abcd", expected a JSON object with key-value pairs.`, + testErrorType: Error } }, - output: { - testErrorMessage: `Unsupported value for the properties "abcd", expected a JSON object with key-value pairs.`, - testErrorType: Error - } - }, - { - testCaseTitle: - "Rule serializer throws Error if rule correlation filter input has user properties an array", - input: { - filter: { - correlationId: "abcd", - properties: ["abcd"] + { + testCaseTitle: + "Rule serializer throws Error if rule correlation filter input has user properties an array", + input: { + filter: { + correlationId: "abcd", + properties: ["abcd"] + } + }, + output: { + testErrorMessage: `Unsupported value for the properties ["abcd"], expected a JSON object with key-value pairs.`, + testErrorType: Error } }, - output: { - testErrorMessage: `Unsupported value for the properties ["abcd"], expected a JSON object with key-value pairs.`, - testErrorType: Error + { + testCaseTitle: + "Rule serializer throws Error if rule correlation filter input has user properties an empty object", + input: { + filter: { + correlationId: "abcd", + properties: {} + } + }, + output: { + testErrorMessage: `Unsupported value for the properties {}, expected a JSON object with key-value pairs.`, + testErrorType: Error + } } - }, - { - testCaseTitle: - "Rule serializer throws Error if rule correlation filter input has user properties an empty object", - input: { - filter: { - correlationId: "abcd", - properties: {} + ].forEach((testCase) => { + describe(`Type validation errors on Correlation user property inputs`, function(): void { + it(`${testCase.testCaseTitle}`, async () => { + try { + const request = new WebResource(); + request.body = testCase.input; + + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: request, + status: 200, + headers: new HttpHeaders({}) + }; + }; + await executeAtomXmlOperation( + mockServiceBusAtomManagementClient, + request, + new RuleResourceSerializer(), + {} + ); + assert.fail("Error must be thrown"); + } catch (err) { + assert.equal( + err.message, + testCase.output.testErrorMessage, + `Unexpected error message found.` + ); + + assert.equal( + err instanceof testCase.output.testErrorType, + true, + `Expected error type to be "${testCase.output.testErrorType}"` + ); + } + }); + }); + }); + + [ + { + testCaseTitle: "Create queue throws Error if authorization rules input is not an array", + input: { + authorizationRules: "notAnArray" + }, + output: { + testErrorMessage: `authorizationRules must be an array of AuthorizationRule objects or undefined, but received "notAnArray"`, + testErrorType: TypeError } }, - output: { - testErrorMessage: `Unsupported value for the properties {}, expected a JSON object with key-value pairs.`, - testErrorType: Error + { + testCaseTitle: + "Create queue throws Error if authorization rule information is not a JS object like representation", + input: { + authorizationRules: ["notAJsObjectLikeValue"] + }, + output: { + testErrorMessage: `Expected authorizationRule input to be a JS object value, but received "notAJsObjectLikeValue"`, + testErrorType: TypeError + } } - } -].forEach((testCase) => { - describe(`Type validation errors on Correlation user property inputs`, function(): void { - it(`${testCase.testCaseTitle}`, async () => { - try { - const request = new WebResource(); - request.body = testCase.input; - + ].forEach((testCase) => { + describe(`Type validation errors on authorization rule inputs`, function(): void { + it(`${testCase.testCaseTitle}`, async () => { mockServiceBusAtomManagementClient.sendRequest = async () => { return { - request: request, + request: new WebResource(), status: 200, headers: new HttpHeaders({}) }; }; - await executeAtomXmlOperation( - mockServiceBusAtomManagementClient, - request, - new RuleResourceSerializer(), - {} - ); - assert.fail("Error must be thrown"); - } catch (err) { - assert.equal( - err.message, - testCase.output.testErrorMessage, - `Unexpected error message found.` - ); + try { + await mockServiceBusAtomManagementClient.createQueue({ + queueName: "test", + ...(testCase.input as any) + }); + assert.fail("Error must be thrown"); + } catch (err) { + assert.equal( + err.message, + testCase.output.testErrorMessage, + `Unexpected error message found.` + ); - assert.equal( - err instanceof testCase.output.testErrorType, - true, - `Expected error type to be "${testCase.output.testErrorType}"` - ); - } + assert.equal( + err instanceof testCase.output.testErrorType, + true, + `Expected error type to be "${testCase.output.testErrorType}"` + ); + } + }); }); }); -}); -[ - { - testCaseTitle: "Create queue throws Error if authorization rules input is not an array", - input: { - authorizationRules: "notAnArray" + [ + { + testCaseTitle: `Receives error code "UnauthorizedRequestError" when response status is "401"`, + input: { + responseStatus: 401, + body: "" + }, + output: { + errorCode: "UnauthorizedRequestError" + } }, - output: { - testErrorMessage: `authorizationRules must be an array of AuthorizationRule objects or undefined, but received "notAnArray"`, - testErrorType: TypeError - } - }, - { - testCaseTitle: - "Create queue throws Error if authorization rule information is not a JS object like representation", - input: { - authorizationRules: ["notAJsObjectLikeValue"] + { + testCaseTitle: `Receives error code "MessageEntityNotFoundError" when response status is "404"`, + input: { + responseStatus: 404, + body: "" + }, + output: { + errorCode: "MessageEntityNotFoundError" + } + }, + { + testCaseTitle: `Receives error code "ServiceError" when response status is "409" and method is "DELETE`, + input: { + responseStatus: 409, + body: "", + requestMethod: "DELETE" + }, + output: { + errorCode: "ServiceError" + } + }, + { + testCaseTitle: `Receives error code "ServiceError" when response status is "409" and method is "PUT" with "If-Match" headers set`, + input: { + responseStatus: 409, + body: "", + requestMethod: "PUT", + requestHeaders: { "If-Match": "*" } + }, + output: { + errorCode: "ServiceError" + } + }, + { + testCaseTitle: `Receives error code "ServiceError" when response status is "409" and error message has subcode 40901 in it`, + input: { + responseStatus: 409, + body: { Error: { Detail: " ... SubCode=40901 ..." } } + }, + output: { + errorCode: "ServiceError" + } + }, + { + testCaseTitle: `Receives error code "MessageEntityAlreadyExistsError" when response status is "409" and no other special conditions are required`, + input: { + responseStatus: 409, + body: "", + requestMethod: "GET" + }, + output: { + errorCode: "MessageEntityAlreadyExistsError" + } + }, + { + input: { + testCaseTitle: `Receives error code "InvalidOperationError" when response status is "403" and error message has subcode 40301 in it`, + responseStatus: 403, + body: { Error: { Detail: " ... SubCode=40301 ..." } } + }, + output: { + errorCode: "InvalidOperationError" + } + }, + { + testCaseTitle: `Receives error code "InvalidOperationError" when response status is "403" and error message does NOT have subcode 40301 in it`, + input: { + responseStatus: 403, + body: "" + }, + output: { + errorCode: "QuotaExceededError" + } + }, + { + testCaseTitle: `Receives error code "ServiceError" when response status is "400"`, + input: { + responseStatus: 400, + body: "" + }, + output: { + errorCode: "ServiceError" + } + }, + { + testCaseTitle: `Receives error code "ServerBusyError" when response status is "503"`, + input: { + responseStatus: 503, + body: "" + }, + output: { + errorCode: "ServerBusyError" + } }, - output: { - testErrorMessage: `Expected authorizationRule input to be a JS object value, but received "notAJsObjectLikeValue"`, - testErrorType: TypeError + { + testCaseTitle: `Receives useful error message when service returned information doesn't have the 'Detail' property defined`, + input: { + responseStatus: 400, + body: { Error: { NoDetails: "no Detail property available" } } + }, + output: { + errorCode: "ServiceError", + errorMessage: + "Detailed error message information not available. Look at the 'code' property on error for more information." + } } - } -].forEach((testCase) => { - describe(`Type validation errors on authorization rule inputs`, function(): void { - it(`${testCase.testCaseTitle}`, async () => { - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: new WebResource(), - status: 200, - headers: new HttpHeaders({}) + ].forEach((testCase) => { + describe(`Verify error codes and messages get constructed correctly for different scenarios`, function(): void { + it(`${testCase.testCaseTitle}`, async () => { + mockServiceBusAtomManagementClient.sendRequest = async () => { + const response = { + request: new WebResource("", testCase.input.requestMethod as "DELETE" | "GET" | "PUT"), + status: testCase.input.responseStatus, + headers: new HttpHeaders(), + parsedBody: testCase.input.body + }; + + if (testCase.input.requestHeaders) { + Object.keys(testCase.input.requestHeaders).forEach((key) => { + const value = (testCase.input.requestHeaders as any)[key]; + response.request.headers.set(key, value); + }); + } + return response; }; - }; - try { - await mockServiceBusAtomManagementClient.createQueue({ - queueName: "test", - ...(testCase.input as any) - }); - assert.fail("Error must be thrown"); - } catch (err) { - assert.equal( - err.message, - testCase.output.testErrorMessage, - `Unexpected error message found.` - ); + try { + await mockServiceBusAtomManagementClient.createQueue({ + queueName: "test", + ...(testCase.input as any) + }); + assert.fail("Error must be thrown"); + } catch (err) { + assert.equal(err.code, testCase.output.errorCode, `Unexpected error code found.`); - assert.equal( - err instanceof testCase.output.testErrorType, - true, - `Expected error type to be "${testCase.output.testErrorType}"` - ); - } + if (testCase.output.errorMessage) { + assert.equal( + err.message, + testCase.output.errorMessage, + `Unexpected error message found.` + ); + } + } + }); }); }); -}); -[ - { - testCaseTitle: `Receives error code "UnauthorizedRequestError" when response status is "401"`, - input: { - responseStatus: 401, - body: "" + [ + { + responseStatus: 100, + errorCode: "Continue" }, - output: { - errorCode: "UnauthorizedRequestError" - } - }, - { - testCaseTitle: `Receives error code "MessageEntityNotFoundError" when response status is "404"`, - input: { - responseStatus: 404, - body: "" + { + responseStatus: 101, + errorCode: "SwitchingProtocols" }, - output: { - errorCode: "MessageEntityNotFoundError" - } - }, - { - testCaseTitle: `Receives error code "ServiceError" when response status is "409" and method is "DELETE`, - input: { - responseStatus: 409, - body: "", - requestMethod: "DELETE" + { + responseStatus: 300, + errorCode: "MultipleChoices" }, - output: { - errorCode: "ServiceError" - } - }, - { - testCaseTitle: `Receives error code "ServiceError" when response status is "409" and method is "PUT" with "If-Match" headers set`, - input: { - responseStatus: 409, - body: "", - requestMethod: "PUT", - requestHeaders: { "If-Match": "*" } + { + responseStatus: 301, + errorCode: "Moved" }, - output: { - errorCode: "ServiceError" - } - }, - { - testCaseTitle: `Receives error code "ServiceError" when response status is "409" and error message has subcode 40901 in it`, - input: { - responseStatus: 409, - body: { Error: { Detail: " ... SubCode=40901 ..." } } + { + responseStatus: 302, + errorCode: "Redirect" }, - output: { - errorCode: "ServiceError" - } - }, - { - testCaseTitle: `Receives error code "MessageEntityAlreadyExistsError" when response status is "409" and no other special conditions are required`, - input: { - responseStatus: 409, - body: "", - requestMethod: "GET" + { + responseStatus: 303, + errorCode: "RedirectMethod" }, - output: { - errorCode: "MessageEntityAlreadyExistsError" - } - }, - { - input: { - testCaseTitle: `Receives error code "InvalidOperationError" when response status is "403" and error message has subcode 40301 in it`, - responseStatus: 403, - body: { Error: { Detail: " ... SubCode=40301 ..." } } + { + responseStatus: 304, + errorCode: "NotModified" }, - output: { - errorCode: "InvalidOperationError" - } - }, - { - testCaseTitle: `Receives error code "InvalidOperationError" when response status is "403" and error message does NOT have subcode 40301 in it`, - input: { - responseStatus: 403, - body: "" + { + responseStatus: 305, + errorCode: "UseProxy" }, - output: { - errorCode: "QuotaExceededError" - } - }, - { - testCaseTitle: `Receives error code "ServiceError" when response status is "400"`, - input: { - responseStatus: 400, - body: "" + { + responseStatus: 306, + errorCode: "Unused" }, - output: { - errorCode: "ServiceError" - } - }, - { - testCaseTitle: `Receives error code "ServerBusyError" when response status is "503"`, - input: { - responseStatus: 503, - body: "" + { + responseStatus: 402, + errorCode: "PaymentRequired" }, - output: { - errorCode: "ServerBusyError" - } - }, - { - testCaseTitle: `Receives useful error message when service returned information doesn't have the 'Detail' property defined`, - input: { - responseStatus: 400, - body: { Error: { NoDetails: "no Detail property available" } } + { + responseStatus: 405, + errorCode: "MethodNotAllowed" + }, + { + responseStatus: 406, + errorCode: "NotAcceptable" + }, + { + responseStatus: 407, + errorCode: "ProxyAuthenticationRequired" + }, + { + responseStatus: 410, + errorCode: "Gone" + }, + { + responseStatus: 411, + errorCode: "LengthRequired" }, - output: { - errorCode: "ServiceError", - errorMessage: - "Detailed error message information not available. Look at the 'code' property on error for more information." + { + responseStatus: 412, + errorCode: "PreconditionFailed" + }, + { + responseStatus: 413, + errorCode: "RequestEntityTooLarge" + }, + { + responseStatus: 414, + errorCode: "RequestUriTooLong" + }, + { + responseStatus: 415, + errorCode: "UnsupportedMediaType" + }, + { + responseStatus: 416, + errorCode: "RequestRangeNotSatisfiable" + }, + { + responseStatus: 417, + errorCode: "ExpectationFailed" + }, + { + responseStatus: 426, + errorCode: "UpgradeRequired" + }, + { + responseStatus: 500, + errorCode: "InternalServerError" + }, + { + responseStatus: 501, + errorCode: "NotImplemented" + }, + { + responseStatus: 502, + errorCode: "BadGateway" + }, + { + responseStatus: 504, + errorCode: "GatewayTimeout" + }, + { + responseStatus: 505, + errorCode: "HttpVersionNotSupported" } - } -].forEach((testCase) => { - describe(`Verify error codes and messages get constructed correctly for different scenarios`, function(): void { - it(`${testCase.testCaseTitle}`, async () => { - mockServiceBusAtomManagementClient.sendRequest = async () => { - const response = { - request: new WebResource("", testCase.input.requestMethod as "DELETE" | "GET" | "PUT"), - status: testCase.input.responseStatus, - headers: new HttpHeaders(), - parsedBody: testCase.input.body + ].forEach((testCase) => { + describe(`Verify error code mapping for non-specialized failed HTTP status codes`, function(): void { + it(`Verify mapping for response status code "${testCase.responseStatus}" to result in "${testCase.errorCode}" error code.`, async () => { + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: new WebResource(), + status: testCase.responseStatus, + headers: new HttpHeaders() + }; }; - if (testCase.input.requestHeaders) { - Object.keys(testCase.input.requestHeaders).forEach((key) => { - const value = (testCase.input.requestHeaders as any)[key]; - response.request.headers.set(key, value); + try { + await mockServiceBusAtomManagementClient.createQueue({ + queueName: "test", + ...(testCase as any) }); + assert.fail("Error must be thrown"); + } catch (err) { + assert.equal(err.code, testCase.errorCode, `Unexpected error code found.`); } - return response; - }; - try { - await mockServiceBusAtomManagementClient.createQueue({ - queueName: "test", - ...(testCase.input as any) - }); - assert.fail("Error must be thrown"); - } catch (err) { - assert.equal(err.code, testCase.output.errorCode, `Unexpected error code found.`); - - if (testCase.output.errorMessage) { - assert.equal( - err.message, - testCase.output.errorMessage, - `Unexpected error message found.` - ); - } - } + }); }); }); -}); -[ - { - responseStatus: 100, - errorCode: "Continue" - }, - { - responseStatus: 101, - errorCode: "SwitchingProtocols" - }, - { - responseStatus: 300, - errorCode: "MultipleChoices" - }, - { - responseStatus: 301, - errorCode: "Moved" - }, - { - responseStatus: 302, - errorCode: "Redirect" - }, - { - responseStatus: 303, - errorCode: "RedirectMethod" - }, - { - responseStatus: 304, - errorCode: "NotModified" - }, - { - responseStatus: 305, - errorCode: "UseProxy" - }, - { - responseStatus: 306, - errorCode: "Unused" - }, - { - responseStatus: 402, - errorCode: "PaymentRequired" - }, - { - responseStatus: 405, - errorCode: "MethodNotAllowed" - }, - { - responseStatus: 406, - errorCode: "NotAcceptable" - }, - { - responseStatus: 407, - errorCode: "ProxyAuthenticationRequired" - }, - { - responseStatus: 410, - errorCode: "Gone" - }, - { - responseStatus: 411, - errorCode: "LengthRequired" - }, - { - responseStatus: 412, - errorCode: "PreconditionFailed" - }, - { - responseStatus: 413, - errorCode: "RequestEntityTooLarge" - }, - { - responseStatus: 414, - errorCode: "RequestUriTooLong" - }, - { - responseStatus: 415, - errorCode: "UnsupportedMediaType" - }, - { - responseStatus: 416, - errorCode: "RequestRangeNotSatisfiable" - }, - { - responseStatus: 417, - errorCode: "ExpectationFailed" - }, - { - responseStatus: 426, - errorCode: "UpgradeRequired" - }, - { - responseStatus: 500, - errorCode: "InternalServerError" - }, - { - responseStatus: 501, - errorCode: "NotImplemented" - }, - { - responseStatus: 502, - errorCode: "BadGateway" - }, - { - responseStatus: 504, - errorCode: "GatewayTimeout" - }, - { - responseStatus: 505, - errorCode: "HttpVersionNotSupported" - } -].forEach((testCase) => { - describe(`Verify error code mapping for non-specialized failed HTTP status codes`, function(): void { - it(`Verify mapping for response status code "${testCase.responseStatus}" to result in "${testCase.errorCode}" error code.`, async () => { + describe(`Parse empty response for list() requests to return as empty array`, function(): void { + function assertEmptyArray(result: any) { mockServiceBusAtomManagementClient.sendRequest = async () => { return { request: new WebResource(), - status: testCase.responseStatus, - headers: new HttpHeaders() + bodyAsText: '', + status: 200, + headers: new HttpHeaders({}) }; }; + assert.equal(Array.isArray(result), true, "Result must be an array"); + assert.equal(result.length, 0, "Result must be an empty array"); + } - try { - await mockServiceBusAtomManagementClient.createQueue({ - queueName: "test", - ...(testCase as any) - }); - assert.fail("Error must be thrown"); - } catch (err) { - assert.equal(err.code, testCase.errorCode, `Unexpected error code found.`); - } - }); - }); -}); - -describe(`Parse empty response for list() requests to return as empty array`, function(): void { - function assertEmptyArray(result: any) { - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: new WebResource(), - bodyAsText: '', - status: 200, - headers: new HttpHeaders({}) - }; - }; - assert.equal(Array.isArray(result), true, "Result must be an array"); - assert.equal(result.length, 0, "Result must be an empty array"); - } - - beforeEach(async () => { - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: new WebResource(), - bodyAsText: '', - status: 200, - headers: new HttpHeaders({}) + beforeEach(async () => { + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: new WebResource(), + bodyAsText: '', + status: 200, + headers: new HttpHeaders({}) + }; }; - }; - }); + }); - it(`List on empty list of queues gives an empty array`, async () => { - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: new WebResource(), - bodyAsText: '', - status: 200, - headers: new HttpHeaders({}) + it(`List on empty list of queues gives an empty array`, async () => { + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: new WebResource(), + bodyAsText: '', + status: 200, + headers: new HttpHeaders({}) + }; }; - }; - const result = await mockServiceBusAtomManagementClient["getQueues"](); - assertEmptyArray(result); - }); + const result = await mockServiceBusAtomManagementClient["getQueues"](); + assertEmptyArray(result); + }); - it(`List on empty list of topics gives an empty array`, async () => { - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: new WebResource(), - bodyAsText: '', - status: 200, - headers: new HttpHeaders({}) + it(`List on empty list of topics gives an empty array`, async () => { + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: new WebResource(), + bodyAsText: '', + status: 200, + headers: new HttpHeaders({}) + }; }; - }; - const result = await mockServiceBusAtomManagementClient["getTopics"](); - assertEmptyArray(result); - }); + const result = await mockServiceBusAtomManagementClient["getTopics"](); + assertEmptyArray(result); + }); - it(`List on empty list of subscriptions gives an empty array`, async () => { - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: new WebResource(), - bodyAsText: '', - status: 200, - headers: new HttpHeaders({}) + it(`List on empty list of subscriptions gives an empty array`, async () => { + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: new WebResource(), + bodyAsText: '', + status: 200, + headers: new HttpHeaders({}) + }; }; - }; - const result = await mockServiceBusAtomManagementClient["getSubscriptions"]("testTopic"); - assertEmptyArray(result); - }); + const result = await mockServiceBusAtomManagementClient["getSubscriptions"]("testTopic"); + assertEmptyArray(result); + }); - it(`List on empty list of rules gives an empty array`, async () => { - mockServiceBusAtomManagementClient.sendRequest = async () => { - return { - request: new WebResource(), - bodyAsText: '', - status: 200, - headers: new HttpHeaders({}) + it(`List on empty list of rules gives an empty array`, async () => { + mockServiceBusAtomManagementClient.sendRequest = async () => { + return { + request: new WebResource(), + bodyAsText: '', + status: 200, + headers: new HttpHeaders({}) + }; }; - }; - const result = await mockServiceBusAtomManagementClient["getRules"]( - "testTopic", - "testSubscription" - ); - assertEmptyArray(result); + const result = await mockServiceBusAtomManagementClient["getRules"]( + "testTopic", + "testSubscription" + ); + assertEmptyArray(result); + }); }); }); diff --git a/sdk/servicebus/service-bus/test/internal/receiverInit.spec.ts b/sdk/servicebus/service-bus/test/internal/receiverInit.spec.ts index 3dd32f387105..08a3aa2e4eaf 100644 --- a/sdk/servicebus/service-bus/test/internal/receiverInit.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/receiverInit.spec.ts @@ -3,6 +3,7 @@ import chai from "chai"; import chaiAsPromised from "chai-as-promised"; +import { ReceiverOptions } from "rhea-promise"; chai.use(chaiAsPromised); const assert = chai.assert; @@ -56,7 +57,7 @@ describe("init() and close() interactions", () => { ); }; - await messageReceiver2["_init"](); + await messageReceiver2["_init"]({} as ReceiverOptions); assert.isFalse(negotiateClaimWasCalled); }); diff --git a/sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts index 41d46aaabfbc..f68069534782 100644 --- a/sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts @@ -64,7 +64,7 @@ describe("StreamingReceiver unit tests", () => { _createStreamingReceiver: (_context, _options) => { wasCalled = true; return ({ - _init: (_ignoredOptions: any, abortSignal?: AbortSignalLike) => { + init: (_useNewName: boolean, abortSignal?: AbortSignalLike) => { wasCalled = true; assert.equal( abortSignal, diff --git a/sdk/servicebus/service-bus/test/retries.spec.ts b/sdk/servicebus/service-bus/test/retries.spec.ts index e38de773777d..5d5341d85472 100644 --- a/sdk/servicebus/service-bus/test/retries.spec.ts +++ b/sdk/servicebus/service-bus/test/retries.spec.ts @@ -14,6 +14,7 @@ import Long from "long"; import { BatchingReceiver } from "../src/core/batchingReceiver"; import { delay } from "rhea-promise"; import { SessionReceiverImpl } from "../src/receivers/sessionReceiver"; +import { ReceiverImpl } from "../src/receivers/receiver"; describe("Retries - ManagementClient", () => { let sender: Sender; @@ -460,10 +461,12 @@ describe("Retries - onDetached", () => { async processError() {} }); await delay(2000); - (receiver as any)._context.streamingReceiver._init = fakeFunction; - await (receiver as any)._context.streamingReceiver.onDetached( - new MessagingError("Hello there, I'm an error") - ); + + const streamingReceiver = (receiver as ReceiverImpl)["_context"].streamingReceiver!; + should.exist(streamingReceiver); + + streamingReceiver["init"] = fakeFunction; + await streamingReceiver.onDetached(new MessagingError("Hello there, I'm an error")); }); }); diff --git a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts index 65a561b058c6..335d33c6d15e 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts @@ -178,7 +178,7 @@ describe("Streaming Receiver Tests", () => { // overwrite _init to throw a non-retryable error. // this will be called by onDetached - (streamingReceiver as any)._init = async () => { + (streamingReceiver as any).init = async () => { const error = new Error("Expected test error!"); // prevent retry from translating error. (error as any).translated = true;