Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-allocate needed columns in abp_pcap_detection example #820

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9e9222d
First 20 lines of examples/abp_pcap_detection/pcap_out.jsonlines pull…
dagardner-nv Mar 29, 2023
16de6f7
Remove flow_id and rollup_time cols
dagardner-nv Mar 29, 2023
a239f41
Test data
dagardner-nv Mar 29, 2023
59dc4f9
Fix type-o
dagardner-nv Mar 29, 2023
d7e23be
Add a get_stage_class method to StageInfo
dagardner-nv Mar 29, 2023
fb7829e
Fixture to reset the PluginManager and GlobalStageRegistry
dagardner-nv Mar 29, 2023
454eeea
Add helper method to assist pulling a stage from the plugin manager
dagardner-nv Mar 29, 2023
e3007b7
End-to-end test for AbpPcapPreprocessingStage
dagardner-nv Mar 29, 2023
c354361
Test asserting that needed columns are defined
dagardner-nv Mar 29, 2023
3fcb19a
Request pre-allocation
dagardner-nv Mar 29, 2023
dd41ddc
Fix setting get_stage_class in LazyStageInfo
dagardner-nv Mar 29, 2023
1781bd6
Merge branch 'branch-23.03' into pcap_preprocessing-pre-allocating-797
dagardner-nv Mar 29, 2023
5d4eb0a
Merge branch 'branch-23.07' into pcap_preprocessing-pre-allocating-797
dagardner-nv Apr 5, 2023
493e7fb
reload_modules fixture reloads requested modules before and after the…
dagardner-nv Apr 5, 2023
81002ac
Move test_abp_pcap_preprocessing.py to an examples subdir
dagardner-nv Apr 5, 2023
19ef9b3
Merge branch 'branch-23.07' into pcap_preprocessing-pre-allocating-797
dagardner-nv Apr 6, 2023
c2b78f9
Merge branch 'branch-23.07' of github.com:nv-morpheus/Morpheus into p…
dagardner-nv Apr 10, 2023
08521cc
Revert "Add a get_stage_class method to StageInfo" [no ci]
dagardner-nv Apr 10, 2023
5db76ac
Reverting changes [no ci]
dagardner-nv Apr 10, 2023
2bed393
Fixture to restore sys.path
dagardner-nv Apr 5, 2023
099a660
Revert changes
dagardner-nv Apr 10, 2023
149e8dd
Revert unneeded changes
dagardner-nv Apr 10, 2023
48f85f2
Fixture for importing python modules outside of the Morpheus API and …
dagardner-nv Apr 10, 2023
3d2f1d2
wip [no ci]
dagardner-nv Apr 10, 2023
aec4a79
Revert unneeded changes [no ci]
dagardner-nv Apr 10, 2023
8fb7234
Fix CR year
dagardner-nv Apr 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions examples/abp_pcap_detection/abp_pcap_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import morpheus._lib.stages as _stages
from morpheus.cli.register_stage import register_stage
from morpheus.common import TypeId
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import InferenceMemoryFIL
Expand Down Expand Up @@ -67,6 +68,12 @@ def __init__(self, c: Config):
self.features
), f"Number of features in preprocessing {len(self.features)}, does not match configuration {self._fea_length}"

# columns required to be added to input message meta
self.req_cols = ["flow_id", "rollup_time"]

for req_col in self.req_cols:
self._needed_columns[req_col] = TypeId.STRING

@property
def name(self) -> str:
return "preprocess-anomaly"
Expand All @@ -75,7 +82,8 @@ def supports_cpp_node(self):
return False

@staticmethod
def pre_process_batch(x: MultiMessage, fea_len: int, fea_cols: typing.List[str]) -> MultiInferenceFILMessage:
def pre_process_batch(x: MultiMessage, fea_len: int, fea_cols: typing.List[str],
req_cols: typing.List[str]) -> MultiInferenceFILMessage:
# Converts the int flags field into a binary string
flags_bin_series = x.get_meta("flags").to_pandas().apply(lambda x: format(int(x), "05b"))

Expand Down Expand Up @@ -166,31 +174,27 @@ def round_time_kernel(timestamp, rollup_time, secs):
data = cp.asarray(merged_df[fea_cols].to_cupy())
count = data.shape[0]

