diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 122d0f541f31..a8bf7de0cf3d 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -5,6 +5,7 @@ **Breaking Changes** * `ServiceBusClient.close()` now closes spawned senders and receivers. +* Attempting to initialize a sender or receiver with a different connection string entity and specified entity (e.g. `queue_name`) will result in an AuthenticationError ## 7.0.0b5 (2020-08-10) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py index 223180d1bf11..80b181d116a9 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py @@ -21,7 +21,7 @@ from ._common._configuration import Configuration from .exceptions import ( ServiceBusError, - ServiceBusAuthorizationError, + ServiceBusAuthenticationError, _create_servicebus_exception ) from ._common.utils import create_properties @@ -104,7 +104,7 @@ def _convert_connection_string_to_kwargs(conn_str, shared_key_credential_type, * entity_in_kwargs = queue_name or topic_name if entity_in_conn_str and entity_in_kwargs and (entity_in_conn_str != entity_in_kwargs): - raise ServiceBusAuthorizationError( + raise ServiceBusAuthenticationError( "Entity names do not match, the entity name in connection string is {};" " the entity name in parameter is {}.".format(entity_in_conn_str, entity_in_kwargs) ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index 130ff7a77f7e..883a291581c3 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -352,6 +352,9 @@ def from_connection_string( within its request to the service. :rtype: ~azure.servicebus.ServiceBusReceiver + :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. + :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. + .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index a5a5131cac22..2ba095b5f2f6 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -284,6 +284,9 @@ def from_connection_string( :rtype: ~azure.servicebus.ServiceBusSender + :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. + :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. + .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py index e69d30d2f847..c6a6bcc33d35 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py @@ -154,6 +154,9 @@ def from_connection_string( within its request to the service. :rtype: ~azure.servicebus.ServiceBusSessionReceiver + :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. + :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. + .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py index 8db9f20d7b1c..4e13bf070d49 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py @@ -18,7 +18,7 @@ 'ServiceBusSender', 'ServiceBusReceiver', 'ServiceBusSessionReceiver', + 'ServiceBusSession', 'ServiceBusSharedKeyCredential', - 'AutoLockRenew', - 'ServiceBusSession' + 'AutoLockRenew' ] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index ac3a7a672f9d..da6c4b6e72e5 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -347,6 +347,9 @@ def from_connection_string( within its request to the service. :rtype: ~azure.servicebus.aio.ServiceBusReceiver + :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. + :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. + .. admonition:: Example: .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 4171981a4599..d427159074e7 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -223,6 +223,9 @@ def from_connection_string( :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. :rtype: ~azure.servicebus.aio.ServiceBusSender + :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. + :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. + .. admonition:: Example: .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py index ffc808f7b819..10abb88b5bf0 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py @@ -137,6 +137,9 @@ def from_connection_string( within its request to the service. :rtype: ~azure.servicebus.aio.ServiceBusSessionReceiver + :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. + :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. + .. admonition:: Example: .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 7e99a5ab6921..8d700eeb8b96 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -959,14 +959,14 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio async with ServiceBusClient.from_connection_string( servicebus_namespace_connection_string, logging_enable=False) as sb_client: - enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) + scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: async with sb_client.get_queue_sender(servicebus_queue.name) as sender: content = str(uuid.uuid4()) message_id = uuid.uuid4() message = Message(content) message.message_id = message_id - message.scheduled_enqueue_time_utc = enqueue_time + message.scheduled_enqueue_time_utc = scheduled_enqueue_time await sender.send_messages(message) messages = await receiver.receive_messages(max_wait_time=120) @@ -975,8 +975,8 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio data = str(messages[0]) assert data == content assert messages[0].message_id == message_id - assert messages[0].scheduled_enqueue_time_utc == enqueue_time - assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0) + assert messages[0].scheduled_enqueue_time_utc == scheduled_enqueue_time + assert messages[0].scheduled_enqueue_time_utc <= messages[0].enqueued_time_utc.replace(microsecond=0) assert len(messages) == 1 finally: for m in messages: @@ -992,7 +992,7 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): async with ServiceBusClient.from_connection_string( servicebus_namespace_connection_string, logging_enable=False) as sb_client: - enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) + scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) messages = [] receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20) sender = sb_client.get_queue_sender(servicebus_queue.name) @@ -1007,11 +1007,12 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace await sender.send_messages([message_a, message_b]) - received_messages = await receiver.receive_messages(max_batch_size=2, max_wait_time=5) - for message in received_messages: + received_messages = [] + async for message in receiver.get_streaming_message_iter(max_wait_time=5): + received_messages.append(message) await message.complete() - tokens = await sender.schedule_messages(received_messages, enqueue_time) + tokens = await sender.schedule_messages(received_messages, scheduled_enqueue_time) assert len(tokens) == 2 messages = await receiver.receive_messages(max_wait_time=120) @@ -1022,8 +1023,8 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace data = str(messages[0]) assert data == content assert messages[0].message_id in (message_id_a, message_id_b) - assert messages[0].scheduled_enqueue_time_utc == enqueue_time - assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0) + assert messages[0].scheduled_enqueue_time_utc == scheduled_enqueue_time + assert messages[0].scheduled_enqueue_time_utc <= messages[0].enqueued_time_utc.replace(microsecond=0) assert len(messages) == 2 finally: for m in messages: @@ -1252,7 +1253,11 @@ def message_content(): message_1st_received_cnt = 0 message_2nd_received_cnt = 0 while message_1st_received_cnt < 20 or message_2nd_received_cnt < 20: - messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5) + messages = [] + batch = await receiver.receive_messages(max_batch_size=20, max_wait_time=5) + while batch: + messages += batch + batch = await receiver.receive_messages(max_batch_size=20, max_wait_time=5) if not messages: break receive_counter += 1 @@ -1382,22 +1387,22 @@ async def test_async_queue_receiver_respects_max_wait_time_overrides(self, servi async for message in receiver.get_streaming_message_iter(max_wait_time=1): messages.append(message) time_3 = receiver._handler._counter.get_current_ms() - assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) < timedelta(seconds=2) + assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) <= timedelta(seconds=2) time_4 = receiver._handler._counter.get_current_ms() - assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) < timedelta(seconds=11) + assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) <= timedelta(seconds=11) async for message in receiver.get_streaming_message_iter(max_wait_time=3): messages.append(message) time_5 = receiver._handler._counter.get_current_ms() - assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) < timedelta(seconds=4) + assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) <= timedelta(seconds=4) async for message in receiver: messages.append(message) time_6 = receiver._handler._counter.get_current_ms() - assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) < timedelta(seconds=6) + assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) <= timedelta(seconds=6) async for message in receiver.get_streaming_message_iter(): messages.append(message) time_7 = receiver._handler._counter.get_current_ms() - assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) < timedelta(seconds=6) + assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) <= timedelta(seconds=6) assert len(messages) == 1 diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index e41c7d14a24b..30e5b3c02082 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1199,14 +1199,14 @@ def test_queue_schedule_message(self, servicebus_namespace_connection_string, se with ServiceBusClient.from_connection_string( servicebus_namespace_connection_string, logging_enable=False) as sb_client: - enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) + scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: with sb_client.get_queue_sender(servicebus_queue.name) as sender: content = str(uuid.uuid4()) message_id = uuid.uuid4() message = Message(content) message.message_id = message_id - message.scheduled_enqueue_time_utc = enqueue_time + message.scheduled_enqueue_time_utc = scheduled_enqueue_time sender.send_messages(message) messages = receiver.receive_messages(max_wait_time=120) @@ -1215,8 +1215,8 @@ def test_queue_schedule_message(self, servicebus_namespace_connection_string, se data = str(messages[0]) assert data == content assert messages[0].message_id == message_id - assert messages[0].scheduled_enqueue_time_utc == enqueue_time - assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0) + assert messages[0].scheduled_enqueue_time_utc == scheduled_enqueue_time + assert messages[0].scheduled_enqueue_time_utc <= messages[0].enqueued_time_utc.replace(microsecond=0) assert len(messages) == 1 finally: for m in messages: @@ -1235,7 +1235,7 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_ with ServiceBusClient.from_connection_string( servicebus_namespace_connection_string, logging_enable=False) as sb_client: - enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) + scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) sender = sb_client.get_queue_sender(servicebus_queue.name) receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20) @@ -1265,7 +1265,7 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_ received_messages.append(message) message.complete() - tokens = sender.schedule_messages(received_messages, enqueue_time) + tokens = sender.schedule_messages(received_messages, scheduled_enqueue_time) assert len(tokens) == 2 messages = receiver.receive_messages(max_wait_time=120) @@ -1275,8 +1275,8 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_ data = str(messages[0]) assert data == content assert messages[0].message_id in (message_id_a, message_id_b) - assert messages[0].scheduled_enqueue_time_utc == enqueue_time - assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0) + assert messages[0].scheduled_enqueue_time_utc == scheduled_enqueue_time + assert messages[0].scheduled_enqueue_time_utc <= messages[0].enqueued_time_utc.replace(microsecond=0) assert messages[0].delivery_count == 0 assert messages[0].properties assert messages[0].properties[b'key'] == b'value' @@ -1619,7 +1619,9 @@ def message_content(): message_1st_received_cnt = 0 message_2nd_received_cnt = 0 while message_1st_received_cnt < 20 or message_2nd_received_cnt < 20: - messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5) + messages = [] + for message in receiver.get_streaming_message_iter(max_wait_time=5): + messages.append(message) if not messages: break receive_counter += 1 @@ -1788,22 +1790,38 @@ def test_queue_receiver_respects_max_wait_time_overrides(self, servicebus_namesp for message in receiver.get_streaming_message_iter(max_wait_time=1): messages.append(message) time_3 = receiver._handler._counter.get_current_ms() - assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) < timedelta(seconds=2) + assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) <= timedelta(seconds=2) time_4 = receiver._handler._counter.get_current_ms() - assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) < timedelta(seconds=11) + assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) <= timedelta(seconds=11) for message in receiver.get_streaming_message_iter(max_wait_time=3): messages.append(message) time_5 = receiver._handler._counter.get_current_ms() - assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) < timedelta(seconds=4) + assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) <= timedelta(seconds=4) for message in receiver: messages.append(message) time_6 = receiver._handler._counter.get_current_ms() - assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) < timedelta(seconds=6) + assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) <= timedelta(seconds=6) for message in receiver.get_streaming_message_iter(): messages.append(message) time_7 = receiver._handler._counter.get_current_ms() - assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) < timedelta(seconds=6) + assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) <= timedelta(seconds=6) assert len(messages) == 1 + + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusQueuePreparer(name_prefix='servicebustest') + def test_queue_receiver_invalid_mode(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: +# with pytest.raises(StopIteration): + with sb_client.get_queue_receiver(servicebus_queue.name, + max_wait_time="oij") as receiver: + + assert receiver diff --git a/sdk/servicebus/azure-servicebus/tests/test_sb_client.py b/sdk/servicebus/azure-servicebus/tests/test_sb_client.py index f9cf5fb0e306..4d4cabf7638a 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sb_client.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sb_client.py @@ -13,13 +13,14 @@ from azure.common import AzureHttpError, AzureConflictHttpError from azure.mgmt.servicebus.models import AccessRights -from azure.servicebus import ServiceBusClient, ServiceBusSharedKeyCredential +from azure.servicebus import ServiceBusClient, ServiceBusSharedKeyCredential, ServiceBusSender from azure.servicebus._common.message import Message, PeekMessage from azure.servicebus._common.constants import ReceiveSettleMode from azure.servicebus.exceptions import ( ServiceBusError, ServiceBusConnectionError, ServiceBusAuthenticationError, + ServiceBusAuthorizationError, ServiceBusResourceNotFound ) from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer @@ -45,7 +46,7 @@ def test_sb_client_bad_credentials(self, servicebus_namespace, servicebus_queue, credential=ServiceBusSharedKeyCredential('invalid', 'invalid'), logging_enable=False) with client: - with pytest.raises(ServiceBusError): + with pytest.raises(ServiceBusAuthenticationError): with client.get_queue_sender(servicebus_queue.name) as sender: sender.send_messages(Message("test")) @@ -88,7 +89,7 @@ def test_sb_client_readonly_credentials(self, servicebus_authorization_rule_conn with client.get_queue_receiver(servicebus_queue.name) as receiver: messages = receiver.receive_messages(max_batch_size=1, max_wait_time=1) - with pytest.raises(ServiceBusError): + with pytest.raises(ServiceBusAuthorizationError): with client.get_queue_sender(servicebus_queue.name) as sender: sender.send_messages(Message("test")) @@ -120,14 +121,33 @@ def test_sb_client_writeonly_credentials(self, servicebus_authorization_rule_con @ServiceBusQueuePreparer(name_prefix='servicebustest_qone', parameter_name='wrong_queue', dead_lettering_on_message_expiration=True) @ServiceBusQueuePreparer(name_prefix='servicebustest_qtwo', dead_lettering_on_message_expiration=True) @ServiceBusQueueAuthorizationRulePreparer(name_prefix='servicebustest_qtwo') - def test_sb_client_incorrect_queue_conn_str(self, servicebus_queue_authorization_rule_connection_string, wrong_queue, **kwargs): + def test_sb_client_incorrect_queue_conn_str(self, servicebus_queue_authorization_rule_connection_string, servicebus_queue, wrong_queue, **kwargs): client = ServiceBusClient.from_connection_string(servicebus_queue_authorization_rule_connection_string) with client: - with pytest.raises(ServiceBusError): + # Validate that the wrong queue with the right credentials fails. + with pytest.raises(ServiceBusAuthenticationError): with client.get_queue_sender(wrong_queue.name) as sender: sender.send_messages(Message("test")) + # But that the correct one works. + with client.get_queue_sender(servicebus_queue.name) as sender: + sender.send_messages(Message("test")) + + # Now do the same but with direct connstr initialization. + with pytest.raises(ServiceBusAuthenticationError): + with ServiceBusSender.from_connection_string( + servicebus_queue_authorization_rule_connection_string, + queue_name=wrong_queue.name, + ) as sender: + sender.send_messages(Message("test")) + + with ServiceBusSender.from_connection_string( + servicebus_queue_authorization_rule_connection_string, + queue_name=servicebus_queue.name, + ) as sender: + sender.send_messages(Message("test")) + @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer()