diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 589bcfae..f383af28 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,6 +10,7 @@ Unreleased - ipynb files from docs/examples are now also used as (optional) tests (PR `#263 `_) - ``yapf`` for code formatting (see developers guide) (Fix #248, PR `#263 `_) - validation framework option to force dataset combinations that include reference dataset updated (PR `#265 `_) +- Added `TimestampAdapter` to the validation framework to deal with datasets that have different date/time specification fields (PR `#268 `_) Version 0.13.4, 2022-01-12 ========================== diff --git a/src/pytesmo/validation_framework/adapters.py b/src/pytesmo/validation_framework/adapters.py index db9024e0..cca2ef30 100644 --- a/src/pytesmo/validation_framework/adapters.py +++ b/src/pytesmo/validation_framework/adapters.py @@ -24,18 +24,19 @@ # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - """ Module containing adapters that can be used together with the validation framework. """ import operator + +import pandas as pd from pytesmo.time_series.anomaly import calc_anomaly from pytesmo.time_series.anomaly import calc_climatology from pytesmo.utils import deprecated from pandas import DataFrame +import numpy as np import warnings _op_lookup = { @@ -89,11 +90,9 @@ def __init__(self, cls, data_property_name="data", read_name=None): setattr(self, read_name, self._adapt_custom) def __get_dataframe(self, data): - if ( - (not isinstance(data, DataFrame)) - and (hasattr(data, self.data_property_name)) - and (isinstance(getattr(data, self.data_property_name), DataFrame)) - ): + if ((not isinstance(data, DataFrame)) and + (hasattr(data, self.data_property_name)) and + (isinstance(getattr(data, self.data_property_name), DataFrame))): data = getattr(data, self.data_property_name) return data @@ -101,15 +100,14 @@ def __drop_tz_info(self, data): if hasattr(data.index, "tz") and (data.index.tz is not None): warnings.warn( f"Dropping timezone information ({data.index.tz})" - f" for data from reader {self.cls.__class__.__name__}" - ) + f" for data from reader {self.cls.__class__.__name__}") data.index = data.index.tz_convert(None) return data def _adapt(self, df: DataFrame) -> DataFrame: # drop time zone info and extract df from ASCAT TS object - return self.__drop_tz_info(self.__get_dataframe(df) - if df is not None else DataFrame()) + return self.__drop_tz_info( + self.__get_dataframe(df) if df is not None else DataFrame()) def _adapt_custom(self, *args, **kwargs): # modifies data from whatever function was set as `read_name`. @@ -133,10 +131,8 @@ def grid(self): return self.cls.grid -@deprecated( - "`MaskingAdapter` is deprecated, use `SelfMaskingAdapter` " - "or `AdvancedMaskingAdapter` instead." -) +@deprecated("`MaskingAdapter` is deprecated, use `SelfMaskingAdapter` " + "or `AdvancedMaskingAdapter` instead.") class MaskingAdapter(BasicAdapter): """ Transform the given class to return a boolean dataset given the operator @@ -370,8 +366,7 @@ def _adapt(self, data): ite = self.columns for column in ite: data[column] = calc_anomaly( - data[column], window_size=self.window_size - ) + data[column], window_size=self.window_size) return data @@ -502,3 +497,176 @@ def _adapt(self, data: DataFrame) -> DataFrame: new_col = data[columns].apply(self.func, **self.func_kwargs) data[self.new_name] = new_col return data + + +class TimestampAdapter(BasicAdapter): + """ + Class that combines two or more timestamp fields to a single exact + measurement time. The fields of interest specify: + + 1. A basic observation time (e.g. days at midnight) which can + be expressed in timestamp (YYYY-mm-dd) or with respect to a + reference time (days since YYYY-mm-dd) + 2. One or more (minute, s, µs) offset times to be added cumulatively + + ------------------- + Example input: + + variable base_time [w.r.t. 2005-02-01] offset [min] offset [sec] + 100 0.889751 100.0 38.0 999.0 + 101 0.108279 101.0 40.0 1000.0 + 102 -1.201708 102.0 39.0 999.0 + + Example output: + + variable + 2005-05-12 00:55:42 0.889751 + 2005-05-13 00:57:39 0.108279 + 2005-05-14 00:56:38 -1.201708 + + Parameters + ---------- + cls: object + Reader object, has to have a `read_ts` or `read` method or a method + name must be specified in the `read_name` kwarg. The same method will + be available for the adapted version of the reader. + time_offset_fields: str, list or None + name or list of names of the fields that provide information on the time offset. + If a list is given, all values will contribute to the offset, assuming that + each refers to the previous. For instance: + offset = minutes + seconds in the minute + µs in the second + NOTE: np.nan values are counted as 0 offset + NOTE: if None, no offset is considered + time_units: str or list + time units that the time_offset_fields are specified in. If a list is given, + it should have the same size as the 'time_offset_fields' parameter + Can be any of the np.datetime[64] units: + https://numpy.org/doc/stable/reference/arrays.datetime.html + base_time_field: str, optional. Default is None. + If a name is provided, the generic time field will be searched for + in the columns; otherwise, it is assumed to be the index + NOTE: np.nan values in this field are dropped + base_time_reference: str, optional. Default is None. + String of format 'YYYY-mm-dd' that can be specified to tranform the + 'base_time_field' from [units since base_time_reference] to + np.datetime[64]. If not provided, it will be assumed that the base_time_field + is already in np.datetime[64] units + base_time_units: str, optional. Default is "D" + Units that the base_time_field is specified in. Only applicable with 'base_time_reference' + replace_index: bool, optional. Default is True. + If True, the exact timestamp is used as index. Else, it will be added + to the dataframe on the column 'output_field' + output_field: str, optional. Default is None. + If a name is specified, an additional column is generated under the name, + with the exact timestamp. Only with 'replace_index' == False + drop_original: bool, optional. Default is True. + Whether the base_time_field and time_offset_fields should be dropped in the + final DataFrame + """ + + def __init__(self, + cls: object, + time_offset_fields: str or list, + time_units: str or list = "s", + base_time_field: str = None, + base_time_reference: str = None, + base_time_units: str = "D", + replace_index: bool = True, + output_field: str = None, + drop_original: bool = True, + **kwargs): + super().__init__(cls, **kwargs) + + self.time_offset_fields = time_offset_fields if isinstance( + time_offset_fields, list) else [time_offset_fields] + self.time_units = time_units if isinstance(time_units, + list) else [time_units] + + self.base_time_field = base_time_field + self.base_time_reference = np.datetime64( + base_time_reference) if base_time_reference is not None else None + self.base_time_units = base_time_units + + self.replace_index = replace_index + if not replace_index and output_field is None: + raise ValueError( + "'output_field' should be specified in case the new timestamp" + "should not be used as index. Alternatively, set 'replace_index' to True" + ) + elif replace_index and output_field is not None: + warnings.warn( + "Ignoring the 'output_field' value. Set 'replace_index' to True to" + "avoid this behavior") + else: + self.output_field = output_field + + self.drop_original = drop_original + + def convert_generic(self, time_arr: np.array) -> np.array: + """Convert the generic time field to np.datetime[64] dtype""" + time_delta = time_arr.astype(int).astype('timedelta64[D]') + time_date = np.full(time_delta.shape, + self.base_time_reference) + time_delta + + return time_date + + def add_offset_cumulative(self, data: DataFrame) -> np.array: + """Return an array of timedelta calculated with all the time_offset_fields""" + total_offset = np.full(data.index.shape, 0, dtype='timedelta64[s]') + for field, unit in zip(self.time_offset_fields, self.time_units): + total_offset += data[field].map( + lambda x: np.timedelta64(int(x), unit) + if not np.isnan(x) else np.timedelta64(0, unit)).values + + return total_offset + + def _adapt(self, data: DataFrame) -> DataFrame: + """ + Adapt the timestamps in the original with the specified offset + NOTE: assumes the index dtype is 'datetime64[ns]' + """ + data = super()._adapt(data) + original = data.copy() + + # Get the generic time array + if self.base_time_field is not None: + base_time = data[self.base_time_field] + else: + base_time = data.index + + # Take only the valid dates + data = data[base_time.notna()] + base_time_values = base_time.dropna().values + + # Make sure the dataframes contains values after dropna() + if data.empty: + warnings.warn( + "The input DataFrame is either empty or has all NaT values" + " in the specified `base_time_field`, therefore the original" + " non-adapted is returned") + + return original + + if self.base_time_reference is not None: + base_time_values = self.convert_generic(base_time_values) + + # If no offset is specified + if self.time_offset_fields is None: + exact_time = base_time_values + else: + # Add time offset to the generic time + offset = self.add_offset_cumulative(data) + exact_time = base_time_values + offset + + # generate the final frame + if not self.replace_index: + data[self.output_field] = exact_time + else: + data.index = exact_time + + if self.drop_original: + data.drop(columns=self.time_offset_fields, inplace=True) + if self.base_time_field in data.columns: + data.drop(columns=[self.base_time_field], inplace=True) + + return data diff --git a/tests/test_validation_framework/test_adapters.py b/tests/test_validation_framework/test_adapters.py index 4adb26c8..4b1c94f8 100644 --- a/tests/test_validation_framework/test_adapters.py +++ b/tests/test_validation_framework/test_adapters.py @@ -1,5 +1,6 @@ import pytest +from src.pytesmo.validation_framework.adapters import TimestampAdapter """ Test for the adapters. """ @@ -11,11 +12,9 @@ import pandas as pd import warnings -from pytesmo.validation_framework.adapters import ( - MaskingAdapter, - AdvancedMaskingAdapter, - ColumnCombineAdapter -) +from pytesmo.validation_framework.adapters import (MaskingAdapter, + AdvancedMaskingAdapter, + ColumnCombineAdapter) from pytesmo.validation_framework.adapters import SelfMaskingAdapter from pytesmo.validation_framework.adapters import AnomalyAdapter from pytesmo.validation_framework.adapters import AnomalyClimAdapter @@ -36,23 +35,21 @@ def test_masking_adapter(): nptest.assert_almost_equal( data_masked["x"].values, np.concatenate( - [np.ones((10), dtype=bool), np.zeros((10), dtype=bool)] - ), + [np.ones((10), dtype=bool), + np.zeros((10), dtype=bool)]), ) nptest.assert_almost_equal( data_masked2["x"].values, np.concatenate( - [np.ones((10), dtype=bool), np.zeros((10), dtype=bool)] - ), + [np.ones((10), dtype=bool), + np.zeros((10), dtype=bool)]), ) if col is None: - nptest.assert_almost_equal( - data_masked["y"].values, np.ones((20), dtype=bool) - ) - nptest.assert_almost_equal( - data_masked2["y"].values, np.ones((20), dtype=bool) - ) + nptest.assert_almost_equal(data_masked["y"].values, + np.ones((20), dtype=bool)) + nptest.assert_almost_equal(data_masked2["y"].values, + np.ones((20), dtype=bool)) def test_self_masking_adapter(): @@ -145,14 +142,15 @@ def test_anomaly_clim_adapter_one_column(): def test_adapters_custom_fct_name(): + def assert_all_read_fcts(reader): - assert(np.all(reader.read() == reader.read_ts())) - assert(np.all(reader.read() == reader.alias_read())) + assert (np.all(reader.read() == reader.read_ts())) + assert (np.all(reader.read() == reader.alias_read())) base = TestDataset("", n=20) assert_all_read_fcts(base) - sma = SelfMaskingAdapter(base, '>=', 5, column_name='y', - read_name='alias_read') + sma = SelfMaskingAdapter( + base, '>=', 5, column_name='y', read_name='alias_read') assert_all_read_fcts(sma) smanom = AnomalyAdapter(sma, read_name='alias_read') assert_all_read_fcts(smanom) @@ -221,23 +219,21 @@ def test_adapters_with_ascat(): class TestTimezoneReader(object): + def read(self, *args, **kwargs): data = np.arange(5.0) data[3] = np.nan return pd.DataFrame( {"data": data}, index=pd.date_range( - datetime(2007, 1, 1, 0), "2007-01-05", freq="D", tz="UTC" - ), + datetime(2007, 1, 1, 0), "2007-01-05", freq="D", tz="UTC"), ) def read_ts(self, *args, **kwargs): return self.read(*args, **kwargs) -@pytest.mark.filterwarnings( - "ignore:Dropping timezone information:UserWarning" -) +@pytest.mark.filterwarnings("ignore:Dropping timezone information:UserWarning") def test_timezone_removal(): tz_reader = TestTimezoneReader() @@ -258,8 +254,11 @@ def test_column_comb_adapter(): ds = TestDataset("", n=20) orig = ds.read() ds_adapted = ColumnCombineAdapter( - ds, func=pd.DataFrame.mean, columns=["x", "y"], - func_kwargs={'skipna': True}, new_name='xy_mean') + ds, + func=pd.DataFrame.mean, + columns=["x", "y"], + func_kwargs={'skipna': True}, + new_name='xy_mean') ds_mean1 = ds_adapted.read_ts() ds_mean2 = ds_adapted.read() @@ -268,3 +267,172 @@ def test_column_comb_adapter(): nptest.assert_equal(ds_mean["y"].values, orig["y"].values) nptest.assert_equal(ds_mean["xy_mean"].values, (ds_mean["x"] + ds_mean["y"]) / 2.) + + +def test_timestamp_adapter(): + ds = TestDataset("", n=20) + + # Simple case + # ================ + + index = np.arange('2005-02', '2005-03', dtype='datetime64[D]') + sm_var = np.random.randn(*index.shape) + time_offset_field = np.random.normal( + loc=1000.0, scale=1.0, size=index.shape).astype(int) + + def _read(): + return pd.DataFrame( + data=np.array([sm_var, time_offset_field]).transpose(), + columns=["sm", "offset"], + index=index) + + setattr(ds, "read", _read) + origin = ds.read().drop(columns="offset") + + adapted_ds = TimestampAdapter( + ds, time_offset_fields="offset", time_units="s") + adapted = adapted_ds.read() + + # Date should be unchanges as we are using a ~1000 sec offset + assert (origin.index.date == adapted.index.date).all() + # The offset is expressed in seconds + assert origin.index[0] + np.timedelta64(time_offset_field[0], + "s") == adapted.index[0] + # The dataframe is integral + assert (origin.columns == adapted.columns).all() + + adapted_ds = TimestampAdapter( + ds, time_offset_fields="offset", time_units="m") + adapted = adapted_ds.read() + + # The offset is expressed in minutes + assert origin.index[0] + np.timedelta64(time_offset_field[0], + "m") == adapted.index[0] + + # This time we do not drop the columns + origin = ds.read() + adapted_ds = TimestampAdapter( + ds, time_offset_fields="offset", time_units="s", drop_original=False) + adapted = adapted_ds.read() + + # The dataframe is integral + assert (origin.columns == adapted.columns).all() + + # test use of new column + # ----------------------- + adapted_ds = TimestampAdapter( + ds, + time_offset_fields="offset", + time_units="s", + replace_index=False, + output_field="exact_timestamp") + adapted = adapted_ds.read() + assert (adapted.columns == ["sm", "exact_timestamp"]).all() + + # test NaNs in offset and generic time + # ----------------------- + index = np.arange('2005-02', '2005-03', dtype='datetime64[D]') + index[4] = np.datetime64("NaT") + sm_var = np.random.randn(*index.shape) + time_offset_field = np.random.normal( + loc=1000.0, scale=1.0, size=index.shape) + time_offset_field[7] = np.nan + + def _read_nans(): + return pd.DataFrame( + data=np.array([sm_var, time_offset_field]).transpose(), + columns=["sm", "offset"], + index=index) + + setattr(ds, "read", _read_nans) + origin = ds.read() + + adapted_ds = TimestampAdapter( + ds, time_offset_fields="offset", time_units="s") + adapted = adapted_ds.read() + + # One index (NaT) value should be dropped + assert len(adapted.index) == len(origin.index) - 1 + # The Nan offset should be interpreted as 0 + assert adapted.index[6] == datetime(2005, 2, 8, 0, 0) + + # test all NaNs in dataframe + # ----------------------- + index = np.arange('2005-02', '2005-03', dtype='datetime64[D]') + index[:] = np.datetime64("NaT") + sm_var = np.random.randn(*index.shape) + time_offset_field = np.random.normal( + loc=1000.0, scale=1.0, size=index.shape) + time_offset_field[7] = np.nan + + def _read_all_nans(): + return pd.DataFrame( + data=np.array([sm_var, time_offset_field]).transpose(), + columns=["sm", "offset"], + index=index) + + setattr(ds, "read", _read_all_nans) + origin = ds.read() + + adapted_ds = TimestampAdapter( + ds, time_offset_fields="offset", time_units="s") + adapted = adapted_ds.read() + + # The original should be returned + pd.testing.assert_frame_equal(origin, adapted) + + # test empty Dataframe + # ----------------------- + + def _read_empty(): + return pd.DataFrame(columns=["sm", "offset"],) + + setattr(ds, "read", _read_empty) + origin = ds.read() + + adapted_ds = TimestampAdapter( + ds, time_offset_fields="offset", time_units="s") + adapted = adapted_ds.read() + + # The original should be returned + assert adapted.empty + assert (adapted.columns == ["sm", "offset"]).all() + + # Complex case + # ================ + base_time = np.arange(100, 200) + index = base_time.copy() + sm_var = np.random.randn(*index.shape) + time_offset_field_min = np.random.normal( + loc=40.0, scale=1.0, size=index.shape).astype(int) + time_offset_field_sec = np.random.normal( + loc=1000.0, scale=1.0, size=index.shape).astype(int) + + def _read_complex(): + return pd.DataFrame( + data=np.array([ + sm_var, base_time, time_offset_field_min, time_offset_field_sec + ]).transpose(), + columns=["sm", "base_time", "offset_min", "offset_sec"], + index=index) + + setattr(ds, "read", _read_complex) + origin = ds.read() + + adapted_ds = TimestampAdapter( + ds, + time_offset_fields=["offset_min", "offset_sec"], + time_units=["m", "s"], + base_time_field="base_time", + base_time_reference="2005-02-01") + + adapted = adapted_ds.read() + + should_be = origin.apply( + lambda row: np.datetime64("2005-02-01") + np.timedelta64( + int(row["base_time"]), "D") + np.timedelta64( + int(row["offset_min"]), "m") + np.timedelta64( + int(row["offset_sec"]), "s"), + axis=1).values + + assert (adapted.index.values == should_be).all()