# columns required to be added to input message meta
req_cols = ["flow_id", "rollup_time"]

for col in req_cols:
x.set_meta(col, merged_df[col])

del merged_df

seg_ids = cp.zeros((count, 3), dtype=cp.uint32)
seg_ids[:, 0] = cp.arange(x.mess_offset, x.mess_offset + count, dtype=cp.uint32)
seg_ids[:, 2] = fea_len - 1
seq_ids = cp.zeros((count, 3), dtype=cp.uint32)
seq_ids[:, 0] = cp.arange(x.mess_offset, x.mess_offset + count, dtype=cp.uint32)
seq_ids[:, 2] = fea_len - 1

# Create the inference memory. Keep in mind count here could be > than input count
memory = InferenceMemoryFIL(count=count, input__0=data, seq_ids=seg_ids)
memory = InferenceMemoryFIL(count=count, input__0=data, seq_ids=seq_ids)

infer_message = MultiInferenceFILMessage.from_message(x, memory=memory)

return infer_message

def _get_preprocess_fn(self) -> typing.Callable[[MultiMessage], MultiInferenceMessage]:
return partial(
AbpPcapPreprocessingStage.pre_process_batch,
fea_len=self._fea_length,
fea_cols=self.features,
)
return partial(AbpPcapPreprocessingStage.pre_process_batch,
fea_len=self._fea_length,
fea_cols=self.features,
req_cols=self.req_cols)

def _get_preprocess_node(self, builder: mrc.Builder):
return _stages.AbpPcapPreprocessingStage(builder, self.unique_name)
47 changes: 43 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import signal
import subprocess
import sys
import time
import typing
import warnings
Expand Down Expand Up @@ -359,18 +360,56 @@ def restore_environ():
del (os.environ[key])


@pytest.fixture(scope="function")
def restore_sys_path():
orig_vars = sys.path.copy()
yield sys.path
sys.path = orig_vars


@pytest.fixture(scope="function")
def import_mod(request: pytest.FixtureRequest, restore_sys_path):
marker = request.node.get_closest_marker("import_mod")
if marker is not None:
mod_paths = marker.args[0]
if not isinstance(mod_paths, list):
mod_paths = [mod_paths]

modules = []
for mod_path in mod_paths:
mod_dir, mod_fname = os.path.split(mod_path)
mod_name, _ = os.path.splitext(mod_fname)

sys.path.append(mod_dir)
mod = importlib.import_module(mod_name)
assert mod.__file__ == mod_path

modules.append(mod)

yield modules

else:
raise ValueError("import_mod fixture requires setting paths in markers: "
"`@pytest.mark.import_mod([os.path.join(TEST_DIRS.examples_dir, 'log_parsing/messages.py')])`")


def _reload_modules(modules: typing.List[typing.Any]):
for mod in modules:
importlib.reload(mod)


@pytest.fixture(scope="function")
def reload_modules(request: pytest.FixtureRequest):
marker = request.node.get_closest_marker("reload_modules")
yield

modules = []
if marker is not None:
modules = marker.args[0]
if not isinstance(modules, list):
modules = [modules]

for mod in modules:
importlib.reload(mod)
_reload_modules(modules)
yield
_reload_modules(modules)


@pytest.fixture(scope="function")
Expand Down
122 changes: 122 additions & 0 deletions tests/examples/test_abp_pcap_preprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import typing

import cupy as cp
import numpy as np
import pytest

import cudf

from morpheus.common import TypeId
from morpheus.config import PipelineModes
from morpheus.io.deserializers import read_file_to_df
from morpheus.messages import MessageMeta
from morpheus.messages import MultiInferenceFILMessage
from morpheus.messages import MultiMessage
from utils import TEST_DIRS


def check_inf_message(msg: MultiInferenceFILMessage,
expected_meta: MessageMeta,
expected_mess_offset: int,
expected_mess_count: int,
expected_offset: int,
expected_count: int,
expected_feature_length: int,
expected_flow_ids: cudf.Series,
expected_rollup_time: str,
expected_input__0: cp.ndarray):
assert isinstance(msg, MultiInferenceFILMessage)
assert msg.meta is expected_meta
assert msg.mess_offset == expected_mess_offset
assert msg.mess_count == expected_mess_count
assert msg.offset == expected_offset
assert msg.count == expected_count

