Skip to content

Commit

Permalink
[service-bus] Fix issue with live tests failing on session-based tests (
Browse files Browse the repository at this point in the history
Azure#10325)

Two breakages:
* A service side change has made it so specifying different values for sessionId and partitionKey is now considered a failure.
* Fix a broken test that got through when I accidentally skipped the live test run on my BatchingReceiverLite PR

Also, moved the receiverInit.spec.ts into 'internal' since it is purely a unit test.
  • Loading branch information
richardpark-msft authored Jul 29, 2020
1 parent d46a2f3 commit 9947ed8
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 8 deletions.
14 changes: 11 additions & 3 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class BatchingReceiver extends MessageReceiver {

this._batchingReceiverLite = new BatchingReceiverLite(
context,
async (abortSignal?: AbortSignalLike): Promise<MinimalReceiver> => {
async (abortSignal?: AbortSignalLike): Promise<MinimalReceiver | undefined> => {
let lastError: Error | AmqpError | undefined;

const rcvrOptions = this._createReceiverOptions(false, {
Expand All @@ -67,7 +67,7 @@ export class BatchingReceiver extends MessageReceiver {
throw lastError;
}

return this._receiver!;
return this._receiver;
},
this.receiveMode
);
Expand Down Expand Up @@ -241,7 +241,9 @@ interface ReceiveMessageArgs {
export class BatchingReceiverLite {
constructor(
clientEntityContext: ClientEntityContext,
private _getCurrentReceiver: (abortSignal?: AbortSignalLike) => Promise<MinimalReceiver>,
private _getCurrentReceiver: (
abortSignal?: AbortSignalLike
) => Promise<MinimalReceiver | undefined>,
private _receiveMode: ReceiveMode
) {
this._createServiceBusMessage = (context: MessageAndDelivery) => {
Expand Down Expand Up @@ -280,6 +282,12 @@ export class BatchingReceiverLite {
try {
this.isReceivingMessages = true;
const receiver = await this._getCurrentReceiver(args.userAbortSignal);

if (receiver == null) {
// (was somehow closed in between the init() and the return)
return [];
}

return await this._receiveMessagesImpl(receiver, args);
} finally {
this._closeHandler = undefined;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import chaiAsPromised from "chai-as-promised";
chai.use(chaiAsPromised);
const assert = chai.assert;

import { ClientEntityContext } from "../src/clientEntityContext";
import { BatchingReceiver } from "../src/core/batchingReceiver";
import { MessageReceiver, ReceiverType } from "../src/core/messageReceiver";
import { ClientEntityContext } from "../../src/clientEntityContext";
import { BatchingReceiver } from "../../src/core/batchingReceiver";
import { MessageReceiver, ReceiverType } from "../../src/core/messageReceiver";

describe("init() and close() interactions", () => {
function fakeContext(): ClientEntityContext {
Expand Down
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/test/renewLockSessions.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ describe("Session Lock Renewal", () => {
function getTestMessage(): ServiceBusMessage {
const baseMessage = TestMessage.getSessionSample();
baseMessage.sessionId = sessionId;
baseMessage.partitionKey = sessionId;
return baseMessage;
}
});
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ describe("Streaming with sessions", () => {
body: "test",
label: `${num}`,
sessionId: TestMessage.sessionId,
partitionKey: "dummy" // Ensures all messages go to same partition to make peek work reliably
partitionKey: TestMessage.sessionId // Ensures all messages go to same partition to make peek work reliably
};
num++;
messages.push(message);
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/test/utils/testUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class TestMessage {
return {
body: `message body ${randomNumber}`,
messageId: `message id ${randomNumber}`,
partitionKey: `partition key ${randomNumber}`,
partitionKey: TestMessage.sessionId,
contentType: `content type ${randomNumber}`,
correlationId: `correlation id ${randomNumber}`,
timeToLive: 60 * 60 * 24,
Expand Down

0 comments on commit 9947ed8

Please sign in to comment.