Skip to content

Commit

Permalink
Support more toydata_mode (#206)
Browse files Browse the repository at this point in the history
* Support more `toydata_mode`

* Also support `only_toydata=True`

* Only move templates in `templates` folder
  • Loading branch information
dachengx authored Sep 7, 2024
1 parent 832c5d7 commit 81f4d6b
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 49 deletions.
4 changes: 2 additions & 2 deletions alea/submitters/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ pegasus-run /scratch/yuanlq/workflows/lq_b8_cevns_30/runs

To collect the final outputs, there are two ways
- Check your folder `/scratch/$USER/workflows/<workflow_id>/outputs/`. There should be a single tarball containing all toymc files and computation results.
- A redundant way is to get files from dCache, in which you have to use `gfal` command to approach. For example ```gfal-ls davs://xenon-gridftp.grid.uchicago.edu:2880/xenon/scratch/yuanlq/lq_b8_cevns_30/``` and to get the files, for example do ```gfal-ls davs://xenon-gridftp.grid.uchicago.edu:2880/xenon/scratch/yuanlq/lq_b8_cevns_30/00/00/```. This contains both the final tarball and all `.h5` files before tarballing. To get them you want to do something like ```gfal-copy davs://xenon-gridftp.grid.uchicago.edu:2880/xenon/scratch/yuanlq/lq_b8_cevns_30/00/00/lq_b8_cevns_30-combined_output.tar.gz . -t 7200``` Note that this command works also on Midway/DaLI.
- A redundant way is to get files from dCache, in which you have to use `gfal` command to approach. For example `gfal-ls davs://xenon-gridftp.grid.uchicago.edu:2880/xenon/scratch/yuanlq/lq_b8_cevns_30/` and to get the files, for example do `gfal-ls davs://xenon-gridftp.grid.uchicago.edu:2880/xenon/scratch/yuanlq/lq_b8_cevns_30/00/00/`. This contains both the final tarball and all `.h5` files before tarballing. To get them you want to do something like `gfal-copy davs://xenon-gridftp.grid.uchicago.edu:2880/xenon/scratch/yuanlq/lq_b8_cevns_30/00/00/lq_b8_cevns_30-combined_output.tar.gz . -t 7200` Note that this command works also on Midway/DaLI.

### Example Workflow
Here we only care about the purple ones, and the rest are generated by `Pegasus`.
- Each individual `run_toymc_wrapper` job is computing `alea_run_toymc`. For details what it is doing, see `run_toymc_wrapper.sh`.
- The `combine` job will just collect all outputs from the `run_toymc_wrapper` jobs, and tar them into a single tarball as final output.
<img width="1607" alt="Screen Shot 2024-05-08 at 5 24 51 PM" src="https://github.com/FaroutYLq/alea/assets/47046530/b1136330-2701-4538-b03c-8506383e4e20">
<img width="1607" alt="Screen Shot 2024-05-08 at 5 24 51 PM" src="workflow_example.png">
85 changes: 43 additions & 42 deletions alea/submitters/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ def _validate_template_path(self):
"The template path contains subdirectories. All templates files will be tarred."
)

def _tar_h5_files(self, directory, output_filename="templates.tar.gz"):
def _tar_h5_files(self, directory, template_tarball="templates.tar.gz"):
"""Tar all .h5 templates in the directory and its subdirectories into a tarball."""
# Create a tar.gz archive
with tarfile.open(output_filename, "w:gz") as tar:
with tarfile.open(template_tarball, "w:gz") as tar:
# Walk through the directory
for dirpath, dirnames, filenames in os.walk(directory):
for filename in filenames:
Expand Down Expand Up @@ -512,16 +512,6 @@ def _add_separate_job(self, combine_i):

return separate_job

def _add_limit_threshold(self):
"""Add the Neyman thresholds limit_threshold to the replica catalog."""
self.f_limit_threshold = File(os.path.basename(self.limit_threshold))
self.rc.add_replica(
"local",
os.path.basename(self.limit_threshold),
"file://{}".format(self.limit_threshold),
)
self.added_limit_threshold = True

def _correct_paths_args_dict(self, args_dict):
"""Correct the paths in the arguments dictionary in a hardcoding way."""
args_dict["statistical_model_args"]["template_path"] = "templates/"
Expand Down Expand Up @@ -549,17 +539,6 @@ def _reorganize_script(self, script):
executable = os.path.basename(script.split()[1])
args_dict = Submitter.runner_kwargs_from_script(shlex.split(script)[2:])

# Add the limit_threshold to the replica catalog if not added
if (
not self.added_limit_threshold
and "limit_threshold" in args_dict["statistical_model_args"].keys()
):
self.limit_threshold = args_dict["statistical_model_args"]["limit_threshold"]
self._add_limit_threshold()

# Correct the paths in the arguments
args_dict = self._correct_paths_args_dict(args_dict)

return executable, args_dict

