diff --git a/alembic/versions/aa1fc9fdc665_add_column_for_py_spy_profiles_url.py b/alembic/versions/aa1fc9fdc665_add_column_for_py_spy_profiles_url.py new file mode 100644 index 0000000000..6bc34a822e --- /dev/null +++ b/alembic/versions/aa1fc9fdc665_add_column_for_py_spy_profiles_url.py @@ -0,0 +1,24 @@ +"""Add column for py-spy profiles url + +Revision ID: aa1fc9fdc665 +Revises: 1095dfdfc4ae +Create Date: 2024-10-23 16:11:24.794416 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'aa1fc9fdc665' +down_revision = '1095dfdfc4ae' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column('test_run', sa.Column('py_spy_profiles_url', sa.String(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("test_run", "py_spy_profiles_url") diff --git a/benchmark_schema.py b/benchmark_schema.py index 20b85697c5..d9f09ae7da 100644 --- a/benchmark_schema.py +++ b/benchmark_schema.py @@ -63,6 +63,7 @@ class TestRun(Base): performance_report_url = Column(String, nullable=True) # Not yet collected cluster_dump_url = Column(String, nullable=True) memray_profiles_url = Column(String, nullable=True) + py_spy_profiles_url = Column(String, nullable=True) class TPCHRun(Base): diff --git a/tests/conftest.py b/tests/conftest.py index 930670bed8..f2d3915438 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -84,6 +84,14 @@ def pytest_addoption(parser): choices=("scheduler", "none"), ) + parser.addoption( + "--py-spy", + action="store", + default="none", + help="py-spy profiles to collect: scheduler, workers, all, or none", + choices=("scheduler", "workers", "all", "none"), + ) + def pytest_sessionfinish(session, exitstatus): # https://github.com/pytest-dev/pytest/issues/2393 @@ -670,12 +678,12 @@ def _(**exta_options): @pytest.fixture(scope="session") def s3_performance(s3): - profiles_url = f"{S3_BUCKET}/performance" + performance_url = f"{S3_BUCKET}/performance" # Ensure that the performance directory exists, # but do NOT remove it as multiple test runs could be # accessing it at the same time - s3.mkdirs(profiles_url, exist_ok=True) - return profiles_url + s3.mkdirs(performance_url, exist_ok=True) + return performance_url @pytest.fixture(scope="session") @@ -894,32 +902,33 @@ def memray_profile( if memray_option == "none": yield contextlib.nullcontext - elif memray_option != "scheduler": + return + + if memray_option != "scheduler": raise ValueError(f"Unhandled value for --memray: {memray_option}") - else: - @contextlib.contextmanager - def _memray_profile(client): - profiles_path = tmp_path / "profiles" - profiles_path.mkdir() - try: - with memray.memray_scheduler(directory=profiles_path): - yield - finally: - archive = tmp_path / "memray.tar.gz" - with tarfile.open(archive, mode="w:gz") as tar: - for item in profiles_path.iterdir(): - tar.add(item, arcname=item.name) - test_run_benchmark.memray_profiles_url = ( - f"{s3_performance_url}/{archive.name}" - ) - s3.put(archive, s3_performance_url) - - yield _memray_profile + @contextlib.contextmanager + def _memray_profile(client): + local_directory = tmp_path / "profiles" / "memray" + local_directory.mkdir(parents=True) + try: + with memray.memray_scheduler(directory=local_directory): + yield + finally: + archive_name = "memray.tar.gz" + archive = tmp_path / archive_name + with tarfile.open(archive, mode="w:gz") as tar: + for item in local_directory.iterdir(): + tar.add(item, arcname=item.name) + destination = f"{s3_performance_url}/{archive_name}" + test_run_benchmark.memray_profiles_url = destination + s3.put_file(archive, destination) + + yield _memray_profile @pytest.fixture -def performance_report( +def py_spy_profile( pytestconfig, s3, s3_performance_url, @@ -929,20 +938,90 @@ def performance_report( ): if not test_run_benchmark: yield contextlib.nullcontext + return + + py_spy_option = pytestconfig.getoption("--py-spy") + if py_spy_option == "none": + yield contextlib.nullcontext + return + + profile_scheduler = False + profile_workers = False + + if py_spy_option == "scheduler": + profile_scheduler = True + elif py_spy_option == "workers": + profile_workers = True + elif py_spy_option == "all": + profile_scheduler = True + profile_workers = True else: - if not pytestconfig.getoption("--performance-report"): - yield contextlib.nullcontext - else: + raise ValueError(f"Unhandled value for --py-spy: {py_spy_option}") + + try: + from dask_pyspy import pyspy, pyspy_on_scheduler + except ModuleNotFoundError as e: + raise ModuleNotFoundError( + "py-spy profiling benchmarks requires dask-pyspy to be installed." + ) from e + + @contextlib.contextmanager + def _py_spy_profile(client): + local_directory = tmp_path / "profiles" / "py-spy" + local_directory.mkdir(parents=True) + + worker_ctx = contextlib.nullcontext() + if profile_workers: + worker_ctx = pyspy(local_directory, client=client) + + scheduler_ctx = contextlib.nullcontext() + if profile_scheduler: + scheduler_ctx = pyspy_on_scheduler( + local_directory / "scheduler.json", client=client + ) + + try: + with worker_ctx, scheduler_ctx: + yield + finally: + archive_name = "py-spy.tar.gz" + archive = tmp_path / archive_name + with tarfile.open(archive, mode="w:gz") as tar: + for item in local_directory.iterdir(): + tar.add(item, arcname=item.name) + destination = f"{s3_performance_url}/{archive_name}" + test_run_benchmark.py_spy_profiles_url = destination + s3.put_file(archive, destination) + + yield _py_spy_profile + + +@pytest.fixture +def performance_report( + pytestconfig, + s3, + s3_performance_url, + s3_storage_options, + test_run_benchmark, + tmp_path, +): + if not test_run_benchmark: + yield contextlib.nullcontext + return + + if not pytestconfig.getoption("--performance-report"): + yield contextlib.nullcontext + return + + @contextlib.contextmanager + def _performance_report(): + try: + filename = f"{s3_performance_url}/performance_report.html.gz" + with distributed.performance_report( + filename=filename, storage_options=s3_storage_options + ): + yield + finally: + test_run_benchmark.performance_report_url = filename - @contextlib.contextmanager - def _performance_report(): - try: - filename = f"{s3_performance_url}/performance_report.html.gz" - with distributed.performance_report( - filename=filename, storage_options=s3_storage_options - ): - yield - finally: - test_run_benchmark.performance_report_url = filename - - yield _performance_report + yield _performance_report diff --git a/tests/geospatial/conftest.py b/tests/geospatial/conftest.py index 9af50f6624..d9ee49f591 100644 --- a/tests/geospatial/conftest.py +++ b/tests/geospatial/conftest.py @@ -28,7 +28,12 @@ def cluster_name(request, scale): @pytest.fixture() def client_factory( - cluster_name, github_cluster_tags, benchmark_all, memray_profile, performance_report + cluster_name, + github_cluster_tags, + benchmark_all, + py_spy_profile, + memray_profile, + performance_report, ): import contextlib @@ -45,9 +50,9 @@ def _(n_workers, env=None, **cluster_kwargs): with cluster.get_client() as client: # FIXME https://github.com/coiled/platform/issues/103 client.wait_for_workers(n_workers) - with performance_report(), memray_profile(client), benchmark_all( + with performance_report(), py_spy_profile(client), memray_profile( client - ): + ), benchmark_all(client): yield client return _