diff --git a/docs/getting_started/running_builders.md b/docs/getting_started/running_builders.md index a5f44c0aa..0f9e75737 100644 --- a/docs/getting_started/running_builders.md +++ b/docs/getting_started/running_builders.md @@ -100,7 +100,7 @@ mrun -n 2 -v my_builder.py ``` -## Running multiple builders +## Running Multiple Builders `mrun` can run multiple builders. You can have multiple builders in a single file: `json`, `python`, or `jupyter-notebook`. Or you can chain multiple files in the order you want to run them: ``` shell @@ -119,3 +119,25 @@ mrun -n 32 -vv my_first_builder.json builder_2_and_3.py last_builder.ipynb * `BUILD_ENDED` - This event tells us the build process finished this specific builder. It also indicates the total number of `errors` and `warnings` that were caught during the process. These event docs also contain the `builder`, a `build_id` which is unique for each time a builder is run and anonymous but unique ID for the machine the builder was run on. + + +## Profiling Memory Usage of Builders + +`mrun` can optionally profile the memory usage of a running builder by using the Memray Python memory profiling tool ([Memray](https://github.com/bloomberg/memray)). To get started, Memray should be installed in the same environment as `maggma` using `pip install memray`. + +Setting the `--memray` (`-m`) option to `on`, or `True`, will signal `mrun` to profile the memory usage of any builders passed to `mrun` as the builders are running. The profiler also supports profiling of both single and forked processes. For example, spawning multiple processes in `mrun` with `-n` will signal the profiler to track any forked child processes spawned from the parent process. + +A basic invocation of the memory profiler using the `mrun` command line tool would look like this: +``` shell +mrun --memray on my_builder.json +``` + +The profiler will generate two files after the builder finishes: +1. An output `.bin` file that is dumped by default into the `temp` directory, which is platform/OS dependent. For Linux/MacOS this will be `/tmp/` and for Windows the target directory will be `C:\TEMP\`.The output file will have a generic naming pattern as follows: `BUILDER_NAME_PASSED_TO_MRUN + BUILDER_START_DATETIME_ISO.bin`, e.g., `my_builder.json_2023-06-09T13:57:48.446361.bin`. +2. A `.html` flamegraph file that will be written to the same directory as the `.bin` dump file. The flamegraph will have a naming pattern similar to the following: `memray-flamegraph-my_builder.json_2023-06-09T13:57:48.446361.html`. The flamegraph can be viewed using any web browser. + +***Note***: Different platforms/operating systems purge their system's `temp` directory at different intervals. It is recommended to move at least the `.bin` file to a more stable location. The `.bin` file can be used to recreate the flamegraph at anytime using the Memray CLI. + +Using the flag `--memray-dir` (`-md`) allows for specifying an output directory for the `.bin` and `.html` files created by the profiler. The provided directory will be created if the directory does not exist, mimicking the `mkdir -p` command. + +Further data visualization and transform examples can be found in Memray's documentation ([Memray reporters](https://bloomberg.github.io/memray/live.html)). \ No newline at end of file diff --git a/requirements-optional.txt b/requirements-optional.txt index fb63c1a96..46d80e0d9 100644 --- a/requirements-optional.txt +++ b/requirements-optional.txt @@ -6,3 +6,4 @@ regex==2022.9.13 montydb==2.5.0 azure-storage-blob==12.16.0 azure-identity==1.12.0 +memray==1.7.0 diff --git a/setup.py b/setup.py index 2150d1bd7..f0c60dab2 100644 --- a/setup.py +++ b/setup.py @@ -45,13 +45,13 @@ "msgpack>=0.5.6", "orjson>=3.6.0", "boto3>=1.20.41", - "python-dateutil>=2.8.2" + "python-dateutil>=2.8.2", ], extras_require={ "vault": ["hvac>=0.9.5"], "montydb": ["montydb>=2.3.12"], "notebook_runner": ["IPython>=7.16", "nbformat>=5.0", "regex>=2020.6"], - "azure": ["azure-storage-blob>=12.16.0", "azure-identity>=1.12.0"] + "azure": ["azure-storage-blob>=12.16.0", "azure-identity>=1.12.0"], }, classifiers=[ "Programming Language :: Python :: 3", diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index 899ce1a34..13dc7d9ed 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -6,6 +6,7 @@ import logging import sys from itertools import chain +from datetime import datetime import click from monty.serialization import loadfn @@ -14,10 +15,14 @@ from maggma.cli.multiprocessing import multi from maggma.cli.serial import serial from maggma.cli.source_loader import ScriptFinder, load_builder_from_source +from maggma.cli.settings import CLISettings from maggma.utils import ReportingHandler, TqdmLoggingHandler + sys.meta_path.append(ScriptFinder()) - + +settings = CLISettings() + @click.command() @click.argument("builders", nargs=-1, type=click.Path(exists=True), required=True) @@ -44,14 +49,17 @@ help="Store in JSON/YAML form to send reporting data to", type=click.Path(exists=True), ) -@click.option("-u", "--url", "url", default=None, type=str, help="URL for the distributed manager") +@click.option( + "-u", "--url", "url", default=None, type=str, help="URL for the distributed manager" +) @click.option( "-p", "--port", "port", default=None, type=int, - help="Port for distributed communication." " mrun will find an open port if None is provided to the manager", + help="Port for distributed communication." + " mrun will find an open port if None is provided to the manager", ) @click.option( "-N", @@ -69,8 +77,12 @@ type=int, help="Number of distributed workers to process chunks", ) -@click.option("--no_bars", is_flag=True, help="Turns of Progress Bars for headless operations") -@click.option("--rabbitmq", is_flag=True, help="Enables the use of RabbitMQ as the work broker") +@click.option( + "--no_bars", is_flag=True, help="Turns of Progress Bars for headless operations" +) +@click.option( + "--rabbitmq", is_flag=True, help="Enables the use of RabbitMQ as the work broker" +) @click.option( "-q", "--queue_prefix", @@ -79,7 +91,27 @@ type=str, help="Prefix to use in queue names when RabbitMQ is select as the broker", ) +@click.option( + "-m", + "--memray", + "memray", + default=False, + type=bool, + help="Option to profile builder memory usage with Memray", +) +@click.option( + "-md", + "--memray-dir", + "memray_dir", + default=None, + type=str, + help="""Directory to dump memory profiler output files. Only runs if --memray is True. + Will create directory if directory does not exist, mimicking mkdir -p command. + If not provided files will be dumped to system's temp directory""", +) +@click.pass_context def run( + ctx, builders, verbosity, reporting_store, @@ -91,7 +123,39 @@ def run( num_processes, rabbitmq, queue_prefix, + memray, + memray_dir, + memray_file=None, + follow_fork=False, ): + # Import profiler and setup directories to dump profiler output + if memray: + from memray import Tracker, FileDestination + + if memray_dir: + import os + + os.makedirs(memray_dir, exist_ok=True) + + memray_file = f"{memray_dir}/{builders[0]}_{datetime.now().isoformat()}.bin" + else: + memray_file = ( + f"{settings.TEMP_DIR}/{builders[0]}_{datetime.now().isoformat()}.bin" + ) + + if num_processes > 1: + follow_fork = True + + # Click context manager handles creation and clean up of profiler dump files for memray tracker + ctx.obj = ctx.with_resource( + Tracker( + destination=FileDestination(memray_file), + native_traces=False, + trace_python_allocators=False, + follow_fork=follow_fork, + ) + ) + # Import proper manager and worker if rabbitmq: from maggma.cli.rabbitmq import manager, worker @@ -104,7 +168,9 @@ def run( root = logging.getLogger() root.setLevel(level) ch = TqdmLoggingHandler() - formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) ch.setFormatter(formatter) root.addHandler(ch) @@ -167,4 +233,11 @@ def run( else: loop = asyncio.get_event_loop() for builder in builder_objects: - loop.run_until_complete(multi(builder=builder, num_processes=num_processes, no_bars=no_bars)) + loop.run_until_complete( + multi(builder=builder, num_processes=num_processes, no_bars=no_bars) + ) + + if memray_file: + import subprocess + + subprocess.run(["memray", "flamegraph", memray_file], shell=False) diff --git a/src/maggma/cli/settings.py b/src/maggma/cli/settings.py index 98e745e60..882f5eb31 100644 --- a/src/maggma/cli/settings.py +++ b/src/maggma/cli/settings.py @@ -1,5 +1,10 @@ +import platform +import tempfile + from pydantic import BaseSettings, Field +tempdir = "/tmp" if platform.system() == "Darwin" else tempfile.gettempdir() + class CLISettings(BaseSettings): WORKER_TIMEOUT: int = Field( @@ -12,6 +17,11 @@ class CLISettings(BaseSettings): description="Timeout in seconds for the worker manager", ) + TEMP_DIR: str = Field( + tempdir, + description="Directory that memory profile .bin files are dumped to", + ) + class Config: env_prefix = "MAGGMA_" extra = "ignore" diff --git a/tests/cli/test_init.py b/tests/cli/test_init.py index e5ce5c03b..74756f929 100644 --- a/tests/cli/test_init.py +++ b/tests/cli/test_init.py @@ -1,3 +1,4 @@ +import os import shutil from datetime import datetime from pathlib import Path @@ -32,7 +33,6 @@ def reporting_store(): def test_basic_run(): - runner = CliRunner() result = runner.invoke(run, ["--help"]) assert result.exit_code == 0 @@ -43,7 +43,6 @@ def test_basic_run(): def test_run_builder(mongostore): - memorystore = MemoryStore("temp") builder = CopyBuilder(mongostore, memorystore) @@ -81,7 +80,6 @@ def test_run_builder(mongostore): def test_run_builder_chain(mongostore): - memorystore = MemoryStore("temp") builder1 = CopyBuilder(mongostore, memorystore) builder2 = CopyBuilder(mongostore, memorystore) @@ -120,7 +118,6 @@ def test_run_builder_chain(mongostore): def test_reporting(mongostore, reporting_store): - memorystore = MemoryStore("temp") builder = CopyBuilder(mongostore, memorystore) @@ -156,7 +153,6 @@ def test_reporting(mongostore, reporting_store): def test_python_source(): - runner = CliRunner() with runner.isolated_filesystem(): @@ -170,7 +166,6 @@ def test_python_source(): def test_python_notebook_source(): - runner = CliRunner() with runner.isolated_filesystem(): @@ -184,3 +179,65 @@ def test_python_notebook_source(): assert result.exit_code == 0 assert "Ended multiprocessing: DummyBuilder" in result.output + + +def test_memray_run_builder(mongostore): + memorystore = MemoryStore("temp") + builder = CopyBuilder(mongostore, memorystore) + + mongostore.update( + [ + {mongostore.key: i, mongostore.last_updated_field: datetime.utcnow()} + for i in range(10) + ] + ) + + runner = CliRunner() + with runner.isolated_filesystem(): + dumpfn(builder, "test_builder.json") + result = runner.invoke(run, ["-v", "--memray", "on", "test_builder.json"]) + assert result.exit_code == 0 + assert "CopyBuilder" in result.output + assert "SerialProcessor" in result.output + + result = runner.invoke( + run, ["-vvv", "--no_bars", "--memray", "on", "test_builder.json"] + ) + assert result.exit_code == 0 + assert "Get" not in result.output + assert "Update" not in result.output + + result = runner.invoke( + run, ["-v", "-n", "2", "--memray", "on", "test_builder.json"] + ) + assert result.exit_code == 0 + assert "CopyBuilder" in result.output + assert "MultiProcessor" in result.output + + result = runner.invoke( + run, ["-vvv", "-n", "2", "--no_bars", "--memray", "on", "test_builder.json"] + ) + assert result.exit_code == 0 + assert "Get" not in result.output + assert "Update" not in result.output + + +def test_memray_user_output_dir(mongostore): + memorystore = MemoryStore("temp") + builder = CopyBuilder(mongostore, memorystore) + + mongostore.update( + [ + {mongostore.key: i, mongostore.last_updated_field: datetime.utcnow()} + for i in range(10) + ] + ) + + runner = CliRunner() + with runner.isolated_filesystem(): + dumpfn(builder, "test_builder.json") + result = runner.invoke( + run, ["--memray", "on", "-md", "memray_output_dir/", "test_builder.json"] + ) + assert result.exit_code == 0 + assert (Path.cwd() / "memray_output_dir").exists() is True