Skip to content

Commit

Permalink
merge master and resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
yunhaoling committed Oct 3, 2020
2 parents 191be47 + 26f7137 commit 0d2dbeb
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 75 deletions.
1 change: 1 addition & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

* Passing any type other than `ReceiveMode` as parameter `receive_mode` now throws a `TypeError` instead of `AttributeError`.
* Administration Client calls now take only entity names, not `<Entity>Descriptions` as well to reduce ambiguity in which entity was being acted on. TypeError will now be thrown on improper parameter types (non-string).
* `AMQPMessage` (`Message.amqp_message`) properties are now read-only, changes of these properties would not be reflected in the underlying message. This may be subject to change before GA.

**BugFixes**

Expand Down
120 changes: 77 additions & 43 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import uuid
import functools
import logging
import copy
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable, Any

import uamqp.message
Expand Down Expand Up @@ -127,7 +128,7 @@ def __init__(self, body, **kwargs):
self.partition_key = kwargs.pop("partition_key", None)
self.via_partition_key = kwargs.pop("via_partition_key", None)
# If message is the full message, amqp_message is the "public facing interface" for what we expose.
self.amqp_message = AMQPMessage(self.message)
self.amqp_message = AMQPMessage(self.message) # type: AMQPMessage

def __str__(self):
return str(self.message)
Expand Down Expand Up @@ -1088,73 +1089,106 @@ def renew_lock(self, **kwargs):

class AMQPMessage(object):
"""
The internal AMQP message that this ServiceBusMessage represents.
:param properties: Properties to add to the message.
:type properties: ~uamqp.message.MessageProperties
:param application_properties: Service specific application properties.
:type application_properties: dict
:param annotations: Service specific message annotations. Keys in the dictionary
must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`.
:type annotations: dict
:param delivery_annotations: Delivery-specific non-standard properties at the head of the message.
Delivery annotations convey information from the sending peer to the receiving peer.
Keys in the dictionary must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`.
:type delivery_annotations: dict
:param header: The message header.
:type header: ~uamqp.message.MessageHeader
:param footer: The message footer.
:type footer: dict
The internal AMQP message that this ServiceBusMessage represents. Is read-only.
"""
def __init__(self, message):
# type: (uamqp.Message) -> None
self._message = message

@property
def properties(self):
return self._message.properties
# type: () -> uamqp.message.MessageProperties
"""
Properties to add to the message.
@properties.setter
def properties(self, value):
self._message.properties = value
:rtype: ~uamqp.message.MessageProperties
"""
return uamqp.message.MessageProperties(message_id=self._message.properties.message_id,
user_id=self._message.properties.user_id,
to=self._message.properties.to,
subject=self._message.properties.subject,
reply_to=self._message.properties.reply_to,
correlation_id=self._message.properties.correlation_id,
content_type=self._message.properties.content_type,
content_encoding=self._message.properties.content_encoding
)

# NOTE: These are disabled pending arch. design and cross-sdk consensus on
# how we will expose sendability of amqp focused messages. To undo, uncomment and remove deepcopies/workarounds.
#
#@properties.setter
#def properties(self, value):
# self._message.properties = value

@property
def application_properties(self):
return self._message.application_properties
# type: () -> dict
"""
Service specific application properties.
@application_properties.setter
def application_properties(self, value):
self._message.application_properties = value
:rtype: dict
"""
return copy.deepcopy(self._message.application_properties)

#@application_properties.setter
#def application_properties(self, value):
# self._message.application_properties = value

@property
def annotations(self):
return self._message.annotations
# type: () -> dict
"""
Service specific message annotations. Keys in the dictionary
must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`.
:rtype: dict
"""
return copy.deepcopy(self._message.annotations)

@annotations.setter
def annotations(self, value):
self._message.annotations = value
#@annotations.setter
#def annotations(self, value):
# self._message.annotations = value

@property
def delivery_annotations(self):
return self._message.delivery_annotations
# type: () -> dict
"""
Delivery-specific non-standard properties at the head of the message.
Delivery annotations convey information from the sending peer to the receiving peer.
Keys in the dictionary must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`.
@delivery_annotations.setter
def delivery_annotations(self, value):
self._message.delivery_annotations = value
:rtype: dict
"""
return copy.deepcopy(self._message.delivery_annotations)

