Skip to content

Commit

Permalink
Rename getAmqpAnnotatedMessage to getRawAmqpMessage (#17712)
Browse files Browse the repository at this point in the history
Rename getAmqpAnnotatedMessage to getRawAmqpMessage
  • Loading branch information
hemanttanwar authored Nov 20, 2020
1 parent 21dea30 commit 2555177
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ public ServiceBusMessage(BinaryData body) {
public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) {
Objects.requireNonNull(receivedMessage, "'receivedMessage' cannot be null.");

final AmqpMessageBodyType bodyType = receivedMessage.getAmqpAnnotatedMessage().getBody().getBodyType();
final AmqpMessageBodyType bodyType = receivedMessage.getRawAmqpMessage().getBody().getBodyType();
AmqpMessageBody amqpMessageBody;
switch (bodyType) {
case DATA:
amqpMessageBody = AmqpMessageBody.fromData(receivedMessage.getAmqpAnnotatedMessage().getBody()
amqpMessageBody = AmqpMessageBody.fromData(receivedMessage.getRawAmqpMessage().getBody()
.getFirstData());
break;
case SEQUENCE:
Expand All @@ -136,7 +136,7 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) {
this.amqpAnnotatedMessage = new AmqpAnnotatedMessage(amqpMessageBody);

// set properties
final AmqpMessageProperties receivedProperties = receivedMessage.getAmqpAnnotatedMessage().getProperties();
final AmqpMessageProperties receivedProperties = receivedMessage.getRawAmqpMessage().getProperties();
final AmqpMessageProperties newProperties = amqpAnnotatedMessage.getProperties();
newProperties.setMessageId(receivedProperties.getMessageId());
newProperties.setUserId(receivedProperties.getUserId());
Expand All @@ -153,15 +153,15 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) {
newProperties.setReplyToGroupId(receivedProperties.getReplyToGroupId());

// copy header except for delivery count which should be set to null
final AmqpMessageHeader receivedHeader = receivedMessage.getAmqpAnnotatedMessage().getHeader();
final AmqpMessageHeader receivedHeader = receivedMessage.getRawAmqpMessage().getHeader();
final AmqpMessageHeader newHeader = this.amqpAnnotatedMessage.getHeader();
newHeader.setPriority(receivedHeader.getPriority());
newHeader.setTimeToLive(receivedHeader.getTimeToLive());
newHeader.setDurable(receivedHeader.isDurable());
newHeader.setFirstAcquirer(receivedHeader.isFirstAcquirer());

// copy message annotations except for broker set ones
final Map<String, Object> receivedAnnotations = receivedMessage.getAmqpAnnotatedMessage()
final Map<String, Object> receivedAnnotations = receivedMessage.getRawAmqpMessage()
.getMessageAnnotations();
final Map<String, Object> newAnnotations = this.amqpAnnotatedMessage.getMessageAnnotations();

Expand All @@ -178,23 +178,23 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) {
}

// copy delivery annotations
final Map<String, Object> receivedDelivery = receivedMessage.getAmqpAnnotatedMessage().getDeliveryAnnotations();
final Map<String, Object> receivedDelivery = receivedMessage.getRawAmqpMessage().getDeliveryAnnotations();
final Map<String, Object> newDelivery = this.amqpAnnotatedMessage.getDeliveryAnnotations();

for (Map.Entry<String, Object> entry: receivedDelivery.entrySet()) {
newDelivery.put(entry.getKey(), entry.getValue());
}

// copy Footer
final Map<String, Object> receivedFooter = receivedMessage.getAmqpAnnotatedMessage().getFooter();
final Map<String, Object> receivedFooter = receivedMessage.getRawAmqpMessage().getFooter();
final Map<String, Object> newFooter = this.amqpAnnotatedMessage.getFooter();

for (Map.Entry<String, Object> entry: receivedFooter.entrySet()) {
newFooter.put(entry.getKey(), entry.getValue());
}

// copy application properties except for broker set ones
final Map<String, Object> receivedApplicationProperties = receivedMessage.getAmqpAnnotatedMessage()
final Map<String, Object> receivedApplicationProperties = receivedMessage.getRawAmqpMessage()
.getApplicationProperties();
final Map<String, Object> newApplicationProperties = this.amqpAnnotatedMessage.getApplicationProperties();

Expand All @@ -215,7 +215,7 @@ public ServiceBusMessage(ServiceBusReceivedMessage receivedMessage) {
*
* @return the amqp message.
*/
public AmqpAnnotatedMessage getAmqpAnnotatedMessage() {
public AmqpAnnotatedMessage getRawAmqpMessage() {
return amqpAnnotatedMessage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public <T> Message serialize(T object) {
amqpMessage.setReplyToGroupId(brokeredMessage.getReplyToSessionId());
amqpMessage.setGroupId(brokeredMessage.getSessionId());

final AmqpMessageProperties brokeredProperties = brokeredMessage.getAmqpAnnotatedMessage().getProperties();
final AmqpMessageProperties brokeredProperties = brokeredMessage.getRawAmqpMessage().getProperties();

amqpMessage.setContentEncoding(brokeredProperties.getContentEncoding());
if (brokeredProperties.getGroupSequence() != null) {
Expand All @@ -166,10 +166,10 @@ public <T> Message serialize(T object) {
}

//set footer
amqpMessage.setFooter(new Footer(brokeredMessage.getAmqpAnnotatedMessage().getFooter()));
amqpMessage.setFooter(new Footer(brokeredMessage.getRawAmqpMessage().getFooter()));

//set header
AmqpMessageHeader header = brokeredMessage.getAmqpAnnotatedMessage().getHeader();
AmqpMessageHeader header = brokeredMessage.getRawAmqpMessage().getHeader();
if (header.getDeliveryCount() != null) {
amqpMessage.setDeliveryCount(header.getDeliveryCount());
}
Expand Down Expand Up @@ -203,7 +203,7 @@ public <T> Message serialize(T object) {
// Set Delivery Annotations.
final Map<Symbol, Object> deliveryAnnotationsMap = new HashMap<>();

final Map<String, Object> deliveryAnnotations = brokeredMessage.getAmqpAnnotatedMessage()
final Map<String, Object> deliveryAnnotations = brokeredMessage.getRawAmqpMessage()
.getDeliveryAnnotations();
for (Map.Entry<String, Object> deliveryEntry : deliveryAnnotations.entrySet()) {
deliveryAnnotationsMap.put(Symbol.valueOf(deliveryEntry.getKey()), deliveryEntry.getValue());
Expand Down Expand Up @@ -355,7 +355,7 @@ private ServiceBusReceivedMessage deserializeMessage(Message amqpMessage) {
bytes = EMPTY_BYTE_ARRAY;
}
final ServiceBusReceivedMessage brokeredMessage = new ServiceBusReceivedMessage(BinaryData.fromBytes(bytes));
AmqpAnnotatedMessage brokeredAmqpAnnotatedMessage = brokeredMessage.getAmqpAnnotatedMessage();
AmqpAnnotatedMessage brokeredAmqpAnnotatedMessage = brokeredMessage.getRawAmqpMessage();

// Application properties
ApplicationProperties applicationProperties = amqpMessage.getApplicationProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public final class ServiceBusReceivedMessage {
*
* @return the {@link AmqpAnnotatedMessage} representing amqp message.
*/
public AmqpAnnotatedMessage getAmqpAnnotatedMessage() {
public AmqpAnnotatedMessage getRawAmqpMessage() {
return amqpAnnotatedMessage;
}

Expand Down Expand Up @@ -407,7 +407,7 @@ public long getSequenceNumber() {
* @return Session Id of the {@link ServiceBusReceivedMessage}.
*/
public String getSessionId() {
return getAmqpAnnotatedMessage().getProperties().getGroupId();
return getRawAmqpMessage().getProperties().getGroupId();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ void deserializeMessage() {
assertEquals(message.getContentType(), actualMessage.getContentType());
assertEquals(message.getCorrelationId(), actualMessage.getCorrelationId());

assertValues(expectedMessageAnnotations, actualMessage.getAmqpAnnotatedMessage().getMessageAnnotations());
assertValues(expectedDeliveryAnnotations, actualMessage.getAmqpAnnotatedMessage().getDeliveryAnnotations());
assertValues(expectedFooterValues, actualMessage.getAmqpAnnotatedMessage().getFooter());
assertValues(expectedMessageAnnotations, actualMessage.getRawAmqpMessage().getMessageAnnotations());
assertValues(expectedDeliveryAnnotations, actualMessage.getRawAmqpMessage().getDeliveryAnnotations());
assertValues(expectedFooterValues, actualMessage.getRawAmqpMessage().getFooter());

// Verifying our application properties are the same.
assertEquals(APPLICATION_PROPERTIES.size(), actualMessage.getApplicationProperties().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public void copyConstructorTest() {


final ServiceBusReceivedMessage expected = new ServiceBusReceivedMessage(PAYLOAD_BINARY);
expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), "10");
expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue(), "abc");
expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), "11");
expected.getAmqpAnnotatedMessage().getMessageAnnotations().put(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue(), "11");
expected.getAmqpAnnotatedMessage().getApplicationProperties().put(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue(), "abc");
expected.getAmqpAnnotatedMessage().getApplicationProperties().put(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue(), "abc");
expected.getRawAmqpMessage().getMessageAnnotations().put(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), "10");
expected.getRawAmqpMessage().getMessageAnnotations().put(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue(), "abc");
expected.getRawAmqpMessage().getMessageAnnotations().put(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(), "11");
expected.getRawAmqpMessage().getMessageAnnotations().put(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue(), "11");
expected.getRawAmqpMessage().getApplicationProperties().put(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue(), "abc");
expected.getRawAmqpMessage().getApplicationProperties().put(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue(), "abc");
expected.setSubject(expectedSubject);
expected.setTo(expectedTo);
expected.setReplyTo(expectedReplyTo);
Expand All @@ -74,12 +74,12 @@ public void copyConstructorTest() {
expected.setTimeToLive(expectedTimeToLive);
expected.setPartitionKey(expectedPartitionKey);

expected.getAmqpAnnotatedMessage().getHeader().setPriority(expectedPriority);
expected.getRawAmqpMessage().getHeader().setPriority(expectedPriority);

final Map<String, Object> expectedFooter = expected.getAmqpAnnotatedMessage().getFooter();
final Map<String, Object> expectedFooter = expected.getRawAmqpMessage().getFooter();
expectedFooter.put("foo-1", expectedFooterValue);

final Map<String, Object> expectedDeliveryAnnotations = expected.getAmqpAnnotatedMessage().getDeliveryAnnotations();
final Map<String, Object> expectedDeliveryAnnotations = expected.getRawAmqpMessage().getDeliveryAnnotations();
expectedDeliveryAnnotations.put("da-1", expectedDeliveryAnnotationsValue);

final Map<String, Object> expectedApplicationProperties = expected.getApplicationProperties();
Expand All @@ -98,14 +98,14 @@ public void copyConstructorTest() {
expected.setPartitionKey("new-p-key");

// Change original values
expected.getAmqpAnnotatedMessage().getHeader().setPriority((short) (expectedPriority + 1));
expected.getRawAmqpMessage().getHeader().setPriority((short) (expectedPriority + 1));
expectedFooter.put("foo-1", expectedFooterValue + "-changed");
expected.getAmqpAnnotatedMessage().getDeliveryAnnotations().put("da-1", expectedDeliveryAnnotationsValue + "-changed");
expected.getAmqpAnnotatedMessage().getApplicationProperties().put("ap-1", expectedApplicationValue + "-changed");
expected.getRawAmqpMessage().getDeliveryAnnotations().put("da-1", expectedDeliveryAnnotationsValue + "-changed");
expected.getRawAmqpMessage().getApplicationProperties().put("ap-1", expectedApplicationValue + "-changed");


// Assert
assertNotSame(expected.getAmqpAnnotatedMessage(), actual.getAmqpAnnotatedMessage());
assertNotSame(expected.getRawAmqpMessage(), actual.getRawAmqpMessage());

// Validate updated values
assertEquals(expectedSubject, actual.getSubject());
Expand All @@ -117,21 +117,21 @@ public void copyConstructorTest() {
assertEquals(expectedPartitionKey, actual.getPartitionKey());

// Following values should be reset.
assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(LOCKED_UNTIL_KEY_ANNOTATION_NAME.getValue()));
assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()));
assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue()));
assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()));
assertNull(actual.getAmqpAnnotatedMessage().getMessageAnnotations().get(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getMessageAnnotations().get(LOCKED_UNTIL_KEY_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getMessageAnnotations().get(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getMessageAnnotations().get(DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getMessageAnnotations().get(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getMessageAnnotations().get(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue()));

assertNull(actual.getAmqpAnnotatedMessage().getApplicationProperties().get(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue()));
assertNull(actual.getAmqpAnnotatedMessage().getApplicationProperties().get(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue()));
assertNull(actual.getAmqpAnnotatedMessage().getHeader().getDeliveryCount());
assertNull(actual.getRawAmqpMessage().getApplicationProperties().get(DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getApplicationProperties().get(DEAD_LETTER_REASON_ANNOTATION_NAME.getValue()));
assertNull(actual.getRawAmqpMessage().getHeader().getDeliveryCount());

// Testing , updating original message did not change copied message values..
assertEquals(expectedPriority, actual.getAmqpAnnotatedMessage().getHeader().getPriority());
assertEquals(expectedFooterValue, actual.getAmqpAnnotatedMessage().getFooter().get("foo-1").toString());
assertEquals(expectedDeliveryAnnotationsValue, actual.getAmqpAnnotatedMessage().getDeliveryAnnotations().get("da-1").toString());
assertEquals(expectedApplicationValue, actual.getAmqpAnnotatedMessage().getApplicationProperties().get("ap-1").toString());
assertEquals(expectedPriority, actual.getRawAmqpMessage().getHeader().getPriority());
assertEquals(expectedFooterValue, actual.getRawAmqpMessage().getFooter().get("foo-1").toString());
assertEquals(expectedDeliveryAnnotationsValue, actual.getRawAmqpMessage().getDeliveryAnnotations().get("da-1").toString());
assertEquals(expectedApplicationValue, actual.getRawAmqpMessage().getApplicationProperties().get("ap-1").toString());

}

Expand Down Expand Up @@ -176,15 +176,15 @@ public void copyConstructorModifyAfterCopyTest() {

// Assert
// Validate updated values
assertEquals(expectedSubject, originalMessage.getAmqpAnnotatedMessage().getProperties().getSubject());
assertEquals(expectedTo, originalMessage.getAmqpAnnotatedMessage().getProperties().getTo().toString());
assertEquals(expectedReplyTo, originalMessage.getAmqpAnnotatedMessage().getProperties().getReplyTo().toString());
assertEquals(expectedReplyToSessionId, originalMessage.getAmqpAnnotatedMessage().getProperties().getReplyToGroupId());
assertEquals(expectedCorrelationId, originalMessage.getAmqpAnnotatedMessage().getProperties().getCorrelationId().toString());
assertEquals(expectedSubject, originalMessage.getRawAmqpMessage().getProperties().getSubject());
assertEquals(expectedTo, originalMessage.getRawAmqpMessage().getProperties().getTo().toString());
assertEquals(expectedReplyTo, originalMessage.getRawAmqpMessage().getProperties().getReplyTo().toString());
assertEquals(expectedReplyToSessionId, originalMessage.getRawAmqpMessage().getProperties().getReplyToGroupId());
assertEquals(expectedCorrelationId, originalMessage.getRawAmqpMessage().getProperties().getCorrelationId().toString());

assertEquals(expectedTimeToLive, originalMessage.getAmqpAnnotatedMessage().getHeader().getTimeToLive());
assertEquals(expectedTimeToLive, originalMessage.getRawAmqpMessage().getHeader().getTimeToLive());

assertEquals(expectedPartitionKey, originalMessage.getAmqpAnnotatedMessage().getMessageAnnotations().get(PARTITION_KEY_ANNOTATION_NAME.getValue()));
assertEquals(expectedPartitionKey, originalMessage.getRawAmqpMessage().getMessageAnnotations().get(PARTITION_KEY_ANNOTATION_NAME.getValue()));
}

/**
Expand Down
Loading

0 comments on commit 2555177

Please sign in to comment.