Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Test and failure improvements #13345

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**Breaking Changes**

* `ServiceBusClient.close()` now closes spawned senders and receivers.
* Attempting to initialize a sender or receiver with a different connection string entity and specified entity (e.g. `queue_name`) will result in an AuthenticationError

## 7.0.0b5 (2020-08-10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ._common._configuration import Configuration
from .exceptions import (
ServiceBusError,
ServiceBusAuthorizationError,
ServiceBusAuthenticationError,
_create_servicebus_exception
)
from ._common.utils import create_properties
Expand Down Expand Up @@ -104,7 +104,7 @@ def _convert_connection_string_to_kwargs(conn_str, shared_key_credential_type, *

entity_in_kwargs = queue_name or topic_name
if entity_in_conn_str and entity_in_kwargs and (entity_in_conn_str != entity_in_kwargs):
raise ServiceBusAuthorizationError(
raise ServiceBusAuthenticationError(
"Entity names do not match, the entity name in connection string is {};"
" the entity name in parameter is {}.".format(entity_in_conn_str, entity_in_kwargs)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,9 @@ def from_connection_string(
within its request to the service.
:rtype: ~azure.servicebus.ServiceBusReceiver

:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.

.. admonition:: Example:

.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ def from_connection_string(

:rtype: ~azure.servicebus.ServiceBusSender

:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.

.. admonition:: Example:

.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ def from_connection_string(
within its request to the service.
:rtype: ~azure.servicebus.ServiceBusSessionReceiver

:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.

.. admonition:: Example:

.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
'ServiceBusSender',
'ServiceBusReceiver',
'ServiceBusSessionReceiver',
'ServiceBusSession',
'ServiceBusSharedKeyCredential',
'AutoLockRenew',
'ServiceBusSession'
'AutoLockRenew'
]
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ def from_connection_string(
within its request to the service.
:rtype: ~azure.servicebus.aio.ServiceBusReceiver

:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.

.. admonition:: Example:

.. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ def from_connection_string(
:keyword str user_agent: If specified, this will be added in front of the built-in user agent string.
:rtype: ~azure.servicebus.aio.ServiceBusSender

:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.

.. admonition:: Example:

.. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def from_connection_string(
within its request to the service.
:rtype: ~azure.servicebus.aio.ServiceBusSessionReceiver

:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.

.. admonition:: Example:

.. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,14 +959,14 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
content = str(uuid.uuid4())
message_id = uuid.uuid4()
message = Message(content)
message.message_id = message_id
message.scheduled_enqueue_time_utc = enqueue_time
message.scheduled_enqueue_time_utc = scheduled_enqueue_time
await sender.send_messages(message)

messages = await receiver.receive_messages(max_wait_time=120)
Expand All @@ -975,8 +975,8 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio
data = str(messages[0])
assert data == content
assert messages[0].message_id == message_id
assert messages[0].scheduled_enqueue_time_utc == enqueue_time
assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0)
assert messages[0].scheduled_enqueue_time_utc == scheduled_enqueue_time
assert messages[0].scheduled_enqueue_time_utc <= messages[0].enqueued_time_utc.replace(microsecond=0)
assert len(messages) == 1
finally:
for m in messages:
Expand All @@ -992,7 +992,7 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio
async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
messages = []
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20)
sender = sb_client.get_queue_sender(servicebus_queue.name)
Expand All @@ -1007,11 +1007,12 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace

await sender.send_messages([message_a, message_b])

received_messages = await receiver.receive_messages(max_batch_size=2, max_wait_time=5)
for message in received_messages:
received_messages = []
async for message in receiver.get_streaming_message_iter(max_wait_time=5):
received_messages.append(message)
await message.complete()

tokens = await sender.schedule_messages(received_messages, enqueue_time)
tokens = await sender.schedule_messages(received_messages, scheduled_enqueue_time)
assert len(tokens) == 2

messages = await receiver.receive_messages(max_wait_time=120)
Expand All @@ -1022,8 +1023,8 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace
data = str(messages[0])
assert data == content
assert messages[0].message_id in (message_id_a, message_id_b)
assert messages[0].scheduled_enqueue_time_utc == enqueue_time
assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0)
assert messages[0].scheduled_enqueue_time_utc == scheduled_enqueue_time
assert messages[0].scheduled_enqueue_time_utc <= messages[0].enqueued_time_utc.replace(microsecond=0)
assert len(messages) == 2
finally:
for m in messages:
Expand Down Expand Up @@ -1252,7 +1253,11 @@ def message_content():
message_1st_received_cnt = 0
message_2nd_received_cnt = 0
while message_1st_received_cnt < 20 or message_2nd_received_cnt < 20:
messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
messages = []
batch = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
while batch:
messages += batch
batch = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
if not messages:
break
receive_counter += 1
Expand Down Expand Up @@ -1382,22 +1387,22 @@ async def test_async_queue_receiver_respects_max_wait_time_overrides(self, servi
async for message in receiver.get_streaming_message_iter(max_wait_time=1):
messages.append(message)
time_3 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) < timedelta(seconds=2)
assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) <= timedelta(seconds=2)
time_4 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) < timedelta(seconds=11)
assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) <= timedelta(seconds=11)

async for message in receiver.get_streaming_message_iter(max_wait_time=3):
messages.append(message)
time_5 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) < timedelta(seconds=4)
assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) <= timedelta(seconds=4)

