Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate redundant code blocks in modules and stages #1123

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4a21359
Setting the tag back to v22.11.00
mdemoret-nv Jan 24, 2023
e959048
removed duplicate code in modules and stages
bsuryadevara Aug 8, 2023
c1beefe
pylint correction
bsuryadevara Aug 9, 2023
5610fc4
updated serilalizer module
bsuryadevara Aug 9, 2023
b6fb2a0
yapf format correction
bsuryadevara Aug 9, 2023
2d62ab7
yapf format correction
bsuryadevara Aug 9, 2023
5765460
fix to preserve_columns property
bsuryadevara Aug 10, 2023
2cccb91
added additional check to schema_transforms
bsuryadevara Aug 16, 2023
cacfed1
added checks to handle str type filter_source
bsuryadevara Aug 17, 2023
7c5e173
updated tests
bsuryadevara Aug 17, 2023
8edd637
Merge remote-tracking branch 'upstream/branch-23.11' into remove-dupl…
bsuryadevara Aug 17, 2023
e39bf53
Merge remote-tracking branch 'upstream/branch-23.11' into remove-dupl…
bsuryadevara Aug 23, 2023
a802734
Merge remote-tracking branch 'upstream/branch-23.11' into remove-dupl…
bsuryadevara Aug 23, 2023
89045cb
updated tests
bsuryadevara Aug 23, 2023
af50204
Merge remote-tracking branch 'upstream/branch-23.11' into remove-dupl…
bsuryadevara Aug 24, 2023
8c67be8
fixed pylint warnings
bsuryadevara Aug 25, 2023
623f4b9
updated to align with latest changes
bsuryadevara Aug 25, 2023
ea3ecee
Merge remote-tracking branch 'upstream/branch-22.11' into remove-dupl…
bsuryadevara Aug 31, 2023
d8deaae
Merge remote-tracking branch 'upstream/branch-23.11' into remove-dupl…
bsuryadevara Aug 31, 2023
a7499d6
moved monitor controller to controllers module
bsuryadevara Aug 31, 2023
4b43315
removed unused pylint disable comment
bsuryadevara Sep 5, 2023
0d29557
Merge branch 'branch-23.11' into remove-duplicate-code
bsuryadevara Sep 5, 2023
7546da8
Update examples/ransomware_detection/common/feature_extractor.py
bsuryadevara Sep 5, 2023
2402127
minor fixes and updates to model cache
bsuryadevara Sep 6, 2023
f201aca
Merge branch 'remove-duplicate-code' of github.com:bsuryadevara/Morph…
bsuryadevara Sep 6, 2023
aaf5fc1
Merge remote-tracking branch 'upstream/branch-23.11' into remove-dupl…
bsuryadevara Sep 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from mrc.core import operators as ops
from tqdm import tqdm

from morpheus.controllers.monitor_controller import MonitorController
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from morpheus.utils.module_utils import register_module
from morpheus.utils.monitor_utils import MonitorController
from morpheus.utils.monitor_utils import MorpheusTqdm
from morpheus.utils.monitor_utils import SilentMorpheusTqdm

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DFPFileBatcherStage(SinglePortStage):

Parameters
----------
c : `morpheus.config.Config`
config : `morpheus.config.Config`
Pipeline configuration instance.
date_conversion_func : callable
A function that takes a file object and returns a `datetime` object representing the date of the file.
Expand All @@ -69,14 +69,14 @@ class DFPFileBatcherStage(SinglePortStage):
"""

def __init__(self,
c: Config,
config: Config,
date_conversion_func: typing.Callable[[fsspec.core.OpenFile], datetime],
period: str = "D",
sampling_rate_s: typing.Optional[int] = None,
start_time: datetime = None,
end_time: datetime = None,
sampling: typing.Union[str, float, int, None] = None):
super().__init__(c)
super().__init__(config)

self._date_conversion_func = date_conversion_func
self._period = period
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@

from morpheus.common import FileTypes
from morpheus.config import Config
from morpheus.controllers.file_to_df_controller import FileToDFController
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.utils.column_info import DataFrameInputSchema
from morpheus.utils.controllers.file_to_df_controller import FileToDFController

logger = logging.getLogger(f"morpheus.{__name__}")

Expand All @@ -41,7 +41,7 @@ class DFPFileToDataFrameStage(PreallocatorMixin, SinglePortStage):

Parameters
----------
c : `morpheus.config.Config`
config : `morpheus.config.Config`
Pipeline configuration instance.
schema : `morpheus.utils.column_info.DataFrameInputSchema`
Input schema for the DataFrame.
Expand All @@ -56,21 +56,20 @@ class DFPFileToDataFrameStage(PreallocatorMixin, SinglePortStage):
"""

