From cdae82cc93becd41ee51a2da1fdaaeb566edae09 Mon Sep 17 00:00:00 2001 From: antisch Date: Thu, 8 Apr 2021 11:25:22 -0700 Subject: [PATCH 01/10] Added profiler support to perf framework --- .gitignore | 3 + .../perfstress_tests/perf_stress_runner.py | 80 +++++++++++++++---- 2 files changed, 67 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index 0fe206db7138..8a38c7927bf7 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,9 @@ build/ TestResults/ ENV_DIR/ +# Perf test profiling +cProfile-*.pstats + # tox generated artifacts test-junit-*.xml pylint-*.out.txt diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py index 861fb84c3b98..27570b03f116 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py @@ -70,6 +70,7 @@ def _parse_args(self): per_test_arg_parser.add_argument('-w', '--warmup', nargs='?', type=int, help='Duration of warmup in seconds. Default is 5.', default=5) per_test_arg_parser.add_argument('--no-cleanup', action='store_true', help='Do not run cleanup logic. Default is false.', default=False) per_test_arg_parser.add_argument('--sync', action='store_true', help='Run tests in sync mode. Default is False.', default=False) + per_test_arg_parser.add_argument('--profile', action='store_true', help='Run tests with profiler. Default is False.', default=False) # Per-test args self._test_class_to_run.add_arguments(per_test_arg_parser) @@ -121,7 +122,11 @@ async def start(self): title = "Test" if self.per_test_args.iterations > 1: title += " " + (i + 1) - await self._run_tests(tests, self.per_test_args.duration, title) + await self._run_tests( + tests, + self.per_test_args.duration, + title, + with_profiler=self.per_test_args.profile) except Exception as e: print("Exception: " + str(e)) finally: @@ -138,7 +143,7 @@ async def start(self): finally: await asyncio.gather(*[test.close() for test in tests]) - async def _run_tests(self, tests, duration, title): + async def _run_tests(self, tests, duration, title, with_profiler=False): self._completed_operations = [0] * len(tests) self._last_completion_times = [0] * len(tests) self._last_total_operations = -1 @@ -148,13 +153,16 @@ async def _run_tests(self, tests, duration, title): if self.per_test_args.sync: threads = [] for id, test in enumerate(tests): - thread = threading.Thread(target=lambda: self._run_sync_loop(test, duration, id)) + thread = threading.Thread( + target=lambda: self._run_sync_loop(test, duration, id, with_profiler) + ) threads.append(thread) thread.start() for thread in threads: thread.join() else: - await asyncio.gather(*[self._run_async_loop(test, duration, id) for id, test in enumerate(tests)]) + tasks = [self._run_async_loop(test, duration, id, with_profiler) for id, test in enumerate(tests)] + await asyncio.gather(*tasks) status_thread.stop() @@ -170,23 +178,63 @@ async def _run_tests(self, tests, duration, title): total_operations, weighted_average_seconds, operations_per_second, seconds_per_operation)) self.logger.info("") - def _run_sync_loop(self, test, duration, id): + def _run_sync_loop(self, test, duration, id, with_profiler): start = time.time() runtime = 0 - while runtime < duration: - test.run_sync() - runtime = time.time() - start - self._completed_operations[id] += 1 - self._last_completion_times[id] = runtime + if with_profiler: + import cProfile + profile = None + while runtime < duration: + profile = cProfile.Profile() + profile.enable() + test.run_sync() + profile.disable() + runtime = time.time() - start + self._completed_operations[id] += 1 + self._last_completion_times[id] = runtime + + if profile: + # Store only profile for final iteration + profile_name = "{}/cProfile-{}-{}-sync.pstats".format(os.getcwd(), test.__class__.__name__, id) + print("Dumping profile data to {}".format(profile_name)) + profile.dump_stats(profile_name) + else: + print("No profile generated.") + else: + while runtime < duration: + test.run_sync() + runtime = time.time() - start + self._completed_operations[id] += 1 + self._last_completion_times[id] = runtime - async def _run_async_loop(self, test, duration, id): + async def _run_async_loop(self, test, duration, id, with_profiler): start = time.time() runtime = 0 - while runtime < duration: - await test.run_async() - runtime = time.time() - start - self._completed_operations[id] += 1 - self._last_completion_times[id] = runtime + if with_profiler: + import cProfile + profile = None + while runtime < duration: + profile = cProfile.Profile() + profile.enable() + await test.run_async() + profile.disable() + runtime = time.time() - start + self._completed_operations[id] += 1 + self._last_completion_times[id] = runtime + + if profile: + # Store only profile for final iteration + profile_name = "{}/cProfile-{}-{}-async.pstats".format(os.getcwd(), test.__class__.__name__, id) + print("Dumping profile data to {}".format(profile_name)) + profile.dump_stats(profile_name) + else: + print("No profile generated.") + else: + while runtime < duration: + await test.run_async() + runtime = time.time() - start + self._completed_operations[id] += 1 + self._last_completion_times[id] = runtime def _print_status(self, title): if self._last_total_operations == -1: From 0e57361fd26318162f4e51f379b097d34f6d5285 Mon Sep 17 00:00:00 2001 From: antisch Date: Tue, 13 Apr 2021 09:19:52 -0700 Subject: [PATCH 02/10] Removed old loop kwarg --- .../azure/storage/blob/aio/_blob_client_async.py | 1 - .../azure/storage/blob/aio/_blob_service_client_async.py | 5 ++--- .../azure/storage/blob/aio/_container_client_async.py | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_client_async.py b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_client_async.py index ba715606ed74..3a6983ee3bc5 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_client_async.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_client_async.py @@ -122,7 +122,6 @@ def __init__( self._client = AzureBlobStorage(url=self.url, pipeline=self._pipeline) default_api_version = self._client._config.version # pylint: disable=protected-access self._client._config.version = get_api_version(kwargs, default_api_version) # pylint: disable=protected-access - self._loop = kwargs.get('loop', None) @distributed_trace_async async def get_account_information(self, **kwargs): # type: ignore diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_service_client_async.py b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_service_client_async.py index d3d72ba65389..15b31566520d 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_service_client_async.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_blob_service_client_async.py @@ -119,7 +119,6 @@ def __init__( self._client = AzureBlobStorage(url=self.url, pipeline=self._pipeline) default_api_version = self._client._config.version # pylint: disable=protected-access self._client._config.version = get_api_version(kwargs, default_api_version) # pylint: disable=protected-access - self._loop = kwargs.get('loop', None) @distributed_trace_async async def get_user_delegation_key(self, key_start_time, # type: datetime @@ -620,7 +619,7 @@ def get_container_client(self, container): credential=self.credential, api_version=self.api_version, _configuration=self._config, _pipeline=_pipeline, _location_mode=self._location_mode, _hosts=self._hosts, require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key, - key_resolver_function=self.key_resolver_function, loop=self._loop) + key_resolver_function=self.key_resolver_function) def get_blob_client( self, container, # type: Union[ContainerProperties, str] @@ -675,4 +674,4 @@ def get_blob_client( credential=self.credential, api_version=self.api_version, _configuration=self._config, _pipeline=_pipeline, _location_mode=self._location_mode, _hosts=self._hosts, require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key, - key_resolver_function=self.key_resolver_function, loop=self._loop) + key_resolver_function=self.key_resolver_function) diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_container_client_async.py b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_container_client_async.py index 65e31f3338fa..22b197cf9380 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_container_client_async.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_container_client_async.py @@ -119,7 +119,6 @@ def __init__( self._client = AzureBlobStorage(url=self.url, pipeline=self._pipeline) default_api_version = self._client._config.version # pylint: disable=protected-access self._client._config.version = get_api_version(kwargs, default_api_version) # pylint: disable=protected-access - self._loop = kwargs.get('loop', None) @distributed_trace_async async def create_container(self, metadata=None, public_access=None, **kwargs): @@ -1207,4 +1206,4 @@ def get_blob_client( credential=self.credential, api_version=self.api_version, _configuration=self._config, _pipeline=_pipeline, _location_mode=self._location_mode, _hosts=self._hosts, require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key, - key_resolver_function=self.key_resolver_function, loop=self._loop) + key_resolver_function=self.key_resolver_function) From 658cb50810ee85fe7d9e6f1a7ae21b2080a53bbe Mon Sep 17 00:00:00 2001 From: antisch Date: Tue, 13 Apr 2021 09:20:17 -0700 Subject: [PATCH 03/10] Fixed list test bug --- .../azure-storage-blob/tests/perfstress_tests/list_blobs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-blob/tests/perfstress_tests/list_blobs.py b/sdk/storage/azure-storage-blob/tests/perfstress_tests/list_blobs.py index ce73c910cf2a..f5f35a86fff1 100644 --- a/sdk/storage/azure-storage-blob/tests/perfstress_tests/list_blobs.py +++ b/sdk/storage/azure-storage-blob/tests/perfstress_tests/list_blobs.py @@ -22,7 +22,8 @@ async def global_setup(self): next_upload = next(pending) running.add(next_upload) except StopIteration: - await asyncio.wait(running, return_when=asyncio.ALL_COMPLETED) + if running: + await asyncio.wait(running, return_when=asyncio.ALL_COMPLETED) break def run_sync(self): From e4950b6c73ca08f2850abbdb444f31fad7613369 Mon Sep 17 00:00:00 2001 From: antisch Date: Tue, 15 Jun 2021 09:59:12 -0700 Subject: [PATCH 04/10] Support old API version --- .../azure-storage-blob/tests/perfstress_tests/_test_base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/storage/azure-storage-blob/tests/perfstress_tests/_test_base.py b/sdk/storage/azure-storage-blob/tests/perfstress_tests/_test_base.py index 678ea3986e27..61c29607e1a8 100644 --- a/sdk/storage/azure-storage-blob/tests/perfstress_tests/_test_base.py +++ b/sdk/storage/azure-storage-blob/tests/perfstress_tests/_test_base.py @@ -24,6 +24,8 @@ def __init__(self, arguments): kwargs['max_single_put_size'] = self.args.max_put_size kwargs['max_block_size'] = self.args.max_block_size kwargs['min_large_block_upload_threshold'] = self.args.buffer_threshold + # kwargs['api_version'] = '2019-02-02' # Used only for comparison with T1 legacy tests + if not _ServiceTest.service_client or self.args.no_client_share: _ServiceTest.service_client = SyncBlobServiceClient.from_connection_string(conn_str=connection_string, **kwargs) _ServiceTest.async_service_client = AsyncBlobServiceClient.from_connection_string(conn_str=connection_string, **kwargs) From 6d5377690844364d5b6d142d45c41af33d44ac67 Mon Sep 17 00:00:00 2001 From: antisch Date: Fri, 18 Jun 2021 11:00:03 -0700 Subject: [PATCH 05/10] Updated perf framework for test proxy --- .../perfstress_tests/_policies.py | 30 ++++++++ .../perfstress_tests/perf_stress_runner.py | 47 ++++++++++--- .../perfstress_tests/perf_stress_test.py | 69 +++++++++++++++++-- 3 files changed, 131 insertions(+), 15 deletions(-) create mode 100644 tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py new file mode 100644 index 000000000000..81108d826e8e --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py @@ -0,0 +1,30 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from azure.core.pipeline.policies import SansIOHTTPPolicy + + +class PerfTestProxyPolicy(SansIOHTTPPolicy): + + def __init__(self, url): + self.recording_id = None + self.mode = None + self._proxy_url = url + + def redirect_to_test_proxy(self, request): + if self.recording_id and self.mode: + original_destination = request.url + request.url = self._proxy_url + request.headers["x-recording-id"] = self.recording_id + request.headers["x-recording-mode"] = self.mode + request.headers["x-recording-remove"] = "false" + + # Ensure x-recording-upstream-base-uri header is only set once, since the + # same HttpMessage will be reused on retries + if "x-recording-upstream-base-uri" not in request.headers: + request.headers["x-recording-upstream-base-uri"] = original_destination + + def send(self, request): + self.redirect_to_test_proxy(request.http_request) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py index 27570b03f116..30d55babd4b0 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py @@ -64,13 +64,35 @@ def _parse_args(self): usage='{} {} []'.format(__file__, args.test)) # Global args - per_test_arg_parser.add_argument('-p', '--parallel', nargs='?', type=int, help='Degree of parallelism to run with. Default is 1.', default=1) - per_test_arg_parser.add_argument('-d', '--duration', nargs='?', type=int, help='Duration of the test in seconds. Default is 10.', default=10) - per_test_arg_parser.add_argument('-i', '--iterations', nargs='?', type=int, help='Number of iterations in the main test loop. Default is 1.', default=1) - per_test_arg_parser.add_argument('-w', '--warmup', nargs='?', type=int, help='Duration of warmup in seconds. Default is 5.', default=5) - per_test_arg_parser.add_argument('--no-cleanup', action='store_true', help='Do not run cleanup logic. Default is false.', default=False) - per_test_arg_parser.add_argument('--sync', action='store_true', help='Run tests in sync mode. Default is False.', default=False) - per_test_arg_parser.add_argument('--profile', action='store_true', help='Run tests with profiler. Default is False.', default=False) + per_test_arg_parser.add_argument( + "-p", "--parallel", nargs="?", type=int, help="Degree of parallelism to run with. Default is 1.", default=1 + ) + per_test_arg_parser.add_argument( + "-d", "--duration", nargs="?", type=int, help="Duration of the test in seconds. Default is 10.", default=10 + ) + per_test_arg_parser.add_argument( + "-i", + "--iterations", + nargs="?", + type=int, + help="Number of iterations in the main test loop. Default is 1.", + default=1, + ) + per_test_arg_parser.add_argument( + "-w", "--warmup", nargs="?", type=int, help="Duration of warmup in seconds. Default is 5.", default=5 + ) + per_test_arg_parser.add_argument( + "--no-cleanup", action="store_true", help="Do not run cleanup logic. Default is false.", default=False + ) + per_test_arg_parser.add_argument( + "--sync", action="store_true", help="Run tests in sync mode. Default is False.", default=False + ) + per_test_arg_parser.add_argument( + "--profile", action="store_true", help="Run tests with profiler. Default is False.", default=False + ) + per_test_arg_parser.add_argument( + "-x", "--test-proxy", help="URI of TestProxy Server" + ) # Per-test args self._test_class_to_run.add_arguments(per_test_arg_parser) @@ -112,9 +134,13 @@ async def start(self): await tests[0].global_setup() try: await asyncio.gather(*[test.setup() for test in tests]) - self.logger.info("") + if self.per_test_args.test_proxy: + self.logger.info("=== Record and Start Playback ===") + await asyncio.gather(*[test.record_and_start_playback() for test in tests]) + self.logger.info("") + if self.per_test_args.warmup > 0: await self._run_tests(tests, self.per_test_args.warmup, "Warmup") @@ -130,6 +156,11 @@ async def start(self): except Exception as e: print("Exception: " + str(e)) finally: + if self.per_test_args.test_proxy: + self.logger.info("=== Stop Playback ===") + await asyncio.gather(*[test.stop_playback() for test in tests]) + self.logger.info("") + if not self.per_test_args.no_cleanup: self.logger.info("=== Cleanup ===") await asyncio.gather(*[test.cleanup() for test in tests]) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py index c59a7973216d..2783902efe27 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py @@ -4,6 +4,9 @@ # -------------------------------------------------------------------------------------------- import os +import aiohttp + +from ._policies import PerfTestProxyPolicy class PerfStressTest: @@ -19,12 +22,50 @@ class PerfStressTest: def __init__(self, arguments): self.args = arguments + self._session = aiohttp.ClientSession() if self.args.test_proxy else None + self._test_proxy_policy = None + self._client_kwargs = {} + self._run_kwargs = {} + self._recording_id = None + if self.args.test_proxy: + self._test_proxy_policy = PerfTestProxyPolicy(self.args.test_proxy) + self._client_kwargs['verify'] = False + self._client_kwargs['policy'] = self._test_proxy_policy # TODO + self._run_kwargs['verify'] = False async def global_setup(self): return async def global_cleanup(self): return + + async def record_and_start_playback(self): + await self._start_recording() + self._test_proxy_policy.recording_id = self._recording_id + self._test_proxy_policy.mode = "record" + + # Record one call to run() + if self.args.sync: + self.run_sync() + else: + await self.run_async() + + await self._stop_recording() + await self._start_playback() + self._test_proxy_policy.recording_id = self._recording_id + self._test_proxy_policy.mode = "playback" + + async def stop_playback(self): + headers = { + "x-recording-id": self._recording_id, + "x-purge-inmemory-recording": "true" + } + url = self.args.test_proxy + "/playback/stop" + async with self._session.post(url, headers=headers, verify=False) as resp: + assert resp.status == 200 + + self._test_proxy_policy.recording_id = None + self._test_proxy_policy.mode = None async def setup(self): return @@ -33,13 +74,8 @@ async def cleanup(self): return async def close(self): - return - - def __enter__(self): - return - - def __exit__(self, exc_type, exc_value, traceback): - return + if self._session: + await self._session.close() def run_sync(self): raise Exception('run_sync must be implemented for {}'.format(self.__class__.__name__)) @@ -47,6 +83,25 @@ def run_sync(self): async def run_async(self): raise Exception('run_async must be implemented for {}'.format(self.__class__.__name__)) + async def _start_recording(self): + url = self.args.test_proxy + "/record/start" + async with self._session.post(url, verify=False) as resp: + assert resp.status == 200 + self._recording_id = resp.headers["x-recording-id"] + + async def _stop_recording(self): + headers = {"x-recording-id": self._recording_id} + url = self.args.test_proxy + "/record/stop" + async with self._session.post(url, headers=headers, verify=False) as resp: + assert resp.status == 200 + + async def _start_playback(self): + headers = {"x-recording-id": self._recording_id} + url = self.args.test_proxy + "/playback/start" + async with self._session.post(url, headers=headers, verify=False) as resp: + assert resp.status == 200 + self._recording_id = resp.headers["x-recording-id"] + @staticmethod def add_arguments(parser): """ From 207cd98dd695bd8000bd51f990df1fd8b497606d Mon Sep 17 00:00:00 2001 From: antisch Date: Fri, 18 Jun 2021 12:28:44 -0700 Subject: [PATCH 06/10] Support test proxy --- .../tests/perfstress_tests/_test_base.py | 15 +++++----- .../perfstress_tests/_policies.py | 28 ++++++++++++------- .../perfstress_tests/perf_stress_test.py | 23 +++++++++------ 3 files changed, 40 insertions(+), 26 deletions(-) diff --git a/sdk/storage/azure-storage-blob/tests/perfstress_tests/_test_base.py b/sdk/storage/azure-storage-blob/tests/perfstress_tests/_test_base.py index 61c29607e1a8..ca46e67ffccb 100644 --- a/sdk/storage/azure-storage-blob/tests/perfstress_tests/_test_base.py +++ b/sdk/storage/azure-storage-blob/tests/perfstress_tests/_test_base.py @@ -20,15 +20,16 @@ class _ServiceTest(PerfStressTest): def __init__(self, arguments): super().__init__(arguments) connection_string = self.get_from_env("AZURE_STORAGE_CONNECTION_STRING") - kwargs = {} - kwargs['max_single_put_size'] = self.args.max_put_size - kwargs['max_block_size'] = self.args.max_block_size - kwargs['min_large_block_upload_threshold'] = self.args.buffer_threshold - # kwargs['api_version'] = '2019-02-02' # Used only for comparison with T1 legacy tests + if self.args.test_proxy: + self._client_kwargs['_additional_pipeline_policies'] = self._client_kwargs['per_retry_policies'] + self._client_kwargs['max_single_put_size'] = self.args.max_put_size + self._client_kwargs['max_block_size'] = self.args.max_block_size + self._client_kwargs['min_large_block_upload_threshold'] = self.args.buffer_threshold + # self._client_kwargs['api_version'] = '2019-02-02' # Used only for comparison with T1 legacy tests if not _ServiceTest.service_client or self.args.no_client_share: - _ServiceTest.service_client = SyncBlobServiceClient.from_connection_string(conn_str=connection_string, **kwargs) - _ServiceTest.async_service_client = AsyncBlobServiceClient.from_connection_string(conn_str=connection_string, **kwargs) + _ServiceTest.service_client = SyncBlobServiceClient.from_connection_string(conn_str=connection_string, **self._client_kwargs) + _ServiceTest.async_service_client = AsyncBlobServiceClient.from_connection_string(conn_str=connection_string, **self._client_kwargs) self.service_client = _ServiceTest.service_client self.async_service_client =_ServiceTest.async_service_client diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py index 81108d826e8e..5be416c1175a 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py @@ -3,6 +3,8 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- +from urllib.parse import urlparse + from azure.core.pipeline.policies import SansIOHTTPPolicy @@ -11,20 +13,26 @@ class PerfTestProxyPolicy(SansIOHTTPPolicy): def __init__(self, url): self.recording_id = None self.mode = None - self._proxy_url = url + self._proxy_url = urlparse(url) def redirect_to_test_proxy(self, request): if self.recording_id and self.mode: - original_destination = request.url - request.url = self._proxy_url - request.headers["x-recording-id"] = self.recording_id - request.headers["x-recording-mode"] = self.mode - request.headers["x-recording-remove"] = "false" + request.context.options['connection_verify'] = False + live_endpoint = urlparse(request.http_request.url) + redirected = live_endpoint._replace( + scheme=self._proxy_url.scheme, + netloc=self._proxy_url.netloc + ) + request.http_request.url = redirected.geturl() + request.http_request.headers["x-recording-id"] = self.recording_id + request.http_request.headers["x-recording-mode"] = self.mode + request.http_request.headers["x-recording-remove"] = "false" # Ensure x-recording-upstream-base-uri header is only set once, since the # same HttpMessage will be reused on retries - if "x-recording-upstream-base-uri" not in request.headers: - request.headers["x-recording-upstream-base-uri"] = original_destination + if "x-recording-upstream-base-uri" not in request.http_request.headers: + original_endpoint = "{}://{}".format(live_endpoint.scheme, live_endpoint.netloc) + request.http_request.headers["x-recording-upstream-base-uri"] = original_endpoint - def send(self, request): - self.redirect_to_test_proxy(request.http_request) + def on_request(self, request): + self.redirect_to_test_proxy(request) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py index 4a458d6580b8..49b4c31a17e1 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py @@ -23,16 +23,21 @@ class PerfStressTest: def __init__(self, arguments): self.args = arguments - self._session = aiohttp.ClientSession() if self.args.test_proxy else None + self._session = None + if self.args.test_proxy: + self._session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) self._test_proxy_policy = None self._client_kwargs = {} - self._run_kwargs = {} self._recording_id = None if self.args.test_proxy: + # SSL will be disabled for the test proxy requests, so suppress warnings + import warnings + from urllib3.exceptions import InsecureRequestWarning + warnings.simplefilter('ignore', InsecureRequestWarning) + + # Add policy to redirect requests to the test proxy self._test_proxy_policy = PerfTestProxyPolicy(self.args.test_proxy) - self._client_kwargs['verify'] = False - self._client_kwargs['policy'] = self._test_proxy_policy # TODO - self._run_kwargs['verify'] = False + self._client_kwargs['per_retry_policies'] = [self._test_proxy_policy] async def global_setup(self): return @@ -62,7 +67,7 @@ async def stop_playback(self): "x-purge-inmemory-recording": "true" } url = self.args.test_proxy + "/playback/stop" - async with self._session.post(url, headers=headers, verify=False) as resp: + async with self._session.post(url, headers=headers) as resp: assert resp.status == 200 self._test_proxy_policy.recording_id = None @@ -86,20 +91,20 @@ async def run_async(self): async def _start_recording(self): url = self.args.test_proxy + "/record/start" - async with self._session.post(url, verify=False) as resp: + async with self._session.post(url) as resp: assert resp.status == 200 self._recording_id = resp.headers["x-recording-id"] async def _stop_recording(self): headers = {"x-recording-id": self._recording_id} url = self.args.test_proxy + "/record/stop" - async with self._session.post(url, headers=headers, verify=False) as resp: + async with self._session.post(url, headers=headers) as resp: assert resp.status == 200 async def _start_playback(self): headers = {"x-recording-id": self._recording_id} url = self.args.test_proxy + "/playback/start" - async with self._session.post(url, headers=headers, verify=False) as resp: + async with self._session.post(url, headers=headers) as resp: assert resp.status == 200 self._recording_id = resp.headers["x-recording-id"] From 4da6c07121a3ec2fc65e883f275135a4b0570943 Mon Sep 17 00:00:00 2001 From: antisch Date: Fri, 18 Jun 2021 12:37:50 -0700 Subject: [PATCH 07/10] Whitespace --- .../perfstress_tests/perf_stress_runner.py | 4 ++-- .../perfstress_tests/perf_stress_test.py | 15 ++++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py index d4409165ed58..a48ded31e10c 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py @@ -232,7 +232,7 @@ def _run_sync_loop(self, test, duration, id, with_profiler): runtime = time.time() - start self._completed_operations[id] += 1 self._last_completion_times[id] = runtime - + if profile: # Store only profile for final iteration profile_name = "{}/cProfile-{}-{}-sync.pstats".format(os.getcwd(), test.__class__.__name__, id) @@ -261,7 +261,7 @@ async def _run_async_loop(self, test, duration, id, with_profiler): runtime = time.time() - start self._completed_operations[id] += 1 self._last_completion_times[id] = runtime - + if profile: # Store only profile for final iteration profile_name = "{}/cProfile-{}-{}-async.pstats".format(os.getcwd(), test.__class__.__name__, id) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py index 49b4c31a17e1..87531c43b14b 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py @@ -24,12 +24,13 @@ class PerfStressTest: def __init__(self, arguments): self.args = arguments self._session = None - if self.args.test_proxy: - self._session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) self._test_proxy_policy = None self._client_kwargs = {} self._recording_id = None + if self.args.test_proxy: + self._session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) + # SSL will be disabled for the test proxy requests, so suppress warnings import warnings from urllib3.exceptions import InsecureRequestWarning @@ -44,7 +45,7 @@ async def global_setup(self): async def global_cleanup(self): return - + async def record_and_start_playback(self): await self._start_recording() self._test_proxy_policy.recording_id = self._recording_id @@ -60,7 +61,7 @@ async def record_and_start_playback(self): await self._start_playback() self._test_proxy_policy.recording_id = self._recording_id self._test_proxy_policy.mode = "playback" - + async def stop_playback(self): headers = { "x-recording-id": self._recording_id, @@ -69,7 +70,7 @@ async def stop_playback(self): url = self.args.test_proxy + "/playback/stop" async with self._session.post(url, headers=headers) as resp: assert resp.status == 200 - + self._test_proxy_policy.recording_id = None self._test_proxy_policy.mode = None @@ -94,13 +95,13 @@ async def _start_recording(self): async with self._session.post(url) as resp: assert resp.status == 200 self._recording_id = resp.headers["x-recording-id"] - + async def _stop_recording(self): headers = {"x-recording-id": self._recording_id} url = self.args.test_proxy + "/record/stop" async with self._session.post(url, headers=headers) as resp: assert resp.status == 200 - + async def _start_playback(self): headers = {"x-recording-id": self._recording_id} url = self.args.test_proxy + "/playback/start" From b78a0558888c0d11414b7a50a1ff6dcc2b458bee Mon Sep 17 00:00:00 2001 From: antisch Date: Fri, 18 Jun 2021 14:47:02 -0700 Subject: [PATCH 08/10] Support proxy in legacy tests --- .../tests/perfstress_tests/README.md | 18 +++++++--- .../T1_legacy_tests/_test_base_legacy.py | 33 ++++++++++++++++++- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md b/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md index f5bcf6dfad3c..678055c787f5 100644 --- a/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md +++ b/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md @@ -37,13 +37,14 @@ Using the `perfstress` command alone will list the available perf tests found. N ### Common perf command line options These options are available for all perf tests: -- `--duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10. -- `--iterations=1` Number of test iterations to run. Default is 1. -- `--parallel=1` Number of tests to run in parallel. Default is 1. +- `-d --duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10. +- `-i --iterations=1` Number of test iterations to run. Default is 1. +- `-p --parallel=1` Number of tests to run in parallel. Default is 1. - `--no-client-share` Whether each parallel test instance should share a single client, or use their own. Default is False (sharing). -- `--warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5. +- `-w --warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5. - `--sync` Whether to run the tests in sync or async. Default is False (async). This flag must be used for Storage legacy tests, which do not support async. - `--no-cleanup` Whether to keep newly created resources after test run. Default is False (resources will be deleted). +- `-x --test-proxy` Whether to run the tests against the test proxy server. Specfiy the URL for the proxy endpoint (e.g. "https://localhost:5001"). WARNING: When using with Legacy tests - only HTTPS is supported. ### Common Blob command line options The options are available for all Blob perf tests: @@ -77,3 +78,12 @@ The tests currently written for the T1 SDK: ```cmd (env) ~/azure-storage-blob/tests> perfstress UploadTest --parallel=2 --size=10240 ``` + +## Running with the test proxy +Follow the instructions here to install and run the test proxy server: +https://github.com/Azure/azure-sdk-tools/tree/feature/http-recording-server/tools/test-proxy/Azure.Sdk.Tools.TestProxy + +Once running, in a separate process run the perf test in question, combined with the `-x` flag to specify the proxy endpoint. (Note, only the HTTPS endpoint is supported for the Legacy tests). +```cmd +(env) ~/azure-storage-blob/tests> perfstress DownloadTest -x "https://localhost:5001" +``` diff --git a/sdk/storage/azure-storage-blob/tests/perfstress_tests/T1_legacy_tests/_test_base_legacy.py b/sdk/storage/azure-storage-blob/tests/perfstress_tests/T1_legacy_tests/_test_base_legacy.py index 0e224e68bf2d..a5a3d607f972 100644 --- a/sdk/storage/azure-storage-blob/tests/perfstress_tests/T1_legacy_tests/_test_base_legacy.py +++ b/sdk/storage/azure-storage-blob/tests/perfstress_tests/T1_legacy_tests/_test_base_legacy.py @@ -5,11 +5,30 @@ import os import uuid +import functools + +import requests from azure_devtools.perfstress_tests import PerfStressTest from azure.storage.blob import BlockBlobService + +def test_proxy_callback(proxy_policy, request): + if proxy_policy.recording_id and proxy_policy.mode: + live_endpoint = request.host + request.host = proxy_policy._proxy_url.netloc + request.headers["x-recording-id"] = proxy_policy.recording_id + request.headers["x-recording-mode"] = proxy_policy.mode + request.headers["x-recording-remove"] = "false" + + # Ensure x-recording-upstream-base-uri header is only set once, since the + # same HttpMessage will be reused on retries + if "x-recording-upstream-base-uri" not in request.headers: + original_endpoint = "https://{}".format(live_endpoint) + request.headers["x-recording-upstream-base-uri"] = original_endpoint + + class _LegacyServiceTest(PerfStressTest): service_client = None async_service_client = None @@ -17,14 +36,26 @@ class _LegacyServiceTest(PerfStressTest): def __init__(self, arguments): super().__init__(arguments) connection_string = self.get_from_env("AZURE_STORAGE_CONNECTION_STRING") + session = None + if self.args.test_proxy: + session = requests.Session() + session.verify = False if not _LegacyServiceTest.service_client or self.args.no_client_share: - _LegacyServiceTest.service_client = BlockBlobService(connection_string=connection_string) + _LegacyServiceTest.service_client = BlockBlobService( + connection_string=connection_string, + request_session=session) _LegacyServiceTest.service_client.MAX_SINGLE_PUT_SIZE = self.args.max_put_size _LegacyServiceTest.service_client.MAX_BLOCK_SIZE = self.args.max_block_size _LegacyServiceTest.service_client.MIN_LARGE_BLOCK_UPLOAD_THRESHOLD = self.args.buffer_threshold self.async_service_client = None self.service_client = _LegacyServiceTest.service_client + if self.args.test_proxy: + self.service_client.request_callback = functools.partial( + test_proxy_callback, + self._test_proxy_policy + ) + @staticmethod def add_arguments(parser): super(_LegacyServiceTest, _LegacyServiceTest).add_arguments(parser) From 65e561b9f0c7f6eac5ab72cad8b2942a3393cf9e Mon Sep 17 00:00:00 2001 From: antisch Date: Fri, 18 Jun 2021 15:07:18 -0700 Subject: [PATCH 09/10] Update perf test guide --- doc/dev/perfstress_tests.md | 35 +++++++++++++++---- .../tests/perfstress_tests/README.md | 1 + 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/doc/dev/perfstress_tests.md b/doc/dev/perfstress_tests.md index 3b68426f62aa..2865d2eac6c4 100644 --- a/doc/dev/perfstress_tests.md +++ b/doc/dev/perfstress_tests.md @@ -2,6 +2,7 @@ 1. [The perfstress framework](#the-perfstress-framework) - [The PerfStressTest base](#the-perfstresstest-base) - [Default command options](#default-command-options) + - [Running with test proxy](#running-with-test-proxy) 2. [Adding performance tests to an SDK](#adding-performance-tests-to-an-sdk) - [Writing a test](#writing-a-test) - [Adding legacy T1 tests](#adding-legacy-t1-tests) @@ -38,6 +39,16 @@ class PerfStressTest: async def global_cleanup(self): # Can be optionally defined. Only run once, regardless of parallelism. + async def record_and_start_playback(self): + # Set up the recording on the test proxy, and configure the proxy in playback mode. + # This function is only run if a test proxy URL is provided (-x). + # There should be no need to overwrite this function. + + async def stop_playback(self): + # Configure the proxy out of playback mode and discard the recording. + # This function is only run if a test proxy URL is provided (-x). + # There should be no need to overwrite this function. + async def setup(self): # Can be optionally defined. Run once per test instance, after global_setup. @@ -65,12 +76,24 @@ class PerfStressTest: ``` ## Default command options The framework has a series of common command line options built in: -- `--duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10. -- `--iterations=1` Number of test iterations to run. Default is 1. -- `--parallel=1` Number of tests to run in parallel. Default is 1. -- `--warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5. +- `-d --duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10. +- `-i --iterations=1` Number of test iterations to run. Default is 1. +- `-p --parallel=1` Number of tests to run in parallel. Default is 1. +- `-w --warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5. - `--sync` Whether to run the tests in sync or async. Default is False (async). - `--no-cleanup` Whether to keep newly created resources after test run. Default is False (resources will be deleted). +- `-x --test-proxy` Whether to run the tests against the test proxy server. Specfiy the URL for the proxy endpoint (e.g. "https://localhost:5001"). +- `--profile` Whether to run the perftest with cProfile. If enabled (default is False), the output file of the **last completed single iteration** will be written to the current working directory in the format `"cProfile---.pstats"`. + + +## Running with the test proxy +Follow the instructions here to install and run the test proxy server: +https://github.com/Azure/azure-sdk-tools/tree/feature/http-recording-server/tools/test-proxy/Azure.Sdk.Tools.TestProxy + +Once running, in a separate process run the perf test in question, combined with the `-x` flag to specify the proxy endpoint. +```cmd +(env) ~/azure-storage-blob/tests> perfstress DownloadTest -x "https://localhost:5001" +``` # Adding performance tests to an SDK The performance tests will be in a submodule called `perfstress_tests` within the `tests` directory in an SDK project. @@ -351,5 +374,5 @@ Using the `perfstress` command alone will list the available perf tests found. N Please add a `README.md` to the perfstress_tests directory so that others know how to setup and run the perf tests, along with a description of the available tests and any support command line options. README files in a `tests/perfstress_tests` directory should already be filtered from CI validation for SDK readmes. Some examples can be found here: -- [Azure Storage Blob](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md) -- [Azure Service Bus](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/servicebus/azure-servicebus/tests/perf_tests/README.md) \ No newline at end of file +- [Azure Storage Blob](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md) +- [Azure Service Bus](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/servicebus/azure-servicebus/tests/perf_tests/README.md) \ No newline at end of file diff --git a/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md b/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md index 678055c787f5..0289e3b68bba 100644 --- a/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md +++ b/sdk/storage/azure-storage-blob/tests/perfstress_tests/README.md @@ -45,6 +45,7 @@ These options are available for all perf tests: - `--sync` Whether to run the tests in sync or async. Default is False (async). This flag must be used for Storage legacy tests, which do not support async. - `--no-cleanup` Whether to keep newly created resources after test run. Default is False (resources will be deleted). - `-x --test-proxy` Whether to run the tests against the test proxy server. Specfiy the URL for the proxy endpoint (e.g. "https://localhost:5001"). WARNING: When using with Legacy tests - only HTTPS is supported. +- `--profile` Whether to run the perftest with cProfile. If enabled (default is False), the output file of the **last completed single iteration** will be written to the current working directory in the format `"cProfile---.pstats"`. ### Common Blob command line options The options are available for all Blob perf tests: From 6c29a97956c9a9757935b13a8f5082ef21d48c24 Mon Sep 17 00:00:00 2001 From: Mike Harder Date: Fri, 18 Jun 2021 17:38:41 -0700 Subject: [PATCH 10/10] Support proxy in GetSecretTest --- .../tests/perfstress_tests/get_secret.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/keyvault/azure-keyvault-secrets/tests/perfstress_tests/get_secret.py b/sdk/keyvault/azure-keyvault-secrets/tests/perfstress_tests/get_secret.py index b40bb6c20841..37230cef3214 100644 --- a/sdk/keyvault/azure-keyvault-secrets/tests/perfstress_tests/get_secret.py +++ b/sdk/keyvault/azure-keyvault-secrets/tests/perfstress_tests/get_secret.py @@ -21,8 +21,8 @@ def __init__(self, arguments): # Create clients vault_url = self.get_from_env("AZURE_KEYVAULT_URL") - self.client = SecretClient(vault_url, self.credential) - self.async_client = AsyncSecretClient(vault_url, self.async_credential) + self.client = SecretClient(vault_url, self.credential, **self._client_kwargs) + self.async_client = AsyncSecretClient(vault_url, self.async_credential, **self._client_kwargs) async def global_setup(self): """The global setup is run only once."""