Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Add option to preserve Date type when receiving messages #24539

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Release History

## 7.7.4 (Unreleased)
## 7.8.0 (Unreleased)

### Features Added

- Add an option to skip converting Date type into UNIX epoch number for properties in message annotations or application properties. (PR #24539)[https://github.com/Azure/azure-sdk-for-js/pull/24539]

### Breaking Changes

### Bugs Fixed
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@azure/service-bus",
"sdk-type": "client",
"author": "Microsoft Corporation",
"version": "7.7.4",
"version": "7.8.0",
"license": "MIT",
"description": "Azure Service Bus SDK for JavaScript",
"homepage": "https://github.com/Azure/azure-sdk-for-js/tree/main/sdk/servicebus/service-bus/",
Expand Down
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ export interface ServiceBusReceiverOptions {
identifier?: string;
maxAutoLockRenewalDurationInMs?: number;
receiveMode?: "peekLock" | "receiveAndDelete";
skipConvertingDate?: boolean;
skipParsingBodyAsJson?: boolean;
subQueueType?: "deadLetter" | "transferDeadLetter";
}
Expand Down Expand Up @@ -549,6 +550,7 @@ export interface ServiceBusSessionReceiverOptions extends OperationOptionsBase {
identifier?: string;
maxAutoLockRenewalDurationInMs?: number;
receiveMode?: "peekLock" | "receiveAndDelete";
skipConvertingDate?: boolean;
skipParsingBodyAsJson?: boolean;
}

Expand Down
9 changes: 6 additions & 3 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ export class BatchingReceiver extends MessageReceiver {
return this.link;
},
this.receiveMode,
options.skipParsingBodyAsJson ?? false
options.skipParsingBodyAsJson ?? false,
options.skipConvertingDate ?? false
);
}

Expand Down Expand Up @@ -247,15 +248,17 @@ export class BatchingReceiverLite {
abortSignal?: AbortSignalLike
) => Promise<MinimalReceiver | undefined>,
private _receiveMode: ReceiveMode,
_skipParsingBodyAsJson: boolean
_skipParsingBodyAsJson: boolean,
_skipConvertingDate: boolean
) {
this._createServiceBusMessage = (context: MessageAndDelivery) => {
return new ServiceBusMessageImpl(
context.message!,
context.delivery!,
true,
this._receiveMode,
_skipParsingBodyAsJson
_skipParsingBodyAsJson,
_skipConvertingDate
);
};

Expand Down
18 changes: 13 additions & 5 deletions sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ export interface SendManagementRequestOptions extends SendRequestOptions {
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
/**
* Whether to skip converting Date type on properties of message annotations
* or application properties into numbers when receiving the message. By
* default, properties of Date type is converted into UNIX epoch number for
* compatibility.
*/
skipConvertingDate?: boolean;
}

/**
Expand Down Expand Up @@ -546,10 +553,10 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
const messages = result.body.messages as { message: Buffer }[];
for (const msg of messages) {
const decodedMessage = RheaMessageUtil.decode(msg.message);
const message = fromRheaMessage(
decodedMessage as any,
options?.skipParsingBodyAsJson ?? false
);
const message = fromRheaMessage(decodedMessage as any, {
skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false,
skipConvertingDate: options?.skipConvertingDate ?? false,
});
messageList.push(message);
this._lastPeekedSequenceNumber = message.sequenceNumber!;
}
Expand Down Expand Up @@ -866,7 +873,8 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
{ tag: msg["lock-token"] } as any,
false,
receiveMode,
options?.skipParsingBodyAsJson ?? false
options?.skipParsingBodyAsJson ?? false,
false
);
messageList.push(message);
}
Expand Down
8 changes: 8 additions & 0 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ export interface ReceiveOptions extends SubscribeOptions {
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson: boolean;

/**
* Whether to skip converting Date type on properties of message annotations
* or application properties into numbers when receiving the message. By
* default, properties of Date type is converted into UNIX epoch number for
* compatibility.
*/
skipConvertingDate: boolean;
}

