Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Initial pass of consistency review changes as detailed in issue #12415. #13160

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,25 @@
## 7.0.0b6 (Unreleased)

**New Features**

* `renew_lock()` now returns the UTC datetime that the lock is set to expire at.
* `receive_deferred_messages()` can now take a single sequence number as well as a list of sequence numbers.
* Messages can now be sent twice in succession.
* Internal AMQP message properties (header, footer, annotations, properties, etc) are now exposed via `Message.amqp_message`

**Breaking Changes**

* Renamed `prefetch` to `prefetch_count`.
* Renamed `ReceiveSettleMode` enum to `ReceiveMode`, and respectively the `mode` parameter to `receive_mode`.
* `retry_total`, `retry_backoff_factor` and `retry_backoff_max` are now defined at the `ServiceBusClient` level and inherited by senders and receivers created from it.
* No longer export `NEXT_AVAILABLE` in `azure.servicebus` module. A null `session_id` will suffice.
* Renamed parameter `message_count` to `max_message_count` as fewer messages may be present for method `peek_messages()` and `receive_messages()`.
* Renamed `PeekMessage` to `PeekedMessage`.
* Renamed `get_session_state()` and `set_session_state()` to `get_state()` and `set_state()` accordingly.
* Renamed parameter `description` to `error_description` for method `dead_letter()`.
* Renamed properties `created_time` and `modified_time` to `created_at_utc` and `modified_at_utc` within `AuthorizationRule` and `NamespaceProperties`.
* Removed parameter `requires_preprocessing` from `SqlRuleFilter` and `SqlRuleAction`.
* Removed property `namespace_type` from `NamespaceProperties`.
* Rename `ServiceBusManagementClient` to `ServiceBusAdministrationClient`
* Attempting to call `send_messages` on something not a `Message`, `BatchMessage`, or list of `Message`s, will now throw a `TypeError` instead of `ValueError`
* Sending a message twice will no longer result in a MessageAlreadySettled exception.
Expand Down
10 changes: 5 additions & 5 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ with ServiceBusClient.from_connection_string(connstr) as client:
if received_message_array:
print(str(received_message_array[0]))

