Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Fix message loss issues with peekLock and receiveAndDelete #15989

Merged
merged 25 commits into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8ca0a16
Fixing an issue where we could lose messages or provoke an alarming m…
richardpark-msft Jun 25, 2021
7562013
Remove comments
richardpark-msft Jun 25, 2021
4338119
Implementing the minimal amount needed to allow for the drain/credit …
richardpark-msft Jun 25, 2021
127ac43
Formatting
richardpark-msft Jun 25, 2021
fc0c51a
Updating changelog
richardpark-msft Jun 25, 2021
86a5613
Use logger.warning
richardpark-msft Jun 25, 2021
8d48098
Fixing a silly unit test bug (I was decremeting the credit count when…
richardpark-msft Jun 26, 2021
57c6dd1
Cleaning up the tests. The 'fake' receiver had a bunch of duplicate c…
richardpark-msft Jun 26, 2021
a2c2595
Updating core-amqp changelog and package.json for new rhea version th…
richardpark-msft Jun 29, 2021
085aeec
Updating core-amqp dependency to 3.1.0 to have access to the new vers…
richardpark-msft Jun 29, 2021
a13ee56
Update mockhubs and eventhubs to use the newest core-amqp and rhea.
richardpark-msft Jun 29, 2021
1ea8206
Update package version to what it is now.
richardpark-msft Jun 29, 2021
bd46bfc
Package version update
richardpark-msft Jun 29, 2021
218ab80
Update package version
richardpark-msft Jun 29, 2021
c0eefee
package version and release date
richardpark-msft Jun 29, 2021
81e9d27
- Updating code to use the properly exposed rheaPromise.drainCredit =…
richardpark-msft Jun 30, 2021
22d3a8a
clean checkout from main, then rush update
richardpark-msft Jun 30, 2021
752ac46
Merge remote-tracking branch 'upstream/main' into sb-enoeden-clean-fix
richardpark-msft Jun 30, 2021
06e2d21
Updated rhea-promise and rhea (as minimal as a rush update as we're g…
richardpark-msft Jun 30, 2021
861a7ca
Updated rhea-promise and rhea (as minimal as a rush update as we're g…
richardpark-msft Jun 30, 2021
99d7750
There's nothing in the latest core-amqp that's required for _this_ ch…
richardpark-msft Jun 30, 2021
cac8131
revert pnpm-lock.yaml before merge
richardpark-msft Jun 30, 2021
88489b2
Merge remote-tracking branch 'upstream/main' into sb-enoeden-clean-fix
richardpark-msft Jun 30, 2021
e0a442c
revert to `main`, and rush update again.
richardpark-msft Jun 30, 2021
5859334
Revert the date on the changelog for core-amqp since we're not releas…
richardpark-msft Jun 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

### Key Bugs Fixed

- Fixed a bug that could lead to message loss in certain conditions when using `receiver.receiveMessages()`.
[PR#15989](https://github.com/Azure/azure-sdk-for-js/pull/15989)

### Fixed


Expand Down
59 changes: 53 additions & 6 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
OnAmqpEvent,
ReceiverEvents,
SessionEvents,
Receiver,
Receiver as RheaPromiseReceiver,
Session
} from "rhea-promise";
import { ServiceBusMessageImpl } from "../serviceBusMessage";
Expand Down Expand Up @@ -187,16 +187,22 @@ export function getRemainingWaitTimeInMsFn(
*
* @internal
*/
type EventEmitterLike<T extends Receiver | Session> = Pick<T, "once" | "removeListener" | "on">;
type EventEmitterLike<T extends RheaPromiseReceiver | Session> = Pick<
T,
"once" | "removeListener" | "on"
>;

/**
* The bare minimum needed to receive messages for batched
* message receiving.
*
* @internal
*/
export type MinimalReceiver = Pick<Receiver, "name" | "isOpen" | "credit" | "addCredit" | "drain"> &
EventEmitterLike<Receiver> & {
export type MinimalReceiver = Pick<
RheaPromiseReceiver,
"name" | "isOpen" | "credit" | "addCredit" | "drain"
> &
EventEmitterLike<RheaPromiseReceiver> & {
session: EventEmitterLike<Session>;
} & {
connection: {
Expand Down Expand Up @@ -386,6 +392,12 @@ export class BatchingReceiverLite {
// - maxWaitTime is passed or
// - newMessageWaitTimeoutInSeconds is passed since the last message was received
const finalAction = (): void => {
if (receiver.drain) {
// If a drain is already in process then we should let it complete. Some messages might still be in flight, but they will
// arrive before the drain completes.
return;
}

// Drain any pending credits.
if (receiver.isOpen() && receiver.credit > 0) {
logger.verbose(`${loggingPrefix} Draining leftover credits(${receiver.credit}).`);
Expand All @@ -395,6 +407,14 @@ export class BatchingReceiverLite {
// at which point we'll wrap everything up and resolve the promise.
receiver.drain = true;
receiver.addCredit(1);

// WORKAROUND: currently addCredit(<positive number>) is the only way the next tick will actually
// send a flow frame. However, we don't want the extra credit we just added - it can result in an extra
// message being sent that we did not ask for (ie, it causes us to request remaining_credits + 1)
//
// This workaround goes in and "removes" the credit, leaving all the other necessary flags intact so a
// flow frame will still get sent. This is all just in-memory manipulation (nothing has been sent yet).
(receiver as RheaReceiverWithPrivateProperties)._link.credit--;
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
} else {
logger.verbose(
`${loggingPrefix} Resolving receiveMessages() with ${brokeredMessages.length} messages.`
Expand Down Expand Up @@ -432,8 +452,17 @@ export class BatchingReceiverLite {

try {
const data: ServiceBusMessageImpl = this._createServiceBusMessage(context);
if (brokeredMessages.length < args.maxMessageCount) {
brokeredMessages.push(data);
brokeredMessages.push(data);

// NOTE: we used to actually "lose" any extra messages. At this point I've fixed the areas that were causing us to receive
// extra messages but if this bug arises in some other way it's better to return the message than it would be to let it be
// silently dropped on the floor.
if (brokeredMessages.length > args.maxMessageCount) {
logger.error(
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
`More messages arrived than were expected: ${
args.maxMessageCount
} vs ${brokeredMessages.length + 1}`
);
}
} catch (err) {
const errObj = err instanceof Error ? err : new Error(JSON.stringify(err));
Expand Down Expand Up @@ -523,3 +552,21 @@ export class BatchingReceiverLite {
receiver.session.on(SessionEvents.sessionClose, onClose);
}
}

/**
* @internal
*/
export type RheaReceiverWithPrivateProperties = RheaPromiseReceiver & {
/**
* rhea-promise does an internal cast to get to the credit property as it's not actually exposed
* by rhea.
*
* export class RheaPromiseReceiver extends Link {}
* export abstract class Link extends Entity {
* protected _link: RheaLink;
* }
*/
_link: RheaPromiseReceiver["_link"] & {
credit: number;
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import { EventEmitter } from "events";
import {
BatchingReceiver,
getRemainingWaitTimeInMsFn,
BatchingReceiverLite
BatchingReceiverLite,
RheaReceiverWithPrivateProperties
} from "../../../src/core/batchingReceiver";
import { defer, createConnectionContextForTests } from "./unittestUtils";
import { createAbortSignalForTest } from "../../public/utils/abortSignalTestUtils";
Expand Down Expand Up @@ -463,16 +464,18 @@ describe("BatchingReceiver unit tests", () => {
} {
const emitter = new EventEmitter();
const { promise: receiveIsReady, resolve: resolvePromiseIsReady } = defer<void>();
let credits = 0;

const remainingRegisteredListeners = new Set<string>();

const _link = {
credit: 0
};

const fakeRheaReceiver = {
on(evt: ReceiverEvents, handler: OnAmqpEventAsPromise) {
emitter.on(evt, handler);

if (evt === ReceiverEvents.message) {
--credits;
--_link.credit;
}

assert.isFalse(remainingRegisteredListeners.has(evt.toString()));
Expand Down Expand Up @@ -501,22 +504,23 @@ describe("BatchingReceiver unit tests", () => {
}
},
isOpen: () => true,
addCredit: (_credits: number) => {
if (_credits === 1 && fakeRheaReceiver.drain === true) {
addCredit: (credit: number) => {
if (credit === 1 && fakeRheaReceiver.drain === true) {
// special case - if we're draining we should initiate a drain
emitter.emit(ReceiverEvents.receiverDrained, undefined);
clock?.runAll();
} else {
credits += _credits;
_link.credit += credit;
}
},
get credit() {
return credits;
return _link.credit;
},
connection: {
id: "connection-id"
}
} as RheaReceiver;
},
_link
} as RheaReceiverWithPrivateProperties;

return {
receiveIsReady,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,14 @@ import {
} from "./unittestUtils";
import sinon, { SinonSpy } from "sinon";
import { EventEmitter } from "events";
import {
ReceiverEvents,
Receiver as RheaReceiver,
EventContext,
Message as RheaMessage,
SessionEvents
} from "rhea-promise";
import { ReceiverEvents, EventContext, Message as RheaMessage, SessionEvents } from "rhea-promise";
import { OnAmqpEventAsPromise } from "../../../src/core/messageReceiver";
import { ServiceBusMessageImpl } from "../../../src/serviceBusMessage";
import { ProcessErrorArgs, ServiceBusError } from "../../../src";
import { ReceiveMode } from "../../../src/models";
import { Constants } from "@azure/core-amqp";
import { AbortError } from "@azure/abort-controller";
import { RheaReceiverWithPrivateProperties } from "../../../src/core/batchingReceiver";

chai.use(chaiAsPromised);
const assert = chai.assert;
Expand Down Expand Up @@ -280,7 +275,9 @@ describe("Message session unit tests", () => {
} {
const emitter = new EventEmitter();
const { promise: receiveIsReady, resolve: resolvePromiseIsReady } = defer<void>();
let credits = 0;
const _link = {
credit: 0
};

const remainingRegisteredListeners = new Set<string>();

Expand All @@ -289,7 +286,7 @@ describe("Message session unit tests", () => {
emitter.on(evt, handler);

if (evt === ReceiverEvents.message) {
--credits;
--_link.credit;
}

assert.isFalse(remainingRegisteredListeners.has(evt.toString()));
Expand Down Expand Up @@ -318,22 +315,23 @@ describe("Message session unit tests", () => {
}
},
isOpen: () => true,
addCredit: (_credits: number) => {
if (_credits === 1 && fakeRheaReceiver.drain === true) {
addCredit: (credit: number) => {
if (credit === 1 && fakeRheaReceiver.drain === true) {
// special case - if we're draining we should initiate a drain
emitter.emit(ReceiverEvents.receiverDrained, undefined);
clock?.runAll();
} else {
credits += _credits;
_link.credit += credit;
}
},
get credit() {
return credits;
return _link.credit;
},
connection: {
id: "connection-id"
}
} as RheaReceiver;
},
_link
} as RheaReceiverWithPrivateProperties;

batchingReceiver["_link"] = fakeRheaReceiver;

Expand Down
21 changes: 9 additions & 12 deletions sdk/servicebus/service-bus/test/stress/scenarioLongRunning.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import {
captureConsoleOutputToAppInsights,
createServiceBusClient,
loopForever as loopInfinitely,
ServiceBusStressTester
} from "./serviceBusStressTester";
import { AbortController, AbortSignalLike } from "@azure/abort-controller";
Expand All @@ -8,14 +13,6 @@ import { v4 as uuidv4 } from "uuid";

captureConsoleOutputToAppInsights();

async function looper(fn: () => Promise<void>, delay: number, abortSignal: AbortSignalLike) {
const timeout = () => new Promise((resolve) => setTimeout(() => resolve(true), delay));

while (!abortSignal.aborted && (await timeout())) {
await fn();
}
}

async function sendMessagesForever(
stressTest: ServiceBusStressTester,
clientForSender: ServiceBusClient,
Expand All @@ -25,7 +22,7 @@ async function sendMessagesForever(

let sender: ServiceBusSender | undefined;

return looper(
return loopInfinitely(
async () => {
if (abortSignal.aborted) {
console.log(`Aborting sending because of abortSignal`);
Expand All @@ -48,7 +45,7 @@ async function sendMessagesForever(
await sender.sendMessages(messagesToSend);
} catch (err) {
console.log(`Sending message failed: `, err);
stressTest.trackError("send", err);
stressTest.trackError("send", err as Error);
sender = undefined;
}
},
Expand All @@ -67,7 +64,7 @@ async function main() {
});

const operation = async () => {
const clientForReceiver = stressTest.createServiceBusClient();
const clientForReceiver = createServiceBusClient();

const receiver = clientForReceiver.createReceiver(stressTest.queueName, {
receiveMode: "peekLock"
Expand All @@ -92,7 +89,7 @@ async function main() {
}
);

const clientForSender = stressTest.createServiceBusClient();
const clientForSender = createServiceBusClient();

await sendMessagesForever(stressTest, clientForSender, abortSignal);
};
Expand Down
Loading