diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index caf40ac0c957..53d27e789899 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -5,6 +5,10 @@ - Adds abortSignal support throughout Sender and non-session Receivers. [PR 9233](https://github.com/Azure/azure-sdk-for-js/pull/9233) [PR 9284](https://github.com/Azure/azure-sdk-for-js/pull/9284) +- (Receiver|SessionReceiver).subscribe() now returns a closeable object which will stop new messages from arriving + but still leave the receiver open so they can be settled via methods like complete(). + [PR 9802](https://github.com/Azure/azure-sdk-for-js/pull/9802) + [PR 9849](https://github.com/Azure/azure-sdk-for-js/pull/9849) - Bug - Messages scheduled in parallel with the `scheduleMessage` method have the same sequence number in response. Fixed in [PR 9503](https://github.com/Azure/azure-sdk-for-js/pull/9503) - Management api updates diff --git a/sdk/servicebus/service-bus/src/core/messageReceiver.ts b/sdk/servicebus/service-bus/src/core/messageReceiver.ts index 15a63c0a3f80..5bb0a1dd258a 100644 --- a/sdk/servicebus/service-bus/src/core/messageReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/messageReceiver.ts @@ -248,16 +248,24 @@ export class MessageReceiver extends LinkEntity { private _isDetaching: boolean = false; private _stopReceivingMessages: boolean = false; + public get receiverHelper(): ReceiverHelper { + return this._receiverHelper; + } + private _receiverHelper: ReceiverHelper; + constructor(context: ClientEntityContext, receiverType: ReceiverType, options?: ReceiveOptions) { super(context.entityPath, context, { address: context.entityPath, audience: `${context.namespace.config.endpoint}${context.entityPath}` }); + if (!options) options = {}; this._retryOptions = options.retryOptions || {}; this.wasCloseInitiated = false; this.receiverType = receiverType; this.receiveMode = options.receiveMode || ReceiveMode.peekLock; + this._receiverHelper = new ReceiverHelper(() => this._receiver); + if (typeof options.maxConcurrentCalls === "number" && options.maxConcurrentCalls > 0) { this.maxConcurrentCalls = options.maxConcurrentCalls; } @@ -515,7 +523,7 @@ export class MessageReceiver extends LinkEntity { } return; } finally { - this.addCredit(1); + this._receiverHelper.addCredit(1); } // If we've made it this far, then user's message handler completed fine. Let us try @@ -715,7 +723,9 @@ export class MessageReceiver extends LinkEntity { * Prevents us from receiving any further messages. */ public stopReceivingMessages(): Promise { - log.receiver(`[${this.name}] User has requested to stop receiving new messages, attempting to drain the credits.`); + log.receiver( + `[${this.name}] User has requested to stop receiving new messages, attempting to drain the credits.` + ); this._stopReceivingMessages = true; return this.drainReceiver(); @@ -1032,7 +1042,7 @@ export class MessageReceiver extends LinkEntity { await this.close(); } else { if (this._receiver && this.receiverType === ReceiverType.streaming) { - this.addCredit(this.maxConcurrentCalls); + this._receiverHelper.addCredit(this.maxConcurrentCalls); } } return; @@ -1188,3 +1198,83 @@ export class MessageReceiver extends LinkEntity { return result; } } + +/** + * Wraps the receiver with some higher level operations for managing state + * like credits, draining, etc... + * + * @internal + * @ignore + */ +export class ReceiverHelper { + private _stopReceivingMessages: boolean = false; + + constructor(private _getCurrentReceiver: () => Receiver | undefined) {} + + /** + * Adds credits to the receiver, respecting any state that + * indicates the receiver is closed or should not continue + * to receive more messages. + * + * @param credits Number of credits to add. + * @returns true if credits were added, false if there is no current receiver instance + * or `stopReceivingMessages` has been called. + */ + public addCredit(credits: number): boolean { + const receiver = this._getCurrentReceiver(); + + if (this._stopReceivingMessages || receiver == null) { + return false; + } + + receiver.addCredit(credits); + return true; + } + + /** + * Prevents us from receiving any further messages. + */ + public async stopReceivingMessages(): Promise { + const receiver = this._getCurrentReceiver(); + + if (receiver == null) { + return; + } + + log.receiver( + `[${receiver.name}] User has requested to stop receiving new messages, attempting to drain the credits.` + ); + this._stopReceivingMessages = true; + + return this.drain(); + } + + /** + * Initiates a drain for the current receiver and resolves when + * the drain has completed. + */ + public async drain(): Promise { + const receiver = this._getCurrentReceiver(); + + if (receiver == null) { + return; + } + + log.receiver(`[${receiver.name}] Receiver is starting drain.`); + + const drainPromise = new Promise((resolve) => { + receiver.once(ReceiverEvents.receiverDrained, () => { + log.receiver(`[${receiver.name}] Receiver has been drained.`); + receiver.drain = false; + resolve(); + }); + + receiver.drain = true; + // this is not actually adding another credit - it'll just + // cause the drain call to start. + receiver.addCredit(1); + }); + + return drainPromise; + } +} diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 1314098d3d4e..f67afa86fdfa 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -60,7 +60,7 @@ export class StreamingReceiver extends MessageReceiver { throwErrorIfConnectionClosed(this._context.namespace); this._onMessage = onMessage; this._onError = onError; - this.addCredit(this.maxConcurrentCalls); + this.receiverHelper.addCredit(this.maxConcurrentCalls); } /** diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index 7e063b8c2bb3..f4af2778d7c2 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -37,6 +37,8 @@ export interface Receiver { * Streams messages to message handlers. * @param handlers A handler that gets called for messages and errors. * @param options Options for subscribe. + * @returns An object that can be closed, sending any remaining messages to `handlers` and + * stopping new messages from arriving. */ subscribe( handlers: MessageHandlers, @@ -403,7 +405,7 @@ export class ReceiverImpl => { - return this._context.streamingReceiver?.stopReceivingMessages(); + return this._context.streamingReceiver?.receiverHelper.stopReceivingMessages(); } }; } diff --git a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts index 74c9d2a02c56..707f6e7e2941 100644 --- a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts @@ -453,8 +453,9 @@ export class SessionReceiverImpl => {} + close: async (): Promise => { + return this._messageSession?.receiverHelper.stopReceivingMessages(); + } }; } diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index 505d92f83472..baa8d3506154 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -18,7 +18,13 @@ import { isAmqpError } from "rhea-promise"; import * as log from "../log"; -import { OnAmqpEventAsPromise, OnError, OnMessage, PromiseLike } from "../core/messageReceiver"; +import { + OnAmqpEventAsPromise, + OnError, + OnMessage, + PromiseLike, + ReceiverHelper +} from "../core/messageReceiver"; import { LinkEntity } from "../core/linkEntity"; import { ClientEntityContext } from "../clientEntityContext"; import { calculateRenewAfterDuration, convertTicksToDate } from "../util/utils"; @@ -250,6 +256,11 @@ export class MessageSession extends LinkEntity { private _totalAutoLockRenewDuration: number; + public get receiverHelper(): ReceiverHelper { + return this._receiverHelper; + } + private _receiverHelper: ReceiverHelper; + /** * Ensures that the session lock is renewed before it expires. The lock will not be renewed for * more than the configured totalAutoLockRenewDuration. @@ -364,15 +375,15 @@ export class MessageSession extends LinkEntity { // SB allows a sessionId with empty string value :) if (this.sessionId == null && receivedSessionId == null) { - // Ideally this code path should never be reached as `createReceiver()` should fail instead + // Ideally this code path should never be reached as `createReceiver()` should fail instead // TODO: https://github.com/Azure/azure-sdk-for-js/issues/9775 to figure out why this code path indeed gets hit. errorMessage = `No unlocked sessions were available`; } else if (this.sessionId != null && receivedSessionId !== this.sessionId) { // This code path is reached if the session is already locked by another receiver. // TODO: Check why the service would not throw an error or just timeout instead of giving a misleading successful receiver - errorMessage = `Failed to get a lock on the session ${this.sessionId};` + errorMessage = `Failed to get a lock on the session ${this.sessionId}`; } - + if (errorMessage) { const error = translate({ description: errorMessage, @@ -476,6 +487,7 @@ export class MessageSession extends LinkEntity { }); this._context.isSessionEnabled = true; this.isReceivingMessages = false; + this._receiverHelper = new ReceiverHelper(() => this._receiver); if (!options) options = { sessionId: undefined }; this.autoComplete = false; this.sessionId = options.sessionId; @@ -846,9 +858,7 @@ export class MessageSession extends LinkEntity { } return; } finally { - if (this._receiver) { - this._receiver!.addCredit(1); - } + this._receiverHelper.addCredit(1); } // If we've made it this far, then user's message handler completed fine. Let us try @@ -883,7 +893,7 @@ export class MessageSession extends LinkEntity { // setting the "message" event listener. this._receiver.on(ReceiverEvents.message, onSessionMessage); // adding credit - this._receiver!.addCredit(this.maxConcurrentCalls); + this._receiverHelper.addCredit(this.maxConcurrentCalls); } else { this.isReceivingMessages = false; const msg = @@ -1077,7 +1087,7 @@ export class MessageSession extends LinkEntity { // number of messages concurrently. We will return the user an array of messages that can // be of size upto maxMessageCount. Then the user needs to accordingly dispose // (complete,/abandon/defer/deadletter) the messages from the array. - this._receiver!.addCredit(maxMessageCount); + this._receiverHelper.addCredit(maxMessageCount); let msg: string = "[%s] Setting the wait timer for %d milliseconds for receiver '%s'."; if (reuse) msg += " Receiver link already present, hence reusing it."; log.batching(msg, this._context.namespace.connectionId, maxWaitTimeInMs, this.name); diff --git a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts index b45a03760390..5e15f322663d 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts @@ -1158,35 +1158,6 @@ describe("Streaming", () => { }); }); -function singleMessagePromise( - receiver: Receiver -): Promise<{ - subscriber: ReturnType["subscribe"]>; - messages: ReceivedMessageWithLock[]; -}> { - const messages: ReceivedMessageWithLock[] = []; - - return new Promise<{ - subscriber: ReturnType["subscribe"]>; - messages: ReceivedMessageWithLock[]; - }>((resolve, reject) => { - const subscriber = receiver.subscribe( - { - processMessage: async (message) => { - messages.push(message); - resolve({ subscriber, messages }); - }, - processError: async (err) => { - reject(err); - } - }, - { - autoComplete: false - } - ); - }); -} - describe("Streaming - onDetached", function(): void { let serviceBusClient: ServiceBusClientForTests; let sender: Sender; @@ -1476,3 +1447,32 @@ describe("Streaming - disconnects", function(): void { refreshConnectionCalled.should.be.greaterThan(0, "refreshConnection was not called."); }); }); + +export function singleMessagePromise( + receiver: Receiver +): Promise<{ + subscriber: ReturnType["subscribe"]>; + messages: ReceivedMessageWithLock[]; +}> { + const messages: ReceivedMessageWithLock[] = []; + + return new Promise<{ + subscriber: ReturnType["subscribe"]>; + messages: ReceivedMessageWithLock[]; + }>((resolve, reject) => { + const subscriber = receiver.subscribe( + { + processMessage: async (message) => { + messages.push(message); + resolve({ subscriber, messages }); + }, + processError: async (err) => { + reject(err); + } + }, + { + autoComplete: false + } + ); + }); +} diff --git a/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts index 613b81718c25..04f4bf784ead 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts @@ -17,6 +17,7 @@ import { testPeekMsgsLength } from "./utils/testutils2"; import { getDeliveryProperty } from "./utils/misc"; +import { singleMessagePromise } from "./streamingReceiver.spec"; const should = chai.should(); chai.use(chaiAsPromised); @@ -99,6 +100,53 @@ describe("Streaming with sessions", () => { return entityNames; } + it("Streaming - user can stop a message subscription without closing the receiver", async () => { + const entities = await serviceBusClient.test.createTestEntities( + TestClientType.UnpartitionedQueueWithSessions + ); + + const sender = await serviceBusClient.test.createSender(entities); + await sender.sendMessages({ + body: ".close() test - first message", + sessionId: TestMessage.sessionId + }); + + const actualReceiver = await serviceBusClient.test.getSessionPeekLockReceiver(entities, { + sessionId: TestMessage.sessionId + }); + const { subscriber, messages } = await singleMessagePromise(actualReceiver); + + messages.map((m) => m.body).should.deep.equal([".close() test - first message"]); + + // now we're going to shut down the closeable (ie, subscription). This leaves + // the receiver open but it does drain it (so any remaining messages are delivered + // and will still be settleable). + await subscriber.close(); + + await messages[0].complete(); + messages.pop(); + + await sender.sendMessages({ + body: ".close test - second message, after closing", + sessionId: TestMessage.sessionId + }); + + // the subscription is closed so no messages should be received here. + await delay(2000); + + messages.map((m) => m.body).should.deep.equal([]); + + await actualReceiver.close(); // release the session lock + + const receiver2 = await serviceBusClient.test.getReceiveAndDeleteReceiver(entities); + + // clean out the remaining message that never arrived. + const [finalMessage] = await receiver2.receiveMessages(1, { maxWaitTimeInMs: 5000 }); + finalMessage.body.should.equal(".close test - second message, after closing"); + + await serviceBusClient.test.afterEach(); + }); + describe("Sessions Streaming - Misc Tests", function(): void { afterEach(async () => { await afterEachTest();