Skip to content

Commit

Permalink
Allow py-spy profiling for geospatial benchmarks (#1572)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Nov 5, 2024
1 parent d1d776b commit fb33a0c
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Add column for py-spy profiles url
Revision ID: aa1fc9fdc665
Revises: 1095dfdfc4ae
Create Date: 2024-10-23 16:11:24.794416
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'aa1fc9fdc665'
down_revision = '1095dfdfc4ae'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column('test_run', sa.Column('py_spy_profiles_url', sa.String(), nullable=True))


def downgrade() -> None:
op.drop_column("test_run", "py_spy_profiles_url")
1 change: 1 addition & 0 deletions benchmark_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class TestRun(Base):
performance_report_url = Column(String, nullable=True) # Not yet collected
cluster_dump_url = Column(String, nullable=True)
memray_profiles_url = Column(String, nullable=True)
py_spy_profiles_url = Column(String, nullable=True)


class TPCHRun(Base):
Expand Down
157 changes: 118 additions & 39 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ def pytest_addoption(parser):
choices=("scheduler", "none"),
)

parser.addoption(
"--py-spy",
action="store",
default="none",
help="py-spy profiles to collect: scheduler, workers, all, or none",
choices=("scheduler", "workers", "all", "none"),
)


def pytest_sessionfinish(session, exitstatus):
# https://github.com/pytest-dev/pytest/issues/2393
Expand Down Expand Up @@ -670,12 +678,12 @@ def _(**exta_options):

@pytest.fixture(scope="session")
def s3_performance(s3):
profiles_url = f"{S3_BUCKET}/performance"
performance_url = f"{S3_BUCKET}/performance"
# Ensure that the performance directory exists,
# but do NOT remove it as multiple test runs could be
# accessing it at the same time
s3.mkdirs(profiles_url, exist_ok=True)
return profiles_url
s3.mkdirs(performance_url, exist_ok=True)
return performance_url


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -894,32 +902,33 @@ def memray_profile(

if memray_option == "none":
yield contextlib.nullcontext
elif memray_option != "scheduler":
return

if memray_option != "scheduler":
raise ValueError(f"Unhandled value for --memray: {memray_option}")
else:

@contextlib.contextmanager
def _memray_profile(client):
profiles_path = tmp_path / "profiles"
profiles_path.mkdir()
try:
with memray.memray_scheduler(directory=profiles_path):
yield
finally:
archive = tmp_path / "memray.tar.gz"
with tarfile.open(archive, mode="w:gz") as tar:
for item in profiles_path.iterdir():
tar.add(item, arcname=item.name)
test_run_benchmark.memray_profiles_url = (
f"{s3_performance_url}/{archive.name}"
)
s3.put(archive, s3_performance_url)

yield _memray_profile
@contextlib.contextmanager
def _memray_profile(client):
local_directory = tmp_path / "profiles" / "memray"
local_directory.mkdir(parents=True)
try:
with memray.memray_scheduler(directory=local_directory):
yield
finally:
archive_name = "memray.tar.gz"
archive = tmp_path / archive_name
with tarfile.open(archive, mode="w:gz") as tar:
for item in local_directory.iterdir():
tar.add(item, arcname=item.name)
destination = f"{s3_performance_url}/{archive_name}"
test_run_benchmark.memray_profiles_url = destination
s3.put_file(archive, destination)

yield _memray_profile


@pytest.fixture
def performance_report(
def py_spy_profile(
pytestconfig,
s3,
s3_performance_url,
Expand All @@ -929,20 +938,90 @@ def performance_report(
):
if not test_run_benchmark:
yield contextlib.nullcontext
return

py_spy_option = pytestconfig.getoption("--py-spy")
if py_spy_option == "none":
yield contextlib.nullcontext
return

profile_scheduler = False
profile_workers = False

if py_spy_option == "scheduler":
profile_scheduler = True
elif py_spy_option == "workers":
profile_workers = True
elif py_spy_option == "all":
profile_scheduler = True
profile_workers = True
else:
if not pytestconfig.getoption("--performance-report"):
yield contextlib.nullcontext
else:
raise ValueError(f"Unhandled value for --py-spy: {py_spy_option}")

try:
from dask_pyspy import pyspy, pyspy_on_scheduler
except ModuleNotFoundError as e:
raise ModuleNotFoundError(
"py-spy profiling benchmarks requires dask-pyspy to be installed."
) from e

@contextlib.contextmanager
def _py_spy_profile(client):
local_directory = tmp_path / "profiles" / "py-spy"
local_directory.mkdir(parents=True)

worker_ctx = contextlib.nullcontext()
if profile_workers:
worker_ctx = pyspy(local_directory, client=client)

scheduler_ctx = contextlib.nullcontext()
if profile_scheduler:
scheduler_ctx = pyspy_on_scheduler(
local_directory / "scheduler.json", client=client
)

try:
with worker_ctx, scheduler_ctx:
yield
finally:
archive_name = "py-spy.tar.gz"
archive = tmp_path / archive_name
with tarfile.open(archive, mode="w:gz") as tar:
for item in local_directory.iterdir():
tar.add(item, arcname=item.name)
destination = f"{s3_performance_url}/{archive_name}"
test_run_benchmark.py_spy_profiles_url = destination
s3.put_file(archive, destination)

yield _py_spy_profile


@pytest.fixture
def performance_report(
pytestconfig,
s3,
s3_performance_url,
s3_storage_options,
test_run_benchmark,
tmp_path,
):
if not test_run_benchmark:
yield contextlib.nullcontext
return

if not pytestconfig.getoption("--performance-report"):
yield contextlib.nullcontext
return

@contextlib.contextmanager
def _performance_report():
try:
filename = f"{s3_performance_url}/performance_report.html.gz"
with distributed.performance_report(
filename=filename, storage_options=s3_storage_options
):
yield
finally:
test_run_benchmark.performance_report_url = filename

@contextlib.contextmanager
def _performance_report():
try:
filename = f"{s3_performance_url}/performance_report.html.gz"
with distributed.performance_report(
filename=filename, storage_options=s3_storage_options
):
yield
finally:
test_run_benchmark.performance_report_url = filename

yield _performance_report
yield _performance_report
11 changes: 8 additions & 3 deletions tests/geospatial/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ def cluster_name(request, scale):

@pytest.fixture()
def client_factory(
cluster_name, github_cluster_tags, benchmark_all, memray_profile, performance_report
cluster_name,
github_cluster_tags,
benchmark_all,
py_spy_profile,
memray_profile,
performance_report,
):
import contextlib

Expand All @@ -45,9 +50,9 @@ def _(n_workers, env=None, **cluster_kwargs):
with cluster.get_client() as client:
# FIXME https://github.com/coiled/platform/issues/103
client.wait_for_workers(n_workers)
with performance_report(), memray_profile(client), benchmark_all(
with performance_report(), py_spy_profile(client), memray_profile(
client
):
), benchmark_all(client):
yield client

return _

0 comments on commit fb33a0c

Please sign in to comment.