Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update TimeSeries stage to also work with Production DFP #1121

Merged
merged 17 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 46 additions & 39 deletions morpheus/stages/postprocess/timeseries_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import MultiResponseAEMessage
from morpheus.messages import MultiResponseMessage
from morpheus.messages.multi_ae_message import MultiMessage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair

Expand Down Expand Up @@ -174,6 +174,7 @@ class _UserTimeSeries:

def __init__(self,
user_id: str,
timestamp_col: str,
resolution: str,
min_window: str,
hot_start: bool,
Expand All @@ -183,6 +184,7 @@ def __init__(self,
super().__init__()

self._user_id = user_id
self._timestamp_col = timestamp_col

# Size of bins
self._resolution_sec = int(round(pd.Timedelta(resolution).total_seconds()))
Expand All @@ -206,7 +208,8 @@ def __init__(self,

# Stateful members
self._pending_messages: deque[MultiResponseMessage] = deque() # Holds the existing messages pending
self._timeseries_data: pd.DataFrame = pd.DataFrame(columns=["event_dt"]) # Holds all available timeseries data
self._timeseries_data: pd.DataFrame = pd.DataFrame(columns=[self._timestamp_col
]) # Holds all available timeseries data

self._t0_epoch: float = None

Expand Down Expand Up @@ -268,22 +271,22 @@ def _determine_action(self, is_complete: bool) -> typing.Optional[_TimeSeriesAct
x: MultiResponseMessage = self._pending_messages[0]

# Get the first message timestamp
message_start = calc_bin(x.get_meta("event_dt").iloc[0], self._t0_epoch, self._resolution_sec)
message_end = calc_bin(x.get_meta("event_dt").iloc[-1], self._t0_epoch, self._resolution_sec)
message_start = calc_bin(x.get_meta(self._timestamp_col).iloc[0], self._t0_epoch, self._resolution_sec)
message_end = calc_bin(x.get_meta(self._timestamp_col).iloc[-1], self._t0_epoch, self._resolution_sec)

window_start = message_start - self._half_window_bins
window_end = message_end + self._half_window_bins

# Check left buffer
if (timeseries_start > window_start):
# logger.debug("Warming up. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s. Delta: %s",
# timeseries_start._repr_base,
# window_start._repr_base,
# message_start._repr_base,
# message_end._repr_base,
# window_end._repr_base,
# timeseries_end._repr_base,
# timeseries_start - window_start)
logger.debug("Warming up. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s. Delta: %s",
timeseries_start,
window_start,
message_start,
message_end,
window_end,
timeseries_end,
timeseries_start - window_start)

# Not shutting down and we arent warm, send through
if (not self._is_warm and not is_complete):
Expand All @@ -293,34 +296,35 @@ def _determine_action(self, is_complete: bool) -> typing.Optional[_TimeSeriesAct

# Check the right buffer
if (timeseries_end < window_end):
# logger.debug("Filling front. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s. Delta: %s",
# timeseries_start._repr_base,
# window_start._repr_base,
# message_start._repr_base,
# message_end._repr_base,
# window_end._repr_base,
# timeseries_end._repr_base,
# window_end - timeseries_end)

if (not is_complete):
# Not shutting down, so hold message
logger.debug("Filling front. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s. Delta: %s",
timeseries_start,
window_start,
message_start,
message_end,
window_end,
timeseries_end,
window_end - timeseries_end)

if (not is_complete and len(self._pending_messages) == 1):
# Last message, so stop processing
logger.debug("not is_complete, no pending")
return None

if (is_complete and self._cold_end):
# Shutting down and we have a cold ending, just empty the message
logger.debug("is_complete and self._cold_end")
return _TimeSeriesAction(send_message=True, message=self._pending_messages.popleft())

# Shutting down and hot end
# logger.debug("Hot End. Processing. TS: %s", timeseries_start._repr_base)

# By this point we have both a front and back buffer. So get ready for a calculation
# logger.debug("Perform Calc. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s.",
# timeseries_start._repr_base,
# window_start._repr_base,
# message_start._repr_base,
# message_end._repr_base,
# window_end._repr_base,
# timeseries_end._repr_base)
logger.debug("Perform Calc. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s.",
timeseries_start,
window_start,
message_start,
message_end,
window_end,
timeseries_end)

# First, remove elements in the front queue that are too old
self._timeseries_data.drop(self._timeseries_data[self._timeseries_data["event_bin"] < window_start].index,
Expand All @@ -347,7 +351,7 @@ def _calc_timeseries(self, x: MultiResponseMessage, is_complete: bool):
# Save this message in the pending queue
self._pending_messages.append(x)

new_timedata = x.get_meta(["event_dt"])
new_timedata = x.get_meta([self._timestamp_col])

# Save this message event times in the event list. Ensure the values are always sorted
self._timeseries_data = pd.concat([self._timeseries_data, new_timedata]).sort_index()
Expand All @@ -363,13 +367,13 @@ def _calc_timeseries(self, x: MultiResponseMessage, is_complete: bool):

# If this is our first time data, set the t0 time
if (self._t0_epoch is None):
self._t0_epoch = self._timeseries_data["event_dt"].iloc[0]
self._t0_epoch = self._timeseries_data[self._timestamp_col].iloc[0]

# TODO(MDD): Floor to the day to unsure all buckets are always aligned with val data
self._t0_epoch = self._t0_epoch.floor(freq="D")

# Calc the bins for the timeseries data
self._timeseries_data["event_bin"] = self._calc_bin_series(self._timeseries_data["event_dt"])
self._timeseries_data["event_bin"] = self._calc_bin_series(self._timeseries_data[self._timestamp_col])

# At this point there are 3 things that can happen
# 1. We are warming up to build a front buffer. Save the current message times and send the message on
Expand Down Expand Up @@ -441,6 +445,8 @@ def __init__(self,
zscore_threshold: float = 8.0):
super().__init__(c)

self._timestamp_col = c.ae.timestamp_column_name

self._feature_length = c.feature_length

self._resolution = resolution
Expand Down Expand Up @@ -470,15 +476,16 @@ def accepted_types(self) -> typing.Tuple:
Accepted input types.

"""
return (MultiResponseMessage, )
return (MultiMessage, )

def supports_cpp_node(self):
return False

def _call_timeseries_user(self, x: MultiResponseAEMessage):
def _call_timeseries_user(self, x: MultiMessage):

if (x.user_id not in self._timeseries_per_user):
self._timeseries_per_user[x.user_id] = _UserTimeSeries(user_id=x.user_id,
timestamp_col=self._timestamp_col,
resolution=self._resolution,
min_window=self._min_window,
hot_start=self._hot_start,
Expand All @@ -493,7 +500,7 @@ def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> Strea
stream = input_stream[0]
out_type = input_stream[1]

def on_next(x: MultiResponseAEMessage):
def on_next(x: MultiMessage):

message_list: typing.List[MultiResponseMessage] = self._call_timeseries_user(x)

Expand All @@ -503,8 +510,8 @@ def on_completed():

to_send = []

for ts in self._timeseries_per_user.values():
message_list: typing.List[MultiResponseMessage] = ts._calc_timeseries(None, True)
for timestamp in self._timeseries_per_user.values():
message_list: typing.List[MultiResponseMessage] = timestamp._calc_timeseries(None, True)

to_send = to_send + message_list

Expand Down
9 changes: 0 additions & 9 deletions tests/benchmarks/test_bench_e2e_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from morpheus.stages.postprocess.add_classifications_stage import AddClassificationsStage
from morpheus.stages.postprocess.add_scores_stage import AddScoresStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.postprocess.timeseries_stage import TimeSeriesStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.stages.preprocess.preprocess_ae_stage import PreprocessAEStage
from morpheus.stages.preprocess.preprocess_fil_stage import PreprocessFILStage
Expand Down Expand Up @@ -112,14 +111,6 @@ def ae_pipeline(config: Config, input_glob, repeat, train_data_glob, output_file
pipeline.add_stage(PreprocessAEStage(config))
pipeline.add_stage(AutoEncoderInferenceStage(config))
pipeline.add_stage(AddScoresStage(config))
pipeline.add_stage(
TimeSeriesStage(config,
resolution="1m",
min_window=" 12 h",
hot_start=True,
cold_end=False,
filter_percent=90.0,
zscore_threshold=8.0))
pipeline.add_stage(MonitorStage(config))
pipeline.add_stage(SerializeStage(config))
pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True))
Expand Down
8 changes: 6 additions & 2 deletions tests/test_dfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def test_dfp_roleg(mock_ae, config, tmp_path):
config.ae = ConfigAutoEncoder()
config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName"
config.ae.userid_filter = "role-g"
config.ae.timestamp_column_name = "event_dt"

with open(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'), encoding='UTF-8') as fh:
config.ae.feature_columns = [x.strip() for x in fh.readlines()]
Expand Down Expand Up @@ -113,7 +114,8 @@ def test_dfp_roleg(mock_ae, config, tmp_path):
results_file_name=results_file_name,
index_col="_index_",
exclude=("event_dt", "zscore"),
rel_tol=0.15))
rel_tol=0.1))

pipe.add_stage(SerializeStage(config, include=[]))
pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False))

Expand Down Expand Up @@ -157,6 +159,7 @@ def test_dfp_user123(mock_ae, config, tmp_path):
config.ae = ConfigAutoEncoder()
config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName"
config.ae.userid_filter = "user123"
config.ae.timestamp_column_name = "event_dt"

with open(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'), encoding='UTF-8') as fh:
config.ae.feature_columns = [x.strip() for x in fh.readlines()]
Expand Down Expand Up @@ -238,6 +241,7 @@ def test_dfp_user123_multi_segment(mock_ae, config, tmp_path):
config.ae = ConfigAutoEncoder()
config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName"
config.ae.userid_filter = "user123"
config.ae.timestamp_column_name = "event_dt"

with open(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'), encoding='UTF-8') as fh:
config.ae.feature_columns = [x.strip() for x in fh.readlines()]
Expand Down Expand Up @@ -284,7 +288,7 @@ def test_dfp_user123_multi_segment(mock_ae, config, tmp_path):
rel_tol=0.1))
pipe.add_segment_boundary(MultiResponseMessage) # Boundary 7
pipe.add_stage(SerializeStage(config, include=[]))
pipe.add_segment_boundary(MessageMeta) # Boundary 9
pipe.add_segment_boundary(MessageMeta) # Boundary 8
pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False))

pipe.run()
Expand Down
2 changes: 2 additions & 0 deletions tests/test_dfp_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def test_dfp_roleg(mock_ae: mock.MagicMock,
config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName"
config.ae.userid_filter = "role-g"
config.ae.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'))
config.ae.timestamp_column_name = "event_dt"

input_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv")
train_data_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv")
Expand Down Expand Up @@ -189,6 +190,7 @@ def test_dfp_user123(mock_ae: mock.MagicMock,
config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName"
config.ae.userid_filter = "user123"
config.ae.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'))
config.ae.timestamp_column_name = "event_dt"

input_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv")
train_data_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv")
Expand Down