def __init__(self,
c: Config,
config: Config,
schema: DataFrameInputSchema,
filter_null: bool = True,
file_type: FileTypes = FileTypes.Auto,
parser_kwargs: dict = None,
cache_dir: str = "./.cache/dfp"):
super().__init__(c)
super().__init__(config)

timestamp_column_name = c.ae.timestamp_column_name
self._controller = FileToDFController(schema=schema,
filter_null=filter_null,
file_type=file_type,
parser_kwargs=parser_kwargs,
cache_dir=cache_dir,
timestamp_column_name=timestamp_column_name)
timestamp_column_name=config.ae.timestamp_column_name)

@property
def name(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
from mrc.core import operators as ops

from morpheus.config import Config
from morpheus.controllers.mlflow_model_writer_controller import MLFlowModelWriterController
from morpheus.messages.multi_ae_message import MultiAEMessage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.utils.controllers.mlflow_model_writer_controller import MLFlowModelWriterController

# Setup conda environment
conda_env = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,10 @@ def load_model_cache(self, client: MlflowClient, reg_model_name: str, timeout: f
logger.error("Deadlock when trying to acquire model cache lock")
raise RuntimeError("Deadlock when trying to acquire model cache lock") from e

# pylint: disable=dangerous-default-value
def load_user_model_cache(self,
user_id: str,
timeout: float,
fallback_user_ids: typing.List[str] = []) -> UserModelMap:
fallback_user_ids: typing.List[str] = None) -> UserModelMap:
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
if (fallback_user_ids is None):
fallback_user_ids = []

Expand Down
3 changes: 1 addition & 2 deletions examples/ransomware_detection/common/feature_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,7 @@ def _extract_protections(self, x: pd.DataFrame, vadinfo_df_size: int, vadsinfo_s
"""
page_execute_writecopy_count = 0

# pylint: disable=consider-iterating-dictionary
for protection in fc.PROTECTIONS.keys():
for protection, _ in fc.PROTECTIONS.items():
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved

p_data = self._get_protection_data(x, protection, vadinfo_df_size, vadsinfo_size, vadinfo_size)

Expand Down
13 changes: 13 additions & 0 deletions morpheus/controllers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
235 changes: 235 additions & 0 deletions morpheus/controllers/monitor_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import typing
from functools import reduce

import fsspec
from tqdm import tqdm

import cudf

from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus.messages import MultiMessage
from morpheus.utils.logger import LogLevels
from morpheus.utils.monitor_utils import MorpheusTqdm

logger = logging.getLogger(__name__)


class MonitorController:
"""
Controls and displays throughput numbers at a specific point in the pipeline.

Parameters
----------
position: int
Specifies the monitor's position on the console.
description : str, default = "Progress"
Name to show for this Monitor Stage in the console window.
smoothing : float
Smoothing parameter to determine how much the throughput should be averaged. 0 = Instantaneous, 1 =
Average.
unit : str
Units to show in the rate value.
delayed_start : bool
When delayed_start is enabled, the progress bar will not be shown until the first message is received.
Otherwise, the progress bar is shown on pipeline startup and will begin timing immediately. In large pipelines,
this option may be desired to give a more accurate timing.
determine_count_fn : typing.Callable[[typing.Any], int]
Custom function for determining the count in a message. Gets called for each message. Allows for
correct counting of batched and sliced messages.
log_level : `morpheus.utils.logger.LogLevels`, default = 'INFO'
Enable this stage when the configured log level is at `log_level` or lower.
tqdm_class: `tqdm`, default = None
Custom implementation of tqdm if required.
"""

controller_count: int = 0

def __init__(self,
position: int,
description: str,
smoothing: float,
unit: str,
delayed_start: bool,
determine_count_fn: typing.Callable[[typing.Any], int],
log_level: LogLevels,
tqdm_class: tqdm = None):

self._progress: tqdm = None
self._position = position
self._description = description
self._smoothing = smoothing
self._unit = unit
self._delayed_start = delayed_start
self._determine_count_fn = determine_count_fn
self._tqdm_class = tqdm_class if tqdm_class else MorpheusTqdm

if isinstance(log_level, LogLevels): # pylint: disable=isinstance-second-argument-not-valid-type
log_level = log_level.value

self._log_level = log_level
self._enabled = None # defined on first call to _is_enabled

@property
def delayed_start(self) -> bool:
return self._delayed_start

@property
def progress(self) -> tqdm:
return self._progress

def is_enabled(self) -> bool:
"""
Returns a boolean indicating whether or not the logger is enabled.
"""

if self._enabled is None:
self._enabled = logger.isEnabledFor(self._log_level)

return self._enabled

def ensure_progress_bar(self):
"""
Ensures that the progress bar is initialized and ready for display.
"""

if (self._progress is None):
self._progress = self._tqdm_class(desc=self._description,
smoothing=self._smoothing,
dynamic_ncols=True,
unit=(self._unit if self._unit.startswith(" ") else f" {self._unit}"),
mininterval=0.25,
maxinterval=1.0,
miniters=1,
position=self._position)

self._progress.reset()

def refresh_progress(self, _):
"""
Refreshes the progress bar display.
"""
self._progress.refresh()

def progress_sink(self, x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List]):
"""
Receives a message and determines the count of the message.
The progress bar is displayed and the progress is updated.

Parameters
----------
x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List]
Message that determines the count of the message

Returns
-------
x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List]

