Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] ServiceBus Operation Timeout Support #13854

Merged
merged 25 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c6c4d5c
sync opeartion timeout update
yunhaoling Sep 17, 2020
3feb787
add support for operation timeout to certain methods
yunhaoling Sep 17, 2020
7e83bb3
small fix
yunhaoling Sep 18, 2020
1275da1
fix 2.7 syntax error
yunhaoling Sep 18, 2020
6c282b4
fix mypy
yunhaoling Sep 18, 2020
dfcd17c
fix mypy
yunhaoling Sep 18, 2020
8411791
add test and fix a small bug
yunhaoling Sep 21, 2020
769a4a1
Merge branch 'master' into sb-operation-timeout
yunhaoling Sep 21, 2020
4161d68
improve code
yunhaoling Sep 21, 2020
88c3cf1
Merge branch 'master' into sb-operation-timeout
yunhaoling Sep 22, 2020
b46131a
improve test code
yunhaoling Sep 22, 2020
daeb2db
Merge branch 'sb-operation-timeout' of https://github.com/yunhaoling/…
yunhaoling Sep 22, 2020
12b2e7e
Update sdk/servicebus/azure-servicebus/CHANGELOG.md
yunhaoling Sep 28, 2020
d0291f7
move timeout into kwargs in async
yunhaoling Sep 29, 2020
c03f0e2
addressing pr review and update uamqp dependency to include the lates…
yunhaoling Sep 30, 2020
809f70c
Merge branch 'sb-operation-timeout' of https://github.com/yunhaoling/…
yunhaoling Sep 30, 2020
d5f095d
remove configuration check
yunhaoling Sep 30, 2020
191be47
Merge remote-tracking branch 'central/master' into sb-operation-timeout
yunhaoling Oct 3, 2020
0d2dbeb
merge master and resolve conflicts
yunhaoling Oct 3, 2020
85ff115
fix bug in test hacking
yunhaoling Oct 5, 2020
be4f31f
fix bug where timeout=0 in tests
yunhaoling Oct 5, 2020
ddae473
Merge remote-tracking branch 'central/master' into sb-operation-timeout
yunhaoling Oct 5, 2020
e793bb2
tweak changelog position
yunhaoling Oct 5, 2020
e1b5721
update changelog
yunhaoling Oct 5, 2020
840bcf2
add try-finally in timeout test cases
yunhaoling Oct 6, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,23 @@

## 7.0.0b8 (Unreleased)

**New Features**

* Added support for `timeout` parameter on the following operations:
- `ServiceBusSender`: `send_messages`, `schedule_messages` and `cancel_scheduled_messages`
- `ServiceBusReceiver`: `receive_deferred_messages` and `peek_messages`
- `ServiceBusSession`: `get_state`, `set_state` and `renew_lock`
- `ReceivedMessage`: `renew_lock`

**BugFixes**

* Updated uAMQP dependency to 1.2.11.
- Fixed bug where amqp message `footer` and `delivery_annotation` were not encoded into the outgoing payload.

## 7.0.0b7 (2020-10-05)

**Breaking Changes**

* Passing any type other than `ReceiveMode` as parameter `receive_mode` now throws a `TypeError` instead of `AttributeError`.
* Administration Client calls now take only entity names, not `<Entity>Descriptions` as well to reduce ambiguity in which entity was being acted on. TypeError will now be thrown on improper parameter types (non-string).
* `AMQPMessage` (`Message.amqp_message`) properties are now read-only, changes of these properties would not be reflected in the underlying message. This may be subject to change before GA.
Expand Down
56 changes: 44 additions & 12 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from urllib.parse import quote_plus

import uamqp
from uamqp import utils
from uamqp import utils, compat
from uamqp.message import MessageProperties