#@delivery_annotations.setter
#def delivery_annotations(self, value):
# self._message.delivery_annotations = value

@property
def header(self):
return self._message.header
# type: () -> uamqp.message.MessageHeader
"""
The message header.
@header.setter
def header(self, value):
self._message.header = value
:rtype: ~uamqp.message.MessageHeader
"""
return uamqp.message.MessageHeader(header=self._message.header)

#@header.setter
#def header(self, value):
# self._message.header = value

@property
def footer(self):
return self._message.footer
# type: () -> dict
"""
The message footer.
:rtype: dict
"""
return copy.deepcopy(self._message.footer)

@footer.setter
def footer(self, value):
self._message.footer = value
#@footer.setter
#def footer(self, value):
# self._message.footer = value
71 changes: 39 additions & 32 deletions sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test_receive_and_delete_reconnect_interaction(self, servicebus_namespace_con

with sb_client.get_queue_sender(servicebus_queue.name) as sender:
for i in range(5):
sender.send_messages(Message("Message {}".format(i)), timeout=5)
sender.send_messages(Message("Message {}".format(i)))

with sb_client.get_queue_receiver(servicebus_queue.name,
receive_mode=ReceiveMode.ReceiveAndDelete,
Expand Down Expand Up @@ -123,7 +123,6 @@ def test_queue_by_queue_client_conn_str_receive_handler_peeklock(self, servicebu
message.to = 'to'
message.reply_to = 'reply_to'
sender.send_messages(message)
assert sender._handler._msg_timeout == 0

receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)
count = 0
Expand Down Expand Up @@ -427,7 +426,7 @@ def test_queue_by_servicebus_client_iter_messages_with_retrieve_deferred_client(
message.defer()

assert count == 10
deferred = receiver.receive_deferred_messages(deferred_messages, timeout=5)
deferred = receiver.receive_deferred_messages(deferred_messages)
assert len(deferred) == 10
for message in deferred:
assert isinstance(message, ReceivedMessage)
Expand Down Expand Up @@ -468,8 +467,6 @@ def test_queue_by_servicebus_client_iter_messages_with_retrieve_deferred_receive
with sb_client.get_queue_receiver(servicebus_queue.name,
max_wait_time=5,
receive_mode=ReceiveMode.PeekLock) as receiver:
with pytest.raises(ValueError):
receiver.receive_deferred_messages(deferred_messages, timeout=0)
deferred = receiver.receive_deferred_messages(deferred_messages)
assert len(deferred) == 10
for message in deferred:
Expand All @@ -489,7 +486,7 @@ def test_queue_by_servicebus_client_iter_messages_with_retrieve_deferred_receive
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:


with sb_client.get_queue_sender(servicebus_queue.name) as sender:
deferred_messages = []
for i in range(10):
Expand All @@ -510,7 +507,7 @@ def test_queue_by_servicebus_client_iter_messages_with_retrieve_deferred_receive

with sb_client.get_queue_receiver(servicebus_queue.name,
max_wait_time=5) as receiver:
deferred = receiver.receive_deferred_messages(deferred_messages, timeout=None)
deferred = receiver.receive_deferred_messages(deferred_messages)
assert len(deferred) == 10
for message in deferred:
assert isinstance(message, ReceivedMessage)
Expand Down Expand Up @@ -739,7 +736,7 @@ def test_queue_by_servicebus_client_browse_messages_client(self, servicebus_name
sender.send_messages(message)

with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
messages = receiver.peek_messages(5, timeout=5)
messages = receiver.peek_messages(5)
assert len(messages) == 5
assert all(isinstance(m, PeekedMessage) for m in messages)
for message in messages:
Expand Down Expand Up @@ -831,7 +828,7 @@ def test_queue_by_servicebus_client_browse_empty_messages(self, servicebus_names
max_wait_time=5,
receive_mode=ReceiveMode.PeekLock,
prefetch_count=10) as receiver:
messages = receiver.peek_messages(10, timeout=None)
messages = receiver.peek_messages(10)
assert len(messages) == 0


Expand Down Expand Up @@ -887,7 +884,7 @@ def test_queue_by_servicebus_client_renew_message_locks(self, servicebus_namespa
assert not m._lock_expired
time.sleep(5)
initial_expiry = m.locked_until_utc
m.renew_lock(timeout=5)
m.renew_lock()
assert (m.locked_until_utc - initial_expiry) >= timedelta(seconds=5)
finally:
messages[0].complete()
Expand Down Expand Up @@ -1277,7 +1274,7 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_
received_messages.append(message)
message.complete()

tokens = sender.schedule_messages(received_messages, scheduled_enqueue_time, timeout=5)
tokens = sender.schedule_messages(received_messages, scheduled_enqueue_time)
assert len(tokens) == 2

messages = receiver.receive_messages(max_wait_time=120)
Expand Down Expand Up @@ -1328,7 +1325,7 @@ def test_queue_cancel_scheduled_messages(self, servicebus_namespace_connection_s
tokens = sender.schedule_messages([message_a, message_b], enqueue_time)
assert len(tokens) == 2

sender.cancel_scheduled_messages(tokens, timeout=None)
sender.cancel_scheduled_messages(tokens)

messages = receiver.receive_messages(max_wait_time=120)
try:
Expand Down Expand Up @@ -1880,20 +1877,21 @@ def test_message_inner_amqp_properties(self, servicebus_namespace_connection_str

message = Message("body")

with pytest.raises(TypeError):
with pytest.raises(AttributeError): # Note: If this is made read-writeable, this would be TypeError
message.amqp_message.properties = {"properties":1}
message.amqp_message.properties.subject = "subject"

message.amqp_message.application_properties = {b"application_properties":1}

message.amqp_message.annotations = {b"annotations":2}
message.amqp_message.delivery_annotations = {b"delivery_annotations":3}

with pytest.raises(TypeError):
message.amqp_message.header = {"header":4}
message.amqp_message.header.priority = 5

message.amqp_message.footer = {b"footer":6}
# NOTE: These are disabled pending cross-language-sdk consensus on sendability/writeability.
# message.amqp_message.properties.subject = "subject"
#
# message.amqp_message.application_properties = {b"application_properties":1}
#
# message.amqp_message.annotations = {b"annotations":2}
# message.amqp_message.delivery_annotations = {b"delivery_annotations":3}
#
# with pytest.raises(TypeError):
# message.amqp_message.header = {"header":4}
# message.amqp_message.header.priority = 5
#
# message.amqp_message.footer = {b"footer":6}

with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
Expand All @@ -1902,12 +1900,21 @@ def test_message_inner_amqp_properties(self, servicebus_namespace_connection_str
sender.send_messages(message)
with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:
message = receiver.receive_messages()[0]
assert message.amqp_message.properties.subject == b"subject"
assert message.amqp_message.application_properties[b"application_properties"] == 1
assert message.amqp_message.annotations[b"annotations"] == 2
assert message.amqp_message.delivery_annotations[b"delivery_annotations"] == 3
assert message.amqp_message.header.priority == 5
assert message.amqp_message.footer[b"footer"] == 6
assert message.amqp_message.application_properties == None \
and message.amqp_message.annotations != None \
and message.amqp_message.delivery_annotations != None \
and message.amqp_message.footer == None \
and message.amqp_message.properties != None \
and message.amqp_message.header != None
# NOTE: These are disabled pending cross-language-sdk consensus on sendability/writeability.
#
# assert message.amqp_message.properties.subject == b"subject"
# assert message.amqp_message.application_properties[b"application_properties"] == 1
# assert message.amqp_message.annotations[b"annotations"] == 2
# # delivery_annotations and footer disabled pending uamqp bug https://github.com/Azure/azure-uamqp-python/issues/169
# #assert message.amqp_message.delivery_annotations[b"delivery_annotations"] == 3
# assert message.amqp_message.header.priority == 5
# #assert message.amqp_message.footer[b"footer"] == 6

@pytest.mark.liveTest
@pytest.mark.live_test_only
Expand Down Expand Up @@ -1963,4 +1970,4 @@ def hack_mgmt_execute(self, operation, op_type, message, timeout=0):
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
with pytest.raises(OperationTimeoutError):
scheduled_time_utc = utc_now() + timedelta(seconds=30)
sender.schedule_messages(Message("Message to be scheduled"), scheduled_time_utc, timeout=5)
sender.schedule_messages(Message("Message to be scheduled"), scheduled_time_utc, timeout=5)

0 comments on commit 0d2dbeb

Please sign in to comment.