From a4f3a2522e0ce6f2f48fa24791b277dfbf8da2b3 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Thu, 27 Apr 2023 18:08:32 -0600 Subject: [PATCH 1/2] Improve DFP period functionality to allow for better sampling and ignoring period --- .../dfp/stages/dfp_file_batcher_stage.py | 93 ++++++++++--------- .../production/morpheus/dfp_duo_pipeline.py | 6 +- 2 files changed, 54 insertions(+), 45 deletions(-) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py index a62759d67c..95d7015509 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py @@ -14,6 +14,7 @@ import logging import typing +import warnings from collections import namedtuple from datetime import datetime @@ -37,17 +38,29 @@ def __init__(self, c: Config, date_conversion_func, period="D", - sampling_rate_s=0, + sampling_rate_s: typing.Optional[int] = None, start_time: datetime = None, - end_time: datetime = None): + end_time: datetime = None, + sampling: typing.Union[str, float, int, None] = None): super().__init__(c) self._date_conversion_func = date_conversion_func - self._sampling_rate_s = sampling_rate_s self._period = period self._start_time = start_time self._end_time = end_time + if (sampling_rate_s is not None and sampling_rate_s > 0): + assert sampling is None, "Cannot set both sampling and sampling_rate_s at the same time" + + # Show the deprecation message + warnings.warn(("The `sampling_rate_s` argument has been deprecated. " + "Please use `sampling={sampling_rate_s}S` instead"), + DeprecationWarning) + + sampling = f"{sampling_rate_s}S" + + self._sampling = sampling + @property def name(self) -> str: return "dfp-file-batcher" @@ -60,6 +73,10 @@ def accepted_types(self) -> typing.Tuple: def on_data(self, file_objects: fsspec.core.OpenFiles): + timestamps = [] + full_names = [] + file_objs = [] + # Determine the date of the file, and apply the window filter if we have one ts_and_files = [] for file_object in file_objects: @@ -70,60 +87,52 @@ def on_data(self, file_objects: fsspec.core.OpenFiles): or (self._end_time is not None and ts > self._end_time)): continue - ts_and_files.append(TimestampFileObj(ts, file_object)) - - # sort the incoming data by date - ts_and_files.sort(key=lambda x: x.timestamp) + timestamps.append(ts) + full_names.append(file_object.full_name) + file_objs.append(file_object) - # Create a dataframe with the incoming metadata - if ((len(ts_and_files) > 1) and (self._sampling_rate_s > 0)): - file_sampled_list = [] + # Build the dataframe + df = pd.DataFrame(index=pd.DatetimeIndex(timestamps), data={"filename": full_names, "objects": file_objects}) - ts_last = ts_and_files[0].timestamp + # sort the incoming data by date + df.sort_index(inplace=True) - file_sampled_list.append(ts_and_files[0]) + # If sampling was provided, perform that here + if (self._sampling is not None): - for idx in range(1, len(ts_and_files)): - ts = ts_and_files[idx].timestamp + if (isinstance(self._sampling, str)): + # We have a frequency for sampling. Resample by the frequency, taking the first + df = df.resample(self._sampling).first().dropna() - if ((ts - ts_last).seconds >= self._sampling_rate_s): + elif (self._sampling < 1.0): + # Sample a fraction of the rows + df = df.sample(frac=self._sampling).sort_index() - ts_and_files.append(ts_and_files[idx]) - ts_last = ts else: - ts_and_files = file_sampled_list + # Sample a fixed amount + df = df.sample(n=self._sampling).sort_index() - df = pd.DataFrame() + # Early exit if no files were found + if (len(df) == 0): + return [] - timestamps = [] - full_names = [] - file_objs = [] - for (ts, file_object) in ts_and_files: - timestamps.append(ts) - full_names.append(file_object.full_name) - file_objs.append(file_object) + if (self._period is None): + # No period was set so group them all into one single batch + return [(fsspec.core.OpenFiles(df["objects"].to_list(), mode=file_objects.mode, fs=file_objects.fs), + len(df))] - df["dfp_timestamp"] = timestamps - df["key"] = full_names - df["objects"] = file_objs + # Now group the rows by the period + resampled = df.resample(self._period) - output_batches = [] + n_groups = len(resampled) - if len(df) > 0: - # Now split by the batching settings - df_period = df["dfp_timestamp"].dt.to_period(self._period) - - period_gb = df.groupby(df_period) + output_batches = [] - n_groups = len(period_gb) - for group in period_gb.groups: - period_df = period_gb.get_group(group) + for _, period_df in resampled: - obj_list = fsspec.core.OpenFiles(period_df["objects"].to_list(), - mode=file_objects.mode, - fs=file_objects.fs) + obj_list = fsspec.core.OpenFiles(period_df["objects"].to_list(), mode=file_objects.mode, fs=file_objects.fs) - output_batches.append((obj_list, n_groups)) + output_batches.append((obj_list, n_groups)) return output_batches diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py b/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py index 588f0a448e..86aa39ea63 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py @@ -111,7 +111,7 @@ type=int, default=0, show_envvar=True, - help="Minimum time step, in milliseconds, between object logs.") + help="Samples the input data files allowing only one file per bin defined by `sample_rate_s`.") @click.option( "--input_file", "-f", @@ -248,8 +248,8 @@ def run_pipeline(train_users, # Batch files into buckets by time. Use the default ISO date extractor from the filename pipeline.add_stage( DFPFileBatcherStage(config, - period="D", - sampling_rate_s=sample_rate_s, + period=None, + sampling=f"{sample_rate_s}S" if sample_rate_s > 0 else None, date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex), start_time=start_time, end_time=end_time)) From 033443955281ef9d8e2cae0a95cb735615cd6434 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Fri, 28 Apr 2023 13:08:52 -0600 Subject: [PATCH 2/2] Style cleanup --- .../production/morpheus/dfp/stages/dfp_file_batcher_stage.py | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py index 95d7015509..183ed9fcda 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py @@ -78,7 +78,6 @@ def on_data(self, file_objects: fsspec.core.OpenFiles): file_objs = [] # Determine the date of the file, and apply the window filter if we have one - ts_and_files = [] for file_object in file_objects: ts = self._date_conversion_func(file_object)