Skip to content

Commit

Permalink
[Service Bus] autoComplete -> autoCompleteMessages (Azure#12558)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramya-rao-a authored Nov 17, 2020
1 parent e8b73af commit 993c7bc
Show file tree
Hide file tree
Showing 15 changed files with 49 additions and 49 deletions.
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ export interface SqlRuleFilter {

// @public
export interface SubscribeOptions extends OperationOptionsBase {
autoComplete?: boolean;
autoCompleteMessages?: boolean;
maxConcurrentCalls?: number;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ async function receiveMessage() {
receiver.subscribe(
{ processMessage, processError },
{
autoComplete: false
autoCompleteMessages: false
}
); // Disabling autoComplete so we can control when message can be completed, deferred or deadlettered
); // Disabling autoCompleteMessages so we can control when message can be completed, deferred or deadlettered
await delay(10000);
await receiver.close();
console.log("Total number of deferred messages:", deferredSteps.size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,15 @@ async function main() {

try {
const subscription = receiver.subscribe({
// After executing this callback you provide, the receiver will remove the message from the queue if you
// have not already settled the message in your callback.
// You can disable this by passing `false` to the `autoCompleteMessages` option in the `subscribe()` method.
// If your callback _does_ throw an error before the message is settled, then it will be abandoned.
processMessage: async (brokeredMessage) => {
console.log(`Received message: ${brokeredMessage.body}`);

// autoComplete, which is enabled by default, will automatically call
// receiver.completeMessage() on your message after awaiting on your processMessage
// handler so long as your handler does not throw an error.
//
// If your handler _does_ throw an error then the message will automatically
// be abandoned using receiver.abandonMessage()
//
// autoComplete can be disabled in the options for subscribe().
},
// This callback will be called for any error that occurs when either in the receiver when receiving the message
// or when executing your `processMessage` callback or when the receiver automatically completes or abandons the message.
processError: async (args) => {
console.log(`Error from source ${args.errorSource} occurred: `, args.error);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ async function receiveMessage() {
receiver.subscribe(
{ processMessage, processError },
{
autoComplete: false
autoCompleteMessages: false
}
); // Disabling autoComplete so we can control when message can be completed, deferred or deadlettered
); // Disabling autoCompleteMessages so we can control when message can be completed, deferred or deadlettered
await delay(10000);
await receiver.close();
console.log("Total number of deferred messages:", deferredSteps.size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,15 @@ export async function main() {

try {
const subscription = receiver.subscribe({
// After executing this callback you provide, the receiver will remove the message from the queue if you
// have not already settled the message in your callback.
// You can disable this by passing `false` to the `autoCompleteMessages` option in the `subscribe()` method.
// If your callback _does_ throw an error before the message is settled, then it will be abandoned.
processMessage: async (brokeredMessage: ServiceBusReceivedMessage) => {
console.log(`Received message: ${brokeredMessage.body}`);

// autoComplete, which is enabled by default, will automatically call
// receiver.completeMessage() on your message after awaiting on your processMessage
// handler so long as your handler does not throw an error.
//
// If your handler _does_ throw an error then the message will automatically
// be abandoned using receiver.abandonMessage()
//
// autoComplete can be disabled in the options for subscribe().
},
// This callback will be called for any error that occurs when either in the receiver when receiving the message
// or when executing your `processMessage` callback or when the receiver automatically completes or abandons the message.
processError: async (args: ProcessErrorArgs) => {
console.log(`Error from source ${args.errorSource} occurred: `, args.error);

Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ export abstract class MessageReceiver extends LinkEntity<Receiver> {
this.receiveMode = options.receiveMode || "peekLock";

// If explicitly set to false then autoComplete is false else true (default).
this.autoComplete = options.autoComplete === false ? options.autoComplete : true;
this.autoComplete =
options.autoCompleteMessages === false ? options.autoCompleteMessages : true;
this._lockRenewer = options.lockRenewer;
}

Expand Down
12 changes: 8 additions & 4 deletions sdk/servicebus/service-bus/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,16 @@ export interface GetMessageIteratorOptions extends OperationOptionsBase {}
*/
export interface SubscribeOptions extends OperationOptionsBase {
/**
* @property Indicates whether the `complete()` method on the message should automatically be
* called by the sdk after the user provided onMessage handler has been executed.
* Calling `complete()` on a message removes it from the Queue/Subscription.
* @property Indicates whether the message should be settled using the `completeMessage()`
* method on the receiver automatically after it executes the user provided message callback.
* Doing so removes the message from the queue/subscription.
*
* This option is ignored if messages are received in the `receiveAndDelete` receive mode or if
* the message is already settled in the user provided message callback.
*
* - **Default**: `true`.
*/
autoComplete?: boolean;
autoCompleteMessages?: boolean;
/**
* @property The maximum number of concurrent calls that the library
* can make to the user's message handler. Once this limit has been reached, more messages will
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
options: StreamingReceiverInitArgs
): Promise<StreamingReceiver> {
throwErrorIfConnectionClosed(this._context);
if (options.autoComplete == null) options.autoComplete = true;
if (options.autoCompleteMessages == null) options.autoCompleteMessages = true;

// When the user "stops" a streaming receiver (via the returned instance from 'subscribe' we just suspend
// it, leaving the link open). This allows users to stop the flow of messages but still be able to settle messages
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,8 @@ export class MessageSession extends LinkEntity<Receiver> {
}

// If explicitly set to false then autoComplete is false else true (default).
this.autoComplete = options.autoComplete === false ? options.autoComplete : true;
this.autoComplete =
options.autoCompleteMessages === false ? options.autoCompleteMessages : true;
this._onMessage = onMessage;
this._onError = onError;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ describe("receive and delete", () => {
errors.push(args.error.message);
}
},
{ autoComplete: autoCompleteFlag }
{ autoCompleteMessages: autoCompleteFlag }
);

const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/test/renewLock.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ describe("Message Lock Renewal", () => {
receiver.subscribe(
{ processMessage, processError },
{
autoComplete: false
autoCompleteMessages: false
}
);
await delay(10000);
Expand Down Expand Up @@ -387,7 +387,7 @@ describe("Message Lock Renewal", () => {
processError: async (err) => reject(err)
},
{
autoComplete: false
autoCompleteMessages: false
}
);
});
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/test/renewLockSessions.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ describe("Session Lock Renewal", () => {
receiver.subscribe(
{ processMessage, processError },
{
autoComplete: false
autoCompleteMessages: false
}
);
await delay(10000);
Expand Down Expand Up @@ -341,7 +341,7 @@ describe("Session Lock Renewal", () => {
}
},
{
autoComplete: false
autoCompleteMessages: false
}
);

Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/test/sessionsTests.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ describe("session tests", () => {
},
processError
},
{ autoComplete: false }
{ autoCompleteMessages: false }
);

const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
Expand Down
14 changes: 7 additions & 7 deletions sdk/servicebus/service-bus/test/streamingReceiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ describe("Streaming Receiver Tests", () => {
},
processError
},
{ autoComplete: false }
{ autoCompleteMessages: false }
);

const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
Expand Down Expand Up @@ -231,7 +231,7 @@ describe("Streaming Receiver Tests", () => {
},
processError
},
{ autoComplete }
{ autoCompleteMessages: autoComplete }
);

const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
Expand Down Expand Up @@ -278,7 +278,7 @@ describe("Streaming Receiver Tests", () => {
},
processError
},
{ autoComplete: false }
{ autoCompleteMessages: false }
);

const deliveryCountFlag = await checkWithTimeout(
Expand Down Expand Up @@ -332,7 +332,7 @@ describe("Streaming Receiver Tests", () => {
},
processError
},
{ autoComplete }
{ autoCompleteMessages: autoComplete }
);
const sequenceNumCheck = await checkWithTimeout(() => sequenceNum !== 0);
should.equal(
Expand Down Expand Up @@ -397,7 +397,7 @@ describe("Streaming Receiver Tests", () => {
},
processError
},
{ autoComplete }
{ autoCompleteMessages: autoComplete }
);
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
should.equal(msgsCheck, true, `Expected 1, received ${receivedMsgs.length} messages`);
Expand Down Expand Up @@ -782,7 +782,7 @@ describe("Streaming Receiver Tests", () => {
processError
},
{
autoComplete: false
autoCompleteMessages: false
}
);
await receiver.close();
Expand Down Expand Up @@ -1040,7 +1040,7 @@ export function singleMessagePromise(
}
},
{
autoComplete: false
autoCompleteMessages: false
}
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ describe("Streaming with sessions", () => {
},
processError
},
{ autoComplete: false }
{ autoCompleteMessages: false }
);

const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
Expand Down Expand Up @@ -253,7 +253,7 @@ describe("Streaming with sessions", () => {
},
processError
},
{ autoComplete }
{ autoCompleteMessages: autoComplete }
);

const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
Expand Down Expand Up @@ -304,7 +304,7 @@ describe("Streaming with sessions", () => {
},
processError
},
{ autoComplete }
{ autoCompleteMessages: autoComplete }
);

const msgAbandonCheck = await checkWithTimeout(() => abandonFlag === 1);
Expand Down Expand Up @@ -364,7 +364,7 @@ describe("Streaming with sessions", () => {
},
processError
},
{ autoComplete }
{ autoCompleteMessages: autoComplete }
);

const sequenceNumCheck = await checkWithTimeout(() => sequenceNum !== 0);
Expand Down Expand Up @@ -425,7 +425,7 @@ describe("Streaming with sessions", () => {
},
processError
},
{ autoComplete }
{ autoCompleteMessages: autoComplete }
);

const msgsCheck = await checkWithTimeout(() => msgCount === 1);
Expand Down Expand Up @@ -764,7 +764,7 @@ describe("Streaming with sessions", () => {
processError
},
{
autoComplete: false
autoCompleteMessages: false
}
);
await receiver.close();
Expand Down

0 comments on commit 993c7bc

Please sign in to comment.