Skip to content

Commit

Permalink
Support for running subproblems in parallel (#802)
Browse files Browse the repository at this point in the history
The get_inputs and run_scenario E2E steps for scenarios with multiple 
subproblems can now be parallelized.
  • Loading branch information
anamileva authored Jun 10, 2021
1 parent 1a0d930 commit f0db9a2
Show file tree
Hide file tree
Showing 5 changed files with 419 additions and 126 deletions.
22 changes: 22 additions & 0 deletions gridpath/common_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,28 @@ def get_db_parser():
return parser


def get_parallel_get_inputs_parser():
"""
"""

parser = ArgumentParser(add_help=False)
parser.add_argument("--n_parallel_get_inputs", default=1,
help="Get inputs for n subproblems in parallel.")

return parser


def get_parallel_solve_parser():
"""
"""

parser = ArgumentParser(add_help=False)
parser.add_argument("--n_parallel_solve", default=1,
help="Solve n subproblems in parallel.")

return parser


def get_solve_parser():
"""
Create ArgumentParser object which has the common set of arguments for
Expand Down
258 changes: 174 additions & 84 deletions gridpath/get_scenario_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,116 +23,203 @@

from argparse import ArgumentParser
import csv
from multiprocessing import Pool
import os.path
import pandas as pd
import sys
import warnings

from db.common_functions import connect_to_database
from gridpath.auxiliary.db_interface import get_scenario_id_and_name
from gridpath.common_functions import determine_scenario_directory, \
create_directory_if_not_exists, get_db_parser, get_required_e2e_arguments_parser
create_directory_if_not_exists, get_db_parser, \
get_required_e2e_arguments_parser, get_parallel_get_inputs_parser
from gridpath.auxiliary.module_list import determine_modules, load_modules
from gridpath.auxiliary.scenario_chars import OptionalFeatures, SubScenarios, \
get_subproblem_structure_from_db, SolverOptions


def write_model_inputs(
scenario_directory, subproblem_structure, loaded_modules, scenario_id,
subscenarios, conn
scenario_directory, subproblem_structure, modules_to_use, scenario_id,
subscenarios, db_path, n_parallel_subproblems
):
"""
For each module, load the inputs from the database and write out the inputs
into .tab files, which will be used to construct the optimization problem.
:param scenario_directory: local scenario directory
:param subproblem_structure: SubProblems object with info on the subproblem/stage
structure
:param loaded_modules: list of imported modules (Python <class 'module'>
:param subproblem_structure: SubProblems object with info on the
subproblem/stage structure
:param modules_to_use: list of imported modules (Python <class 'module'>
objects)
:param scenario_id: integer
:param subscenarios: SubScenarios object with all subscenario info
:param conn: database connection
:param db_path: database connection
:param n_parallel_subproblems: int; get inputs for subproblems in parallel
:return:
"""
subproblems_list = list(subproblem_structure.SUBPROBLEM_STAGES.keys())
# TODO: should probably delete the whole scenario directory first,
# as previous scenario with the same name could have had a different
# subproblem-stage structure
# Delete auxiliary files that may have existed before to avoid phantom
# files
delete_prior_aux_files(scenario_directory=scenario_directory)

# Determine whether we will have subproblem directories
if len(subproblem_structure.SUBPROBLEM_STAGES) == 1:
make_subproblem_directories = False
else:
make_subproblem_directories = True

# Do a few checks on parallelization request
if n_parallel_subproblems < 1:
warnings.warn("n_parallel_subproblem can't be 0. Solving without "
"parallelization.")
n_parallel_subproblems = 1

if len(subproblem_structure.SUBPROBLEM_STAGES) == 1 \
and n_parallel_subproblems > 1:
warnings.warn("Only one subproblem in scenario. Parallelization "
"not possible.")
n_parallel_subproblems = 1

# If no parallelization requested, loop through the subproblems
if n_parallel_subproblems == 1:
for subproblem in subproblem_structure.SUBPROBLEM_STAGES.keys():
get_inputs_for_subproblem(
scenario_directory=scenario_directory,
subproblem_structure=subproblem_structure,
subproblem=subproblem,
make_subproblem_directories=make_subproblem_directories,
modules_to_use=modules_to_use,
scenario_id=scenario_id,
subscenarios=subscenarios,
db_path=db_path
)
else:
pool_data = tuple([
[scenario_directory, subproblem_structure, subproblem,
make_subproblem_directories, modules_to_use, scenario_id,
subscenarios, db_path]
for subproblem in subproblem_structure.SUBPROBLEM_STAGES.keys()
])

pool = Pool(n_parallel_subproblems)
pool.map(get_inputs_for_subproblem_pool, pool_data)
pool.close()


def get_inputs_for_subproblem(
scenario_directory, subproblem_structure, subproblem,
make_subproblem_directories, modules_to_use, scenario_id,
subscenarios, db_path
):

for subproblem in subproblems_list:
stages = subproblem_structure.SUBPROBLEM_STAGES[subproblem]
loaded_modules = load_modules(modules_to_use=modules_to_use)

for stage in stages:
# if there are subproblems/stages, input directory will be nested
if len(subproblems_list) > 1 and len(stages) > 1:
pass
elif len(subproblems_list) > 1:
stage = ""
elif len(stages) > 1:
subproblem = ""
# First make inputs directory if needed
# If there are subproblems/stages, input directory will be nested
if make_subproblem_directories:
subproblem_str = str(subproblem)
else:
subproblem_str = ""

stages = subproblem_structure.SUBPROBLEM_STAGES[subproblem]
if len(stages) == 1:
make_stage_directories = False
else:
make_stage_directories = True

for stage in stages:
if make_stage_directories:
stage_str = str(stage)
else:
stage_str = ""
inputs_directory = os.path.join(
scenario_directory, subproblem_str, stage_str, "inputs"
)
if not os.path.exists(inputs_directory):
os.makedirs(inputs_directory)

# Delete input files that may have existed before to avoid
# phantom inputs
delete_prior_inputs(inputs_directory=inputs_directory)

# Write model input .tab files for each of the loaded_modules if
# appropriate. Note that all input files are saved in the
# input_directory, even the non-temporal inputs that are not
# dependent on the subproblem or stage. This simplifies the file
# structure at the expense of unnecessarily duplicating
# non-temporal input files such as projects.tab.
conn = connect_to_database(db_path=db_path)
for m in loaded_modules:
if hasattr(m, "write_model_inputs"):
m.write_model_inputs(
scenario_directory=scenario_directory,
scenario_id=scenario_id,
subscenarios=subscenarios,
subproblem=subproblem_str,
stage=stage_str,
conn=conn
)
else:
subproblem = ""
stage = ""
inputs_directory = os.path.join(
scenario_directory, str(subproblem), str(stage), "inputs"
pass
conn.close()

# If there are stages in the subproblem, we also need a pass-through
# directory and to write headers of the pass-through input file
# TODO: this should probably be moved to the module responsible for
# writing to this file
# TODO: how to deal with pass-through inputs
# TODO: we probably don't need a directory for the
# pass-through inputs, as it's only one file
if len(stages) > 1:
# Create the commitment pass-through file (also deletes any
# prior results)
# First create the pass-through directory if it doesn't
# exist
# TODO: need better handling of deleting prior results?
pass_through_directory = \
os.path.join(scenario_directory, str(subproblem),
"pass_through_inputs")
if not os.path.exists(pass_through_directory):
os.makedirs(pass_through_directory)
with open(
os.path.join(
pass_through_directory,
"fixed_commitment.tab"
), "w", newline=""
) as fixed_commitment_file:
fixed_commitment_writer = csv.writer(
fixed_commitment_file,
delimiter="\t", lineterminator="\n"
)
if not os.path.exists(inputs_directory):
os.makedirs(inputs_directory)

# Delete auxiliary and input files that may have existed before to
# avoid phantom files/inputs
delete_prior_aux_files(scenario_directory=scenario_directory)
delete_prior_inputs(inputs_directory=inputs_directory)

# Write model input .tab files for each of the loaded_modules if
# appropriate. Note that all input files are saved in the
# input_directory, even the non-temporal inputs that are not
# dependent on the subproblem or stage. This simplifies the file
# structure at the expense of unnecessarily duplicating
# non-temporal input files such as projects.tab.
for m in loaded_modules:
if hasattr(m, "write_model_inputs"):
m.write_model_inputs(
scenario_directory=scenario_directory,
scenario_id=scenario_id,
subscenarios=subscenarios,
subproblem=subproblem,
stage=stage,
conn=conn,
)
else:
pass

# If there are stages in the subproblem, we also need a pass-through
# directory and to write headers of the pass-through input file
# TODO: this should probably be moved to the module responsible for
# writing to this file
# TODO: how to deal with pass-through inputs
# TODO: we probably don't need a directory for the
# pass-through inputs, as it's only one file
if len(stages) > 1:
# Create the commitment pass-through file (also deletes any
# prior results)
# First create the pass-through directory if it doesn't
# exist
# TODO: need better handling of deleting prior results?
pass_through_directory = \
os.path.join(scenario_directory, str(subproblem),
"pass_through_inputs")
if not os.path.exists(pass_through_directory):
os.makedirs(pass_through_directory)
with open(
os.path.join(
pass_through_directory,
"fixed_commitment.tab"
), "w", newline=""
) as fixed_commitment_file:
fixed_commitment_writer = csv.writer(
fixed_commitment_file,
delimiter="\t", lineterminator="\n"
)
fixed_commitment_writer.writerow(
["project", "timepoint", "stage",
"final_commitment_stage", "commitment"])
fixed_commitment_writer.writerow(
["project", "timepoint", "stage",
"final_commitment_stage", "commitment"])


def get_inputs_for_subproblem_pool(pool_datum):
"""
Helper function to easily pass to pool.map if running subproblems in
parallel
"""
[scenario_directory, subproblem_structure, subproblem,
make_subproblem_directories, modules_to_use, scenario_id,
subscenarios, db_path] = pool_datum

get_inputs_for_subproblem(
scenario_directory=scenario_directory,
subproblem_structure=subproblem_structure,
subproblem=subproblem,
make_subproblem_directories=make_subproblem_directories,
modules_to_use=modules_to_use,
scenario_id=scenario_id,
subscenarios=subscenarios,
db_path=db_path
)


def delete_prior_aux_files(scenario_directory):
Expand Down Expand Up @@ -177,8 +264,10 @@ def parse_arguments(args):
"""
parser = ArgumentParser(
add_help=True,
parents=[get_db_parser(), get_required_e2e_arguments_parser()]
parents=[get_db_parser(), get_required_e2e_arguments_parser(),
get_parallel_get_inputs_parser()]
)

parsed_arguments = parser.parse_known_args(args=args)[0]

return parsed_arguments
Expand Down Expand Up @@ -365,16 +454,17 @@ def main(args=None):
# Figure out which modules to use and load the modules
modules_to_use = determine_modules(features=feature_list,
multi_stage=stages_flag)
loaded_modules = load_modules(modules_to_use=modules_to_use)

# Get appropriate inputs from database and write the .tab file model inputs
write_model_inputs(
scenario_directory=scenario_directory,
subproblem_structure=subproblem_structure,
loaded_modules=loaded_modules,
modules_to_use=modules_to_use,
scenario_id=scenario_id,
subscenarios=subscenarios,
conn=conn)
db_path=db_path,
n_parallel_subproblems=int(parsed_arguments.n_parallel_get_inputs)
)

# Save the list of optional features to a file (will be used to determine
# modules without database connection)
Expand Down
6 changes: 4 additions & 2 deletions gridpath/run_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
# GridPath modules
from db.common_functions import connect_to_database, spin_on_database_lock
from gridpath.common_functions import get_db_parser, get_solve_parser, \
get_required_e2e_arguments_parser, create_logs_directory_if_not_exists,\
get_required_e2e_arguments_parser, get_parallel_get_inputs_parser, \
get_parallel_solve_parser, create_logs_directory_if_not_exists,\
Logging, determine_scenario_directory
from gridpath import get_scenario_inputs, run_scenario, \
import_scenario_results, process_results
Expand All @@ -52,7 +53,8 @@ def parse_arguments(args):
parser = ArgumentParser(
add_help=True,
parents=[get_db_parser(), get_required_e2e_arguments_parser(),
get_solve_parser()]
get_solve_parser(), get_parallel_get_inputs_parser(),
get_parallel_solve_parser()]
)

# Arguments to skip an E2E step
Expand Down
Loading

0 comments on commit f0db9a2

Please sign in to comment.