-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ServiceBus] AMQPAnnotatedMessage API Review Feedback #18432
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,6 @@ | |
import datetime | ||
import uuid | ||
import logging | ||
import copy | ||
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Any, Mapping, cast | ||
|
||
import six | ||
|
@@ -34,10 +33,8 @@ | |
ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME, | ||
ANNOTATION_SYMBOL_KEY_MAP, | ||
MESSAGE_PROPERTY_MAX_LENGTH, | ||
AMQPMessageBodyType, | ||
AMQP_MESSAGE_BODY_TYPE_MAP, | ||
) | ||
|
||
from ..amqp import AMQPAnnotatedMessage | ||
from ..exceptions import MessageSizeExceededError | ||
from .utils import ( | ||
utc_from_timestamp, | ||
|
@@ -80,6 +77,7 @@ class ServiceBusMessage( | |
|
||
:ivar AMQPAnnotatedMessage raw_amqp_message: Advanced use only. | ||
The internal AMQP message payload that is sent or received. | ||
:vartype raw_amqp_message: ~azure.servicebus.amqp.AMQPAnnotatedMessage | ||
|
||
.. admonition:: Example: | ||
|
||
|
@@ -317,30 +315,18 @@ def scheduled_enqueue_time_utc(self, value): | |
|
||
@property | ||
def body(self): | ||
# type: () -> Any | ||
"""The body of the Message. The format may vary depending | ||
on the body type: | ||
For ~azure.servicebus.AMQPMessageBodyType.DATA, the body could be bytes or Iterable[bytes] | ||
For ~azure.servicebus.AMQPMessageBodyType.SEQUENCE, the body could be List or Iterable[List] | ||
For ~azure.servicebus.AMQPMessageBodyType.VALUE, the body could be any type. | ||
|
||
:rtype: Any | ||
# type: () -> Optional[Union[bytes, Iterable[bytes]]] | ||
"""The body of the Message. | ||
:rtype: bytes or Iterable[bytes] | ||
""" | ||
return self.message.get_data() | ||
# pylint: disable=protected-access | ||
if not self.message._message or not self.message._body: | ||
return None | ||
|
||
@property | ||
def body_type(self): | ||
# type: () -> Optional[AMQPMessageBodyType] | ||
"""The body type of the underlying AMQP message. | ||
if self.message._body.type != uamqp.MessageBodyType.Data: | ||
return None # TODO: TBD, raise TypeError vs return None | ||
Comment on lines
+326
to
+327
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thoughts: if message.body == None:
message.raw_amqp_message.body
# vs
try:
message.body
except TypeError:
message.raw_amqp_message.body although raising error on property seems not common in python, but it seems be a stronger indicator to me in this dedicated scenario.. there're some discussions: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wasn't there during the original discussion, but looking through these questions, it seems that raising an error is not preferred. Is there a specific scenario you have in mind where the try/except would work and the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having a case in mind which is pushing me to the corner case for sending None:
(well, actually I have a way to convince myself that ServiceBusMessage.body is always trying to extract the data body, so in the sequence/value body case, data body is None which returns None is fair enough) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. find one raising error property sample from requests content: similar in httpx lib: |
||
|
||
rtype: Optional[~azure.servicebus.AMQPMessageBodyType] | ||
""" | ||
try: | ||
return AMQP_MESSAGE_BODY_TYPE_MAP.get( | ||
self.message._body.type # pylint: disable=protected-access | ||
) | ||
except AttributeError: | ||
return None | ||
return self.message.get_data() | ||
|
||
@property | ||
def content_type(self): | ||
|
@@ -587,7 +573,7 @@ def add_message(self, message): | |
be raised. | ||
|
||
:param message: The Message to be added to the batch. | ||
:type message: Union[~azure.servicebus.ServiceBusMessage, ~azure.servicebus.AMQPAnnotatedMessage] | ||
:type message: Union[~azure.servicebus.ServiceBusMessage, ~azure.servicebus.amqp.AMQPAnnotatedMessage] | ||
:rtype: None | ||
:raises: :class: ~azure.servicebus.exceptions.MessageSizeExceededError, when exceeding the size limit. | ||
""" | ||
|
@@ -878,253 +864,3 @@ def locked_until_utc(self): | |
expiry_in_seconds = self.message.annotations[_X_OPT_LOCKED_UNTIL] / 1000 | ||
self._expiry = utc_from_timestamp(expiry_in_seconds) | ||
return self._expiry | ||
|
||
|
||
class AMQPAnnotatedMessage(object): | ||
""" | ||
The AMQP Annotated Message for advanced sending and receiving scenarios which allows you to | ||
access to low-level AMQP message sections. | ||
Please refer to the AMQP spec: | ||
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format | ||
for more information on the message format. | ||
|
||
:keyword data_body: The body consists of one or more data sections and each section contains opaque binary data. | ||
:paramtype data_body: Union[str, bytes, List[Union[str, bytes]]] | ||
:keyword sequence_body: The body consists of one or more sequence sections and | ||
each section contains an arbitrary number of structured data elements. | ||
:paramtype sequence_body: List[Any] | ||
:keyword value_body: The body consists of one amqp-value section and the section contains a single AMQP value. | ||
:paramtype value_body: Any | ||
:keyword header: The amqp message header. This must be a dictionary with the following | ||
keys: | ||
|
||
- `delivery_count` (int) | ||
- `time_to_live` (int) | ||
- `durable` (bool) | ||
- `first_acquirer` (bool) | ||
- `priority` (int) | ||
|
||
:paramtype header: dict | ||
:keyword footer: The amqp message footer. | ||
:paramtype footer: dict | ||
:keyword properties: Properties to add to the amqp message. This must be a dictionary with the following | ||
keys: | ||
|
||
- message_id (str or bytes), | ||
- user_id (str or bytes), | ||
- to (Any), | ||
- subject (str or bytes), | ||
- reply_to (Any), | ||
- correlation_id (str or bytes), | ||
- content_type (str or bytes), | ||
- content_encoding (str or bytes), | ||
- creation_time (int), | ||
- absolute_expiry_time (int), | ||
- group_id (str or bytes), | ||
- group_sequence (int), | ||
- reply_to_group_id (str or bytes) | ||
|
||
:paramtype properties: dict | ||
:keyword application_properties: Service specific application properties. | ||
:paramtype application_properties: dict | ||
:keyword annotations: Service specific message annotations. | ||
:paramtype annotations: dict | ||
:keyword delivery_annotations: Service specific delivery annotations. | ||
:paramtype delivery_annotations: dict | ||
""" | ||
|
||
def __init__(self, **kwargs): | ||
# type: (Any) -> None | ||
self._message = kwargs.pop("message", None) | ||
|
||
if self._message: | ||
# internal usage only for service bus received message | ||
return | ||
|
||
self._data_body = kwargs.pop("data_body", None) | ||
self._sequence_body = kwargs.pop("sequence_body", None) | ||
self._value_body = kwargs.pop("value_body", None) | ||
|
||
validation = [ | ||
body | ||
for body in (self._value_body, self._data_body, self._sequence_body) | ||
if body is not None | ||
] | ||
if len(validation) != 1: | ||
raise ValueError( | ||
"There should be one and only one of either data_body, sequence_body " | ||
"or value_body being set as the body of the AMQPAnnotatedMessage." | ||
) | ||
|
||
header = kwargs.get("header") | ||
footer = kwargs.get("footer") | ||
properties = kwargs.get("properties") | ||
application_properties = kwargs.get("application_properties") | ||
annotations = kwargs.get("annotations") | ||
delivery_annotations = kwargs.get("delivery_annotations") | ||
|
||
body_type = ( | ||
uamqp.MessageBodyType.Data | ||
if self._data_body | ||
else ( | ||
uamqp.MessageBodyType.Sequence | ||
if self._sequence_body | ||
else uamqp.MessageBodyType.Value | ||
) | ||
) | ||
body = self._data_body or self._sequence_body or self._value_body | ||
|
||
message_header = None | ||
message_properties = None | ||
if header: | ||
message_header = uamqp.message.MessageHeader() | ||
message_header.delivery_count = header.get("delivery_count", 0) | ||
message_header.time_to_live = header.get("time_to_live") | ||
message_header.first_acquirer = header.get("first_acquirer") | ||
message_header.durable = header.get("durable") | ||
message_header.priority = header.get("priority") | ||
|
||
if properties: | ||
message_properties = uamqp.message.MessageProperties(**properties) | ||
|
||
self._message = uamqp.message.Message( | ||
body=body, | ||
body_type=body_type, | ||
header=message_header, | ||
footer=footer, | ||
properties=message_properties, | ||
application_properties=application_properties, | ||
annotations=annotations, | ||
delivery_annotations=delivery_annotations, | ||
) | ||
|
||
def _to_service_bus_message(self): | ||
return ServiceBusMessage(body=None, message=self._message) | ||
|
||
@property | ||
def body(self): | ||
# type: () -> Any | ||
"""The body of the Message. The format may vary depending | ||
on the body type: | ||
For ~azure.servicebus.AMQPMessageBodyType.DATA, the body could be bytes or Iterable[bytes] | ||
For ~azure.servicebus.AMQPMessageBodyType.SEQUENCE, the body could be List or Iterable[List] | ||
For ~azure.servicebus.AMQPMessageBodyType.VALUE, the body could be any type. | ||
|
||
:rtype: Any | ||
""" | ||
return self._message.get_data() | ||
|
||
@property | ||
def body_type(self): | ||
# type: () -> Optional[AMQPMessageBodyType] | ||
"""The body type of the underlying AMQP message. | ||
|
||
rtype: Optional[~azure.servicebus.AMQPMessageBodyType] | ||
""" | ||
return AMQP_MESSAGE_BODY_TYPE_MAP.get( | ||
self._message._body.type # pylint: disable=protected-access | ||
) | ||
|
||
@property | ||
def properties(self): | ||
# type: () -> uamqp.message.MessageProperties | ||
""" | ||
Properties to add to the message. | ||
|
||
:rtype: ~uamqp.message.MessageProperties | ||
""" | ||
return uamqp.message.MessageProperties( | ||
message_id=self._message.properties.message_id, | ||
user_id=self._message.properties.user_id, | ||
to=self._message.properties.to, | ||
subject=self._message.properties.subject, | ||
reply_to=self._message.properties.reply_to, | ||
correlation_id=self._message.properties.correlation_id, | ||
content_type=self._message.properties.content_type, | ||
content_encoding=self._message.properties.content_encoding, | ||
absolute_expiry_time=self._message.properties.absolute_expiry_time, | ||
creation_time=self._message.properties.creation_time, | ||
group_id=self._message.properties.group_id, | ||
group_sequence=self._message.properties.group_sequence, | ||
reply_to_group_id=self._message.properties.reply_to_group_id, | ||
) | ||
|
||
# NOTE: These are disabled pending arch. design and cross-sdk consensus on | ||
# how we will expose sendability of amqp focused messages. To undo, uncomment and remove deepcopies/workarounds. | ||
# | ||
# @properties.setter | ||
# def properties(self, value): | ||
# self._message.properties = value | ||
|
||
@property | ||
def application_properties(self): | ||
# type: () -> Optional[dict] | ||
""" | ||
Service specific application properties. | ||
|
||
:rtype: dict | ||
""" | ||
return copy.deepcopy(self._message.application_properties) | ||
|
||
# @application_properties.setter | ||
# def application_properties(self, value): | ||
# self._message.application_properties = value | ||
|
||
@property | ||
def annotations(self): | ||
# type: () -> Optional[dict] | ||
""" | ||
Service specific message annotations. Keys in the dictionary | ||
must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`. | ||
|
||
:rtype: dict | ||
""" | ||
return copy.deepcopy(self._message.annotations) | ||
|
||
# @annotations.setter | ||
# def annotations(self, value): | ||
# self._message.annotations = value | ||
|
||
@property | ||
def delivery_annotations(self): | ||
# type: () -> Optional[dict] | ||
""" | ||
Delivery-specific non-standard properties at the head of the message. | ||
Delivery annotations convey information from the sending peer to the receiving peer. | ||
Keys in the dictionary must be `uamqp.types.AMQPSymbol` or `uamqp.types.AMQPuLong`. | ||
|
||
:rtype: dict | ||
""" | ||
return copy.deepcopy(self._message.delivery_annotations) | ||
|
||
# @delivery_annotations.setter | ||
# def delivery_annotations(self, value): | ||
# self._message.delivery_annotations = value | ||
|
||
@property | ||
def header(self): | ||
# type: () -> uamqp.message.MessageHeader | ||
""" | ||
The message header. | ||
|
||
:rtype: ~uamqp.message.MessageHeader | ||
""" | ||
return uamqp.message.MessageHeader(header=self._message.header) | ||
|
||
# @header.setter | ||
# def header(self, value): | ||
# self._message.header = value | ||
|
||
@property | ||
def footer(self): | ||
# type: () -> Optional[dict] | ||
""" | ||
The message footer. | ||
|
||
:rtype: dict | ||
""" | ||
return copy.deepcopy(self._message.footer) | ||
|
||
# @footer.setter | ||
# def footer(self, value): | ||
# self._message.footer = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would be breaking - i thought we never exposed this to customer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would be a breaking change for b1, and we never exposed this to customer in GA