Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Perf] Support for test proxy and profiling #19338

Merged
merged 12 commits into from
Jun 21, 2021
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ build/
TestResults/
ENV_DIR/

# Perf test profiling
cProfile-*.pstats

# tox generated artifacts
test-junit-*.xml
pylint-*.out.txt
Expand Down
35 changes: 29 additions & 6 deletions doc/dev/perfstress_tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
1. [The perfstress framework](#the-perfstress-framework)
- [The PerfStressTest base](#the-perfstresstest-base)
- [Default command options](#default-command-options)
- [Running with test proxy](#running-with-test-proxy)
2. [Adding performance tests to an SDK](#adding-performance-tests-to-an-sdk)
- [Writing a test](#writing-a-test)
- [Adding legacy T1 tests](#adding-legacy-t1-tests)
Expand Down Expand Up @@ -38,6 +39,16 @@ class PerfStressTest:
async def global_cleanup(self):
# Can be optionally defined. Only run once, regardless of parallelism.

async def record_and_start_playback(self):
# Set up the recording on the test proxy, and configure the proxy in playback mode.
# This function is only run if a test proxy URL is provided (-x).
# There should be no need to overwrite this function.

async def stop_playback(self):
# Configure the proxy out of playback mode and discard the recording.
# This function is only run if a test proxy URL is provided (-x).
# There should be no need to overwrite this function.

async def setup(self):
# Can be optionally defined. Run once per test instance, after global_setup.

Expand Down Expand Up @@ -65,12 +76,24 @@ class PerfStressTest:
```
## Default command options
The framework has a series of common command line options built in:
- `--duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10.
- `--iterations=1` Number of test iterations to run. Default is 1.
- `--parallel=1` Number of tests to run in parallel. Default is 1.
- `--warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5.
- `-d --duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10.
- `-i --iterations=1` Number of test iterations to run. Default is 1.
- `-p --parallel=1` Number of tests to run in parallel. Default is 1.
- `-w --warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5.
- `--sync` Whether to run the tests in sync or async. Default is False (async).
- `--no-cleanup` Whether to keep newly created resources after test run. Default is False (resources will be deleted).
- `-x --test-proxy` Whether to run the tests against the test proxy server. Specfiy the URL for the proxy endpoint (e.g. "https://localhost:5001").
- `--profile` Whether to run the perftest with cProfile. If enabled (default is False), the output file of the **last completed single iteration** will be written to the current working directory in the format `"cProfile-<TestClassName>-<TestID>-<sync/async>.pstats"`.


## Running with the test proxy
Follow the instructions here to install and run the test proxy server:
https://github.com/Azure/azure-sdk-tools/tree/feature/http-recording-server/tools/test-proxy/Azure.Sdk.Tools.TestProxy

Once running, in a separate process run the perf test in question, combined with the `-x` flag to specify the proxy endpoint.
```cmd
(env) ~/azure-storage-blob/tests> perfstress DownloadTest -x "https://localhost:5001"
```

# Adding performance tests to an SDK
The performance tests will be in a submodule called `perfstress_tests` within the `tests` directory in an SDK project.
Expand Down Expand Up @@ -351,5 +374,5 @@ Using the `perfstress` command alone will list the available perf tests found. N

Please add a `README.md` to the perfstress_tests directory so that others know how to setup and run the perf tests, along with a description of the available tests and any support command line options. README files in a `tests/perfstress_tests` directory should already be filtered from CI validation for SDK readmes.
Some examples can be found here:
- [Azure Storage Blob](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md)
- [Azure Service Bus](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/servicebus/azure-servicebus/tests/perf_tests/README.md)
- [Azure Storage Blob](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md)
- [Azure Service Bus](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/servicebus/azure-servicebus/tests/perf_tests/README.md)
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def __init__(self, arguments):

# Create clients
vault_url = self.get_from_env("AZURE_KEYVAULT_URL")
self.client = SecretClient(vault_url, self.credential)
self.async_client = AsyncSecretClient(vault_url, self.async_credential)
self.client = SecretClient(vault_url, self.credential, **self._client_kwargs)
self.async_client = AsyncSecretClient(vault_url, self.async_credential, **self._client_kwargs)

async def global_setup(self):
"""The global setup is run only once."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ def __init__(
self._client = AzureBlobStorage(url=self.url, pipeline=self._pipeline)
default_api_version = self._client._config.version # pylint: disable=protected-access
self._client._config.version = get_api_version(kwargs, default_api_version) # pylint: disable=protected-access
self._loop = kwargs.get('loop', None)

@distributed_trace_async
async def get_account_information(self, **kwargs): # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ def __init__(
self._client = AzureBlobStorage(url=self.url, pipeline=self._pipeline)
default_api_version = self._client._config.version # pylint: disable=protected-access
self._client._config.version = get_api_version(kwargs, default_api_version) # pylint: disable=protected-access
self._loop = kwargs.get('loop', None)

@distributed_trace_async
async def get_user_delegation_key(self, key_start_time, # type: datetime
Expand Down Expand Up @@ -620,7 +619,7 @@ def get_container_client(self, container):
credential=self.credential, api_version=self.api_version, _configuration=self._config,
_pipeline=_pipeline, _location_mode=self._location_mode, _hosts=self._hosts,
require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key,
key_resolver_function=self.key_resolver_function, loop=self._loop)
key_resolver_function=self.key_resolver_function)

def get_blob_client(
self, container, # type: Union[ContainerProperties, str]
Expand Down Expand Up @@ -675,4 +674,4 @@ def get_blob_client(
credential=self.credential, api_version=self.api_version, _configuration=self._config,
_pipeline=_pipeline, _location_mode=self._location_mode, _hosts=self._hosts,
require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key,
key_resolver_function=self.key_resolver_function, loop=self._loop)
key_resolver_function=self.key_resolver_function)
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ def __init__(
self._client = AzureBlobStorage(url=self.url, pipeline=self._pipeline)
default_api_version = self._client._config.version # pylint: disable=protected-access
self._client._config.version = get_api_version(kwargs, default_api_version) # pylint: disable=protected-access
self._loop = kwargs.get('loop', None)

@distributed_trace_async
async def create_container(self, metadata=None, public_access=None, **kwargs):
Expand Down Expand Up @@ -1207,4 +1206,4 @@ def get_blob_client(
credential=self.credential, api_version=self.api_version, _configuration=self._config,
_pipeline=_pipeline, _location_mode=self._location_mode, _hosts=self._hosts,
require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key,
key_resolver_function=self.key_resolver_function, loop=self._loop)
key_resolver_function=self.key_resolver_function)
19 changes: 15 additions & 4 deletions sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ Using the `perfstress` command alone will list the available perf tests found. N

### Common perf command line options
These options are available for all perf tests:
- `--duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10.
- `--iterations=1` Number of test iterations to run. Default is 1.
- `--parallel=1` Number of tests to run in parallel. Default is 1.
- `-d --duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10.
- `-i --iterations=1` Number of test iterations to run. Default is 1.
- `-p --parallel=1` Number of tests to run in parallel. Default is 1.
- `--no-client-share` Whether each parallel test instance should share a single client, or use their own. Default is False (sharing).
- `--warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5.
- `-w --warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5.
- `--sync` Whether to run the tests in sync or async. Default is False (async). This flag must be used for Storage legacy tests, which do not support async.
- `--no-cleanup` Whether to keep newly created resources after test run. Default is False (resources will be deleted).
- `-x --test-proxy` Whether to run the tests against the test proxy server. Specfiy the URL for the proxy endpoint (e.g. "https://localhost:5001"). WARNING: When using with Legacy tests - only HTTPS is supported.
- `--profile` Whether to run the perftest with cProfile. If enabled (default is False), the output file of the **last completed single iteration** will be written to the current working directory in the format `"cProfile-<TestClassName>-<TestID>-<sync/async>.pstats"`.

### Common Blob command line options
The options are available for all Blob perf tests:
Expand Down Expand Up @@ -77,3 +79,12 @@ The tests currently written for the T1 SDK:
```cmd
(env) ~/azure-storage-blob/tests> perfstress UploadTest --parallel=2 --size=10240
```

## Running with the test proxy
Follow the instructions here to install and run the test proxy server:
https://github.com/Azure/azure-sdk-tools/tree/feature/http-recording-server/tools/test-proxy/Azure.Sdk.Tools.TestProxy

Once running, in a separate process run the perf test in question, combined with the `-x` flag to specify the proxy endpoint. (Note, only the HTTPS endpoint is supported for the Legacy tests).
```cmd
(env) ~/azure-storage-blob/tests> perfstress DownloadTest -x "https://localhost:5001"
```
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,57 @@

import os
import uuid
import functools

import requests

from azure_devtools.perfstress_tests import PerfStressTest

from azure.storage.blob import BlockBlobService


def test_proxy_callback(proxy_policy, request):
if proxy_policy.recording_id and proxy_policy.mode:
live_endpoint = request.host
request.host = proxy_policy._proxy_url.netloc
request.headers["x-recording-id"] = proxy_policy.recording_id
request.headers["x-recording-mode"] = proxy_policy.mode
request.headers["x-recording-remove"] = "false"

# Ensure x-recording-upstream-base-uri header is only set once, since the
# same HttpMessage will be reused on retries
if "x-recording-upstream-base-uri" not in request.headers:
original_endpoint = "https://{}".format(live_endpoint)
request.headers["x-recording-upstream-base-uri"] = original_endpoint


class _LegacyServiceTest(PerfStressTest):
service_client = None
async_service_client = None

def __init__(self, arguments):
super().__init__(arguments)
connection_string = self.get_from_env("AZURE_STORAGE_CONNECTION_STRING")
session = None
if self.args.test_proxy:
session = requests.Session()
session.verify = False
if not _LegacyServiceTest.service_client or self.args.no_client_share:
_LegacyServiceTest.service_client = BlockBlobService(connection_string=connection_string)
_LegacyServiceTest.service_client = BlockBlobService(
connection_string=connection_string,
request_session=session)
_LegacyServiceTest.service_client.MAX_SINGLE_PUT_SIZE = self.args.max_put_size
_LegacyServiceTest.service_client.MAX_BLOCK_SIZE = self.args.max_block_size
_LegacyServiceTest.service_client.MIN_LARGE_BLOCK_UPLOAD_THRESHOLD = self.args.buffer_threshold
self.async_service_client = None
self.service_client = _LegacyServiceTest.service_client

if self.args.test_proxy:
self.service_client.request_callback = functools.partial(
test_proxy_callback,
self._test_proxy_policy
)

@staticmethod
def add_arguments(parser):
super(_LegacyServiceTest, _LegacyServiceTest).add_arguments(parser)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ class _ServiceTest(PerfStressTest):
def __init__(self, arguments):
super().__init__(arguments)
connection_string = self.get_from_env("AZURE_STORAGE_CONNECTION_STRING")
kwargs = {}
kwargs['max_single_put_size'] = self.args.max_put_size
kwargs['max_block_size'] = self.args.max_block_size
kwargs['min_large_block_upload_threshold'] = self.args.buffer_threshold
if self.args.test_proxy:
self._client_kwargs['_additional_pipeline_policies'] = self._client_kwargs['per_retry_policies']
self._client_kwargs['max_single_put_size'] = self.args.max_put_size
self._client_kwargs['max_block_size'] = self.args.max_block_size
self._client_kwargs['min_large_block_upload_threshold'] = self.args.buffer_threshold
# self._client_kwargs['api_version'] = '2019-02-02' # Used only for comparison with T1 legacy tests

if not _ServiceTest.service_client or self.args.no_client_share:
_ServiceTest.service_client = SyncBlobServiceClient.from_connection_string(conn_str=connection_string, **kwargs)
_ServiceTest.async_service_client = AsyncBlobServiceClient.from_connection_string(conn_str=connection_string, **kwargs)
_ServiceTest.service_client = SyncBlobServiceClient.from_connection_string(conn_str=connection_string, **self._client_kwargs)
_ServiceTest.async_service_client = AsyncBlobServiceClient.from_connection_string(conn_str=connection_string, **self._client_kwargs)
self.service_client = _ServiceTest.service_client
self.async_service_client =_ServiceTest.async_service_client

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ async def global_setup(self):
next_upload = next(pending)
running.add(next_upload)
except StopIteration:
await asyncio.wait(running, return_when=asyncio.ALL_COMPLETED)
if running:
await asyncio.wait(running, return_when=asyncio.ALL_COMPLETED)
break

def run_sync(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from urllib.parse import urlparse

from azure.core.pipeline.policies import SansIOHTTPPolicy


class PerfTestProxyPolicy(SansIOHTTPPolicy):

def __init__(self, url):
self.recording_id = None
self.mode = None
self._proxy_url = urlparse(url)

def redirect_to_test_proxy(self, request):
if self.recording_id and self.mode:
request.context.options['connection_verify'] = False
live_endpoint = urlparse(request.http_request.url)
redirected = live_endpoint._replace(
scheme=self._proxy_url.scheme,
netloc=self._proxy_url.netloc
)
request.http_request.url = redirected.geturl()
request.http_request.headers["x-recording-id"] = self.recording_id
request.http_request.headers["x-recording-mode"] = self.mode
request.http_request.headers["x-recording-remove"] = "false"

# Ensure x-recording-upstream-base-uri header is only set once, since the
# same HttpMessage will be reused on retries
if "x-recording-upstream-base-uri" not in request.http_request.headers:
original_endpoint = "{}://{}".format(live_endpoint.scheme, live_endpoint.netloc)
request.http_request.headers["x-recording-upstream-base-uri"] = original_endpoint

def on_request(self, request):
self.redirect_to_test_proxy(request)
Loading