Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Small stress test and sample touchups #14304

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
An example to show authentication using a SAS token generated from a SAS key, as well as the generation process.
"""

import os
import time
import hmac
import hashlib
import base64
try:
from urllib.parse import quote as url_parse_quote
except ImportError:
from urllib import pathname2url as url_parse_quote
from azure.core.credentials import AccessToken

from azure.servicebus import ServiceBusClient

def generate_sas_token(uri, sas_name, sas_value, token_ttl):
"""Performs the signing and encoding needed to generate a sas token from a sas key."""
sas = sas_value.encode('utf-8')
expiry = str(int(time.time() + token_ttl))
string_to_sign = (uri + '\n' + expiry).encode('utf-8')
signed_hmac_sha256 = hmac.HMAC(sas, string_to_sign, hashlib.sha256)
signature = url_parse_quote(base64.b64encode(signed_hmac_sha256.digest()))
return 'SharedAccessSignature sr={}&sig={}&se={}&skn={}'.format(uri, signature, expiry, sas_name)

class CustomizedSASCredential(object):
def __init__(self, token, expiry):
"""
:param str token: The token string
:param float expiry: The epoch timestamp
"""
self.token = token
self.expiry = expiry
self.token_type = b"servicebus.windows.net:sastoken"

def get_token(self, *scopes, **kwargs):
"""
This method is automatically called when token is about to expire.
"""
return AccessToken(self.token, self.expiry)

FULLY_QUALIFIED_NAMESPACE = os.environ['SERVICE_BUS_NAMESPACE']
SAS_POLICY = os.environ['SERVICE_BUS_SAS_POLICY']
SAS_KEY = os.environ['SERVICE_BUS_SAS_KEY']

auth_uri = "sb://{}/{}".format(FULLY_QUALIFIED_NAMESPACE, SESSION_QUEUE_NAME)
token_ttl = 3000 # seconds

sas_token = generate_sas_token(auth_uri, SAS_POLICY, SAS_KEY, token_ttl)

credential=CustomizedSASCredential(sas_token, time.time() + token_ttl)

with ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential) as client:
pass # client now connected, your logic goes here.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
MessageLockExpired,
MessageAlreadySettled,
AutoLockRenewTimeout,
MessageSettleFailed)
MessageSettleFailed,
MessageSendFailed)
from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer
from servicebus_preparer import (
CachedServiceBusNamespacePreparer,
CachedServiceBusQueuePreparer,
ServiceBusTopicPreparer,
ServiceBusQueuePreparer,
ServiceBusSubscriptionPreparer
Expand Down Expand Up @@ -903,4 +905,18 @@ async def test_async_session_connection_failure_is_idempotent(self, servicebus_n
messages = []
async for message in receiver:
messages.append(message)
assert len(messages) == 1
assert len(messages) == 1

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@CachedServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True)
async def test_async_session_non_session_send_to_session_queue_should_fail(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
with pytest.raises(MessageSendFailed):
message = Message("Handler message")
await sender.send_messages(message)
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def test_stress_queue_pull_receive_timeout(self, servicebus_namespace_connection
senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
max_wait_time = 5,
receive_type=ReceiveType.pull,
duration=timedelta(seconds=600))

result = stress_test.Run()
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/azure-servicebus/tests/test_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer
from servicebus_preparer import (
CachedServiceBusNamespacePreparer,
CachedServiceBusQueuePreparer,
ServiceBusTopicPreparer,
ServiceBusQueuePreparer,
ServiceBusSubscriptionPreparer
Expand Down Expand Up @@ -1001,7 +1002,7 @@ def test_session_basic_topic_subscription_send_and_receive(self, servicebus_name
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True)
@CachedServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True)
def test_session_non_session_send_to_session_queue_should_fail(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
Expand Down