Skip to content

Commit

Permalink
Combine several jobs into one to save computation resources (#131)
Browse files Browse the repository at this point in the history
* Combine several jobs into one to save computation resources

* Add example of potential usage of `combine_n_jobs`

* Minor update of docs
  • Loading branch information
dachengx authored Jan 19, 2024
1 parent 6e9116a commit 02aa46d
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 8 deletions.
6 changes: 3 additions & 3 deletions alea/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ def store_data(
Store a list of datasets.
(each on the form of a list of one or more structured arrays or dicts)
Using inference_interface, but included here to allow over-writing.
The structure would be: [[datasets1], [datasets2], ..., [datasetsn]],
The structure would be: ``[[datasets1], [datasets2], ..., [datasetsn]]``,
where each of datasets is a list of structured arrays.
If you specify, it is set, if not it will read from self.get_likelihood_term_names.
If not defined, it will be ["0", "1", ..., "n-1"]. The metadata is optional.
If you specify, it is set, if not it will read from ``self.get_likelihood_term_names``.
If not defined, it will be ``["0", "1", ..., "n-1"]``. The metadata is optional.
Args:
file_name (str): name of the file to store the data in
Expand Down
34 changes: 33 additions & 1 deletion alea/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class Submitter:

config_file_path: str
template_path: str
combine_n_jobs: int = 1
allowed_special_args: List[str] = []
logging = logging.getLogger("submitter_logger")

Expand Down Expand Up @@ -316,7 +317,7 @@ def computation_tickets_generator(self):
"""Get the submission script for the current configuration. It generates the submission
script for each combination of the computation options.
for Runner from to_zip, to_vary and in_common.
For Runner from to_zip, to_vary and in_common:
- First, generate the combined computational options directly.
- Second, update the input and output folder of the options.
- Thrid, collect the non-fittable(settable) parameters into nominal_values.
Expand Down Expand Up @@ -387,6 +388,37 @@ def already_done(self, i_args: dict) -> bool:
is_done = False
return is_done

def combined_tickets_generator(self):
"""Get the combined submission script for the current configuration. ``self.combine_n_jobs``
jobs will be combined into one submission script.
Yields:
(str, str): the combined submission script and name output_filename
Note:
User can add ``combine_n_jobs: 10`` in ``local_configurations``,
``slurm_configurations`` or ``htcondor_configurations`` to combine 10 jobs into
one submission script. User will need this feature when the number of jobs pending
for submission is too large.
"""

_script = ""
n_combined = 0
for script, last_output_filename in self.computation_tickets_generator():
if n_combined == 0:
_script += script
else:
_script += " && " + script
n_combined += 1
if n_combined == self.combine_n_jobs:
yield _script, last_output_filename
n_combined = 0
_script = ""
else:
if n_combined > 0:
yield _script, last_output_filename

@staticmethod
def update_n_batch(runner_args):
"""Update n_mc if n_batch is provided.
Expand Down
3 changes: 2 additions & 1 deletion alea/submitters/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, *args, **kwargs):
"""Initialize the SubmitterLocal class."""
self.local_configurations = kwargs.get("local_configurations", {})
self.template_path = self.local_configurations.pop("template_path", None)
self.combine_n_jobs = self.local_configurations.pop("combine_n_jobs", 1)
super().__init__(*args, **kwargs)

@staticmethod
Expand All @@ -57,7 +58,7 @@ def submit(self):
If debug is True, only return the first instance of Runner.
"""
for _, (script, _) in enumerate(self.computation_tickets_generator()):
for _, (script, _) in enumerate(self.combined_tickets_generator()):
if self.debug:
print(script)
return self.initialized_runner(script)
Expand Down
7 changes: 4 additions & 3 deletions alea/submitters/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, *args, **kwargs):
self.name = self.__class__.__name__
self.slurm_configurations = kwargs.get("slurm_configurations", {})
self.template_path = self.slurm_configurations.pop("template_path", None)
self.combine_n_jobs = self.slurm_configurations.pop("combine_n_jobs", 1)
self.batchq_arguments = {**BATCHQ_DEFAULT_ARGUMENTS, **self.slurm_configurations}
self._check_batchq_arguments()
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -92,7 +93,7 @@ def submit(self, **kwargs):
"""
_jobname = kwargs.pop("jobname", self.name.lower())
batchq_kwargs = {}
for job, (script, output_filename) in enumerate(self.computation_tickets_generator()):
for job, (script, last_output_filename) in enumerate(self.combined_tickets_generator()):
if self.debug:
print(script)
if job > 0:
Expand All @@ -101,7 +102,7 @@ def submit(self, **kwargs):
self.logging.info("Too many jobs. Sleeping for 30s.")
time.sleep(30)
batchq_kwargs["jobname"] = f"{_jobname}_{job:03d}"
if output_filename is not None:
batchq_kwargs["log"] = os.path.join(self.log_dir, f"{output_filename}.log")
if last_output_filename is not None:
batchq_kwargs["log"] = os.path.join(self.log_dir, f"{last_output_filename}.log")
self.logging.debug(f"Call '_submit' with job: {job} and kwargs: {batchq_kwargs}.")
self._submit(script, **batchq_kwargs)

0 comments on commit 02aa46d

Please sign in to comment.