"""

# Make sure the progress bar is shown
self.ensure_progress_bar()

if (self._determine_count_fn is None):
self._determine_count_fn = self.auto_count_fn(x)

# Skip incase we have empty objects
if (self._determine_count_fn is None):
return x

# Do our best to determine the count
count = self._determine_count_fn(x)

self._progress.update(n=count)

return x

def auto_count_fn(self, x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List]):
"""
This is a helper function that is used to determine the count of messages received by the
monitor.

Parameters
----------
x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List]
Message that determines the count of the message

Returns
-------
Message count.

"""

# pylint: disable=too-many-return-statements

if (x is None):
return None

# Wait for a list thats not empty
if (isinstance(x, list) and len(x) == 0):
return None

if (isinstance(x, cudf.DataFrame)):
return lambda y: len(y.index)

if (isinstance(x, MultiMessage)):
return lambda y: y.mess_count

if (isinstance(x, MessageMeta)):
return lambda y: y.count

if isinstance(x, ControlMessage):

def check_df(y):
df = y.payload().df
if df is not None:
return len(df)

return 0

return check_df

if (isinstance(x, list)):
item_count_fn = self.auto_count_fn(x[0])
return lambda y: reduce(lambda sum, z, item_count_fn=item_count_fn: sum + item_count_fn(z), y, 0)

if (isinstance(x, (str, fsspec.core.OpenFile))):
return lambda y: 1

if (hasattr(x, "__len__")):
return len # Return len directly (same as `lambda y: len(y)`)

raise NotImplementedError(f"Unsupported type: {type(x)}")

def sink_on_completed(self):
"""
Stops the progress bar and prevents the monitors from writing over each other when the last
stage completes.
"""

# Set the name to complete. This refreshes the display
self.progress.set_description_str(self.progress.desc + "[Complete]")

self.progress.stop()

# To prevent the monitors from writing over eachother, stop the monitor when the last stage completes
MonitorController.controller_count -= 1

if (MonitorController.controller_count <= 0 and self._tqdm_class.monitor is not None):
self._tqdm_class.monitor.exit()
self._tqdm_class.monitor = None
2 changes: 1 addition & 1 deletion morpheus/loaders/file_to_df_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import cudf

from morpheus.cli.utils import str_to_file_type
from morpheus.controllers.file_to_df_controller import FileToDFController
from morpheus.messages import ControlMessage
from morpheus.messages.message_meta import MessageMeta
from morpheus.utils.controllers.file_to_df_controller import FileToDFController
from morpheus.utils.loader_ids import FILE_TO_DF_LOADER
from morpheus.utils.loader_utils import register_loader

Expand Down
2 changes: 1 addition & 1 deletion morpheus/modules/file_to_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from mrc.core import operators as ops

from morpheus.cli.utils import str_to_file_type
from morpheus.utils.controllers.file_to_df_controller import FileToDFController
from morpheus.controllers.file_to_df_controller import FileToDFController
from morpheus.utils.module_ids import FILE_TO_DF
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from morpheus.utils.module_utils import register_module
Expand Down
2 changes: 1 addition & 1 deletion morpheus/modules/filter_detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import morpheus._lib.stages as _stages
from morpheus.common import FilterSource
from morpheus.utils.controllers.filter_detections_controller import FilterDetectionsController
from morpheus.controllers.filter_detections_controller import FilterDetectionsController
from morpheus.utils.module_ids import FILTER_DETECTIONS
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from morpheus.utils.module_utils import register_module
Expand Down
Loading
Loading