diff --git a/activitysim/cli/run.py b/activitysim/cli/run.py index d5c464950..eed1321e5 100644 --- a/activitysim/cli/run.py +++ b/activitysim/cli/run.py @@ -1,6 +1,7 @@ # ActivitySim # See full license in LICENSE.txt. import argparse +import importlib import logging import os import sys @@ -13,7 +14,13 @@ logger = logging.getLogger(__name__) -INJECTABLES = ["data_dir", "configs_dir", "output_dir", "settings_file_name"] +INJECTABLES = [ + "data_dir", + "configs_dir", + "output_dir", + "settings_file_name", + "imported_extensions", +] def add_run_args(parser, multiprocess=True): @@ -64,6 +71,15 @@ def add_run_args(parser, multiprocess=True): parser.add_argument( "--households_sample_size", type=int, metavar="N", help="households sample size" ) + parser.add_argument( + "-e", + "--ext", + type=str, + action="append", + metavar="PATH", + help="Package of extension modules to load. Use of this option is not " + "generally secure.", + ) if multiprocess: parser.add_argument( @@ -111,6 +127,23 @@ def inject_arg(name, value, cache=False): # 'configs', 'data', and 'output' folders by default os.chdir(args.working_dir) + if args.ext: + for e in args.ext: + basepath, extpath = os.path.split(e) + if not basepath: + basepath = "." + sys.path.insert(0, os.path.abspath(basepath)) + try: + importlib.import_module(extpath) + except ImportError as err: + logger.exception("ImportError") + raise + finally: + del sys.path[0] + inject_arg("imported_extensions", args.ext) + else: + inject_arg("imported_extensions", ()) + # settings_file_name should be cached or else it gets squashed by config.py if args.settings_file: inject_arg("settings_file_name", args.settings_file, cache=True) @@ -183,13 +216,24 @@ def run(args): # other callers (e.g. populationsim) will have to arrange to register their own steps and injectables # (presumably) in a custom run_simulation.py instead of using the 'activitysim run' command if not inject.is_injectable("preload_injectables"): - from activitysim import ( # register abm steps and other abm-specific injectables - abm, - ) + # register abm steps and other abm-specific injectables + from activitysim import abm # noqa: F401 tracing.config_logger(basic=True) handle_standard_args(args) # possibly update injectables + if config.setting("memory_profile", False) and not config.setting( + "multiprocess", False + ): + # Memory sidecar is only useful for single process runs + # multiprocess runs log memory usage without blocking in the controlling process. + mem_prof_log = config.log_file_path("memory_profile.csv") + from ..core.memory_sidecar import MemorySidecar + + memory_sidecar_process = MemorySidecar(mem_prof_log) + else: + memory_sidecar_process = None + # legacy support for run_list setting nested 'models' and 'resume_after' settings if config.setting("run_list"): warnings.warn( @@ -281,7 +325,11 @@ def run(args): else: logger.info("run single process simulation") - pipeline.run(models=config.setting("models"), resume_after=resume_after) + pipeline.run( + models=config.setting("models"), + resume_after=resume_after, + memory_sidecar_process=memory_sidecar_process, + ) if config.setting("cleanup_pipeline_after_run", False): pipeline.cleanup_pipeline() # has side effect of closing open pipeline @@ -300,12 +348,15 @@ def run(args): tracing.print_elapsed_time("all models", t0) + if memory_sidecar_process: + memory_sidecar_process.stop() + return 0 if __name__ == "__main__": - from activitysim import abm # register injectables + from activitysim import abm # register injectables # noqa: F401 parser = argparse.ArgumentParser() add_run_args(parser) diff --git a/activitysim/core/memory_sidecar.py b/activitysim/core/memory_sidecar.py new file mode 100644 index 000000000..b1d8816e3 --- /dev/null +++ b/activitysim/core/memory_sidecar.py @@ -0,0 +1,130 @@ +import datetime +import os +import time +from multiprocessing import Pipe, Process + +import psutil + + +def record_memory_usage( + logstream, event="", event_idx=-1, measure_uss=False, measure_cpu=False, pid=None +): + + if pid is None: + pid = os.getpid() + current_process = psutil.Process(pid) + with current_process.oneshot(): + process_name = current_process.name() + + if measure_uss: + try: + info = current_process.memory_full_info() + uss = info.uss + except (PermissionError, psutil.AccessDenied, RuntimeError): + info = current_process.memory_info() + uss = 0 + else: + info = current_process.memory_info() + uss = 0 + + if measure_cpu: + cpu_pct = current_process.cpu_percent() + else: + cpu_pct = -1 + + full_rss = rss = info.rss + + num_children = 0 + for child in current_process.children(recursive=True): + try: + child_info = child.memory_info() + full_rss += child_info.rss + num_children += 1 + except (psutil.NoSuchProcess, psutil.AccessDenied) as e: + pass + + timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f") # sortable + + print( + f"{process_name}," + f"{pid}," + f"{int(rss)}," + f"{int(full_rss)}," + f"{int(uss)}," + f"{cpu_pct}," + f"{event_idx}," + f"{event}," + f"{num_children}," + f"{timestamp}", + file=logstream, + ) + + +def monitor_memory_usage( + pid, + conn, + interval=0.5, + flush_interval=5, + filename="/tmp/sidecar.csv", + measure_uss=True, + measure_cpu=True, +): + event = "" + event_idx = 0 + last_flush = time.time() + if measure_cpu: + psutil.cpu_percent() + with open(filename, "w") as stream: + MEM_LOG_HEADER = ( + "process,pid,rss,full_rss,uss,cpu,event_idx,event,children,time" + ) + print(MEM_LOG_HEADER, file=stream) + while True: + # timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f") # sortable + # print(f"{timestamp} [{event}]", file=stream) + record_memory_usage( + stream, + event=event, + event_idx=event_idx, + measure_uss=measure_uss, + measure_cpu=measure_cpu, + pid=pid, + ) + if conn.poll(interval): + event_ = conn.recv() + if event_ != event: + event_idx += 1 + event = event_ + else: + pass + now = time.time() + if now > last_flush + flush_interval: + stream.flush() + last_flush = now + if event == "STOP": + stream.flush() + break + + +class MemorySidecar: + def __init__(self, filename="/tmp/sidecar.csv"): + self.local_conn, child_conn = Pipe() + self.sidecar_process = Process( + target=monitor_memory_usage, + args=(os.getpid(), child_conn), + kwargs=dict(filename=filename), + ) + self.sidecar_process.start() + + def stop(self): + self.set_event("STOP") + self.sidecar_process.join(timeout=5) + if self.sidecar_process.exitcode is None: + self.sidecar_process.kill() + print("memory sidecar stopped") + + def set_event(self, event): + try: + self.local_conn.send(str(event)) + except BrokenPipeError: + pass diff --git a/activitysim/core/mp_tasks.py b/activitysim/core/mp_tasks.py index 429ff2e86..eab883910 100644 --- a/activitysim/core/mp_tasks.py +++ b/activitysim/core/mp_tasks.py @@ -1,5 +1,6 @@ # ActivitySim # See full license in LICENSE.txt. +import importlib import logging import multiprocessing import os @@ -733,15 +734,28 @@ def setup_injectables_and_logging(injectables, locutor=True): # other callers (e.g. piopulationsim) will have to arrange to register their own steps and injectables # (presumably) in a custom run_simulation.py instead of using the 'activitysim run' command if not inject.is_injectable("preload_injectables"): - from activitysim import ( # register abm steps and other abm-specific injectables - abm, - ) + # register abm steps and other abm-specific injectables + from activitysim import abm # noqa: F401 try: for k, v in injectables.items(): inject.add_injectable(k, v) + ext = inject.get_injectable("imported_extensions", default=()) + for e in ext: + basepath, extpath = os.path.split(e) + if not basepath: + basepath = "." + sys.path.insert(0, basepath) + try: + importlib.import_module(e) + except ImportError as err: + logger.exception("ImportError") + raise + finally: + del sys.path[0] + inject.add_injectable("is_sub_task", True) inject.add_injectable("locutor", locutor) diff --git a/activitysim/core/pipeline.py b/activitysim/core/pipeline.py index ad39cdcfe..08cfb30a7 100644 --- a/activitysim/core/pipeline.py +++ b/activitysim/core/pipeline.py @@ -543,7 +543,15 @@ def open_pipeline(resume_after=None, mode="a"): # open existing pipeline logger.debug("open_pipeline - open existing pipeline") open_pipeline_store(overwrite=False, mode=mode) - load_checkpoint(resume_after) + try: + load_checkpoint(resume_after) + except KeyError as err: + if "checkpoints" in err.args[0]: + # no checkpoints initialized, fall back to restart + _PIPELINE.last_checkpoint[CHECKPOINT_NAME] = INITIAL_CHECKPOINT_NAME + add_checkpoint(INITIAL_CHECKPOINT_NAME) + else: + raise else: # open new, empty pipeline logger.debug("open_pipeline - new, empty pipeline") @@ -601,7 +609,7 @@ def intermediate_checkpoint(checkpoint_name=None): return checkpoint_name in checkpoints -def run(models, resume_after=None): +def run(models, resume_after=None, memory_sidecar_process=None): """ run the specified list of models, optionally loading checkpoint and resuming after specified checkpoint. @@ -618,6 +626,8 @@ def run(models, resume_after=None): list of model_names resume_after : str or None model_name of checkpoint to load checkpoint and AFTER WHICH to resume model run + memory_sidecar_process : MemorySidecar, optional + Subprocess that monitors memory usage returns: nothing, but with pipeline open @@ -640,18 +650,25 @@ def run(models, resume_after=None): # preload any bulky injectables (e.g. skims) not in pipeline if inject.get_injectable("preload_injectables", None): + if memory_sidecar_process: + memory_sidecar_process.set_event("preload_injectables") t0 = print_elapsed_time("preload_injectables", t0) mem.trace_memory_info("pipeline.run after preload_injectables") t0 = print_elapsed_time() for model in models: + if memory_sidecar_process: + memory_sidecar_process.set_event(model) t1 = print_elapsed_time() run_model(model) mem.trace_memory_info(f"pipeline.run after {model}") tracing.log_runtime(model_name=model, start_time=t1) + if memory_sidecar_process: + memory_sidecar_process.set_event("finalizing") + # add checkpoint with final tables even if not intermediate checkpointing if not intermediate_checkpoint(): add_checkpoint(FINAL_CHECKPOINT_NAME)