Skip to content

Commit

Permalink
Sender -> AwaitableSender
Browse files Browse the repository at this point in the history
  • Loading branch information
HarshaNalluru committed Feb 11, 2020
1 parent 8431d03 commit 2be69f9
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 112 deletions.
22 changes: 17 additions & 5 deletions sdk/servicebus/service-bus/src/core/linkEntity.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import { defaultLock, TokenType, AccessToken, Constants, SharedKeyCredential } from "@azure/core-amqp";
import {
defaultLock,
TokenType,
AccessToken,
Constants,
SharedKeyCredential
} from "@azure/core-amqp";
import { ClientEntityContext } from "../clientEntityContext";
import * as log from "../log";
import { Sender, Receiver } from "rhea-promise";
import { AwaitableSender, Receiver } from "rhea-promise";
import { getUniqueName } from "../util/utils";

/**
Expand Down Expand Up @@ -131,7 +137,9 @@ export class LinkEntity {
// renew sas token in every 45 minutess
this._tokenTimeout = (3600 - 900) * 1000;
} else {
const aadToken = await this._context.namespace.tokenCredential.getToken(Constants.aadServiceBusScope);
const aadToken = await this._context.namespace.tokenCredential.getToken(
Constants.aadServiceBusScope
);
if (!aadToken) {
throw new Error(`Failed to get token from the provided "TokenCredential" object`);
}
Expand Down Expand Up @@ -161,7 +169,11 @@ export class LinkEntity {
throw new Error("Token cannot be null");
}
await defaultLock.acquire(this._context.namespace.negotiateClaimLock, () => {
return this._context.namespace.cbsSession.negotiateClaim(this.audience, tokenObject, tokenType);
return this._context.namespace.cbsSession.negotiateClaim(
this.audience,
tokenObject,
tokenType
);
});
log.link(
"[%s] Negotiated claim for %s '%s' with with address: %s",
Expand Down Expand Up @@ -216,7 +228,7 @@ export class LinkEntity {
* @param {Sender | Receiver} [link] The Sender or Receiver link that needs to be closed and
* removed.
*/
protected async _closeLink(link?: Sender | Receiver): Promise<void> {
protected async _closeLink(link?: AwaitableSender | Receiver): Promise<void> {
clearTimeout(this._tokenRenewalTimer as NodeJS.Timer);
if (link) {
try {
Expand Down
131 changes: 24 additions & 107 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@
import * as log from "../log";
import {
messageProperties,
Sender,
AwaitableSender,
AwaitableSenderOptions,
EventContext,
OnAmqpEvent,
SenderOptions,
SenderEvents,
message as RheaMessageUtil,
AmqpError,
generate_uuid
} from "rhea-promise";
import {
defaultLock,
Func,
retry,
translate,
AmqpMessage,
Expand All @@ -36,13 +34,6 @@ import { LinkEntity } from "./linkEntity";
import { getUniqueName } from "../util/utils";
import { throwErrorIfConnectionClosed } from "../util/errors";

/**
* @internal
*/
interface CreateSenderOptions {
newName?: boolean;
}

/**
* @internal
* Describes the MessageSender that will send messages to ServiceBus.
Expand Down Expand Up @@ -83,7 +74,7 @@ export class MessageSender extends LinkEntity {
* @property {Sender} [_sender] The AMQP sender link.
* @private
*/
private _sender?: Sender;
private _sender?: AwaitableSender;

/**
* Creates a new MessageSender instance.
Expand Down Expand Up @@ -225,17 +216,18 @@ export class MessageSender extends LinkEntity {
);
}

private _createSenderOptions(options: CreateSenderOptions): SenderOptions {
if (options.newName) this.name = getUniqueName(this._context.entityPath);
const srOptions: SenderOptions = {
private _createSenderOptions(timeoutInMs: number, newName?: boolean): AwaitableSenderOptions {
if (newName) this.name = getUniqueName(this._context.entityPath);
const srOptions: AwaitableSenderOptions = {
name: this.name,
target: {
address: this.address
},
onError: this._onAmqpError,
onClose: this._onAmqpClose,
onSessionError: this._onSessionError,
onSessionClose: this._onSessionClose
onSessionClose: this._onSessionClose,
sendTimeoutInSeconds: timeoutInMs / 1000
};
log.sender("Creating sender with options: %O", srOptions);
return srOptions;
Expand All @@ -255,7 +247,6 @@ export class MessageSender extends LinkEntity {
private _trySend(encodedMessage: Buffer, sendBatch?: boolean): Promise<void> {
const sendEventPromise = () =>
new Promise<void>(async (resolve, reject) => {
let waitTimer: any;
log.sender(
"[%s] Sender '%s', credit: %d available: %d",
this._context.namespace.connectionId,
Expand All @@ -281,83 +272,7 @@ export class MessageSender extends LinkEntity {
);
}
if (this._sender!.sendable()) {
let onRejected: Func<EventContext, void>;
let onReleased: Func<EventContext, void>;
let onModified: Func<EventContext, void>;
let onAccepted: Func<EventContext, void>;
const removeListeners = (): void => {
clearTimeout(waitTimer);
// When `removeListeners` is called on timeout, the sender might be closed and cleared
// So, check if it exists, before removing listeners from it.
if (this._sender) {
this._sender.removeListener(SenderEvents.rejected, onRejected);
this._sender.removeListener(SenderEvents.accepted, onAccepted);
this._sender.removeListener(SenderEvents.released, onReleased);
this._sender.removeListener(SenderEvents.modified, onModified);
}
};

onAccepted = (context: EventContext) => {
// Since we will be adding listener for accepted and rejected event every time
// we send a message, we need to remove listener for both the events.
// This will ensure duplicate listeners are not added for the same event.
removeListeners();
log.sender(
"[%s] Sender '%s', got event accepted.",
this._context.namespace.connectionId,
this.name
);
resolve();
};
onRejected = (context: EventContext) => {
removeListeners();
log.error(
"[%s] Sender '%s', got event rejected.",
this._context.namespace.connectionId,
this.name
);
const err = translate(context!.delivery!.remote_state!.error);
reject(err);
};
onReleased = (context: EventContext) => {
removeListeners();
log.error(
"[%s] Sender '%s', got event released.",
this._context.namespace.connectionId,
this.name
);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.namespace.connectionId}]Sender '${this.name}', ` +
`received a release disposition.Hence we are rejecting the promise.`
);
}
reject(err);
};
onModified = (context: EventContext) => {
removeListeners();
log.error(
"[%s] Sender '%s', got event modified.",
this._context.namespace.connectionId,
this.name
);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.namespace.connectionId}]Sender "${this.name}", ` +
`received a modified disposition.Hence we are rejecting the promise.`
);
}
reject(err);
};

const actionAfterTimeout = () => {
removeListeners();
const desc: string =
`[${this._context.namespace.connectionId}] Sender "${this.name}" ` +
`with address "${this.address}", was not able to send the message right now, due ` +
Expand All @@ -370,13 +285,9 @@ export class MessageSender extends LinkEntity {
return reject(translate(e));
};

this._sender!.on(SenderEvents.accepted, onAccepted);
this._sender!.on(SenderEvents.rejected, onRejected);
this._sender!.on(SenderEvents.modified, onModified);
this._sender!.on(SenderEvents.released, onReleased);
waitTimer = setTimeout(actionAfterTimeout, Constants.defaultOperationTimeoutInMs);
const waitTimer = setTimeout(actionAfterTimeout, Constants.defaultOperationTimeoutInMs);
try {
const delivery = this._sender!.send(
const delivery = await this._sender!.send(
encodedMessage,
undefined,
sendBatch ? 0x80013700 : 0
Expand All @@ -387,9 +298,17 @@ export class MessageSender extends LinkEntity {
this.name,
delivery.id
);
return resolve();
} catch (error) {
removeListeners();
error = translate(error);
log.error(
"[%s] An error occurred while sending the message",
this._context.namespace.connectionId,
error
);
return reject(error);
} finally {
clearTimeout(waitTimer);
}
} else {
// let us retry to send the message after some time.
Expand Down Expand Up @@ -421,7 +340,7 @@ export class MessageSender extends LinkEntity {
/**
* Initializes the sender session on the connection.
*/
private async _init(options?: SenderOptions): Promise<void> {
private async _init(options?: AwaitableSenderOptions): Promise<void> {
try {
// isOpen isConnecting Should establish
// true false No
Expand All @@ -444,9 +363,9 @@ export class MessageSender extends LinkEntity {
this.name
);
if (!options) {
options = this._createSenderOptions({});
options = this._createSenderOptions(Constants.defaultOperationTimeoutInMs);
}
this._sender = await this._context.namespace.connection.createSender(options);
this._sender = await this._context.namespace.connection.createAwaitableSender(options);
this.isConnecting = false;
log.error(
"[%s] Sender '%s' with address '%s' has established itself.",
Expand Down Expand Up @@ -544,13 +463,11 @@ export class MessageSender extends LinkEntity {
}
if (shouldReopen) {
await defaultLock.acquire(this.senderLock, () => {
const options: SenderOptions = this._createSenderOptions({
newName: true
});
const senderOptions = this._createSenderOptions(Constants.defaultOperationTimeoutInMs);
// shall retry forever at an interval of 15 seconds if the error is a retryable error
// else bail out when the error is not retryable or the oepration succeeds.
const config: RetryConfig<void> = {
operation: () => this._init(options),
operation: () => this._init(senderOptions),
connectionId: this._context.namespace.connectionId!,
operationType: RetryOperationType.senderLink,
retryOptions: {
Expand Down

0 comments on commit 2be69f9

Please sign in to comment.