def _generate_workflow(self, name="run_toymc_wrapper"):
Expand Down Expand Up @@ -596,18 +575,14 @@ def _generate_workflow(self, name="run_toymc_wrapper"):
# Reorganize the script to get the executable and arguments,
# in which the paths are corrected
executable, args_dict = self._reorganize_script(script)
if not (args_dict["toydata_mode"] in ["generate_and_store", "generate"]):
if not (
args_dict["toydata_mode"]
in ["read", "generate_and_store", "generate", "no_toydata"]
):
raise NotImplementedError(
"Only generate_and_store toydata mode is supported on OSG."
)

logger.info(f"Adding job {job_id} to the workflow")
logger.debug(f"Naked Script: {script}")
logger.debug(f"Output: {args_dict['output_filename']}")
logger.debug(f"Executable: {executable}")
logger.debug(f"Toydata: {args_dict['toydata_filename']}")
logger.debug(f"Arguments: {args_dict}")

# Create a job with base requirements
job = self._initialize_job(
name=name,
Expand All @@ -617,26 +592,43 @@ def _generate_workflow(self, name="run_toymc_wrapper"):
)
job.add_profiles(Namespace.CONDOR, "requirements", self.requirements)

# Add the limit_threshold to the replica catalog if not added
if "limit_threshold" in args_dict["statistical_model_args"]:
limit_threshold = args_dict["statistical_model_args"]["limit_threshold"]
self.rc.add_replica(
"local",
os.path.basename(limit_threshold),
f"file://{limit_threshold}",
)
job.add_inputs(File(os.path.basename(limit_threshold)))

# Add the inputs and outputs
job.add_inputs(
self.f_template_tarball,
self.f_running_configuration,
self.f_statistical_model_config,
self.f_run_toymc_wrapper,
self.f_alea_run_toymc,
self.f_combine,
self.f_separate,
)
if self.added_limit_threshold:
job.add_inputs(self.f_limit_threshold)

job.add_outputs(File(args_dict["output_filename"]), stage_out=False)
combine_job.add_inputs(File(args_dict["output_filename"]))

# Only add the toydata file if instructed to do so
if args_dict["toydata_mode"] == "generate_and_store":
job.add_outputs(File(args_dict["toydata_filename"]), stage_out=False)
combine_job.add_inputs(File(args_dict["toydata_filename"]))
if not args_dict["only_toydata"]:
output_filename = args_dict["output_filename"]
job.add_outputs(File(os.path.basename(output_filename)), stage_out=False)
combine_job.add_inputs(File(os.path.basename(output_filename)))

toydata_filename = args_dict["toydata_filename"]
if args_dict["toydata_mode"] == "read":
# Add toydata as input if needed
self.rc.add_replica(
"local",
os.path.basename(toydata_filename),
f"file://{toydata_filename}",
)
job.add_inputs(File(os.path.basename(toydata_filename)))
elif args_dict["toydata_mode"] == "generate_and_store":
# Only add the toydata file if instructed to do so
job.add_outputs(File(os.path.basename(toydata_filename)), stage_out=False)
combine_job.add_inputs(File(os.path.basename(toydata_filename)))

# Add the arguments into the job
# Using escaped argument to avoid the shell syntax error
Expand All @@ -646,6 +638,8 @@ def _extract_all_to_tuple(d):
for key in d.keys()
)

# Correct the paths in the arguments
args_dict = self._correct_paths_args_dict(args_dict)
args_tuple = _extract_all_to_tuple(args_dict)
job.add_args(*args_tuple)

Expand All @@ -659,6 +653,13 @@ def _extract_all_to_tuple(d):
else:
new_to_combine = False

logger.info(f"Adding job {job_id} to the workflow")
logger.debug(f"Naked Script: {script}")
logger.debug(f"Executable: {executable}")
logger.debug(f"Output: {args_dict['output_filename']}")
logger.debug(f"Toydata: {args_dict['toydata_filename']}")
logger.debug(f"Arguments: {args_dict}")

# Finalize the workflow
self.wf.add_replica_catalog(self.rc)
self.wf.add_transformation_catalog(self.tc)
Expand Down
7 changes: 2 additions & 5 deletions alea/submitters/run_toymc_wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,16 @@ SEED=$(echo "$seed" | sed "s/'/\"/g")
METADATA=$(echo "$metadata" | sed "s/'/\"/g")

# Extract tarballs input
mkdir -p templates
START=$(date +%s)
for TAR in `ls *.tar.gz`; do
tar -xzf $TAR
tar -xzf $TAR -C templates
done
rm *.tar.gz
END=$(date +%s)
DIFF=$(( $END - $START ))
echo "Untarring took $DIFF seconds."

# Move all untarred files to templates/
mkdir -p templates
mv *.h5 templates/

# Source the environment
. /opt/XENONnT/setup.sh
chmod +x alea_run_toymc
Expand Down
Binary file added alea/submitters/workflow_example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 81f4d6b

Please sign in to comment.