/**
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ export class StreamingReceiver extends MessageReceiver {
context.delivery!,
true,
this.receiveMode,
options.skipParsingBodyAsJson ?? false
options.skipParsingBodyAsJson ?? false,
options.skipConvertingDate ?? false
);

this._lockRenewer?.start(this, bMessage, (err) => {
Expand Down
14 changes: 14 additions & 0 deletions sdk/servicebus/service-bus/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ export interface ServiceBusReceiverOptions {
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
/**
* Whether to skip converting Date type on properties of message annotations
* or application properties into numbers when receiving the message. By
* default, properties of Date type is converted into UNIX epoch number for
* compatibility.
*/
skipConvertingDate?: boolean;
/**
* Sets the name to identify the receiver. This can be used to correlate logs and exceptions.
* If not specified or empty, a random unique one will be used.
Expand Down Expand Up @@ -291,6 +298,13 @@ export interface ServiceBusSessionReceiverOptions extends OperationOptionsBase {
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
/**
* Whether to skip converting Date type on properties of message annotations
* or application properties into numbers when receiving the message. By
* default, properties of Date type is converted into UNIX epoch number for
* compatibility.
*/
skipConvertingDate?: boolean;
/**
* Sets the name to identify the session receiver. This can be used to correlate logs and exceptions.
* If not specified or empty, a random unique one will be used.
Expand Down
3 changes: 3 additions & 0 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
public receiveMode: "peekLock" | "receiveAndDelete",
maxAutoRenewLockDurationInMs: number,
private skipParsingBodyAsJson: boolean,
private skipConvertingDate: boolean = false,
retryOptions: RetryOptions = {},
identifier?: string
) {
Expand Down Expand Up @@ -372,6 +373,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
receiveMode: this.receiveMode,
lockRenewer: this._lockRenewer,
skipParsingBodyAsJson: this.skipParsingBodyAsJson,
skipConvertingDate: this.skipConvertingDate,
};
this._batchingReceiver = this._createBatchingReceiver(
this._context,
Expand Down Expand Up @@ -526,6 +528,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
retryOptions: this._retryOptions,
lockRenewer: this._lockRenewer,
skipParsingBodyAsJson: this.skipParsingBodyAsJson,
skipConvertingDate: this.skipConvertingDate,
});

// this ensures that if the outer service bus client is closed that this receiver is cleaned up.
Expand Down
3 changes: 3 additions & 0 deletions sdk/servicebus/service-bus/src/serviceBusClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ export class ServiceBusClient {
receiveMode,
maxLockAutoRenewDurationInMs,
options?.skipParsingBodyAsJson ?? false,
options?.skipConvertingDate ?? false,
this._clientOptions.retryOptions,
options?.identifier
);
Expand Down Expand Up @@ -362,6 +363,7 @@ export class ServiceBusClient {
abortSignal: options?.abortSignal,
retryOptions: this._clientOptions.retryOptions,
skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false,
skipConvertingDate: options?.skipConvertingDate ?? false,
}
);

Expand Down Expand Up @@ -449,6 +451,7 @@ export class ServiceBusClient {
abortSignal: options?.abortSignal,
retryOptions: this._clientOptions.retryOptions,
skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false,
skipConvertingDate: options?.skipConvertingDate ?? false,
}
);

Expand Down
46 changes: 33 additions & 13 deletions sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -516,16 +516,24 @@ export interface ServiceBusReceivedMessage extends ServiceBusMessage {
*/
export function fromRheaMessage(
rheaMessage: RheaMessage,
skipParsingBodyAsJson: boolean,
delivery?: Delivery,
shouldReorderLockToken?: boolean
options: {
skipParsingBodyAsJson: boolean;
skipConvertingDate?: boolean;
delivery?: Delivery;
shouldReorderLockToken?: boolean;
}
): ServiceBusReceivedMessage {
if (!rheaMessage) {
rheaMessage = {
body: undefined,
};
}

const {
skipParsingBodyAsJson,
delivery,
shouldReorderLockToken,
skipConvertingDate = false,
} = options;
const { body, bodyType } = defaultDataTransformer.decodeWithType(
rheaMessage.body,
skipParsingBodyAsJson
Expand All @@ -536,7 +544,9 @@ export function fromRheaMessage(
};

if (rheaMessage.application_properties != null) {
sbmsg.applicationProperties = convertDatesToNumbers(rheaMessage.application_properties);
sbmsg.applicationProperties = skipConvertingDate
? rheaMessage.application_properties
: convertDatesToNumbers(rheaMessage.application_properties);
}
if (rheaMessage.content_type != null) {
sbmsg.contentType = rheaMessage.content_type;
Expand Down Expand Up @@ -635,13 +645,19 @@ export function fromRheaMessage(
rawMessage.bodyType = bodyType;

if (rawMessage.applicationProperties) {
rawMessage.applicationProperties = convertDatesToNumbers(rawMessage.applicationProperties);
rawMessage.applicationProperties = skipConvertingDate
? rawMessage.applicationProperties
: convertDatesToNumbers(rawMessage.applicationProperties);
}
if (rawMessage.deliveryAnnotations) {
rawMessage.deliveryAnnotations = convertDatesToNumbers(rawMessage.deliveryAnnotations);
rawMessage.deliveryAnnotations = skipConvertingDate
? rawMessage.deliveryAnnotations
: convertDatesToNumbers(rawMessage.deliveryAnnotations);
}
if (rawMessage.messageAnnotations) {
rawMessage.messageAnnotations = convertDatesToNumbers(rawMessage.messageAnnotations);
rawMessage.messageAnnotations = skipConvertingDate
? rawMessage.messageAnnotations
: convertDatesToNumbers(rawMessage.messageAnnotations);
}

const rcvdsbmsg: ServiceBusReceivedMessage = {
Expand Down Expand Up @@ -899,13 +915,13 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage {
delivery: Delivery,
shouldReorderLockToken: boolean,
receiveMode: ReceiveMode,
skipParsingBodyAsJson: boolean
skipParsingBodyAsJson: boolean,
skipConvertingDate: boolean
) {
const { _rawAmqpMessage, ...restOfMessageProps } = fromRheaMessage(
msg,
skipParsingBodyAsJson,
delivery,
shouldReorderLockToken

{ skipParsingBodyAsJson, delivery, shouldReorderLockToken, skipConvertingDate }
);
this._rawAmqpMessage = _rawAmqpMessage; // need to initialize _rawAmqpMessage property to make compiler happy
Object.assign(this, restOfMessageProps);
Expand Down Expand Up @@ -970,7 +986,11 @@ function convertDatesToNumbers<T = unknown>(thing: T): T {
[0, 'foo', new Date(), { nested: new Date()}]
*/
if (Array.isArray(thing)) {
return thing.map(convertDatesToNumbers) as unknown as T;
const result = [];
for (const element of thing) {
result.push(convertDatesToNumbers(element));
}
return result as unknown as T;
}

/*
Expand Down
16 changes: 14 additions & 2 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export type MessageSessionOptions = Pick<
receiveMode?: ReceiveMode;
retryOptions: RetryOptions | undefined;
skipParsingBodyAsJson: boolean;
skipConvertingDate: boolean;
};

/**
Expand Down Expand Up @@ -186,6 +187,14 @@ export class MessageSession extends LinkEntity<Receiver> {
*/
private skipParsingBodyAsJson: boolean;

/**
* Whether to skip converting Date type on properties of message annotations
* or application properties into numbers when receiving the message. By
* default, properties of Date type is converted into UNIX epoch number for
* compatibility.
*/
private skipConvertingDate: boolean;

public get receiverHelper(): ReceiverHelper {
return this._receiverHelper;
}
Expand Down Expand Up @@ -391,6 +400,7 @@ export class MessageSession extends LinkEntity<Receiver> {
if (isDefined(this._providedSessionId)) this.sessionId = this._providedSessionId;
this.receiveMode = options.receiveMode || "peekLock";
this.skipParsingBodyAsJson = options.skipParsingBodyAsJson;
this.skipConvertingDate = options.skipConvertingDate;
this.maxAutoRenewDurationInMs =
options.maxAutoLockRenewalDurationInMs != null
? options.maxAutoLockRenewalDurationInMs
Expand All @@ -406,7 +416,8 @@ export class MessageSession extends LinkEntity<Receiver> {
return this.link!;
},
this.receiveMode,
this.skipParsingBodyAsJson
this.skipParsingBodyAsJson,
this.skipConvertingDate
);

// setting all the handlers
Expand Down Expand Up @@ -648,7 +659,8 @@ export class MessageSession extends LinkEntity<Receiver> {
context.delivery!,
true,
this.receiveMode,
this.skipParsingBodyAsJson
this.skipParsingBodyAsJson,
this.skipConvertingDate
);

try {
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/
export const packageJsonInfo = {
name: "@azure/service-bus",
version: "7.7.4",
version: "7.8.0",
};

/**
Expand Down
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/test/internal/retries.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ describe("Retries - Receive methods", () => {
lockRenewer: undefined,
receiveMode: "peekLock",
skipParsingBodyAsJson: false,
skipConvertingDate: false,
}
);
batchingReceiver.isOpen = () => true;
Expand Down
Loading