from azure.core.credentials import AccessToken
Expand All @@ -24,6 +24,7 @@
from .exceptions import (
ServiceBusError,
ServiceBusAuthenticationError,
OperationTimeoutError,
_create_servicebus_exception
)
from ._common.utils import create_properties
Expand Down Expand Up @@ -233,14 +234,14 @@ def _backoff(
self,
retried_times,
last_exception,
timeout=None,
abs_timeout_time=None,
entity_name=None
):
# type: (int, Exception, Optional[float], str) -> None
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
if backoff <= self._config.retry_backoff_max and (
timeout is None or backoff <= timeout
abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time
): # pylint:disable=no-else-return
time.sleep(backoff)
_LOGGER.info(
Expand All @@ -259,14 +260,17 @@ def _backoff(
def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# type: (Callable, Optional[float], Any) -> Any
require_last_exception = kwargs.pop("require_last_exception", False)
require_timeout = kwargs.pop("require_timeout", False)
operation_requires_timeout = kwargs.pop("operation_requires_timeout", False)
retried_times = 0
max_retries = self._config.retry_total

abs_timeout_time = (time.time() + timeout) if (operation_requires_timeout and timeout) else None

while retried_times <= max_retries:
try:
if require_timeout:
kwargs["timeout"] = timeout
if operation_requires_timeout and abs_timeout_time:
remaining_timeout = abs_timeout_time - time.time()
kwargs["timeout"] = remaining_timeout
return operation(**kwargs)
except StopIteration:
raise
Expand All @@ -285,13 +289,37 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs):
self._backoff(
retried_times=retried_times,
last_exception=last_exception,
timeout=timeout
abs_timeout_time=abs_timeout_time
)

def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_associated_link=True, **kwargs):
# type: (str, uamqp.Message, Callable, bool, Any) -> uamqp.Message
def _mgmt_request_response(
self,
mgmt_operation,
message,
callback,
keep_alive_associated_link=True,
timeout=None,
KieranBrantnerMagee marked this conversation as resolved.
Show resolved Hide resolved
**kwargs
):
# type: (bytes, uamqp.Message, Callable, bool, Optional[float], Any) -> uamqp.Message
"""
Execute an amqp management operation.

:param bytes mgmt_operation: The type of operation to be performed. This value will
be service-specific, but common values include READ, CREATE and UPDATE.
This value will be added as an application property on the message.
:param message: The message to send in the management request.
:paramtype message: ~uamqp.message.Message
:param callback: The callback which is used to parse the returning message.
:paramtype callback: Callable[int, ~uamqp.message.Message, str]
:param keep_alive_associated_link: A boolean flag for keeping associated amqp sender/receiver link alive when
executing operation on mgmt links.
:param timeout: timeout in seconds executing the mgmt operation.
:rtype: None
"""
self._open()
application_properties = {}

# Some mgmt calls do not support an associated link name (such as list_sessions). Most do, so on by default.
if keep_alive_associated_link:
try:
Expand All @@ -314,19 +342,23 @@ def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_a
mgmt_operation,
op_type=MGMT_REQUEST_OP_TYPE_ENTITY_MGMT,
node=self._mgmt_target.encode(self._config.encoding),
timeout=5000,
timeout=timeout * 1000 if timeout else None,
callback=callback
)
except Exception as exp: # pylint: disable=broad-except
if isinstance(exp, compat.TimeoutException):
raise OperationTimeoutError("Management operation timed out.", inner_exception=exp)
raise ServiceBusError("Management request failed: {}".format(exp), exp)

def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, **kwargs):
# type: (bytes, Dict[str, Any], Callable, Any) -> Any
def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, timeout=None, **kwargs):
# type: (bytes, Dict[str, Any], Callable, Optional[float], Any) -> Any
return self._do_retryable_operation(
self._mgmt_request_response,
mgmt_operation=mgmt_operation,
message=message,
callback=callback,
timeout=timeout,
operation_requires_timeout=True,
**kwargs
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ def __init__(self, **kwargs):
if self.http_proxy
else kwargs.get("transport_type", TransportType.Amqp)
)
# The following configs are not public, for internal usage only
self.auth_timeout = kwargs.get("auth_timeout", 60) # type: int
self.encoding = kwargs.get("encoding", "UTF-8")
self.auto_reconnect = kwargs.get("auto_reconnect", True)
self.keep_alive = kwargs.get("keep_alive", 30)
self.timeout = kwargs.get("timeout", 60) # type: float
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,6 @@ def _to_outgoing_message(self):
via_partition_key=self.via_partition_key
)


@property
def dead_letter_error_description(self):
# type: () -> Optional[str]
Expand Down Expand Up @@ -1043,8 +1042,8 @@ def defer(self):
self._settle_message(MESSAGE_DEFER)
self._settled = True

def renew_lock(self):
# type: () -> datetime.datetime
def renew_lock(self, **kwargs):
# type: (Any) -> datetime.datetime
# pylint: disable=protected-access,no-member
"""Renew the message lock.

Expand All @@ -1060,14 +1059,16 @@ def renew_lock(self):
Lock renewal can be performed as a background task by registering the message with an
`azure.servicebus.AutoLockRenew` instance.

:keyword float timeout: The total operation timeout in seconds including all the retries. The value must be
greater than 0 if specified. The default value is None, meaning no timeout.
:returns: The utc datetime the lock is set to expire at.
:rtype: datetime.datetime
:raises: TypeError if the message is sessionful.
:raises: ~azure.servicebus.exceptions.MessageLockExpired is message lock has already expired.
:raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled.
"""
try:
if self._receiver.session: # type: ignore
if self._receiver.session: # type: ignore
raise TypeError("Session messages cannot be renewed. Please renew the Session lock instead.")
except AttributeError:
pass
Expand All @@ -1076,8 +1077,12 @@ def renew_lock(self):
if not token:
raise ValueError("Unable to renew lock - no lock token found.")