with client.get_queue_receiver(queue_name, prefetch=5) as receiver:
received_message_array = receiver.receive_messages(max_batch_size=5, max_wait_time=10) # try to receive maximum 5 messages in a batch within 10 seconds
with client.get_queue_receiver(queue_name) as receiver:
received_message_array = receiver.receive_messages(max_message_count=5, max_wait_time=10) # try to receive maximum 5 messages in a batch within 10 seconds
for message in received_message_array:
print(str(message))
```

In this example, max_batch_size (and prefetch, as required by max_batch_size) declares the maximum number of messages to attempt receiving before hitting a max_wait_time as specified in seconds.
In this example, max_message_count declares the maximum number of messages to attempt receiving before hitting a max_wait_time as specified in seconds.

> **NOTE:** It should also be noted that `ServiceBusReceiver.peek_messages()` is subtly different than receiving, as it does not lock the messages being peeked, and thus they cannot be settled.

Expand Down Expand Up @@ -235,8 +235,8 @@ with ServiceBusClient.from_connection_string(connstr) as client:

When receiving from a queue, you have multiple actions you can take on the messages you receive.

> **NOTE**: You can only settle `ReceivedMessage` objects which are received in `ReceiveSettleMode.PeekLock` mode (this is the default).
> `ReceiveSettleMode.ReceiveAndDelete` mode removes the message from the queue on receipt. `PeekMessage` messages
> **NOTE**: You can only settle `ReceivedMessage` objects which are received in `ReceiveMode.PeekLock` mode (this is the default).
> `ReceiveMode.ReceiveAndDelete` mode removes the message from the queue on receipt. `PeekedMessage` messages
> returned from `peek()` cannot be settled, as the message lock is not taken like it is in the aforementioned receive methods. Sessionful messages have a similar limitation.

If the message has a lock as mentioned above, settlement will fail if the message lock has expired.
Expand Down
8 changes: 4 additions & 4 deletions sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@
from ._servicebus_session_receiver import ServiceBusSessionReceiver
from ._servicebus_session import ServiceBusSession
from ._base_handler import ServiceBusSharedKeyCredential
from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage
from ._common.constants import ReceiveSettleMode, SubQueue
from ._common.message import Message, BatchMessage, PeekedMessage, ReceivedMessage
from ._common.constants import ReceiveMode, SubQueue
from ._common.auto_lock_renewer import AutoLockRenew

TransportType = constants.TransportType

__all__ = [
'Message',
'BatchMessage',
'PeekMessage',
'PeekedMessage',
'ReceivedMessage',
'ReceiveSettleMode',
'SubQueue',
'ReceiveMode',
'ServiceBusClient',
'ServiceBusReceiver',
'ServiceBusSessionReceiver',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@
MGMT_REQUEST_SEQUENCE_NUMBERS = 'sequence-numbers'
MGMT_REQUEST_RECEIVER_SETTLE_MODE = 'receiver-settle-mode'
MGMT_REQUEST_FROM_SEQUENCE_NUMBER = 'from-sequence-number'
MGMT_REQUEST_MESSAGE_COUNT = 'message-count'
MGMT_REQUEST_MAX_MESSAGE_COUNT = 'message-count'
MGMT_REQUEST_MESSAGE = 'message'
MGMT_REQUEST_MESSAGES = 'messages'
MGMT_REQUEST_MESSAGE_ID = 'message-id'
MGMT_REQUEST_PARTITION_KEY = 'partition-key'
MGMT_REQUEST_VIA_PARTITION_KEY = 'via-partition-key'
MGMT_REQUEST_DEAD_LETTER_REASON = 'deadletter-reason'
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION = 'deadletter-description'
MGMT_REQUEST_DEAD_LETTER_ERROR_DESCRIPTION = 'deadletter-description'
RECEIVER_LINK_DEAD_LETTER_REASON = 'DeadLetterReason'
RECEIVER_LINK_DEAD_LETTER_DESCRIPTION = 'DeadLetterErrorDescription'
RECEIVER_LINK_DEAD_LETTER_ERROR_DESCRIPTION = 'DeadLetterErrorDescription'
MGMT_REQUEST_OP_TYPE_ENTITY_MGMT = b"entity-mgmt"

MESSAGE_COMPLETE = 'complete'
Expand Down Expand Up @@ -106,7 +106,7 @@
TRANSFER_DEAD_LETTER_QUEUE_SUFFIX = '/$Transfer' + DEAD_LETTER_QUEUE_SUFFIX


class ReceiveSettleMode(Enum):
class ReceiveMode(Enum):
PeekLock = constants.ReceiverSettleMode.PeekLock
ReceiveAndDelete = constants.ReceiverSettleMode.ReceiveAndDelete

Expand Down
53 changes: 29 additions & 24 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
SETTLEMENT_COMPLETE,
SETTLEMENT_DEFER,
SETTLEMENT_DEADLETTER,
ReceiveSettleMode,
ReceiveMode,
_X_OPT_ENQUEUED_TIME,
_X_OPT_SEQUENCE_NUMBER,
_X_OPT_ENQUEUE_SEQUENCE_NUMBER,
Expand All @@ -32,9 +32,9 @@
_X_OPT_DEAD_LETTER_SOURCE,
MGMT_RESPONSE_MESSAGE_EXPIRATION,
MGMT_REQUEST_DEAD_LETTER_REASON,
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION,
MGMT_REQUEST_DEAD_LETTER_ERROR_DESCRIPTION,
RECEIVER_LINK_DEAD_LETTER_REASON,
RECEIVER_LINK_DEAD_LETTER_DESCRIPTION,
RECEIVER_LINK_DEAD_LETTER_ERROR_DESCRIPTION,
MESSAGE_COMPLETE,
MESSAGE_DEAD_LETTER,
MESSAGE_ABANDON,
Expand Down Expand Up @@ -577,7 +577,7 @@ def add(self, message):
self._messages.append(message)


class PeekMessage(Message):
class PeekedMessage(Message):
"""A preview message.

This message is still on the queue, and unlocked.
Expand All @@ -587,7 +587,7 @@ class PeekMessage(Message):

def __init__(self, message):
# type: (uamqp.message.Message) -> None
super(PeekMessage, self).__init__(None, message=message) # type: ignore
super(PeekedMessage, self).__init__(None, message=message) # type: ignore

def _to_outgoing_message(self):
# type: () -> Message
Expand Down Expand Up @@ -736,7 +736,7 @@ def sequence_number(self):
return None


class ReceivedMessageBase(PeekMessage):
class ReceivedMessageBase(PeekedMessage):
"""
A Service Bus Message received from service side.

Expand All @@ -753,10 +753,10 @@ class ReceivedMessageBase(PeekMessage):
:caption: Checking the properties on a received message.
"""

def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs):
# type: (uamqp.message.Message, ReceiveSettleMode, Any) -> None
def __init__(self, message, receive_mode=ReceiveMode.PeekLock, **kwargs):
# type: (uamqp.message.Message, ReceiveMode, Any) -> None
super(ReceivedMessageBase, self).__init__(message=message)
self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete)
self._settled = (receive_mode == ReceiveMode.ReceiveAndDelete)
self._received_timestamp_utc = utc_now()
self._is_deferred_message = kwargs.get("is_deferred_message", False)
self.auto_renew_error = None # type: Optional[Exception]
Expand All @@ -765,7 +765,7 @@ def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs):
except KeyError:
raise TypeError("ReceivedMessage requires a receiver to be initialized. This class should never be" + \
"initialized by a user; the Message class should be utilized instead.")
self._expiry = None
self._expiry = None # type: Optional[datetime.datetime]

def _check_live(self, action):
# pylint: disable=no-member
Expand All @@ -784,7 +784,7 @@ def _check_live(self, action):
except AttributeError:
pass

def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_letter_description=None):
def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_letter_error_description=None):
# type: (str, Optional[str], Optional[str]) -> Callable
# pylint: disable=protected-access

Expand All @@ -807,7 +807,7 @@ def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_
[self.lock_token],
dead_letter_details={
MGMT_REQUEST_DEAD_LETTER_REASON: dead_letter_reason or "",
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: dead_letter_description or ""
MGMT_REQUEST_DEAD_LETTER_ERROR_DESCRIPTION: dead_letter_error_description or ""
}
)
if settle_operation == MESSAGE_DEFER:
Expand All @@ -818,7 +818,7 @@ def _settle_via_mgmt_link(self, settle_operation, dead_letter_reason=None, dead_
)
raise ValueError("Unsupported settle operation type: {}".format(settle_operation))

def _settle_via_receiver_link(self, settle_operation, dead_letter_reason=None, dead_letter_description=None):
def _settle_via_receiver_link(self, settle_operation, dead_letter_reason=None, dead_letter_error_description=None):
# type: (str, Optional[str], Optional[str]) -> Callable
if settle_operation == MESSAGE_COMPLETE:
return functools.partial(self.message.accept)
Expand All @@ -828,10 +828,10 @@ def _settle_via_receiver_link(self, settle_operation, dead_letter_reason=None, d
return functools.partial(
self.message.reject,
condition=DEADLETTERNAME,
description=dead_letter_description,
description=dead_letter_error_description,
info={
RECEIVER_LINK_DEAD_LETTER_REASON: dead_letter_reason,
RECEIVER_LINK_DEAD_LETTER_DESCRIPTION: dead_letter_description
RECEIVER_LINK_DEAD_LETTER_ERROR_DESCRIPTION: dead_letter_error_description
}
)
if settle_operation == MESSAGE_DEFER:
Expand Down Expand Up @@ -905,15 +905,15 @@ def _settle_message(
self,
settle_operation,
dead_letter_reason=None,
dead_letter_description=None,
dead_letter_error_description=None,
):
# type: (str, Optional[str], Optional[str]) -> None
try:
if not self._is_deferred_message:
try:
self._settle_via_receiver_link(settle_operation,
dead_letter_reason=dead_letter_reason,
dead_letter_description=dead_letter_description)()
dead_letter_error_description=dead_letter_error_description)()
return
except RuntimeError as exception:
_LOGGER.info(
Expand All @@ -924,7 +924,7 @@ def _settle_message(
)
self._settle_via_mgmt_link(settle_operation,
dead_letter_reason=dead_letter_reason,
dead_letter_description=dead_letter_description)()
dead_letter_error_description=dead_letter_error_description)()
except Exception as e:
raise MessageSettleFailed(settle_operation, e)

Expand Down Expand Up @@ -955,7 +955,7 @@ def complete(self):
self._settle_message(MESSAGE_COMPLETE)
self._settled = True

def dead_letter(self, reason=None, description=None):
def dead_letter(self, reason=None, error_description=None):
# type: (Optional[str], Optional[str]) -> None
"""Move the message to the Dead Letter queue.

Expand All @@ -964,7 +964,7 @@ def dead_letter(self, reason=None, description=None):
or processing. The queue can also be configured to send expired messages to the Dead Letter queue.

:param str reason: The reason for dead-lettering the message.
:param str description: The detailed description for dead-lettering the message.
:param str error_description: The detailed error description for dead-lettering the message.
:rtype: None
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
:raises: ~azure.servicebus.exceptions.MessageLockExpired if message lock has already expired.
Expand All @@ -983,7 +983,9 @@ def dead_letter(self, reason=None, description=None):
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_DEAD_LETTER)
self._settle_message(MESSAGE_DEAD_LETTER, dead_letter_reason=reason, dead_letter_description=description)
self._settle_message(MESSAGE_DEAD_LETTER,
dead_letter_reason=reason,
dead_letter_error_description=error_description)
self._settled = True

def abandon(self):
Expand Down Expand Up @@ -1041,7 +1043,7 @@ def defer(self):
self._settled = True

def renew_lock(self):
# type: () -> None
# type: () -> datetime.datetime
# pylint: disable=protected-access,no-member
"""Renew the message lock.

Expand All @@ -1057,7 +1059,8 @@ def renew_lock(self):
Lock renewal can be performed as a background task by registering the message with an
`azure.servicebus.AutoLockRenew` instance.

:rtype: None
:returns: The utc datetime the lock is set to expire at.
:rtype: datetime.datetime
:raises: TypeError if the message is sessionful.
:raises: ~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.
Expand All @@ -1073,7 +1076,9 @@ def renew_lock(self):
raise ValueError("Unable to renew lock - no lock token found.")

expiry = self._receiver._renew_locks(token) # type: ignore
self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0)
self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) # type: datetime.datetime

return self._expiry


class AMQPMessage(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

import uamqp

from .message import PeekMessage, ReceivedMessage
from .message import PeekedMessage, ReceivedMessage
from ..exceptions import ServiceBusError, MessageLockExpired
from .constants import ReceiveSettleMode
from .constants import ReceiveMode


def default(status_code, message, description):
Expand All @@ -34,7 +34,7 @@ def peek_op(status_code, message, description):
parsed = []
for m in message.get_data()[b'messages']:
wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message']))
parsed.append(PeekMessage(wrapped))
parsed.append(PeekedMessage(wrapped))
return parsed
if status_code in [202, 204]:
return []
Expand Down Expand Up @@ -63,14 +63,14 @@ def deferred_message_op(
message,
description,
receiver,
mode=ReceiveSettleMode.PeekLock,
receive_mode=ReceiveMode.PeekLock,
message_type=ReceivedMessage
):
if status_code == 200:
parsed = []
for m in message.get_data()[b'messages']:
wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message']))
parsed.append(message_type(wrapped, mode, is_deferred_message=True, receiver=receiver))
parsed.append(message_type(wrapped, receive_mode, is_deferred_message=True, receiver=receiver))
return parsed
if status_code in [202, 204]:
return []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
SESSION_LOCKED_UNTIL,
DATETIMEOFFSET_EPOCH,
MGMT_REQUEST_SESSION_ID,
ReceiveSettleMode
ReceiveMode
)
from ..exceptions import (
_ServiceBusErrorPolicy,
Expand All @@ -32,26 +32,26 @@ def _populate_attributes(self, **kwargs):

self._auth_uri = "sb://{}/{}".format(self.fully_qualified_namespace, self.entity_path)
self._entity_uri = "amqps://{}/{}".format(self.fully_qualified_namespace, self.entity_path)
self._mode = kwargs.get("mode", ReceiveSettleMode.PeekLock)
self._receive_mode = kwargs.get("receive_mode", ReceiveMode.PeekLock)
self._error_policy = _ServiceBusErrorPolicy(
max_retries=self._config.retry_total
)
self._name = "SBReceiver-{}".format(uuid.uuid4())
self._last_received_sequenced_number = None
self._message_iter = None
self._connection = kwargs.get("connection")
prefetch = kwargs.get("prefetch", 0)
if int(prefetch) < 0 or int(prefetch) > 50000:
raise ValueError("Prefetch must be an integer between 0 and 50000 inclusive.")
self._prefetch = prefetch + 1
prefetch_count = kwargs.get("prefetch_count", 0)
if int(prefetch_count) < 0 or int(prefetch_count) > 50000:
raise ValueError("prefetch_count must be an integer between 0 and 50000 inclusive.")
self._prefetch_count = prefetch_count + 1
# The relationship between the amount can be received and the time interval is linear: amount ~= perf * interval
# In large max_batch_size case, like 5000, the pull receive would always return hundreds of messages limited by
# the perf and time.
# In large max_message_count case, like 5000, the pull receive would always return hundreds of messages limited
# by the perf and time.
self._further_pull_receive_timeout_ms = 200
self._max_wait_time = kwargs.get("max_wait_time", None)

def _build_message(self, received, message_type=ReceivedMessage):
message = message_type(message=received, mode=self._mode, receiver=self)
message = message_type(message=received, receive_mode=self._receive_mode, receiver=self)
self._last_received_sequenced_number = message.sequence_number
return message

Expand Down
Loading