Skip to content

Commit

Permalink
[service-bus] Track 2 - push down methods of MessageReceiver that are…
Browse files Browse the repository at this point in the history
… only used for StreamingReceiver (Azure#10322)

Pushing down methods from MessageReceiver that really only belong to StreamingReceiver. This is a prep-PR for a larger refactoring with StreamingReceiver but it's useful on its own.
  • Loading branch information
richardpark-msft authored Jul 29, 2020
1 parent 9947ed8 commit 3a046f6
Show file tree
Hide file tree
Showing 12 changed files with 1,704 additions and 1,723 deletions.
5 changes: 0 additions & 5 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,6 @@ export class BatchingReceiver extends MessageReceiver {
this.name
);

// while creating the receiver link for batching receiver the max concurrent calls
// i.e. the credit_window on the link is set to zero. After the link is created
// successfully, we add credit which is the maxMessageCount specified by the user.
this.maxConcurrentCalls = 0;

return await this._batchingReceiverLite.receiveMessages({
maxMessageCount,
maxWaitTimeInMs,
Expand Down
790 changes: 23 additions & 767 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts

Large diffs are not rendered by default.

85 changes: 85 additions & 0 deletions sdk/servicebus/service-bus/src/core/receiverHelper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { Receiver, ReceiverEvents } from "rhea-promise";
import * as log from "../log";

/**
* Wraps the receiver with some higher level operations for managing state
* like credits, draining, etc...
*
* @internal
* @ignore
*/
export class ReceiverHelper {
private _stopReceivingMessages: boolean = false;

constructor(private _getCurrentReceiver: () => Receiver | undefined) {}

/**
* Adds credits to the receiver, respecting any state that
* indicates the receiver is closed or should not continue
* to receive more messages.
*
* @param credits Number of credits to add.
* @returns true if credits were added, false if there is no current receiver instance
* or `stopReceivingMessages` has been called.
*/
public addCredit(credits: number): boolean {
const receiver = this._getCurrentReceiver();

if (this._stopReceivingMessages || receiver == null) {
return false;
}

receiver.addCredit(credits);
return true;
}

/**
* Prevents us from receiving any further messages.
*/
public async stopReceivingMessages(): Promise<void> {
const receiver = this._getCurrentReceiver();

if (receiver == null) {
return;
}

log.receiver(
`[${receiver.name}] User has requested to stop receiving new messages, attempting to drain the credits.`
);
this._stopReceivingMessages = true;

return this.drain();
}

/**
* Initiates a drain for the current receiver and resolves when
* the drain has completed.
*/
public async drain(): Promise<void> {
const receiver = this._getCurrentReceiver();

if (receiver == null) {
return;
}

log.receiver(`[${receiver.name}] Receiver is starting drain.`);

const drainPromise = new Promise<void>((resolve) => {
receiver.once(ReceiverEvents.receiverDrained, () => {
log.receiver(`[${receiver.name}] Receiver has been drained.`);
receiver.drain = false;
resolve();
});

receiver.drain = true;
// this is not actually adding another credit - it'll just
// cause the drain call to start.
receiver.addCredit(1);
});

return drainPromise;
}
}
Loading

0 comments on commit 3a046f6

Please sign in to comment.