async for message in receiver:
messages.append(message)
time_6 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) < timedelta(seconds=6)
assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) <= timedelta(seconds=6)

async for message in receiver.get_streaming_message_iter():
messages.append(message)
time_7 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) < timedelta(seconds=6)
assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) <= timedelta(seconds=6)
assert len(messages) == 1
46 changes: 32 additions & 14 deletions sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -1199,14 +1199,14 @@ def test_queue_schedule_message(self, servicebus_namespace_connection_string, se
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
content = str(uuid.uuid4())
message_id = uuid.uuid4()
message = Message(content)
message.message_id = message_id
message.scheduled_enqueue_time_utc = enqueue_time
message.scheduled_enqueue_time_utc = scheduled_enqueue_time
sender.send_messages(message)

messages = receiver.receive_messages(max_wait_time=120)
Expand All @@ -1215,8 +1215,8 @@ def test_queue_schedule_message(self, servicebus_namespace_connection_string, se
data = str(messages[0])
assert data == content
assert messages[0].message_id == message_id
assert messages[0].scheduled_enqueue_time_utc == enqueue_time
assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0)
assert messages[0].scheduled_enqueue_time_utc == scheduled_enqueue_time
assert messages[0].scheduled_enqueue_time_utc <= messages[0].enqueued_time_utc.replace(microsecond=0)
assert len(messages) == 1
finally:
for m in messages:
Expand All @@ -1235,7 +1235,7 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20)

Expand Down Expand Up @@ -1265,7 +1265,7 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_
received_messages.append(message)
message.complete()

tokens = sender.schedule_messages(received_messages, enqueue_time)
tokens = sender.schedule_messages(received_messages, scheduled_enqueue_time)
assert len(tokens) == 2

messages = receiver.receive_messages(max_wait_time=120)
Expand All @@ -1275,8 +1275,8 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_
data = str(messages[0])
assert data == content
assert messages[0].message_id in (message_id_a, message_id_b)
assert messages[0].scheduled_enqueue_time_utc == enqueue_time
assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0)
assert messages[0].scheduled_enqueue_time_utc == scheduled_enqueue_time
assert messages[0].scheduled_enqueue_time_utc <= messages[0].enqueued_time_utc.replace(microsecond=0)
assert messages[0].delivery_count == 0
assert messages[0].properties
assert messages[0].properties[b'key'] == b'value'
Expand Down Expand Up @@ -1619,7 +1619,9 @@ def message_content():
message_1st_received_cnt = 0
message_2nd_received_cnt = 0
while message_1st_received_cnt < 20 or message_2nd_received_cnt < 20:
messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5)
messages = []
for message in receiver.get_streaming_message_iter(max_wait_time=5):
messages.append(message)
if not messages:
break
receive_counter += 1
Expand Down Expand Up @@ -1788,22 +1790,38 @@ def test_queue_receiver_respects_max_wait_time_overrides(self, servicebus_namesp
for message in receiver.get_streaming_message_iter(max_wait_time=1):
messages.append(message)
time_3 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) < timedelta(seconds=2)
assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) <= timedelta(seconds=2)
time_4 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) < timedelta(seconds=11)
assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) <= timedelta(seconds=11)

for message in receiver.get_streaming_message_iter(max_wait_time=3):
messages.append(message)
time_5 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) < timedelta(seconds=4)
assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) <= timedelta(seconds=4)

for message in receiver:
messages.append(message)
time_6 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) < timedelta(seconds=6)
assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) <= timedelta(seconds=6)

for message in receiver.get_streaming_message_iter():
messages.append(message)
time_7 = receiver._handler._counter.get_current_ms()
assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) < timedelta(seconds=6)
assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) <= timedelta(seconds=6)
assert len(messages) == 1


@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@CachedServiceBusQueuePreparer(name_prefix='servicebustest')
def test_queue_receiver_invalid_mode(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):

with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
# with pytest.raises(StopIteration):
with sb_client.get_queue_receiver(servicebus_queue.name,
max_wait_time="oij") as receiver:

assert receiver
Loading