From 993c7bcb690f32765822fc8406c3c7585269ecdb Mon Sep 17 00:00:00 2001 From: Ramya Rao Date: Mon, 16 Nov 2020 17:48:16 -0800 Subject: [PATCH] [Service Bus] autoComplete -> autoCompleteMessages (#12558) --- .../service-bus/review/service-bus.api.md | 2 +- .../samples/javascript/advanced/deferral.js | 4 ++-- .../javascript/receiveMessagesStreaming.js | 15 ++++++--------- .../samples/typescript/src/advanced/deferral.ts | 4 ++-- .../typescript/src/receiveMessagesStreaming.ts | 15 ++++++--------- .../service-bus/src/core/messageReceiver.ts | 3 ++- sdk/servicebus/service-bus/src/models.ts | 12 ++++++++---- .../service-bus/src/receivers/receiver.ts | 2 +- .../service-bus/src/session/messageSession.ts | 3 ++- .../service-bus/test/receiveAndDeleteMode.spec.ts | 2 +- sdk/servicebus/service-bus/test/renewLock.spec.ts | 4 ++-- .../service-bus/test/renewLockSessions.spec.ts | 4 ++-- .../service-bus/test/sessionsTests.spec.ts | 2 +- .../service-bus/test/streamingReceiver.spec.ts | 14 +++++++------- .../test/streamingReceiverSessions.spec.ts | 12 ++++++------ 15 files changed, 49 insertions(+), 49 deletions(-) diff --git a/sdk/servicebus/service-bus/review/service-bus.api.md b/sdk/servicebus/service-bus/review/service-bus.api.md index 02178b64a2e2..cb7aef3d0984 100644 --- a/sdk/servicebus/service-bus/review/service-bus.api.md +++ b/sdk/servicebus/service-bus/review/service-bus.api.md @@ -488,7 +488,7 @@ export interface SqlRuleFilter { // @public export interface SubscribeOptions extends OperationOptionsBase { - autoComplete?: boolean; + autoCompleteMessages?: boolean; maxConcurrentCalls?: number; } diff --git a/sdk/servicebus/service-bus/samples/javascript/advanced/deferral.js b/sdk/servicebus/service-bus/samples/javascript/advanced/deferral.js index 3996aec295e8..d21cff84282a 100644 --- a/sdk/servicebus/service-bus/samples/javascript/advanced/deferral.js +++ b/sdk/servicebus/service-bus/samples/javascript/advanced/deferral.js @@ -111,9 +111,9 @@ async function receiveMessage() { receiver.subscribe( { processMessage, processError }, { - autoComplete: false + autoCompleteMessages: false } - ); // Disabling autoComplete so we can control when message can be completed, deferred or deadlettered + ); // Disabling autoCompleteMessages so we can control when message can be completed, deferred or deadlettered await delay(10000); await receiver.close(); console.log("Total number of deferred messages:", deferredSteps.size); diff --git a/sdk/servicebus/service-bus/samples/javascript/receiveMessagesStreaming.js b/sdk/servicebus/service-bus/samples/javascript/receiveMessagesStreaming.js index 9bad46d844b7..6ad8cafdb942 100644 --- a/sdk/servicebus/service-bus/samples/javascript/receiveMessagesStreaming.js +++ b/sdk/servicebus/service-bus/samples/javascript/receiveMessagesStreaming.js @@ -30,18 +30,15 @@ async function main() { try { const subscription = receiver.subscribe({ + // After executing this callback you provide, the receiver will remove the message from the queue if you + // have not already settled the message in your callback. + // You can disable this by passing `false` to the `autoCompleteMessages` option in the `subscribe()` method. + // If your callback _does_ throw an error before the message is settled, then it will be abandoned. processMessage: async (brokeredMessage) => { console.log(`Received message: ${brokeredMessage.body}`); - - // autoComplete, which is enabled by default, will automatically call - // receiver.completeMessage() on your message after awaiting on your processMessage - // handler so long as your handler does not throw an error. - // - // If your handler _does_ throw an error then the message will automatically - // be abandoned using receiver.abandonMessage() - // - // autoComplete can be disabled in the options for subscribe(). }, + // This callback will be called for any error that occurs when either in the receiver when receiving the message + // or when executing your `processMessage` callback or when the receiver automatically completes or abandons the message. processError: async (args) => { console.log(`Error from source ${args.errorSource} occurred: `, args.error); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/advanced/deferral.ts b/sdk/servicebus/service-bus/samples/typescript/src/advanced/deferral.ts index 01fe0da2e1da..b0bb394a3e56 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/advanced/deferral.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/advanced/deferral.ts @@ -117,9 +117,9 @@ async function receiveMessage() { receiver.subscribe( { processMessage, processError }, { - autoComplete: false + autoCompleteMessages: false } - ); // Disabling autoComplete so we can control when message can be completed, deferred or deadlettered + ); // Disabling autoCompleteMessages so we can control when message can be completed, deferred or deadlettered await delay(10000); await receiver.close(); console.log("Total number of deferred messages:", deferredSteps.size); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/receiveMessagesStreaming.ts b/sdk/servicebus/service-bus/samples/typescript/src/receiveMessagesStreaming.ts index 8349a00ec8c7..250352e1d3d9 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/receiveMessagesStreaming.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/receiveMessagesStreaming.ts @@ -38,18 +38,15 @@ export async function main() { try { const subscription = receiver.subscribe({ + // After executing this callback you provide, the receiver will remove the message from the queue if you + // have not already settled the message in your callback. + // You can disable this by passing `false` to the `autoCompleteMessages` option in the `subscribe()` method. + // If your callback _does_ throw an error before the message is settled, then it will be abandoned. processMessage: async (brokeredMessage: ServiceBusReceivedMessage) => { console.log(`Received message: ${brokeredMessage.body}`); - - // autoComplete, which is enabled by default, will automatically call - // receiver.completeMessage() on your message after awaiting on your processMessage - // handler so long as your handler does not throw an error. - // - // If your handler _does_ throw an error then the message will automatically - // be abandoned using receiver.abandonMessage() - // - // autoComplete can be disabled in the options for subscribe(). }, + // This callback will be called for any error that occurs when either in the receiver when receiving the message + // or when executing your `processMessage` callback or when the receiver automatically completes or abandons the message. processError: async (args: ProcessErrorArgs) => { console.log(`Error from source ${args.errorSource} occurred: `, args.error); diff --git a/sdk/servicebus/service-bus/src/core/messageReceiver.ts b/sdk/servicebus/service-bus/src/core/messageReceiver.ts index 2bf8ee815a8d..97b02ed71553 100644 --- a/sdk/servicebus/service-bus/src/core/messageReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/messageReceiver.ts @@ -162,7 +162,8 @@ export abstract class MessageReceiver extends LinkEntity { this.receiveMode = options.receiveMode || "peekLock"; // If explicitly set to false then autoComplete is false else true (default). - this.autoComplete = options.autoComplete === false ? options.autoComplete : true; + this.autoComplete = + options.autoCompleteMessages === false ? options.autoCompleteMessages : true; this._lockRenewer = options.lockRenewer; } diff --git a/sdk/servicebus/service-bus/src/models.ts b/sdk/servicebus/service-bus/src/models.ts index b837bcd9de41..2e52093ab535 100644 --- a/sdk/servicebus/service-bus/src/models.ts +++ b/sdk/servicebus/service-bus/src/models.ts @@ -154,12 +154,16 @@ export interface GetMessageIteratorOptions extends OperationOptionsBase {} */ export interface SubscribeOptions extends OperationOptionsBase { /** - * @property Indicates whether the `complete()` method on the message should automatically be - * called by the sdk after the user provided onMessage handler has been executed. - * Calling `complete()` on a message removes it from the Queue/Subscription. + * @property Indicates whether the message should be settled using the `completeMessage()` + * method on the receiver automatically after it executes the user provided message callback. + * Doing so removes the message from the queue/subscription. + * + * This option is ignored if messages are received in the `receiveAndDelete` receive mode or if + * the message is already settled in the user provided message callback. + * * - **Default**: `true`. */ - autoComplete?: boolean; + autoCompleteMessages?: boolean; /** * @property The maximum number of concurrent calls that the library * can make to the user's message handler. Once this limit has been reached, more messages will diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index ddfa35815c18..38713ee1da17 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -425,7 +425,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { options: StreamingReceiverInitArgs ): Promise { throwErrorIfConnectionClosed(this._context); - if (options.autoComplete == null) options.autoComplete = true; + if (options.autoCompleteMessages == null) options.autoCompleteMessages = true; // When the user "stops" a streaming receiver (via the returned instance from 'subscribe' we just suspend // it, leaving the link open). This allows users to stop the flow of messages but still be able to settle messages diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index 0294f7cea44a..27593ab73927 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -603,7 +603,8 @@ export class MessageSession extends LinkEntity { } // If explicitly set to false then autoComplete is false else true (default). - this.autoComplete = options.autoComplete === false ? options.autoComplete : true; + this.autoComplete = + options.autoCompleteMessages === false ? options.autoCompleteMessages : true; this._onMessage = onMessage; this._onError = onError; diff --git a/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts b/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts index b56c9457cab0..5ca0e11d6b56 100644 --- a/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts +++ b/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts @@ -145,7 +145,7 @@ describe("receive and delete", () => { errors.push(args.error.message); } }, - { autoComplete: autoCompleteFlag } + { autoCompleteMessages: autoCompleteFlag } ); const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); diff --git a/sdk/servicebus/service-bus/test/renewLock.spec.ts b/sdk/servicebus/service-bus/test/renewLock.spec.ts index 53eea62722b8..ae928b2b1a93 100644 --- a/sdk/servicebus/service-bus/test/renewLock.spec.ts +++ b/sdk/servicebus/service-bus/test/renewLock.spec.ts @@ -301,7 +301,7 @@ describe("Message Lock Renewal", () => { receiver.subscribe( { processMessage, processError }, { - autoComplete: false + autoCompleteMessages: false } ); await delay(10000); @@ -387,7 +387,7 @@ describe("Message Lock Renewal", () => { processError: async (err) => reject(err) }, { - autoComplete: false + autoCompleteMessages: false } ); }); diff --git a/sdk/servicebus/service-bus/test/renewLockSessions.spec.ts b/sdk/servicebus/service-bus/test/renewLockSessions.spec.ts index f81375f47c78..87c1a27331fc 100644 --- a/sdk/servicebus/service-bus/test/renewLockSessions.spec.ts +++ b/sdk/servicebus/service-bus/test/renewLockSessions.spec.ts @@ -275,7 +275,7 @@ describe("Session Lock Renewal", () => { receiver.subscribe( { processMessage, processError }, { - autoComplete: false + autoCompleteMessages: false } ); await delay(10000); @@ -341,7 +341,7 @@ describe("Session Lock Renewal", () => { } }, { - autoComplete: false + autoCompleteMessages: false } ); diff --git a/sdk/servicebus/service-bus/test/sessionsTests.spec.ts b/sdk/servicebus/service-bus/test/sessionsTests.spec.ts index f21d9f6df87e..06aecddce15e 100644 --- a/sdk/servicebus/service-bus/test/sessionsTests.spec.ts +++ b/sdk/servicebus/service-bus/test/sessionsTests.spec.ts @@ -192,7 +192,7 @@ describe("session tests", () => { }, processError }, - { autoComplete: false } + { autoCompleteMessages: false } ); const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); diff --git a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts index 508e3ce5b874..ead69ab22f55 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts @@ -136,7 +136,7 @@ describe("Streaming Receiver Tests", () => { }, processError }, - { autoComplete: false } + { autoCompleteMessages: false } ); const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); @@ -231,7 +231,7 @@ describe("Streaming Receiver Tests", () => { }, processError }, - { autoComplete } + { autoCompleteMessages: autoComplete } ); const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); @@ -278,7 +278,7 @@ describe("Streaming Receiver Tests", () => { }, processError }, - { autoComplete: false } + { autoCompleteMessages: false } ); const deliveryCountFlag = await checkWithTimeout( @@ -332,7 +332,7 @@ describe("Streaming Receiver Tests", () => { }, processError }, - { autoComplete } + { autoCompleteMessages: autoComplete } ); const sequenceNumCheck = await checkWithTimeout(() => sequenceNum !== 0); should.equal( @@ -397,7 +397,7 @@ describe("Streaming Receiver Tests", () => { }, processError }, - { autoComplete } + { autoCompleteMessages: autoComplete } ); const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); should.equal(msgsCheck, true, `Expected 1, received ${receivedMsgs.length} messages`); @@ -782,7 +782,7 @@ describe("Streaming Receiver Tests", () => { processError }, { - autoComplete: false + autoCompleteMessages: false } ); await receiver.close(); @@ -1040,7 +1040,7 @@ export function singleMessagePromise( } }, { - autoComplete: false + autoCompleteMessages: false } ); }); diff --git a/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts index d1c54605e4de..3f2d5af3acc2 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts @@ -209,7 +209,7 @@ describe("Streaming with sessions", () => { }, processError }, - { autoComplete: false } + { autoCompleteMessages: false } ); const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); @@ -253,7 +253,7 @@ describe("Streaming with sessions", () => { }, processError }, - { autoComplete } + { autoCompleteMessages: autoComplete } ); const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1); @@ -304,7 +304,7 @@ describe("Streaming with sessions", () => { }, processError }, - { autoComplete } + { autoCompleteMessages: autoComplete } ); const msgAbandonCheck = await checkWithTimeout(() => abandonFlag === 1); @@ -364,7 +364,7 @@ describe("Streaming with sessions", () => { }, processError }, - { autoComplete } + { autoCompleteMessages: autoComplete } ); const sequenceNumCheck = await checkWithTimeout(() => sequenceNum !== 0); @@ -425,7 +425,7 @@ describe("Streaming with sessions", () => { }, processError }, - { autoComplete } + { autoCompleteMessages: autoComplete } ); const msgsCheck = await checkWithTimeout(() => msgCount === 1); @@ -764,7 +764,7 @@ describe("Streaming with sessions", () => { processError }, { - autoComplete: false + autoCompleteMessages: false } ); await receiver.close();