Skip to content

Commit

Permalink
[ServiceBus] Condition based error redesign + client validation align…
Browse files Browse the repository at this point in the history
…ment (#15364)

* partition key validation

* no. 2 message settle twice

* client validation and add error condtions

* error redesign

* fix mypy and pylint

* rever session and message lock lost to client validation

* fix mypy, pylint, add constant and comments

* fix pylint and address pr review comments

* fix pylint

* 3 error class renaming

* server busy should be retryable

* fix docs

* fix samples

* use _close_handler in internal _open call
  • Loading branch information
yunhaoling authored Nov 20, 2020
1 parent 5608cdf commit f845d36
Show file tree
Hide file tree
Showing 24 changed files with 861 additions and 544 deletions.
20 changes: 19 additions & 1 deletion sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,26 @@

**Breaking Changes**

* `ServiceBusSender` and `ServiceBusReceiver` are no more reusable and will raise `ValueError` when trying to operate on a closed handler.
* Setting `ServiceBusMessage.partition_key` to a value different than `session_id` on the message instance now raises `ValueError`.
* `ServiceBusSender` and `ServiceBusReceiver` are no longer reusable and will raise `ValueError` when trying to operate on a closed handler.
* `send_messages`, `schedule_messages`, `cancel_scheduled_messages` and `receive_deferred_messages` now performs a no-op rather than raising a `ValueError` if provided an empty list of messages or an empty batch.
* Redesigned error hierarchy based on the service-defined error condition:
- `MessageAlreadySettled` now inherits from `ValueError` instead of `ServiceBusMessageError` as it's a client-side validation.
- Removed `NoActiveSession` which is now replaced by `OperationTimeoutError` as the client times out when trying to connect to any available session.
- Removed `ServiceBusMessageError` as error condition based exceptions provide comprehensive error information.
- Renamed `MessageContentTooLarge` to `MessageSizeExceededError` to be consistent with the term defined by the service.
- Removed `MessageSettleFailed` as error condition based exceptions provide comprehensive error information.
- Removed `MessageSendFailed` as error condition based exceptions provide comprehensive error information.
- Renamed `MessageLockExpired` to `MessageLockLostError` to be consistent with the term defined by the service.
- Renamed `SessionLockExpired` to `SessionLockLostError` to be consistent with the term defined by the service.
- Introduced `MessageNotFoundError` which would be raised when the requested message was not found.
- Introduced `MessagingEntityNotFoundError` which would be raised when a Service Bus resource cannot be found by the Service Bus service.
- Introduced `MessagingEntityDisabledError` which would be raised when the Messaging Entity is disabled.
- Introduced `MessagingEntityAlreadyExistsError` which would be raised when an entity with the same name exists under the same namespace.
- Introduced `ServiceBusQuotaExceededError` which would be raised when a Service Bus resource has been exceeded while interacting with the Azure Service Bus service.
- Introduced `ServiceBusServerBusyError` which would be raised when the Azure Service Bus service reports that it is busy in response to a client request to perform an operation.
- Introduced `ServiceBusCommunicationError` which would be raised when there was a general communications error encountered when interacting with the Azure Service Bus service.
- Introduced `SessionCannotBeLockedError` which would be raised when the requested session cannot be locked.

**BugFixes**

Expand Down
29 changes: 16 additions & 13 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,37 +400,40 @@ This may have been caused by the credentials not having the right permission to
It is recommended to check the permission of the credentials.
- **ServiceBusAuthenticationError:** An error occurred when authenticate the connection to the service.
This may have been caused by the credentials being incorrect. It is recommended to check the credentials.
- **NoActiveSession:** This indicates that there is no active sessions receive from.
This may have been caused by all sessions in the entity have been occupied by other receiver instances.
It is recommended to retry if necessary.
- **OperationTimeoutError:** This indicates that the service did not respond to an operation within the expected amount of time.
This may have been caused by a transient network issue or service problem. The service may or may not have successfully completed the request; the status is not known.
It is recommended to attempt to verify the current state and retry if necessary.
- **MessageContentTooLarge:** This indicate that the message content is larger than the service bus frame size.
- **MessageSizeExceededError:** This indicate that the message content is larger than the service bus frame size.
This could happen when too many service bus messages are sent in a batch or the content passed into
the body of a `Message` is too large. It is recommended to reduce the count of messages being sent in a batch or the size of content being passed into a single `ServiceBusMessage`.
- **MessageAlreadySettled:** This indicates failure to settle the message.
This could happen when trying to settle an already-settled message.
- **MessageSettleFailed:** The attempt to settle a message failed.
This could happen when the service is unable to process the request.
It is recommended to retry or receive and settle the message again.
- **MessageSendFailed:** A message failed to be sent to the Service Bus entity.
This could happen when the service is unable to process the request.
It is recommended to retry, check the message content or reduce the count of messages in a batch.
- **MessageLockExpired:** The lock on the message has expired and it has been released back to the queue.
- **MessageLockLostError:** The lock on the message has expired and it has been released back to the queue.
It will need to be received again in order to settle it.
You should be aware of the lock duration of a message and keep renewing the lock before expiration in case of long processing time.
`AutoLockRenewer` could help on keeping the lock of the message automatically renewed.
- **SessionLockExpired:** The lock on the session has expired.
- **SessionLockLostError:** The lock on the session has expired.
All unsettled messages that have been received can no longer be settled.
It is recommended to reconnect to the session if receive messages again if necessary.
You should be aware of the lock duration of a session and keep renewing the lock before expiration in case of long processing time.
`AutoLockRenewer` could help on keeping the lock of the session automatically renewed.
- **MessageNotFoundError:** Attempt to receive a message with a particular sequence number. This message isn't found.
Make sure the message hasn't been received already. Check the deadletter queue to see if the message has been deadlettered.
- **MessagingEntityNotFoundError:** Entity associated with the operation doesn't exist or it has been deleted.
Please make sure the entity exists.
- **MessagingEntityDisabledError:** Request for a runtime operation on a disabled entity. Please Activate the entity.
- **ServiceBusQuotaExceededError:** The messaging entity has reached its maximum allowable size, or the maximum number of connections to a namespace has been exceeded.
Create space in the entity by receiving messages from the entity or its subqueues.
- **ServiceBusServerBusyError:** Service isn't able to process the request at this time. Client can wait for a period of time, then retry the operation.
- **ServiceBusCommunicationError:** Client isn't able to establish a connection to Service Bus.
Make sure the supplied host name is correct and the host is reachable.
If your code runs in an environment with a firewall/proxy, ensure that the traffic to the Service Bus domain/IP address and ports isn't blocked.
- **SessionCannotBeLockedError:** Attempt to connect to a session with a specific session ID, but the session is currently locked by another client.
Make sure the session is unlocked by other clients.
- **AutoLockRenewFailed:** An attempt to renew a lock on a message or session in the background has failed.
This could happen when the receiver used by `AutoLockRenerer` is closed or the lock of the renewable has expired.
It is recommended to re-register the renewable message or session by receiving the message or connect to the sessionful entity again.
- **AutoLockRenewTimeout:** The time allocated to renew the message or session lock has elapsed. You could re-register the object that wants be auto lock renewed or extend the timeout in advance.
- **ServiceBusMessageError:** Operation on message failed because the message is in a wrong state. It is the root error class of message related errors described above.
- **ServiceBusError:** All other Service Bus related errors. It is the root error class of all the errors described above.

Please view the [exceptions reference docs][exception_reference] for detailed descriptions of our common Exception types.
Expand Down
66 changes: 52 additions & 14 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
from ._common._configuration import Configuration
from .exceptions import (
ServiceBusError,
ServiceBusAuthenticationError,
ServiceBusConnectionError,
OperationTimeoutError,
SessionLockLostError,
_create_servicebus_exception
)
from ._common.utils import create_properties, strip_protocol_from_uri
Expand Down Expand Up @@ -190,9 +191,11 @@ def _convert_connection_string_to_kwargs(cls, conn_str, **kwargs):

entity_in_kwargs = queue_name or topic_name
if entity_in_conn_str and entity_in_kwargs and (entity_in_conn_str != entity_in_kwargs):
raise ServiceBusAuthenticationError(
"Entity names do not match, the entity name in connection string is {};"
" the entity name in parameter is {}.".format(entity_in_conn_str, entity_in_kwargs)
raise ValueError(
"The queue or topic name provided: {} which does not match the EntityPath in"
" the connection string passed to the ServiceBusClient constructor: {}.".format(
entity_in_conn_str, entity_in_kwargs
)
)

kwargs["fully_qualified_namespace"] = host
Expand All @@ -211,30 +214,65 @@ def _create_credential_from_connection_string_parameters(cls, token, token_expir
return ServiceBusSharedKeyCredential(policy, key)

def __enter__(self):
if self._shutdown.is_set():
raise ValueError("The handler has already been shutdown. Please use ServiceBusClient to "
"create a new instance.")

self._open_with_retry()
return self

def __exit__(self, *args):
self.close()

def _handle_exception(self, exception, **kwargs):
# type: (BaseException, Any) -> ServiceBusError
error, error_need_close_handler, error_need_raise = \
_create_servicebus_exception(_LOGGER, exception, self, **kwargs)
if error_need_close_handler:
def _handle_exception(self, exception):
# type: (BaseException) -> ServiceBusError
# pylint: disable=protected-access, line-too-long
error = _create_servicebus_exception(_LOGGER, exception)

try:
# If SessionLockLostError or ServiceBusConnectionError happen when a session receiver is running,
# the receiver should no longer be used and should create a new session receiver
# instance to receive from session. There are pitfalls WRT both next session IDs,
# and the diversity of session failure modes, that motivates us to disallow this.
if self._session and self._running and isinstance(error, (SessionLockLostError, ServiceBusConnectionError)): # type: ignore
self._session._lock_lost = True # type: ignore
self._close_handler()
raise error
except AttributeError:
pass

if error._shutdown_handler:
self._close_handler()
if error_need_raise:
if not error._retryable:
raise error

return error

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# type: (Callable, Optional[float], Any) -> Any
def _check_live(self):
"""check whether the handler is alive"""
# pylint: disable=protected-access
if self._shutdown.is_set():
raise ValueError("The handler has already been shutdown. Please use ServiceBusClient to "
"create a new instance.")
# The following client validation is for two purposes in a session receiver:
# 1. self._session._lock_lost is set when a session receiver encounters a connection error,
# once there's a connection error, we don't retry on the session entity and simply raise SessionlockLostError.
# 2. self._session._lock_expired is a hot fix as client validation for session lock expiration.
# Because currently uamqp doesn't have the ability to detect remote session lock lost.
# Usually the service would send a detach frame once a session lock gets expired, however, in the edge case
# when we drain messages in a queue and try to settle messages after lock expiration,
# we are not able to receive the detach frame by calling uamqp connection.work(),
# Eventually this should be a fix in the uamqp library.
# see issue: https://github.com/Azure/azure-uamqp-python/issues/183
try:
if self._session and (self._session._lock_lost or self._session._lock_expired):
raise SessionLockLostError(error=self._session.auto_renew_error)
except AttributeError:
pass

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# type: (Callable, Optional[float], Any) -> Any
# pylint: disable=protected-access
require_last_exception = kwargs.pop("require_last_exception", False)
operation_requires_timeout = kwargs.pop("operation_requires_timeout", False)
retried_times = 0
Expand All @@ -251,7 +289,7 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs):
except StopIteration:
raise
except Exception as exception: # pylint: disable=broad-except
last_exception = self._handle_exception(exception, **kwargs)
last_exception = self._handle_exception(exception)
if require_last_exception:
kwargs["last_exception"] = last_exception
retried_times += 1
Expand Down Expand Up @@ -350,7 +388,7 @@ def _mgmt_request_response(
)
except Exception as exp: # pylint: disable=broad-except
if isinstance(exp, compat.TimeoutException):
raise OperationTimeoutError("Management operation timed out.", error=exp)
raise OperationTimeoutError(error=exp)
raise

def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, timeout=None, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,21 @@

SESSION_FILTER = VENDOR + b":session-filter"
SESSION_LOCKED_UNTIL = VENDOR + b":locked-until-utc"
SESSION_LOCK_LOST = VENDOR + b":session-lock-lost"
SESSION_LOCK_TIMEOUT = VENDOR + b":timeout"

# Error codes
ERROR_CODE_SESSION_LOCK_LOST = VENDOR + b":session-lock-lost"
ERROR_CODE_MESSAGE_LOCK_LOST = VENDOR + b":message-lock-lost"
ERROR_CODE_MESSAGE_NOT_FOUND = VENDOR + b":message-not-found"
ERROR_CODE_TIMEOUT = VENDOR + b":timeout"
ERROR_CODE_AUTH_FAILED = VENDOR + b":auth-failed"
ERROR_CODE_SESSION_CANNOT_BE_LOCKED = VENDOR + b":session-cannot-be-locked"
ERROR_CODE_SERVER_BUSY = VENDOR + b":server-busy"
ERROR_CODE_ARGUMENT_ERROR = VENDOR + b":argument-error"
ERROR_CODE_OUT_OF_RANGE = VENDOR + b":argument-out-of-range"
ERROR_CODE_ENTITY_DISABLED = VENDOR + b":entity-disabled"
ERROR_CODE_ENTITY_ALREADY_EXISTS = VENDOR + b":entity-already-exists"
ERROR_CODE_PRECONDITION_FAILED = VENDOR + b":precondition-failed"


REQUEST_RESPONSE_OPERATION_NAME = b"operation"
REQUEST_RESPONSE_TIMEOUT = VENDOR + b":server-timeout"
Expand Down Expand Up @@ -68,6 +81,7 @@
RECEIVER_LINK_DEAD_LETTER_REASON = 'DeadLetterReason'
RECEIVER_LINK_DEAD_LETTER_ERROR_DESCRIPTION = 'DeadLetterErrorDescription'
MGMT_REQUEST_OP_TYPE_ENTITY_MGMT = b"entity-mgmt"
MGMT_RESPONSE_MESSAGE_ERROR_CONDITION = b'errorCondition'

MESSAGE_COMPLETE = 'complete'
MESSAGE_DEAD_LETTER = 'dead-letter'
Expand Down Expand Up @@ -114,6 +128,9 @@
TRANSFER_DEAD_LETTER_QUEUE_SUFFIX = '/$Transfer' + DEAD_LETTER_QUEUE_SUFFIX


MESSAGE_PROPERTY_MAX_LENGTH = 128


class ReceiveMode(Enum):
PeekLock = constants.ReceiverSettleMode.PeekLock
ReceiveAndDelete = constants.ReceiverSettleMode.ReceiveAndDelete
Expand Down
Loading

0 comments on commit f845d36

Please sign in to comment.