From b06b05986c0f987b64f00d6831af67c62cc3ba2c Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Tue, 3 Sep 2024 16:08:26 -0700 Subject: [PATCH 01/11] Initial commit to parallelize the reprojection step across many small workers. --- src/kbmod_wf/parallel_repro_single_chip_wf.py | 190 ++++++++++++++++++ .../resource_configs/klone_configuration.py | 20 ++ ...oject_single_chip_single_night_wu_shard.py | 94 +++++++++ 3 files changed, 304 insertions(+) create mode 100644 src/kbmod_wf/parallel_repro_single_chip_wf.py create mode 100644 src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py diff --git a/src/kbmod_wf/parallel_repro_single_chip_wf.py b/src/kbmod_wf/parallel_repro_single_chip_wf.py new file mode 100644 index 0000000..85b9489 --- /dev/null +++ b/src/kbmod_wf/parallel_repro_single_chip_wf.py @@ -0,0 +1,190 @@ +import argparse +import os +from pathlib import Path + +import toml +import parsl +from parsl import join_app, python_app, File +import parsl.executors + +from kbmod_wf.utilities import ( + apply_runtime_updates, + get_resource_config, + get_executors, + get_configured_logger, +) + +from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, kbmod_search + + +# There's still a ton of duplicated code here and in kbmod_wf.workflow_tasks.reproject_wu +# that should be refactored. +# The only difference is the import of reproject_single_chip_single_night_wu here. +@join_app( + cache=True, + executors=get_executors(["local_dev_testing", "sharded_reproject"]), + ignore_for_cache=["logging_file"], +) +def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): + from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger + + logger = get_configured_logger("task.reproject_wu", logging_file.filepath) + + logger.info("Starting reproject_ic") + with ErrorLogger(logger): + future = sharded_reproject( + original_wu_filepath=inputs[0].filepath, + reprojected_wu_filepath=outputs[0].filepath, + runtime_config=runtime_config, + logger=logger, + ) + logger.info("Completed reproject_ic") + return future + + +@python_app( + cache=True, + executors=get_executors(["local_dev_testing", "sharded_reproject"]), + ignore_for_cache=["logging_file"], +) +def sharded_reproject(inputs=(), outputs=(), runtime_config={}, logging_file=None): + from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger + + logger = get_configured_logger("task.sharded_reproject", logging_file.filepath) + + from kbmod_wf.task_impls.reproject_single_chip_single_night_wu_shard import reproject_wu_shard + + logger.info("Starting reproject_ic") + with ErrorLogger(logger): + reproject_wu_shard( + original_wu_filepath=inputs[0].filepath, + reprojected_wu_filepath=outputs[0].filepath, + runtime_config=runtime_config, + logger=logger, + ) + logger.info("Completed reproject_ic") + return outputs[0] + + +def workflow_runner(env=None, runtime_config={}): + """This function will load and configure Parsl, and run the workflow. + + Parameters + ---------- + env : str, optional + Environment string used to define which resource configuration to use, + by default None + runtime_config : dict, optional + Dictionary of assorted runtime configuration parameters, by default {} + """ + resource_config = get_resource_config(env=env) + resource_config = apply_runtime_updates(resource_config, runtime_config) + + app_configs = runtime_config.get("apps", {}) + + dfk = parsl.load(resource_config) + if dfk: + logging_file = File(os.path.join(dfk.run_dir, "kbmod.log")) + logger = get_configured_logger("workflow.workflow_runner", logging_file.filepath) + + if runtime_config is not None: + logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}") + + logger.info("Starting workflow") + + # gather all the *.collection files that are staged for processing + create_manifest_config = app_configs.get("create_manifest", {}) + manifest_file = File( + os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt") + ) + create_manifest_future = create_manifest( + inputs=[], + outputs=[manifest_file], + runtime_config=app_configs.get("create_manifest", {}), + logging_file=logging_file, + ) + + with open(create_manifest_future.result(), "r") as f: + # process each .collection file in the manifest into a .wu file + original_work_unit_futures = [] + for line in f: + # Create path object for the line in the manifest + input_file = Path(line.strip()) + + # Create a directory for the sharded work unit files + sharded_directory = Path(input_file.parent, input_file.stem) + sharded_directory.mkdir(exist_ok=True) + + # Create the work unit filepath + output_workunit_filepath = Path(sharded_directory, input_file.stem + ".wu") + + # Create the work unit future + original_work_unit_futures.append( + ic_to_wu( + inputs=[input_file], + outputs=[File(output_workunit_filepath)], + runtime_config=app_configs.get("ic_to_wu", {}), + logging_file=logging_file, + ) + ) + + # reproject each WorkUnit + # For chip-by-chip, this isn't really necessary, so hardcoding to 0. + reproject_futures = [] + for f in original_work_unit_futures: + distance = 0 + + unique_obstimes, unique_obstimes_indices = work_unit.get_unique_obstimes_and_indices() + + reproject_futures.append( + reproject_wu( + inputs=[f.result()], + outputs=[File(f.result().filepath + f".{distance}.repro")], + runtime_config=app_configs.get("reproject_wu", {}), + logging_file=logging_file, + ) + ) + + # run kbmod search on each reprojected WorkUnit + search_futures = [] + for f in reproject_futures: + search_futures.append( + kbmod_search( + inputs=[f.result()], + outputs=[File(f.result().filepath + ".search.ecsv")], + runtime_config=app_configs.get("kbmod_search", {}), + logging_file=logging_file, + ) + ) + + [f.result() for f in search_futures] + + logger.info("Workflow complete") + + parsl.clear() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--env", + type=str, + choices=["dev", "klone"], + help="The environment to run the workflow in.", + ) + + parser.add_argument( + "--runtime-config", + type=str, + help="The complete runtime configuration filepath to use for the workflow.", + ) + + args = parser.parse_args() + + # if a runtime_config file was provided and exists, load the toml as a dict. + runtime_config = {} + if args.runtime_config is not None and os.path.exists(args.runtime_config): + with open(args.runtime_config, "r") as toml_runtime_config: + runtime_config = toml.load(toml_runtime_config) + + workflow_runner(env=args.env, runtime_config=runtime_config) diff --git a/src/kbmod_wf/resource_configs/klone_configuration.py b/src/kbmod_wf/resource_configs/klone_configuration.py index 953622b..f76a978 100644 --- a/src/kbmod_wf/resource_configs/klone_configuration.py +++ b/src/kbmod_wf/resource_configs/klone_configuration.py @@ -9,6 +9,7 @@ "compute_bigmem": "01:00:00", "large_mem": "04:00:00", "sharded_reproject": "04:00:00", + "parallel_reproject": "00:30:00", "gpu_max": "08:00:00", } @@ -80,6 +81,25 @@ def klone_resource_config(): worker_init="", ), ), + HighThroughputExecutor( + label="parallel_reproject", + max_workers=1, + provider=SlurmProvider( + partition="ckpt-g2", + account="astro", + min_blocks=0, + max_blocks=2, + init_blocks=0, + parallelism=1, + nodes_per_block=1, + cores_per_node=1, + mem_per_node=2, # ~2-4 GB per core + exclusive=False, + walltime=walltimes["parallel_reproject"], + # Command to run before starting worker - i.e. conda activate + worker_init="", + ), + ), HighThroughputExecutor( label="gpu", max_workers=1, diff --git a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py new file mode 100644 index 0000000..ce089c4 --- /dev/null +++ b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py @@ -0,0 +1,94 @@ +import kbmod +from kbmod.work_unit import WorkUnit + +import kbmod.reprojection as reprojection + +from reproject.mosaicking import find_optimal_celestial_wcs +import os +import time +from logging import Logger + + +def reproject_wu_shard( + original_wu_shard_filepath: str = None, + reprojected_wu_shard_filepath: str = None, + runtime_config: dict = {}, + logger: Logger = None, +): + """This task will reproject a WorkUnit to a common WCS. + + Parameters + ---------- + original_wu_shard_filepath : str, optional + The fully resolved filepath to the input WorkUnit file, by default None + reprojected_wu_shard_filepath : str, optional + The fully resolved filepath to the resulting WorkUnit file after + reprojection, by default None + runtime_config : dict, optional + Additional configuration parameters to be used at runtime, by default {} + logger : Logger, optional + Primary logger for the workflow, by default None + + Returns + ------- + str + The fully resolved filepath of the resulting WorkUnit file after reflex + and reprojection. + """ + wu_shard_reprojector = WUShardReprojector( + original_wu_filepath=original_wu_shard_filepath, + reprojected_wu_filepath=reprojected_wu_shard_filepath, + runtime_config=runtime_config, + logger=logger, + ) + + return wu_shard_reprojector.reproject_workunit() + + +class WUShardReprojector: + def __init__( + self, + original_wu_filepath: str = None, + reprojected_wu_filepath: str = None, + runtime_config: dict = {}, + logger: Logger = None, + ): + self.original_wu_filepath = original_wu_filepath + self.reprojected_wu_filepath = reprojected_wu_filepath + self.runtime_config = runtime_config + self.logger = logger + + # Default to 8 workers if not in the config. Value must be 0 Date: Wed, 4 Sep 2024 21:09:30 -0700 Subject: [PATCH 02/11] Adding a temporary testing workflow to prove out that the approach to Futures works correctly. --- example_runtime_config.toml | 2 +- .../resource_configs/dev_configuration.py | 11 +- src/kbmod_wf/test_workflow.py | 257 ++++++++++++++++++ 3 files changed, 259 insertions(+), 11 deletions(-) create mode 100644 src/kbmod_wf/test_workflow.py diff --git a/example_runtime_config.toml b/example_runtime_config.toml index a33daf1..2956ffb 100644 --- a/example_runtime_config.toml +++ b/example_runtime_config.toml @@ -12,7 +12,7 @@ checkpoint_mode = 'task_exit' # The path to the staging directory # e.g. "/gscratch/dirac/kbmod/workflow/staging" staging_directory = "/home/drew/code/kbmod-wf/dev_staging" -output_directory = "/home/drew/code/kbmod-wf/dev_staging/single_chip_workflow" +output_directory = "/home/drew/code/kbmod-wf/dev_staging/processing" file_pattern = "*.collection" diff --git a/src/kbmod_wf/resource_configs/dev_configuration.py b/src/kbmod_wf/resource_configs/dev_configuration.py index b003b62..b677f8a 100644 --- a/src/kbmod_wf/resource_configs/dev_configuration.py +++ b/src/kbmod_wf/resource_configs/dev_configuration.py @@ -13,14 +13,5 @@ def dev_resource_config(): return Config( # put the log files in in the top level folder, "run_logs". run_dir=os.path.join(project_dir, "run_logs", datetime.date.today().isoformat()), - app_cache=True, - checkpoint_mode="task_exit", - checkpoint_files=get_all_checkpoints( - os.path.join(project_dir, "run_logs", datetime.date.today().isoformat()) - ), - executors=[ - ThreadPoolExecutor( - label="local_dev_testing", - ) - ], + executors=[ThreadPoolExecutor(label="local_dev_testing", max_threads=3)], ) diff --git a/src/kbmod_wf/test_workflow.py b/src/kbmod_wf/test_workflow.py new file mode 100644 index 0000000..038e0a4 --- /dev/null +++ b/src/kbmod_wf/test_workflow.py @@ -0,0 +1,257 @@ +import argparse +import os +from pathlib import Path + +import toml +import parsl +from parsl import python_app, File +import parsl.executors + +from kbmod_wf.utilities import ( + apply_runtime_updates, + get_resource_config, + get_executors, + get_configured_logger, +) + + +@python_app(executors=get_executors(["local_dev_testing", "local_thread"])) +def create_manifest(inputs=(), outputs=(), runtime_config={}, logging_file=None): + import glob + import os + import shutil + + from kbmod_wf.utilities.logger_utilities import get_configured_logger + + logger = get_configured_logger("task.create_manifest", logging_file.filepath) + + directory_path = runtime_config.get("staging_directory") + output_path = runtime_config.get("output_directory") + + if directory_path is None: + logger.error(f"No staging_directory provided in the configuration.") + raise ValueError("No staging_directory provided in the configuration.") + + if output_path is None: + logger.info( + f"No output_directory provided in the configuration. Using staging directory: {directory_path}" + ) + output_path = directory_path + + if not os.path.exists(output_path): + logger.info(f"Creating output directory: {output_path}") + os.makedirs(output_path) + + logger.info(f"Looking for staged files in {directory_path}") + + # Gather all the *.collection entries in the directory + file_pattern = runtime_config.get("file_pattern", "*.collection") + pattern = os.path.join(directory_path, file_pattern) + entries = glob.glob(pattern) + + # Filter out directories, keep only files + # Copy files to the output directory, and adds them to the list of files + files = [] + for f in entries: + if os.path.isfile(os.path.join(directory_path, f)): + files.append(shutil.copy2(f, output_path)) + + logger.info(f"Found {len(files)} files in {directory_path}") + + # Write the filenames to the manifest file + logger.info(f"Writing manifest file: {outputs[0].filepath}") + with open(outputs[0].filepath, "w") as manifest_file: + for file in files: + manifest_file.write(file + "\n") + + return outputs[0] + + +@python_app(executors=get_executors(["local_dev_testing"])) +def ic_to_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): + from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger + + logger = get_configured_logger("task.ic_to_wu", logging_file) + + logger.info("Starting ic_to_wu") + with ErrorLogger(logger): + # open the file + input_file = Path(inputs[0]) + output_files = [] + with open(input_file, "r") as input_f: + # read the number in the file + data = input_f.read() + + # create a directory for the output with the same name as the input file + output_directory = input_file.parent / input_file.stem + output_directory.mkdir(exist_ok=True) + + # create indexed output files + for i in range(0, int(data)): + output_file = output_directory / f"{i}_{input_file.stem}.shard" + with open(output_file, "w") as output_f: + output_f.write(f"Work Unit shard: {i}") + output_files.append(output_file) + + logger.info("Completed ic_to_wu") + + return output_files + + +@python_app(executors=get_executors(["local_dev_testing"])) +def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): + from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger + from time import sleep + import random + + logger = get_configured_logger("task.ic_to_wu", logging_file) + + logger.info("Starting reproject_wu") + sleep(random.uniform(1, 5)) + with open(outputs[0].filepath, "w") as f: + f.write(f"Reprojected: {inputs[0]}") + + logger.info(f"Reprojected: {inputs[0]}") + + return (inputs[0].parent, outputs[0]) + + +@python_app(executors=get_executors(["local_dev_testing"])) +def kbmod_search(inputs=(), outputs=(), runtime_config={}, logging_file=None): + from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger + + logger = get_configured_logger("task.kbmod_search", logging_file) + + logger.info("Starting kbmod_search") + logger.info("Pretending to run kbmod search") + output_directory = Path(inputs[0][0]) + logger.info(f"Output directory: {output_directory}") + search_file = output_directory / (output_directory.stem + "_search.ecsv") + with open(search_file, "w") as f: + f.write("Pretending to run kbmod search") + logger.info("Completed kbmod_search") + + return search_file + + +def workflow_runner(env=None, runtime_config={}): + """This function will load and configure Parsl, and run the workflow. + + Parameters + ---------- + env : str, optional + Environment string used to define which resource configuration to use, + by default None + runtime_config : dict, optional + Dictionary of assorted runtime configuration parameters, by default {} + """ + resource_config = get_resource_config(env=env) + resource_config = apply_runtime_updates(resource_config, runtime_config) + + app_configs = runtime_config.get("apps", {}) + + dfk = parsl.load(resource_config) + if dfk: + logging_file = File(os.path.join(dfk.run_dir, "kbmod.log")) + logger = get_configured_logger("workflow.workflow_runner", logging_file.filepath) + + if runtime_config is not None: + logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}") + + logger.info("Starting workflow") + + # gather all the *.collection files that are staged for processing + create_manifest_config = app_configs.get("create_manifest", {}) + manifest_file = File( + os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt") + ) + create_manifest_future = create_manifest( + inputs=[], + outputs=[manifest_file], + runtime_config=create_manifest_config, + logging_file=logging_file, + ) + + with open(create_manifest_future.result(), "r") as f: + # process each .collection file in the manifest into a .wu file + original_work_unit_futures = [] + for line in f: + # Create path object for the line in the manifest + input_file = Path(line.strip()) + + # Create a directory for the sharded work unit files + sharded_directory = Path(input_file.parent, input_file.stem) + sharded_directory.mkdir(exist_ok=True) + + # Create the work unit filepath + output_workunit_filepath = Path(sharded_directory, input_file.stem + ".wu") + + # Create the work unit future + original_work_unit_futures.append( + ic_to_wu( + inputs=[input_file], + outputs=[File(output_workunit_filepath)], + runtime_config=app_configs.get("ic_to_wu", {}), + logging_file=logging_file, + ) + ) + + reprojected_wu_futures = [] + for f in original_work_unit_futures: + shard_futures = [] + for i in f.result(): + shard_future = reproject_wu( + inputs=[i], + outputs=[File(i.parent / (i.stem + ".repro"))], + runtime_config=app_configs.get("reproject_wu", {}), + logging_file=logging_file, + ) + shard_futures.append(shard_future) + reprojected_wu_futures.append(shard_futures) + + # run kbmod search on each reprojected WorkUnit + search_futures = [] + for f in reprojected_wu_futures: + search_futures.append( + kbmod_search( + inputs=[i.result() for i in f], #! This, surprisingly, seems to work!!! + outputs=[], + runtime_config=app_configs.get("kbmod_search", {}), + logging_file=logging_file, + ) + ) + + [f.result() for f in search_futures] + + logger.info("Workflow complete") + + parsl.clear() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--env", + type=str, + choices=["dev", "klone"], + help="The environment to run the workflow in.", + ) + + parser.add_argument( + "--runtime-config", + type=str, + help="The complete runtime configuration filepath to use for the workflow.", + ) + + args = parser.parse_args() + + # if a runtime_config file was provided and exists, load the toml as a dict. + runtime_config = {} + + #! Don't forget to remove this hardcoded path!!! + args.runtime_config = "/home/drew/code/kbmod-wf/example_runtime_config.toml" + if args.runtime_config is not None and os.path.exists(args.runtime_config): + with open(args.runtime_config, "r") as toml_runtime_config: + runtime_config = toml.load(toml_runtime_config) + + workflow_runner(env=args.env, runtime_config=runtime_config) From fb2b9a4b7a7ad89cffe7419aa94c1676d1639104 Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Thu, 5 Sep 2024 13:12:03 -0700 Subject: [PATCH 03/11] WIP - Expect several bugs, but need to test on Klone now. --- example_runtime_config.toml | 4 +- src/kbmod_wf/parallel_repro_single_chip_wf.py | 79 ++++------ .../resource_configs/klone_configuration.py | 10 +- src/kbmod_wf/task_impls/ic_to_wu.py | 4 +- ...oject_single_chip_single_night_wu_shard.py | 140 ++++++++++-------- src/kbmod_wf/test_workflow.py | 2 +- src/kbmod_wf/workflow_tasks/__init__.py | 1 + .../workflow_tasks/ic_to_wu_return_shards.py | 53 +++++++ 8 files changed, 169 insertions(+), 124 deletions(-) create mode 100644 src/kbmod_wf/workflow_tasks/ic_to_wu_return_shards.py diff --git a/example_runtime_config.toml b/example_runtime_config.toml index 2956ffb..671a93a 100644 --- a/example_runtime_config.toml +++ b/example_runtime_config.toml @@ -11,8 +11,8 @@ checkpoint_mode = 'task_exit' [apps.create_manifest] # The path to the staging directory # e.g. "/gscratch/dirac/kbmod/workflow/staging" -staging_directory = "/home/drew/code/kbmod-wf/dev_staging" -output_directory = "/home/drew/code/kbmod-wf/dev_staging/processing" +staging_directory = "/Users/drew/code/kbmod-wf/dev_staging" +output_directory = "/Users/drew/code/kbmod-wf/dev_staging/processing" file_pattern = "*.collection" diff --git a/src/kbmod_wf/parallel_repro_single_chip_wf.py b/src/kbmod_wf/parallel_repro_single_chip_wf.py index 85b9489..3edd27f 100644 --- a/src/kbmod_wf/parallel_repro_single_chip_wf.py +++ b/src/kbmod_wf/parallel_repro_single_chip_wf.py @@ -4,7 +4,7 @@ import toml import parsl -from parsl import join_app, python_app, File +from parsl import python_app, File import parsl.executors from kbmod_wf.utilities import ( @@ -14,50 +14,26 @@ get_configured_logger, ) -from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, kbmod_search - - -# There's still a ton of duplicated code here and in kbmod_wf.workflow_tasks.reproject_wu -# that should be refactored. -# The only difference is the import of reproject_single_chip_single_night_wu here. -@join_app( - cache=True, - executors=get_executors(["local_dev_testing", "sharded_reproject"]), - ignore_for_cache=["logging_file"], -) -def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): - from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger - - logger = get_configured_logger("task.reproject_wu", logging_file.filepath) - - logger.info("Starting reproject_ic") - with ErrorLogger(logger): - future = sharded_reproject( - original_wu_filepath=inputs[0].filepath, - reprojected_wu_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - logger.info("Completed reproject_ic") - return future +from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu_return_shards, kbmod_search @python_app( cache=True, - executors=get_executors(["local_dev_testing", "sharded_reproject"]), + executors=get_executors(["local_dev_testing", "reproject_single_shard"]), ignore_for_cache=["logging_file"], ) -def sharded_reproject(inputs=(), outputs=(), runtime_config={}, logging_file=None): +def reproject_shard(inputs=(), outputs=(), runtime_config={}, logging_file=None): from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger logger = get_configured_logger("task.sharded_reproject", logging_file.filepath) - from kbmod_wf.task_impls.reproject_single_chip_single_night_wu_shard import reproject_wu_shard + from kbmod_wf.task_impls.reproject_single_chip_single_night_wu_shard import reproject_shard logger.info("Starting reproject_ic") with ErrorLogger(logger): - reproject_wu_shard( + reproject_shard( original_wu_filepath=inputs[0].filepath, + original_wcs=inputs[1], reprojected_wu_filepath=outputs[0].filepath, runtime_config=runtime_config, logger=logger, @@ -94,33 +70,32 @@ def workflow_runner(env=None, runtime_config={}): # gather all the *.collection files that are staged for processing create_manifest_config = app_configs.get("create_manifest", {}) - manifest_file = File( - os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt") - ) + manifest_file_path = Path(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt") + create_manifest_future = create_manifest( inputs=[], - outputs=[manifest_file], + outputs=[File(manifest_file_path)], runtime_config=app_configs.get("create_manifest", {}), logging_file=logging_file, ) - with open(create_manifest_future.result(), "r") as f: - # process each .collection file in the manifest into a .wu file + with open(create_manifest_future.result(), "r") as manifest: + # process each .collection file in the manifest original_work_unit_futures = [] - for line in f: + for line in manifest: # Create path object for the line in the manifest input_file = Path(line.strip()) - # Create a directory for the sharded work unit files + # Create a directory to contain each work unit's shards sharded_directory = Path(input_file.parent, input_file.stem) sharded_directory.mkdir(exist_ok=True) - # Create the work unit filepath + # Construct the work unit filepath output_workunit_filepath = Path(sharded_directory, input_file.stem + ".wu") # Create the work unit future original_work_unit_futures.append( - ic_to_wu( + ic_to_wu_return_shards( inputs=[input_file], outputs=[File(output_workunit_filepath)], runtime_config=app_configs.get("ic_to_wu", {}), @@ -128,30 +103,28 @@ def workflow_runner(env=None, runtime_config={}): ) ) - # reproject each WorkUnit + # reproject each WorkUnit shard individually # For chip-by-chip, this isn't really necessary, so hardcoding to 0. reproject_futures = [] for f in original_work_unit_futures: - distance = 0 - - unique_obstimes, unique_obstimes_indices = work_unit.get_unique_obstimes_and_indices() - - reproject_futures.append( - reproject_wu( - inputs=[f.result()], - outputs=[File(f.result().filepath + f".{distance}.repro")], + shard_futures = [] + for i in f.result(): + shard_future = reproject_shard( + inputs=[i], + outputs=[File(i.parent / (i.stem + ".repro"))], runtime_config=app_configs.get("reproject_wu", {}), logging_file=logging_file, ) - ) + shard_futures.append(shard_future) + reproject_futures.append(shard_futures) # run kbmod search on each reprojected WorkUnit search_futures = [] for f in reproject_futures: search_futures.append( kbmod_search( - inputs=[f.result()], - outputs=[File(f.result().filepath + ".search.ecsv")], + inputs=[i.result() for i in f], + outputs=[], runtime_config=app_configs.get("kbmod_search", {}), logging_file=logging_file, ) diff --git a/src/kbmod_wf/resource_configs/klone_configuration.py b/src/kbmod_wf/resource_configs/klone_configuration.py index f76a978..256f12d 100644 --- a/src/kbmod_wf/resource_configs/klone_configuration.py +++ b/src/kbmod_wf/resource_configs/klone_configuration.py @@ -9,7 +9,7 @@ "compute_bigmem": "01:00:00", "large_mem": "04:00:00", "sharded_reproject": "04:00:00", - "parallel_reproject": "00:30:00", + "reproject_single_shard": "00:30:00", "gpu_max": "08:00:00", } @@ -82,20 +82,20 @@ def klone_resource_config(): ), ), HighThroughputExecutor( - label="parallel_reproject", + label="reproject_single_shard", max_workers=1, provider=SlurmProvider( partition="ckpt-g2", account="astro", min_blocks=0, - max_blocks=2, + max_blocks=256, init_blocks=0, parallelism=1, nodes_per_block=1, cores_per_node=1, - mem_per_node=2, # ~2-4 GB per core + mem_per_node=1, # only working on 1 image, so <1 GB should be required exclusive=False, - walltime=walltimes["parallel_reproject"], + walltime=walltimes["reproject_single_shard"], # Command to run before starting worker - i.e. conda activate worker_init="", ), diff --git a/src/kbmod_wf/task_impls/ic_to_wu.py b/src/kbmod_wf/task_impls/ic_to_wu.py index 66559da..5ef9b06 100644 --- a/src/kbmod_wf/task_impls/ic_to_wu.py +++ b/src/kbmod_wf/task_impls/ic_to_wu.py @@ -83,4 +83,6 @@ def create_work_unit(self): elapsed = round(time.time() - last_time, 1) self.logger.debug(f"Required {elapsed}[s] to write WorkUnit to disk: {self.wu_filepath}") - return self.wu_filepath + wcs = list(orig_wu._per_image_wcs) + + return self.wu_filepath, wcs diff --git a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py index ce089c4..8f371b0 100644 --- a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py +++ b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py @@ -1,16 +1,21 @@ +import os +import time +from logging import Logger + +import numpy as np +import astropy.io.fits as fitsio + import kbmod from kbmod.work_unit import WorkUnit - import kbmod.reprojection as reprojection +from reoproject import reproject_adaptive from reproject.mosaicking import find_optimal_celestial_wcs -import os -import time -from logging import Logger -def reproject_wu_shard( +def reproject_shard( original_wu_shard_filepath: str = None, + original_wcs=None, reprojected_wu_shard_filepath: str = None, runtime_config: dict = {}, logger: Logger = None, @@ -35,60 +40,71 @@ def reproject_wu_shard( The fully resolved filepath of the resulting WorkUnit file after reflex and reprojection. """ - wu_shard_reprojector = WUShardReprojector( - original_wu_filepath=original_wu_shard_filepath, - reprojected_wu_filepath=reprojected_wu_shard_filepath, - runtime_config=runtime_config, - logger=logger, - ) - - return wu_shard_reprojector.reproject_workunit() - - -class WUShardReprojector: - def __init__( - self, - original_wu_filepath: str = None, - reprojected_wu_filepath: str = None, - runtime_config: dict = {}, - logger: Logger = None, - ): - self.original_wu_filepath = original_wu_filepath - self.reprojected_wu_filepath = reprojected_wu_filepath - self.runtime_config = runtime_config - self.logger = logger - - # Default to 8 workers if not in the config. Value must be 0 Date: Thu, 5 Sep 2024 15:40:08 -0700 Subject: [PATCH 04/11] Updates to workflow to support older version of parsl. --- src/kbmod_wf/parallel_repro_single_chip_wf.py | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/kbmod_wf/parallel_repro_single_chip_wf.py b/src/kbmod_wf/parallel_repro_single_chip_wf.py index 3edd27f..230d18b 100644 --- a/src/kbmod_wf/parallel_repro_single_chip_wf.py +++ b/src/kbmod_wf/parallel_repro_single_chip_wf.py @@ -22,7 +22,7 @@ executors=get_executors(["local_dev_testing", "reproject_single_shard"]), ignore_for_cache=["logging_file"], ) -def reproject_shard(inputs=(), outputs=(), runtime_config={}, logging_file=None): +def reproject_shard(inputs=(), outputs=(), wcs=None, runtime_config={}, logging_file=None): from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger logger = get_configured_logger("task.sharded_reproject", logging_file.filepath) @@ -33,7 +33,7 @@ def reproject_shard(inputs=(), outputs=(), runtime_config={}, logging_file=None) with ErrorLogger(logger): reproject_shard( original_wu_filepath=inputs[0].filepath, - original_wcs=inputs[1], + original_wcs=wcs, reprojected_wu_filepath=outputs[0].filepath, runtime_config=runtime_config, logger=logger, @@ -70,11 +70,13 @@ def workflow_runner(env=None, runtime_config={}): # gather all the *.collection files that are staged for processing create_manifest_config = app_configs.get("create_manifest", {}) - manifest_file_path = Path(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt") + manifest_file = File( + os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt") + ) create_manifest_future = create_manifest( inputs=[], - outputs=[File(manifest_file_path)], + outputs=[manifest_file], runtime_config=app_configs.get("create_manifest", {}), logging_file=logging_file, ) @@ -96,8 +98,8 @@ def workflow_runner(env=None, runtime_config={}): # Create the work unit future original_work_unit_futures.append( ic_to_wu_return_shards( - inputs=[input_file], - outputs=[File(output_workunit_filepath)], + inputs=[File(str(input_file))], + outputs=[File(str(output_workunit_filepath))], runtime_config=app_configs.get("ic_to_wu", {}), logging_file=logging_file, ) @@ -108,10 +110,13 @@ def workflow_runner(env=None, runtime_config={}): reproject_futures = [] for f in original_work_unit_futures: shard_futures = [] - for i in f.result(): + shard_files, wcs = f.result() + for i in shard_files: + shard_file = Path(i) shard_future = reproject_shard( - inputs=[i], - outputs=[File(i.parent / (i.stem + ".repro"))], + inputs=[File(str(shard_file))], + outputs=[File(str(shard_file.parent / (shard_file.stem + ".repro")))], + wcs=wcs, runtime_config=app_configs.get("reproject_wu", {}), logging_file=logging_file, ) From f8f2b5f906902ae38b96a0d9c2751decf43bbfd1 Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Thu, 5 Sep 2024 16:00:28 -0700 Subject: [PATCH 05/11] Fixes to get workflow running on Klone. --- src/kbmod_wf/parallel_repro_single_chip_wf.py | 6 +++--- src/kbmod_wf/task_impls/ic_to_wu.py | 8 ++++++-- .../reproject_single_chip_single_night_wu_shard.py | 14 +++++++++----- .../workflow_tasks/ic_to_wu_return_shards.py | 4 ++-- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/kbmod_wf/parallel_repro_single_chip_wf.py b/src/kbmod_wf/parallel_repro_single_chip_wf.py index 230d18b..7815fd5 100644 --- a/src/kbmod_wf/parallel_repro_single_chip_wf.py +++ b/src/kbmod_wf/parallel_repro_single_chip_wf.py @@ -22,7 +22,7 @@ executors=get_executors(["local_dev_testing", "reproject_single_shard"]), ignore_for_cache=["logging_file"], ) -def reproject_shard(inputs=(), outputs=(), wcs=None, runtime_config={}, logging_file=None): +def reproject_shard(inputs=(), outputs=(), wcses=None, runtime_config={}, logging_file=None): from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger logger = get_configured_logger("task.sharded_reproject", logging_file.filepath) @@ -33,7 +33,7 @@ def reproject_shard(inputs=(), outputs=(), wcs=None, runtime_config={}, logging_ with ErrorLogger(logger): reproject_shard( original_wu_filepath=inputs[0].filepath, - original_wcs=wcs, + original_wcs=wcses, reprojected_wu_filepath=outputs[0].filepath, runtime_config=runtime_config, logger=logger, @@ -116,7 +116,7 @@ def workflow_runner(env=None, runtime_config={}): shard_future = reproject_shard( inputs=[File(str(shard_file))], outputs=[File(str(shard_file.parent / (shard_file.stem + ".repro")))], - wcs=wcs, + wcses=wcses, runtime_config=app_configs.get("reproject_wu", {}), logging_file=logging_file, ) diff --git a/src/kbmod_wf/task_impls/ic_to_wu.py b/src/kbmod_wf/task_impls/ic_to_wu.py index 5ef9b06..2494e1f 100644 --- a/src/kbmod_wf/task_impls/ic_to_wu.py +++ b/src/kbmod_wf/task_impls/ic_to_wu.py @@ -83,6 +83,10 @@ def create_work_unit(self): elapsed = round(time.time() - last_time, 1) self.logger.debug(f"Required {elapsed}[s] to write WorkUnit to disk: {self.wu_filepath}") - wcs = list(orig_wu._per_image_wcs) + # All of the WCS information is maintained in the header of the ImageCollection + # as a Astropy table Column. Here we unwrap the column to create a list of strings + # Each string can then be converted into a WCS object when needed (for reprojection) + # using: `wcs_objects = [WCS(json.loads(i)) for i in wcses]` + wcses = [i for i in ic.data['wcs']] - return self.wu_filepath, wcs + return self.wu_filepath, wcses diff --git a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py index 8f371b0..ab7b56a 100644 --- a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py +++ b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py @@ -1,15 +1,17 @@ +import json import os import time from logging import Logger import numpy as np import astropy.io.fits as fitsio +from astropy.wcs import WCS -import kbmod -from kbmod.work_unit import WorkUnit -import kbmod.reprojection as reprojection +# import kbmod +# from kbmod.work_unit import WorkUnit +# import kbmod.reprojection as reprojection -from reoproject import reproject_adaptive +from reproject import reproject_adaptive from reproject.mosaicking import find_optimal_celestial_wcs @@ -41,7 +43,9 @@ def reproject_shard( and reprojection. """ - opt_wcs, shape = find_optimal_celestial_wcs(original_wcs) + wcs_list = [WCS(json.loads(wcs)) for wcs in original_wcs] + + opt_wcs, shape = find_optimal_celestial_wcs(wcs_list) opt_wcs.array_shape = shape shard = fitsio.open(original_wu_shard_filepath) diff --git a/src/kbmod_wf/workflow_tasks/ic_to_wu_return_shards.py b/src/kbmod_wf/workflow_tasks/ic_to_wu_return_shards.py index 157d59b..3cea5ed 100644 --- a/src/kbmod_wf/workflow_tasks/ic_to_wu_return_shards.py +++ b/src/kbmod_wf/workflow_tasks/ic_to_wu_return_shards.py @@ -45,9 +45,9 @@ def ic_to_wu_return_shards(inputs=(), outputs=(), runtime_config={}, logging_fil logger.info("Completed ic_to_wu") # get parent directory of outputs[0] and fine all .wu files in that directory - shard_files = [s for s in Path(outputs[0]).parent.glob("*.wu")] + shard_files = [str(s) for s in Path(outputs[0]).parent.glob("*.wu")] # remove the original .wu file from the shard_files list - shard_files = [f for f in shard_files if f != outputs[0]] + shard_files = [f for f in shard_files if f != outputs[0].filepath] return shard_files, wcs From 5084621431b736003d3dc05a6a3979b9cacec890 Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Thu, 5 Sep 2024 16:02:37 -0700 Subject: [PATCH 06/11] Add relax=True --- .../task_impls/reproject_single_chip_single_night_wu_shard.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py index ab7b56a..385e879 100644 --- a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py +++ b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py @@ -43,7 +43,7 @@ def reproject_shard( and reprojection. """ - wcs_list = [WCS(json.loads(wcs)) for wcs in original_wcs] + wcs_list = [WCS(json.loads(wcs), relax=True) for wcs in original_wcs] opt_wcs, shape = find_optimal_celestial_wcs(wcs_list) opt_wcs.array_shape = shape From bed7b54688338857677c8528749cbe435df099a8 Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Thu, 5 Sep 2024 16:26:15 -0700 Subject: [PATCH 07/11] Fix variable name typo. --- src/kbmod_wf/parallel_repro_single_chip_wf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kbmod_wf/parallel_repro_single_chip_wf.py b/src/kbmod_wf/parallel_repro_single_chip_wf.py index 7815fd5..f489ac4 100644 --- a/src/kbmod_wf/parallel_repro_single_chip_wf.py +++ b/src/kbmod_wf/parallel_repro_single_chip_wf.py @@ -110,7 +110,7 @@ def workflow_runner(env=None, runtime_config={}): reproject_futures = [] for f in original_work_unit_futures: shard_futures = [] - shard_files, wcs = f.result() + shard_files, wcses = f.result() for i in shard_files: shard_file = Path(i) shard_future = reproject_shard( From d703dff5c1710e638d892142b07054de12340914 Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Thu, 5 Sep 2024 20:22:34 -0700 Subject: [PATCH 08/11] Fix keyword typo. --- src/kbmod_wf/parallel_repro_single_chip_wf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kbmod_wf/parallel_repro_single_chip_wf.py b/src/kbmod_wf/parallel_repro_single_chip_wf.py index 7815fd5..ba76c3b 100644 --- a/src/kbmod_wf/parallel_repro_single_chip_wf.py +++ b/src/kbmod_wf/parallel_repro_single_chip_wf.py @@ -32,7 +32,7 @@ def reproject_shard(inputs=(), outputs=(), wcses=None, runtime_config={}, loggin logger.info("Starting reproject_ic") with ErrorLogger(logger): reproject_shard( - original_wu_filepath=inputs[0].filepath, + original_wu_shard_filepath=inputs[0].filepath, original_wcs=wcses, reprojected_wu_filepath=outputs[0].filepath, runtime_config=runtime_config, From e73e75599fe7372fd4a7e4523ff0b4c008a601c2 Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Thu, 5 Sep 2024 22:13:32 -0700 Subject: [PATCH 09/11] Fix another keyword typo. --- src/kbmod_wf/parallel_repro_single_chip_wf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kbmod_wf/parallel_repro_single_chip_wf.py b/src/kbmod_wf/parallel_repro_single_chip_wf.py index e916487..2ff01fc 100644 --- a/src/kbmod_wf/parallel_repro_single_chip_wf.py +++ b/src/kbmod_wf/parallel_repro_single_chip_wf.py @@ -34,7 +34,7 @@ def reproject_shard(inputs=(), outputs=(), wcses=None, runtime_config={}, loggin reproject_shard( original_wu_shard_filepath=inputs[0].filepath, original_wcs=wcses, - reprojected_wu_filepath=outputs[0].filepath, + reprojected_wu_shard_filepath=outputs[0].filepath, runtime_config=runtime_config, logger=logger, ) From 9b16a9ad3927a771b5da049fcfed054b20cd735b Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Fri, 6 Sep 2024 11:37:43 -0700 Subject: [PATCH 10/11] Include more parameters to align with what kbmod reproject.py is doing. --- ...oject_single_chip_single_night_wu_shard.py | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py index 385e879..f8afb83 100644 --- a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py +++ b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py @@ -49,9 +49,32 @@ def reproject_shard( opt_wcs.array_shape = shape shard = fitsio.open(original_wu_shard_filepath) - sci = reproject_adaptive(shard, opt_wcs, hdu_in=0) - var = reproject_adaptive(shard, opt_wcs, hdu_in=1) - mask = reproject_adaptive(shard, opt_wcs, hdu_in=2) + sci = reproject_adaptive( + shard, + opt_wcs, + hdu_in=0, + shape_out=opt_wcs.array_shape, + bad_value_mode="ignore", + roundtrip_coords=False + ) + + var = reproject_adaptive( + shard, + opt_wcs, + hdu_in=1, + shape_out=opt_wcs.array_shape, + bad_value_mode="ignore", + roundtrip_coords=False + ) + + mask = reproject_adaptive( + shard, + opt_wcs, + hdu_in=2, + shape_out=opt_wcs.array_shape, + bad_value_mode="ignore", + roundtrip_coords=False + ) shard[0].data = sci.astype(np.float32) shard[1].data = var.astype(np.float32) From c9efb141e5dc36e7a4ab6d81248856da60732267 Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Tue, 10 Sep 2024 15:00:45 -0700 Subject: [PATCH 11/11] Copying the WCS header info to the other layers in the fits shard. --- .../reproject_single_chip_single_night_wu_shard.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py index f8afb83..9427723 100644 --- a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py +++ b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py @@ -49,13 +49,17 @@ def reproject_shard( opt_wcs.array_shape = shape shard = fitsio.open(original_wu_shard_filepath) + shard_wcs = WCS(shard[0].header) + shard[1].header.update(shard_wcs.to_header()) + shard[2].header.update(shard_wcs.to_header()) + sci = reproject_adaptive( shard, opt_wcs, hdu_in=0, shape_out=opt_wcs.array_shape, bad_value_mode="ignore", - roundtrip_coords=False + roundtrip_coords=False, ) var = reproject_adaptive( @@ -64,7 +68,7 @@ def reproject_shard( hdu_in=1, shape_out=opt_wcs.array_shape, bad_value_mode="ignore", - roundtrip_coords=False + roundtrip_coords=False, ) mask = reproject_adaptive( @@ -73,7 +77,7 @@ def reproject_shard( hdu_in=2, shape_out=opt_wcs.array_shape, bad_value_mode="ignore", - roundtrip_coords=False + roundtrip_coords=False, ) shard[0].data = sci.astype(np.float32)