expiry = self._receiver._renew_locks(token) # type: ignore
self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) # type: datetime.datetime
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")

expiry = self._receiver._renew_locks(token, timeout=timeout) # type: ignore
self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) # type: datetime.datetime

return self._expiry

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,6 @@ def _open(self):
self.close()
raise

def close(self):
# type: () -> None
super(ServiceBusReceiver, self).close()
self._message_iter = None # pylint: disable=attribute-defined-outside-init

def _receive(self, max_message_count=None, timeout=None):
# type: (Optional[int], Optional[float]) -> List[ReceivedMessage]
# pylint: disable=protected-access
Expand Down Expand Up @@ -276,15 +271,22 @@ def _settle_message(self, settlement, lock_tokens, dead_letter_details=None):
mgmt_handlers.default
)

def _renew_locks(self, *lock_tokens):
# type: (str) -> Any
def _renew_locks(self, *lock_tokens, **kwargs):
# type: (str, Any) -> Any
timeout = kwargs.pop("timeout", None)
message = {MGMT_REQUEST_LOCK_TOKENS: types.AMQPArray(lock_tokens)}
return self._mgmt_request_response_with_retry(
REQUEST_RESPONSE_RENEWLOCK_OPERATION,
message,
mgmt_handlers.lock_renew_op
mgmt_handlers.lock_renew_op,
timeout=timeout
)

def close(self):
# type: () -> None
super(ServiceBusReceiver, self).close()
self._message_iter = None # pylint: disable=attribute-defined-outside-init

def get_streaming_message_iter(self, max_wait_time=None):
# type: (float) -> Iterator[ReceivedMessage]
"""Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until
Expand Down Expand Up @@ -413,18 +415,20 @@ def receive_messages(self, max_message_count=None, max_wait_time=None):
self._receive,
max_message_count=max_message_count,
timeout=max_wait_time,
require_timeout=True
operation_requires_timeout=True
)

def receive_deferred_messages(self, sequence_numbers):
# type: (Union[int,List[int]]) -> List[ReceivedMessage]
def receive_deferred_messages(self, sequence_numbers, **kwargs):
# type: (Union[int,List[int]], Any) -> List[ReceivedMessage]
"""Receive messages that have previously been deferred.

When receiving deferred messages from a partitioned entity, all of the supplied
sequence numbers must be messages from the same partition.

:param Union[int,List[int]] sequence_numbers: A list of the sequence numbers of messages that have been
deferred.
:keyword float timeout: The total operation timeout in seconds including all the retries. The value must be
greater than 0 if specified. The default value is None, meaning no timeout.
:rtype: List[~azure.servicebus.ReceivedMessage]

.. admonition:: Example:
Expand All @@ -438,6 +442,9 @@ def receive_deferred_messages(self, sequence_numbers):

"""
self._check_live()
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
if isinstance(sequence_numbers, six.integer_types):
sequence_numbers = [sequence_numbers]
if not sequence_numbers:
Expand All @@ -458,20 +465,23 @@ def receive_deferred_messages(self, sequence_numbers):
messages = self._mgmt_request_response_with_retry(
REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER,
message,
handler
handler,
timeout=timeout
)
return messages

def peek_messages(self, max_message_count=1, sequence_number=None):
# type: (int, Optional[int]) -> List[PeekedMessage]
def peek_messages(self, max_message_count=1, **kwargs):
# type: (int, Any) -> List[PeekedMessage]
"""Browse messages currently pending in the queue.

Peeked messages are not removed from queue, nor are they locked. They cannot be completed,
deferred or dead-lettered.

:param int max_message_count: The maximum number of messages to try and peek. The default
value is 1.
:param int sequence_number: A message sequence number from which to start browsing messages.
:keyword int sequence_number: A message sequence number from which to start browsing messages.
:keyword float timeout: The total operation timeout in seconds including all the retries. The value must be
greater than 0 if specified. The default value is None, meaning no timeout.

:rtype: List[~azure.servicebus.PeekedMessage]

Expand All @@ -486,6 +496,10 @@ def peek_messages(self, max_message_count=1, sequence_number=None):

"""
self._check_live()
sequence_number = kwargs.pop("sequence_number", 0)
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
if not sequence_number:
sequence_number = self._last_received_sequenced_number or 1
if int(max_message_count) < 1:
Expand All @@ -504,5 +518,6 @@ def peek_messages(self, max_message_count=1, sequence_number=None):
return self._mgmt_request_response_with_retry(
REQUEST_RESPONSE_PEEK_OPERATION,
message,
mgmt_handlers.peek_op
mgmt_handlers.peek_op,
timeout=timeout
)
Loading