Skip to content

Commit

Permalink
reimplement date range checks in catalog query (NOAA-GFDL#626)
Browse files Browse the repository at this point in the history
* reimplement date range check in query stage
move prior CropDateRangeFunction routines to MDTFPreprocessorbase
delete old CropDateRangeFunction class and methods

* remove unused parm from check_group_daterange call
  • Loading branch information
wrongkindofdoctor authored Jul 17, 2024
1 parent b57f2cc commit 196a81e
Showing 1 changed file with 104 additions and 124 deletions.
228 changes: 104 additions & 124 deletions src/preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,128 +99,6 @@ def execute(self, var: varlist_util.VarlistEntry,
pass


class CropDateRangeFunction(PreprocessorFunctionBase):
"""A PreprocessorFunction which truncates the date range (time axis) of
the dataset to the user-requested analysis period.
"""

@staticmethod
def cast_to_cftime(dt: datetime.datetime, calendar):
"""Workaround to cast a python :py:class:`~datetime.datetime` object *dt*
to a
`cftime.datetime <https://unidata.github.io/cftime/api.html#cftime.datetime>`__
object with a specified *calendar*. Python's standard library has no
support for different calendars (all datetime objects use the proleptic
Gregorian calendar.)
"""
# NB "tm_mday" is not a typo
t = dt.timetuple()
tt = (getattr(t, attr_) for attr_ in
('tm_year', 'tm_mon', 'tm_mday', 'tm_hour', 'tm_min', 'tm_sec'))
return cftime.datetime(*tt, calendar=calendar)

def execute(self, var, ds, **kwargs):
"""Parse quantities related to the calendar for time-dependent data and
truncate the date range of model dataset *ds*.
In particular, the *var*\'s ``date_range`` attribute was set from the
user's input before we knew the calendar being used by the model. The
workaround here to cast those values into `cftime.datetime
<https://unidata.github.io/cftime/api.html#cftime.datetime>`__
objects so that they can be compared with the model data's time axis.
"""
tv_name = var.translation.name
t_coord = ds[tv_name].coords.get('time', None)
if t_coord is None:
var.log.debug("Exit %s for %s: time-independent.",
self.__class__.__name__, var.full_name)
return ds
# time coordinate will be a list if variable has
# multiple coordinates/coordinate attributes
cal = t_coord.encoding['calendar']
t_start = t_coord.values[0]
t_end = t_coord.values[-1]
t_size = t_coord.size
dt_range = var.T.range
# lower/upper are earliest/latest datetimes consistent with the date we
# were given, up to the precision that was specified (eg lower for "2000"
# would be Jan 1, 2000, and upper would be Dec 31).

# match date range hours to dataset hours if necessary
# this is a kluge to support the timeslice data and similar datasets that
# do not begin at hour zero
if dt_range.start.lower.hour != t_start.hour:
var.log.info("Variable %s data starts at hour %s", var.full_name, t_start.hour)
dt_start_upper_new = datetime.datetime(dt_range.start.upper.year,
dt_range.start.upper.month,
dt_range.start.upper.day,
t_start.hour,
t_start.minute,
t_start.second)
dt_start_lower_new = datetime.datetime(dt_range.start.lower.year,
dt_range.start.lower.month,
dt_range.start.lower.day,
t_start.hour,
t_start.minute,
t_start.second)
dt_start_lower = self.cast_to_cftime(dt_start_lower_new, cal)
dt_start_upper = self.cast_to_cftime(dt_start_upper_new, cal)
else:
dt_start_lower = self.cast_to_cftime(dt_range.start.lower, cal)
dt_start_upper = self.cast_to_cftime(dt_range.start.upper, cal)
if dt_range.end.lower.hour != t_end.hour:
var.log.info("Variable %s data ends at hour %s", var.full_name, t_end.hour)
dt_end_lower_new = datetime.datetime(dt_range.end.lower.year,
dt_range.end.lower.month,
dt_range.end.lower.day,
t_end.hour,
t_end.minute,
t_end.second)
dt_end_upper_new = datetime.datetime(dt_range.end.upper.year,
dt_range.end.upper.month,
dt_range.end.upper.day,
t_end.hour,
t_end.minute,
t_end.second)
dt_end_lower = self.cast_to_cftime(dt_end_lower_new, cal)
dt_end_upper = self.cast_to_cftime(dt_end_upper_new, cal)
else:
dt_end_lower = self.cast_to_cftime(dt_range.end.lower, cal)
dt_end_upper = self.cast_to_cftime(dt_range.end.upper, cal)

if t_start > dt_start_upper:
err_str = (f"Error: dataset start ({t_start}) is after "
f"requested date range start ({dt_start_upper}).")
var.log.error(err_str)
raise IndexError(err_str)
if t_end < dt_end_lower:
err_str = (f"Error: dataset end ({t_end}) is before "
f"requested date range end ({dt_end_lower}).")
var.log.error(err_str)
raise IndexError(err_str)
ds = ds.sel({t_coord.name: slice(dt_start_lower, dt_end_upper)})
new_t = ds[tv_name].coords.get('time', None)
nt_size = new_t.size
nt_values = new_t.values
if t_size == nt_size:
var.log.info(("Requested dates for %s coincide with range of dataset "
"'%s -- %s'; left unmodified."),
var.full_name,
nt_values[0].strftime('%Y-%m-%d:%H-%M-%S'),
nt_values[-1].strftime('%Y-%m-%d:%H-%M-%S'),
)
else:
var.log.info("Cropped date range of %s from '%s -- %s' to '%s -- %s'.",
var.full_name,
t_start.strftime('%Y-%m-%d:%H-%M-%S'),
t_end.strftime('%Y-%m-%d:%H-%M-%S'),
nt_values[0].strftime('%Y-%m-%d:%H-%M-%S'),
nt_values[-1].strftime('%Y-%m-%d:%H-%M-%S'),
tags=util.ObjectLogTag.NC_HISTORY
)
return ds


class PrecipRateToFluxFunction(PreprocessorFunctionBase):
"""A PreprocessorFunction which converts the dependent variable's units, for
the specific case of precipitation. Flux and precip rate differ by a factor
Expand Down Expand Up @@ -808,10 +686,105 @@ def _functions(self):
return [
AssociatedVariablesFunction,
PrecipRateToFluxFunction, ConvertUnitsFunction,
ExtractLevelFunction, RenameVariablesFunction,
ExtractLevelFunction, RenameVariablesFunction
]

def check_group_daterange(self, group_df: pd.DataFrame, log=_log) -> pd.DataFrame:
def cast_to_cftime(self, dt: datetime.datetime, calendar):
"""Workaround to cast a python :py:class:`~datetime.datetime` object *dt*
to a
`cftime.datetime <https://unidata.github.io/cftime/api.html#cftime.datetime>`__
object with a specified *calendar*. Python's standard library has no
support for different calendars (all datetime objects use the proleptic
Gregorian calendar.)
"""
# NB "tm_mday" is not a typo
t = dt.timetuple()
tt = (getattr(t, attr_) for attr_ in
('tm_year', 'tm_mon', 'tm_mday', 'tm_hour', 'tm_min', 'tm_sec'))
return cftime.datetime(*tt, calendar=calendar)

def check_time_bounds(self, ds, var: translation.TranslatedVarlistEntry, freq: str):
"""Parse quantities related to the calendar for time-dependent data and
truncate the date range of model dataset *ds*.
In particular, the *var*\'s ``date_range`` attribute was set from the
user's input before we knew the calendar being used by the model. The
workaround here to cast those values into `cftime.datetime
<https://unidata.github.io/cftime/api.html#cftime.datetime>`__
objects so that they can be compared with the model data's time axis.
"""
dt_range = var.T.range
ds_decode = xr.decode_cf(ds)
t_coord = ds_decode[var.T.name]

# time coordinate will be a list if variable has
# multiple coordinates/coordinate attributes
if hasattr(t_coord, 'calendar'):
cal = t_coord.calendar
elif 'calendar' in t_coord.encoding:
cal = t_coord.encoding['calendar']
else:
raise ValueError(f'calendar attribute not found for catalog time coord')
t_start = t_coord.values[0]
t_end = t_coord.values[-1]
# lower/upper are earliest/latest datetimes consistent with the date we
# were given, up to the precision that was specified (eg lower for "2000"
# would be Jan 1, 2000, and upper would be Dec 31).

# match date range hours to dataset hours if necessary
# to accommodate timeslice data and other datasets that
# do not begin at hour zero
if dt_range.start.lower.hour != t_start.hour:
var.log.info("Variable %s data starts at hour %s", var.full_name, t_start.hour)
dt_start_upper_new = datetime.datetime(dt_range.start.upper.year,
dt_range.start.upper.month,
dt_range.start.upper.day,
t_start.hour,
t_start.minute,
t_start.second)
dt_start_upper = self.cast_to_cftime(dt_start_upper_new, cal)
else:
dt_start_upper = self.cast_to_cftime(dt_range.start.upper, cal)
if dt_range.end.lower.hour != t_end.hour:
var.log.info("Variable %s data ends at hour %s", var.full_name, t_end.hour)
dt_end_lower_new = datetime.datetime(dt_range.end.lower.year,
dt_range.end.lower.month,
dt_range.end.lower.day,
t_end.hour,
t_end.minute,
t_end.second)
dt_end_lower = self.cast_to_cftime(dt_end_lower_new, cal)
else:
dt_end_lower = self.cast_to_cftime(dt_range.end.lower, cal)

# only check that up to monthly precision for monthly or longer data
if freq in ['mon', 'year']:
if t_start.year > dt_start_upper.year or\
t_start.year == dt_start_upper.year and t_start.month > dt_start_upper.month:
err_str = (f"Error: dataset start ({t_start}) is after "
f"requested date range start ({dt_start_upper}).")
var.log.error(err_str)
raise IndexError(err_str)
if t_end.year < dt_end_lower.year or\
t_end.year == dt_end_lower.year and t_end.month < dt_end_lower.month:
err_str = (f"Error: dataset end ({t_end}) is before "
f"requested date range end ({dt_end_lower}).")
var.log.error(err_str)
raise IndexError(err_str)
else:
if t_start > dt_start_upper:
err_str = (f"Error: dataset start ({t_start}) is after "
f"requested date range start ({dt_start_upper}).")
var.log.error(err_str)
raise IndexError(err_str)
if t_end < dt_end_lower:
err_str = (f"Error: dataset end ({t_end}) is before "
f"requested date range end ({dt_end_lower}).")
var.log.error(err_str)
raise IndexError(err_str)

def check_group_daterange(self, group_df: pd.DataFrame,
log=_log) -> pd.DataFrame:
"""Sort the files found for each experiment by date, verify that
the date ranges contained in the files are contiguous in time and that
the date range of the files spans the query date range.
Expand Down Expand Up @@ -858,6 +831,7 @@ def check_group_daterange(self, group_df: pd.DataFrame, log=_log) -> pd.DataFram
group_df['start_time'] = pd.to_datetime(start_time_vals, format=date_format)
# convert end_times to date_format for all files in query
group_df['end_time'] = pd.to_datetime(end_time_vals, format=date_format)

# method throws ValueError if ranges aren't contiguous
dates_df = group_df.loc[:, ['start_time', 'end_time']]
date_range_vals = []
Expand Down Expand Up @@ -1004,6 +978,12 @@ def query_catalog(self,
cat_dict[case_name] = var_xr
else:
cat_dict[case_name] = xr.merge([cat_dict[case_name], var_xr])
# check that start and end times include runtime startdate and enddate
try:
self.check_time_bounds(cat_dict[case_name], var.translation, freq)
except LookupError:
var.log.error(f'Data not found in catalog query for {var.translation.name} for requested date_range.')
raise SystemExit("Terminating program")
return cat_dict

def edit_request(self, v: varlist_util.VarlistEntry, **kwargs):
Expand Down

0 comments on commit 196a81e

Please sign in to comment.