Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve DFP period functionality to allow for better sampling and ignoring period #912

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import logging
import typing
import warnings
from collections import namedtuple
from datetime import datetime

Expand All @@ -37,17 +38,29 @@ def __init__(self,
c: Config,
date_conversion_func,
period="D",
sampling_rate_s=0,
sampling_rate_s: typing.Optional[int] = None,
start_time: datetime = None,
end_time: datetime = None):
end_time: datetime = None,
sampling: typing.Union[str, float, int, None] = None):
super().__init__(c)

self._date_conversion_func = date_conversion_func
self._sampling_rate_s = sampling_rate_s
self._period = period
self._start_time = start_time
self._end_time = end_time

if (sampling_rate_s is not None and sampling_rate_s > 0):
assert sampling is None, "Cannot set both sampling and sampling_rate_s at the same time"

# Show the deprecation message
warnings.warn(("The `sampling_rate_s` argument has been deprecated. "
"Please use `sampling={sampling_rate_s}S` instead"),
DeprecationWarning)

sampling = f"{sampling_rate_s}S"

self._sampling = sampling

@property
def name(self) -> str:
return "dfp-file-batcher"
Expand All @@ -60,8 +73,11 @@ def accepted_types(self) -> typing.Tuple:

def on_data(self, file_objects: fsspec.core.OpenFiles):

timestamps = []
full_names = []
file_objs = []

# Determine the date of the file, and apply the window filter if we have one
ts_and_files = []
for file_object in file_objects:
ts = self._date_conversion_func(file_object)

Expand All @@ -70,60 +86,52 @@ def on_data(self, file_objects: fsspec.core.OpenFiles):
or (self._end_time is not None and ts > self._end_time)):
continue

ts_and_files.append(TimestampFileObj(ts, file_object))

# sort the incoming data by date
ts_and_files.sort(key=lambda x: x.timestamp)
timestamps.append(ts)
full_names.append(file_object.full_name)
file_objs.append(file_object)

# Create a dataframe with the incoming metadata
if ((len(ts_and_files) > 1) and (self._sampling_rate_s > 0)):
file_sampled_list = []
# Build the dataframe
df = pd.DataFrame(index=pd.DatetimeIndex(timestamps), data={"filename": full_names, "objects": file_objects})

ts_last = ts_and_files[0].timestamp
# sort the incoming data by date
df.sort_index(inplace=True)

file_sampled_list.append(ts_and_files[0])
# If sampling was provided, perform that here
if (self._sampling is not None):

for idx in range(1, len(ts_and_files)):
ts = ts_and_files[idx].timestamp
if (isinstance(self._sampling, str)):
# We have a frequency for sampling. Resample by the frequency, taking the first
df = df.resample(self._sampling).first().dropna()

if ((ts - ts_last).seconds >= self._sampling_rate_s):
elif (self._sampling < 1.0):
# Sample a fraction of the rows
df = df.sample(frac=self._sampling).sort_index()

ts_and_files.append(ts_and_files[idx])
ts_last = ts
else:
ts_and_files = file_sampled_list
# Sample a fixed amount
df = df.sample(n=self._sampling).sort_index()

df = pd.DataFrame()
# Early exit if no files were found
if (len(df) == 0):
return []

timestamps = []
full_names = []
file_objs = []
for (ts, file_object) in ts_and_files:
timestamps.append(ts)
full_names.append(file_object.full_name)
file_objs.append(file_object)
if (self._period is None):
# No period was set so group them all into one single batch
return [(fsspec.core.OpenFiles(df["objects"].to_list(), mode=file_objects.mode, fs=file_objects.fs),
len(df))]

df["dfp_timestamp"] = timestamps
df["key"] = full_names
df["objects"] = file_objs
# Now group the rows by the period
resampled = df.resample(self._period)

output_batches = []
n_groups = len(resampled)

if len(df) > 0:
# Now split by the batching settings
df_period = df["dfp_timestamp"].dt.to_period(self._period)

period_gb = df.groupby(df_period)
output_batches = []

n_groups = len(period_gb)
for group in period_gb.groups:
period_df = period_gb.get_group(group)
for _, period_df in resampled:

obj_list = fsspec.core.OpenFiles(period_df["objects"].to_list(),
mode=file_objects.mode,
fs=file_objects.fs)
obj_list = fsspec.core.OpenFiles(period_df["objects"].to_list(), mode=file_objects.mode, fs=file_objects.fs)

output_batches.append((obj_list, n_groups))
output_batches.append((obj_list, n_groups))

return output_batches

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
type=int,
default=0,
show_envvar=True,
help="Minimum time step, in milliseconds, between object logs.")
help="Samples the input data files allowing only one file per bin defined by `sample_rate_s`.")
@click.option(
"--input_file",
"-f",
Expand Down Expand Up @@ -248,8 +248,8 @@ def run_pipeline(train_users,
# Batch files into buckets by time. Use the default ISO date extractor from the filename
pipeline.add_stage(
DFPFileBatcherStage(config,
period="D",
sampling_rate_s=sample_rate_s,
period=None,
sampling=f"{sample_rate_s}S" if sample_rate_s > 0 else None,
date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex),
start_time=start_time,
end_time=end_time))
Expand Down