diff --git a/sdk/servicebus/service-bus/src/core/linkEntity.ts b/sdk/servicebus/service-bus/src/core/linkEntity.ts index fb0d516498bb..1c974bc258c7 100644 --- a/sdk/servicebus/service-bus/src/core/linkEntity.ts +++ b/sdk/servicebus/service-bus/src/core/linkEntity.ts @@ -1,10 +1,16 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -import { defaultLock, TokenType, AccessToken, Constants, SharedKeyCredential } from "@azure/core-amqp"; +import { + defaultLock, + TokenType, + AccessToken, + Constants, + SharedKeyCredential +} from "@azure/core-amqp"; import { ClientEntityContext } from "../clientEntityContext"; import * as log from "../log"; -import { Sender, Receiver } from "rhea-promise"; +import { AwaitableSender, Receiver } from "rhea-promise"; import { getUniqueName } from "../util/utils"; /** @@ -131,7 +137,9 @@ export class LinkEntity { // renew sas token in every 45 minutess this._tokenTimeout = (3600 - 900) * 1000; } else { - const aadToken = await this._context.namespace.tokenCredential.getToken(Constants.aadServiceBusScope); + const aadToken = await this._context.namespace.tokenCredential.getToken( + Constants.aadServiceBusScope + ); if (!aadToken) { throw new Error(`Failed to get token from the provided "TokenCredential" object`); } @@ -161,7 +169,11 @@ export class LinkEntity { throw new Error("Token cannot be null"); } await defaultLock.acquire(this._context.namespace.negotiateClaimLock, () => { - return this._context.namespace.cbsSession.negotiateClaim(this.audience, tokenObject, tokenType); + return this._context.namespace.cbsSession.negotiateClaim( + this.audience, + tokenObject, + tokenType + ); }); log.link( "[%s] Negotiated claim for %s '%s' with with address: %s", @@ -216,7 +228,7 @@ export class LinkEntity { * @param {Sender | Receiver} [link] The Sender or Receiver link that needs to be closed and * removed. */ - protected async _closeLink(link?: Sender | Receiver): Promise { + protected async _closeLink(link?: AwaitableSender | Receiver): Promise { clearTimeout(this._tokenRenewalTimer as NodeJS.Timer); if (link) { try { diff --git a/sdk/servicebus/service-bus/src/core/messageSender.ts b/sdk/servicebus/service-bus/src/core/messageSender.ts index 1c18ee964681..66928a752a16 100644 --- a/sdk/servicebus/service-bus/src/core/messageSender.ts +++ b/sdk/servicebus/service-bus/src/core/messageSender.ts @@ -4,18 +4,16 @@ import * as log from "../log"; import { messageProperties, - Sender, + AwaitableSender, + AwaitableSenderOptions, EventContext, OnAmqpEvent, - SenderOptions, - SenderEvents, message as RheaMessageUtil, AmqpError, generate_uuid } from "rhea-promise"; import { defaultLock, - Func, retry, translate, AmqpMessage, @@ -36,13 +34,6 @@ import { LinkEntity } from "./linkEntity"; import { getUniqueName } from "../util/utils"; import { throwErrorIfConnectionClosed } from "../util/errors"; -/** - * @internal - */ -interface CreateSenderOptions { - newName?: boolean; -} - /** * @internal * Describes the MessageSender that will send messages to ServiceBus. @@ -83,7 +74,7 @@ export class MessageSender extends LinkEntity { * @property {Sender} [_sender] The AMQP sender link. * @private */ - private _sender?: Sender; + private _sender?: AwaitableSender; /** * Creates a new MessageSender instance. @@ -225,9 +216,9 @@ export class MessageSender extends LinkEntity { ); } - private _createSenderOptions(options: CreateSenderOptions): SenderOptions { - if (options.newName) this.name = getUniqueName(this._context.entityPath); - const srOptions: SenderOptions = { + private _createSenderOptions(timeoutInMs: number, newName?: boolean): AwaitableSenderOptions { + if (newName) this.name = getUniqueName(this._context.entityPath); + const srOptions: AwaitableSenderOptions = { name: this.name, target: { address: this.address @@ -235,7 +226,8 @@ export class MessageSender extends LinkEntity { onError: this._onAmqpError, onClose: this._onAmqpClose, onSessionError: this._onSessionError, - onSessionClose: this._onSessionClose + onSessionClose: this._onSessionClose, + sendTimeoutInSeconds: timeoutInMs / 1000 }; log.sender("Creating sender with options: %O", srOptions); return srOptions; @@ -255,7 +247,6 @@ export class MessageSender extends LinkEntity { private _trySend(encodedMessage: Buffer, sendBatch?: boolean): Promise { const sendEventPromise = () => new Promise(async (resolve, reject) => { - let waitTimer: any; log.sender( "[%s] Sender '%s', credit: %d available: %d", this._context.namespace.connectionId, @@ -281,83 +272,7 @@ export class MessageSender extends LinkEntity { ); } if (this._sender!.sendable()) { - let onRejected: Func; - let onReleased: Func; - let onModified: Func; - let onAccepted: Func; - const removeListeners = (): void => { - clearTimeout(waitTimer); - // When `removeListeners` is called on timeout, the sender might be closed and cleared - // So, check if it exists, before removing listeners from it. - if (this._sender) { - this._sender.removeListener(SenderEvents.rejected, onRejected); - this._sender.removeListener(SenderEvents.accepted, onAccepted); - this._sender.removeListener(SenderEvents.released, onReleased); - this._sender.removeListener(SenderEvents.modified, onModified); - } - }; - - onAccepted = (context: EventContext) => { - // Since we will be adding listener for accepted and rejected event every time - // we send a message, we need to remove listener for both the events. - // This will ensure duplicate listeners are not added for the same event. - removeListeners(); - log.sender( - "[%s] Sender '%s', got event accepted.", - this._context.namespace.connectionId, - this.name - ); - resolve(); - }; - onRejected = (context: EventContext) => { - removeListeners(); - log.error( - "[%s] Sender '%s', got event rejected.", - this._context.namespace.connectionId, - this.name - ); - const err = translate(context!.delivery!.remote_state!.error); - reject(err); - }; - onReleased = (context: EventContext) => { - removeListeners(); - log.error( - "[%s] Sender '%s', got event released.", - this._context.namespace.connectionId, - this.name - ); - let err: Error; - if (context!.delivery!.remote_state!.error) { - err = translate(context!.delivery!.remote_state!.error); - } else { - err = new Error( - `[${this._context.namespace.connectionId}]Sender '${this.name}', ` + - `received a release disposition.Hence we are rejecting the promise.` - ); - } - reject(err); - }; - onModified = (context: EventContext) => { - removeListeners(); - log.error( - "[%s] Sender '%s', got event modified.", - this._context.namespace.connectionId, - this.name - ); - let err: Error; - if (context!.delivery!.remote_state!.error) { - err = translate(context!.delivery!.remote_state!.error); - } else { - err = new Error( - `[${this._context.namespace.connectionId}]Sender "${this.name}", ` + - `received a modified disposition.Hence we are rejecting the promise.` - ); - } - reject(err); - }; - const actionAfterTimeout = () => { - removeListeners(); const desc: string = `[${this._context.namespace.connectionId}] Sender "${this.name}" ` + `with address "${this.address}", was not able to send the message right now, due ` + @@ -370,13 +285,9 @@ export class MessageSender extends LinkEntity { return reject(translate(e)); }; - this._sender!.on(SenderEvents.accepted, onAccepted); - this._sender!.on(SenderEvents.rejected, onRejected); - this._sender!.on(SenderEvents.modified, onModified); - this._sender!.on(SenderEvents.released, onReleased); - waitTimer = setTimeout(actionAfterTimeout, Constants.defaultOperationTimeoutInMs); + const waitTimer = setTimeout(actionAfterTimeout, Constants.defaultOperationTimeoutInMs); try { - const delivery = this._sender!.send( + const delivery = await this._sender!.send( encodedMessage, undefined, sendBatch ? 0x80013700 : 0 @@ -387,9 +298,17 @@ export class MessageSender extends LinkEntity { this.name, delivery.id ); + return resolve(); } catch (error) { - removeListeners(); + error = translate(error); + log.error( + "[%s] An error occurred while sending the message", + this._context.namespace.connectionId, + error + ); return reject(error); + } finally { + clearTimeout(waitTimer); } } else { // let us retry to send the message after some time. @@ -421,7 +340,7 @@ export class MessageSender extends LinkEntity { /** * Initializes the sender session on the connection. */ - private async _init(options?: SenderOptions): Promise { + private async _init(options?: AwaitableSenderOptions): Promise { try { // isOpen isConnecting Should establish // true false No @@ -444,9 +363,9 @@ export class MessageSender extends LinkEntity { this.name ); if (!options) { - options = this._createSenderOptions({}); + options = this._createSenderOptions(Constants.defaultOperationTimeoutInMs); } - this._sender = await this._context.namespace.connection.createSender(options); + this._sender = await this._context.namespace.connection.createAwaitableSender(options); this.isConnecting = false; log.error( "[%s] Sender '%s' with address '%s' has established itself.", @@ -544,13 +463,11 @@ export class MessageSender extends LinkEntity { } if (shouldReopen) { await defaultLock.acquire(this.senderLock, () => { - const options: SenderOptions = this._createSenderOptions({ - newName: true - }); + const senderOptions = this._createSenderOptions(Constants.defaultOperationTimeoutInMs); // shall retry forever at an interval of 15 seconds if the error is a retryable error // else bail out when the error is not retryable or the oepration succeeds. const config: RetryConfig = { - operation: () => this._init(options), + operation: () => this._init(senderOptions), connectionId: this._context.namespace.connectionId!, operationType: RetryOperationType.senderLink, retryOptions: {