diff --git a/pipelines/epinow2/generate_config.py b/pipelines/epinow2/generate_config.py index b28e1a3..b86831b 100644 --- a/pipelines/epinow2/generate_config.py +++ b/pipelines/epinow2/generate_config.py @@ -1,5 +1,6 @@ import os -from datetime import date, datetime +from datetime import date, datetime, timezone + from utils.epinow2.functions import ( generate_job_id, @@ -20,7 +21,7 @@ # Pull run parameters from environment state = os.environ.get("state", "all") - pathogen = os.environ.get("pathogen", "all") + disease = os.environ.get("disease", "all") report_date = os.environ.get("report_date", date.today()) reference_dates = os.environ.get("reference_date", [report_date]) data_source = os.environ.get("data_source", "nhsn") @@ -28,16 +29,14 @@ # Validate and sanitize args sanitized_args = validate_args( state=state, - pathogen=pathogen, + disease=disease, report_date=report_date, reference_dates=reference_dates, data_source=data_source, ) - # Generate job-specific parameters - as_of_date = int(datetime.timestamp(datetime.now())) + as_of_date = int(datetime.timestamp(datetime.now(timezone.utc))) job_id = generate_job_id() - # Generate task-specific configs task_configs = generate_task_configs( **sanitized_args, as_of_date=as_of_date, job_id=job_id diff --git a/utils/epinow2/constants.py b/utils/epinow2/constants.py index fdfa84e..706143a 100644 --- a/utils/epinow2/constants.py +++ b/utils/epinow2/constants.py @@ -14,7 +14,7 @@ }, } -states = [ +all_states = [ "AK", "AL", "AR", @@ -71,5 +71,5 @@ ] nssp_states_omit = ["AS", "FM", "MH", "NP", "PR", "PW", "VI", "MO", "GU"] -pathogens = ["COVID-19", "Influenza"] +all_diseases = ["COVID-19", "Influenza"] data_sources = ["nhsn", "nssp"] diff --git a/utils/epinow2/functions.py b/utils/epinow2/functions.py index 1da66c3..0a59920 100644 --- a/utils/epinow2/functions.py +++ b/utils/epinow2/functions.py @@ -3,9 +3,9 @@ from utils.epinow2.constants import ( nssp_states_omit, - pathogens, + all_diseases, shared_params, - states, + all_states, ) @@ -19,7 +19,7 @@ def generate_job_id() -> UUID: def validate_args( state: str | None = None, - pathogen: str | None = None, + disease: str | None = None, report_date: date | None = None, reference_dates: list[date] | None = None, data_source: str | None = None, @@ -28,7 +28,7 @@ def validate_args( in a standardized format for downstream use. Parameters: state: geography to run model - pathogen: pathogen to run + disease: disease to run report_date: date of model run reference_dates: array of reference (event) dates data_source: source of input data @@ -38,27 +38,30 @@ def validate_args( args_dict = {} if state == "all": if data_source == "nssp": - args_dict["state"] = list(set(states) - set(nssp_states_omit)) + args_dict["state"] = list(set(all_states) - set(nssp_states_omit)) elif data_source == "nhsn": - args_dict["state"] = states + args_dict["state"] = all_states else: - raise ValueError(f"Data source {data_source} not recognized.") - elif state not in states: + raise ValueError( + f"Data source {data_source} not recognized. Valid options are 'nssp' or 'nhsn'." + ) + elif state not in all_states: raise ValueError(f"State {state} not recognized.") else: args_dict["state"] = [state] - if pathogen == "all": - args_dict["pathogen"] = pathogens - elif pathogen not in pathogens: - raise ValueError(f"Pathogen {pathogen} not recognized.") + if disease == "all": + args_dict["disease"] = all_diseases + elif disease not in all_diseases: + raise ValueError( + f"Disease {disease} not recognized. Valid options are 'COVID-19' or 'Influenza'." + ) else: - args_dict["pathogen"] = [pathogen] + args_dict["disease"] = [disease] # Standardize reference_dates reference_dates = [ - date.fromisoformat(x) if isinstance(x, str) else x - for x in reference_dates + date.fromisoformat(x) if isinstance(x, str) else x for x in reference_dates ] # Check valid reference_date @@ -74,21 +77,21 @@ def validate_args( def generate_task_id( job_id: UUID | None = None, state: str | None = None, - pathogen: str | None = None, + disease: str | None = None, ) -> str: """Generates a task_id which consists of the hex code of the job_id and information on the state and pathogen. Parameters: job_id: UUID of job state: state being run - pathogen: pathogen being run + disease: disease being run """ - return f"{job_id.hex}_{state}_{pathogen}" + return f"{job_id.hex}_{state}_{disease}" def generate_task_configs( state: list | None = None, - pathogen: list | None = None, + disease: list | None = None, report_date: date | None = None, reference_dates: list[date] | None = None, as_of_date: int | None = None, @@ -99,7 +102,7 @@ def generate_task_configs( supplied parameters. Parameters: state: geography to run model - pathogen: pathogen to run + disease: pathogen to run report_date: date of model run reference_dates: array of reference (event) dates as_of_date: timestamp of model run @@ -110,14 +113,12 @@ def generate_task_configs( configs = [] # Create tasks for each state-pathogen combination for s in state: - for p in pathogen: + for d in disease: task_config = { "job_id": str(job_id), - "task_id": generate_task_id( - job_id=job_id, state=s, pathogen=p - ), + "task_id": generate_task_id(job_id=job_id, state=s, disease=d), "as_of_date": as_of_date, - "disease": p, + "disease": d, "geo_value": [s], "geo_type": "state" if s != "US" else "country", "parameters": shared_params["parameters"],