df = msg.get_meta()
assert 'flow_id' in df
assert 'rollup_time' in df

assert (df.flow_id == expected_flow_ids).all()
assert (df.rollup_time == expected_rollup_time).all()

assert msg.memory.has_tensor('input__0')
assert msg.memory.has_tensor('seq_ids')

input__0 = msg.memory.get_tensor('input__0')
assert input__0.shape == (expected_count, expected_feature_length)
assert (input__0 == expected_input__0).all()

seq_ids = msg.memory.get_tensor('seq_ids')
assert seq_ids.shape == (expected_count, 3)
assert (seq_ids[:, 0] == cp.arange(expected_mess_offset,
expected_mess_offset + expected_mess_count,
dtype=cp.uint32)).all()
assert (seq_ids[:, 1] == 0).all()
assert (seq_ids[:, 2] == expected_feature_length - 1).all()


@pytest.mark.import_mod([os.path.join(TEST_DIRS.examples_dir, 'abp_pcap_detection/abp_pcap_preprocessing.py')])
def test_abp_pcap_preprocessing(config, import_mod: typing.List[typing.Any]):
# Setup the config
config.mode = PipelineModes.FIL
config.feature_length = 13

abp_pcap_preprocessing = import_mod[0]

# Get our input data, should contain the first 20 lines of the production data
input_file = os.path.join(TEST_DIRS.tests_data_dir, 'abp_pcap.jsonlines')
input_df = read_file_to_df(input_file, df_type='cudf', filter_nulls=False)

expected_flow_ids = input_df.src_ip + ":" + input_df.src_port + "=" + input_df.dest_ip + ":" + input_df.dest_port
expected_input__0 = cp.asarray(
np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'abp_pcap_expected_input_0.csv'), delimiter=",", skiprows=0))

assert len(input_df) == 20

meta = MessageMeta(input_df)
mm1 = MultiMessage(meta=meta, mess_offset=0, mess_count=10)
mm2 = MultiMessage(meta=meta, mess_offset=10, mess_count=10)

stage = abp_pcap_preprocessing.AbpPcapPreprocessingStage(config)
assert stage.get_needed_columns() == {'flow_id': TypeId.STRING, 'rollup_time': TypeId.STRING}

inf1 = stage.pre_process_batch(mm1, config.feature_length, stage.features, stage.req_cols)
check_inf_message(inf1,
expected_meta=meta,
expected_mess_offset=0,
expected_mess_count=10,
expected_offset=0,
expected_count=10,
expected_feature_length=config.feature_length,
expected_flow_ids=expected_flow_ids[0:10],
expected_rollup_time='2021-04-07 15:55',
expected_input__0=expected_input__0[0:10])

inf2 = stage.pre_process_batch(mm2, config.feature_length, stage.features, stage.req_cols)
check_inf_message(inf2,
expected_meta=meta,
expected_mess_offset=10,
expected_mess_count=10,
expected_offset=0,
expected_count=10,
expected_feature_length=config.feature_length,
expected_flow_ids=expected_flow_ids[10:],
expected_rollup_time='2021-04-07 15:55',
expected_input__0=expected_input__0[10:])
3 changes: 3 additions & 0 deletions tests/tests_data/abp_pcap.jsonlines
Git LFS file not shown
3 changes: 3 additions & 0 deletions tests/tests_data/abp_pcap_expected_input_0.csv
Git LFS file not shown
1 change: 1 addition & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(self, cur_file=__file__) -> None:
self.tests_dir = os.path.dirname(cur_file)
self.morpheus_root = os.environ.get('MORPHEUS_ROOT', os.path.dirname(self.tests_dir))
self.data_dir = morpheus.DATA_DIR
self.examples_dir = os.path.join(self.morpheus_root, 'examples')
self.models_dir = os.path.join(self.morpheus_root, 'models')
self.datasets_dir = os.path.join(self.models_dir, 'datasets')
self.training_data_dir = os.path.join(self.datasets_dir, 'training-data')
Expand Down