diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 71dd7e187263..175ce72eaea3 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -5,6 +5,10 @@ **Breaking Changes** * `ServiceBusSender` and `ServiceBusReceiver` are no more reusable and will raise `ValueError` when trying to operate on a closed handler. +**Bug Fixes** + +* Using parameter `auto_lock_renewer` on a sessionful receiver alongside `ReceiveMode.ReceiveAndDelete` will no longer fail during receipt due to failure to register the message with the renewer. + ## 7.0.0b8 (2020-11-05) **New Features** diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index ba51b94089aa..6c8e8d858fbe 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -184,7 +184,7 @@ def _iter_next(self): self._message_iter = self._handler.receive_messages_iter() uamqp_message = next(self._message_iter) message = self._build_message(uamqp_message) - if self._auto_lock_renewer and not self._session: + if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete: self._auto_lock_renewer.register(self, message) return message @@ -521,7 +521,7 @@ def receive_messages(self, max_message_count=1, max_wait_time=None): timeout=max_wait_time, operation_requires_timeout=True ) - if self._auto_lock_renewer and not self._session: + if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete: for message in messages: self._auto_lock_renewer.register(self, message) return messages @@ -576,7 +576,7 @@ def receive_deferred_messages(self, sequence_numbers, **kwargs): handler, timeout=timeout ) - if self._auto_lock_renewer and not self._session: + if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete: for message in messages: self._auto_lock_renewer.register(self, message) return messages diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index d09f2801e6dd..bf91d3a951c0 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -181,7 +181,7 @@ async def _iter_next(self): self._message_iter = self._handler.receive_messages_iter_async() uamqp_message = await self._message_iter.__anext__() message = self._build_message(uamqp_message) - if self._auto_lock_renewer and not self._session: + if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete: self._auto_lock_renewer.register(self, message) return message @@ -520,7 +520,7 @@ async def receive_messages( timeout=max_wait_time, operation_requires_timeout=True ) - if self._auto_lock_renewer and not self._session: + if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete: for message in messages: self._auto_lock_renewer.register(self, message) return messages @@ -581,7 +581,7 @@ async def receive_deferred_messages( handler, timeout=timeout ) - if self._auto_lock_renewer and not self._session: + if self._auto_lock_renewer and not self._session and self._receive_mode != ReceiveMode.ReceiveAndDelete: for message in messages: self._auto_lock_renewer.register(self, message) return messages diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py index e4772fcebca6..9553cd748551 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py @@ -786,6 +786,28 @@ async def test_async_session_cancel_scheduled_messages(self, servicebus_namespac raise await renewer.close() + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True) + async def test_session_receiver_partially_invalid_autolockrenew_mode(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + session_id = str(uuid.uuid4()) + async with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + async with sb_client.get_queue_sender(servicebus_queue.name) as sender: + await sender.send_messages(ServiceBusMessage("test_message", session_id=session_id)) + + failures = 0 + async def should_not_run(*args, **kwargs): + failures += 1 + + async with sb_client.get_queue_receiver(servicebus_queue.name, + session_id=session_id, + receive_mode=ReceiveMode.ReceiveAndDelete, + auto_lock_renewer=AutoLockRenewer()) as receiver: + assert receiver.receive_messages() + assert not failures @pytest.mark.liveTest @pytest.mark.live_test_only diff --git a/sdk/servicebus/azure-servicebus/tests/test_sessions.py b/sdk/servicebus/azure-servicebus/tests/test_sessions.py index b7ed48ce03ff..c3680bb25847 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sessions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sessions.py @@ -724,6 +724,32 @@ def lock_lost_callback(renewable, error): renewer.close() assert len(messages) == 2 + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True) + def test_session_receiver_partially_invalid_autolockrenew_mode(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + session_id = str(uuid.uuid4()) + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + + with sb_client.get_queue_sender(servicebus_queue.name) as sender: + sender.send_messages(ServiceBusMessage("test_message", session_id=session_id)) + + failures = 0 + def should_not_run(*args, **kwargs): + failures += 1 + + auto_lock_renewer = AutoLockRenewer(on_lock_renew_failure=should_not_run) + with sb_client.get_queue_receiver(servicebus_queue.name, + session_id=session_id, + receive_mode=ReceiveMode.ReceiveAndDelete, + auto_lock_renewer=auto_lock_renewer) as receiver: + + assert receiver.receive_messages() + assert not failures + @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer()