Skip to content

Commit

Permalink
Merge pull request #612 from camsys/backports
Browse files Browse the repository at this point in the history
Operational updates
  • Loading branch information
jpn-- authored Dec 30, 2022
2 parents 1e8164b + f99a757 commit 58cf20e
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 11 deletions.
63 changes: 57 additions & 6 deletions activitysim/cli/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# ActivitySim
# See full license in LICENSE.txt.
import argparse
import importlib
import logging
import os
import sys
Expand All @@ -13,7 +14,13 @@
logger = logging.getLogger(__name__)


INJECTABLES = ["data_dir", "configs_dir", "output_dir", "settings_file_name"]
INJECTABLES = [
"data_dir",
"configs_dir",
"output_dir",
"settings_file_name",
"imported_extensions",
]


def add_run_args(parser, multiprocess=True):
Expand Down Expand Up @@ -64,6 +71,15 @@ def add_run_args(parser, multiprocess=True):
parser.add_argument(
"--households_sample_size", type=int, metavar="N", help="households sample size"
)
parser.add_argument(
"-e",
"--ext",
type=str,
action="append",
metavar="PATH",
help="Package of extension modules to load. Use of this option is not "
"generally secure.",
)

if multiprocess:
parser.add_argument(
Expand Down Expand Up @@ -111,6 +127,23 @@ def inject_arg(name, value, cache=False):
# 'configs', 'data', and 'output' folders by default
os.chdir(args.working_dir)

if args.ext:
for e in args.ext:
basepath, extpath = os.path.split(e)
if not basepath:
basepath = "."
sys.path.insert(0, os.path.abspath(basepath))
try:
importlib.import_module(extpath)
except ImportError as err:
logger.exception("ImportError")
raise
finally:
del sys.path[0]
inject_arg("imported_extensions", args.ext)
else:
inject_arg("imported_extensions", ())

# settings_file_name should be cached or else it gets squashed by config.py
if args.settings_file:
inject_arg("settings_file_name", args.settings_file, cache=True)
Expand Down Expand Up @@ -183,13 +216,24 @@ def run(args):
# other callers (e.g. populationsim) will have to arrange to register their own steps and injectables
# (presumably) in a custom run_simulation.py instead of using the 'activitysim run' command
if not inject.is_injectable("preload_injectables"):
from activitysim import ( # register abm steps and other abm-specific injectables
abm,
)
# register abm steps and other abm-specific injectables
from activitysim import abm # noqa: F401

tracing.config_logger(basic=True)
handle_standard_args(args) # possibly update injectables

if config.setting("memory_profile", False) and not config.setting(
"multiprocess", False
):
# Memory sidecar is only useful for single process runs
# multiprocess runs log memory usage without blocking in the controlling process.
mem_prof_log = config.log_file_path("memory_profile.csv")
from ..core.memory_sidecar import MemorySidecar

memory_sidecar_process = MemorySidecar(mem_prof_log)
else:
memory_sidecar_process = None

# legacy support for run_list setting nested 'models' and 'resume_after' settings
if config.setting("run_list"):
warnings.warn(
Expand Down Expand Up @@ -281,7 +325,11 @@ def run(args):
else:
logger.info("run single process simulation")

pipeline.run(models=config.setting("models"), resume_after=resume_after)
pipeline.run(
models=config.setting("models"),
resume_after=resume_after,
memory_sidecar_process=memory_sidecar_process,
)

if config.setting("cleanup_pipeline_after_run", False):
pipeline.cleanup_pipeline() # has side effect of closing open pipeline
Expand All @@ -300,12 +348,15 @@ def run(args):

tracing.print_elapsed_time("all models", t0)

if memory_sidecar_process:
memory_sidecar_process.stop()

return 0


if __name__ == "__main__":

from activitysim import abm # register injectables
from activitysim import abm # register injectables # noqa: F401

parser = argparse.ArgumentParser()
add_run_args(parser)
Expand Down
130 changes: 130 additions & 0 deletions activitysim/core/memory_sidecar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import datetime
import os
import time
from multiprocessing import Pipe, Process

import psutil


def record_memory_usage(
logstream, event="", event_idx=-1, measure_uss=False, measure_cpu=False, pid=None
):

if pid is None:
pid = os.getpid()
current_process = psutil.Process(pid)
with current_process.oneshot():
process_name = current_process.name()

if measure_uss:
try:
info = current_process.memory_full_info()
uss = info.uss
except (PermissionError, psutil.AccessDenied, RuntimeError):
info = current_process.memory_info()
uss = 0
else:
info = current_process.memory_info()
uss = 0

if measure_cpu:
cpu_pct = current_process.cpu_percent()
else:
cpu_pct = -1

full_rss = rss = info.rss

num_children = 0
for child in current_process.children(recursive=True):
try:
child_info = child.memory_info()
full_rss += child_info.rss
num_children += 1
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
pass

timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f") # sortable

print(
f"{process_name},"
f"{pid},"
f"{int(rss)},"
f"{int(full_rss)},"
f"{int(uss)},"
f"{cpu_pct},"
f"{event_idx},"
f"{event},"
f"{num_children},"
f"{timestamp}",
file=logstream,
)


def monitor_memory_usage(
pid,
conn,
interval=0.5,
flush_interval=5,
filename="/tmp/sidecar.csv",
measure_uss=True,
measure_cpu=True,
):
event = ""
event_idx = 0
last_flush = time.time()
if measure_cpu:
psutil.cpu_percent()
with open(filename, "w") as stream:
MEM_LOG_HEADER = (
"process,pid,rss,full_rss,uss,cpu,event_idx,event,children,time"
)
print(MEM_LOG_HEADER, file=stream)
while True:
# timestamp = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f") # sortable
# print(f"{timestamp} [{event}]", file=stream)
record_memory_usage(
stream,
event=event,
event_idx=event_idx,
measure_uss=measure_uss,
measure_cpu=measure_cpu,
pid=pid,
)
if conn.poll(interval):
event_ = conn.recv()
if event_ != event:
event_idx += 1
event = event_
else:
pass
now = time.time()
if now > last_flush + flush_interval:
stream.flush()
last_flush = now
if event == "STOP":
stream.flush()
break


class MemorySidecar:
def __init__(self, filename="/tmp/sidecar.csv"):
self.local_conn, child_conn = Pipe()
self.sidecar_process = Process(
target=monitor_memory_usage,
args=(os.getpid(), child_conn),
kwargs=dict(filename=filename),
)
self.sidecar_process.start()

def stop(self):
self.set_event("STOP")
self.sidecar_process.join(timeout=5)
if self.sidecar_process.exitcode is None:
self.sidecar_process.kill()
print("memory sidecar stopped")

def set_event(self, event):
try:
self.local_conn.send(str(event))
except BrokenPipeError:
pass
20 changes: 17 additions & 3 deletions activitysim/core/mp_tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# ActivitySim
# See full license in LICENSE.txt.
import importlib
import logging
import multiprocessing
import os
Expand Down Expand Up @@ -733,15 +734,28 @@ def setup_injectables_and_logging(injectables, locutor=True):
# other callers (e.g. piopulationsim) will have to arrange to register their own steps and injectables
# (presumably) in a custom run_simulation.py instead of using the 'activitysim run' command
if not inject.is_injectable("preload_injectables"):
from activitysim import ( # register abm steps and other abm-specific injectables
abm,
)
# register abm steps and other abm-specific injectables
from activitysim import abm # noqa: F401

try:

for k, v in injectables.items():
inject.add_injectable(k, v)

ext = inject.get_injectable("imported_extensions", default=())
for e in ext:
basepath, extpath = os.path.split(e)
if not basepath:
basepath = "."
sys.path.insert(0, basepath)
try:
importlib.import_module(e)
except ImportError as err:
logger.exception("ImportError")
raise
finally:
del sys.path[0]

inject.add_injectable("is_sub_task", True)
inject.add_injectable("locutor", locutor)

Expand Down
21 changes: 19 additions & 2 deletions activitysim/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,15 @@ def open_pipeline(resume_after=None, mode="a"):
# open existing pipeline
logger.debug("open_pipeline - open existing pipeline")
open_pipeline_store(overwrite=False, mode=mode)
load_checkpoint(resume_after)
try:
load_checkpoint(resume_after)
except KeyError as err:
if "checkpoints" in err.args[0]:
# no checkpoints initialized, fall back to restart
_PIPELINE.last_checkpoint[CHECKPOINT_NAME] = INITIAL_CHECKPOINT_NAME
add_checkpoint(INITIAL_CHECKPOINT_NAME)
else:
raise
else:
# open new, empty pipeline
logger.debug("open_pipeline - new, empty pipeline")
Expand Down Expand Up @@ -601,7 +609,7 @@ def intermediate_checkpoint(checkpoint_name=None):
return checkpoint_name in checkpoints


def run(models, resume_after=None):
def run(models, resume_after=None, memory_sidecar_process=None):
"""
run the specified list of models, optionally loading checkpoint and resuming after specified
checkpoint.
Expand All @@ -618,6 +626,8 @@ def run(models, resume_after=None):
list of model_names
resume_after : str or None
model_name of checkpoint to load checkpoint and AFTER WHICH to resume model run
memory_sidecar_process : MemorySidecar, optional
Subprocess that monitors memory usage
returns:
nothing, but with pipeline open
Expand All @@ -640,18 +650,25 @@ def run(models, resume_after=None):

# preload any bulky injectables (e.g. skims) not in pipeline
if inject.get_injectable("preload_injectables", None):
if memory_sidecar_process:
memory_sidecar_process.set_event("preload_injectables")
t0 = print_elapsed_time("preload_injectables", t0)

mem.trace_memory_info("pipeline.run after preload_injectables")

t0 = print_elapsed_time()
for model in models:
if memory_sidecar_process:
memory_sidecar_process.set_event(model)
t1 = print_elapsed_time()
run_model(model)
mem.trace_memory_info(f"pipeline.run after {model}")

tracing.log_runtime(model_name=model, start_time=t1)

if memory_sidecar_process:
memory_sidecar_process.set_event("finalizing")

# add checkpoint with final tables even if not intermediate checkpointing
if not intermediate_checkpoint():
add_checkpoint(FINAL_CHECKPOINT_NAME)
Expand Down

0 comments on commit 58cf20e

Please sign in to comment.