From ecb53ddf1e3eb54e052bc519d6070af7cd3ca44c Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 10 Jul 2023 11:11:11 -0700 Subject: [PATCH 1/4] Refactor setup into fixtures --- tests/utils/nvt/integration/test_mutate_op.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/tests/utils/nvt/integration/test_mutate_op.py b/tests/utils/nvt/integration/test_mutate_op.py index 70172f9fcd..028c143eda 100644 --- a/tests/utils/nvt/integration/test_mutate_op.py +++ b/tests/utils/nvt/integration/test_mutate_op.py @@ -12,7 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import typing + import pandas as pd +import pytest from merlin.dag import ColumnSelector import cudf @@ -21,25 +24,25 @@ from morpheus.utils.nvt.transforms import json_flatten -def setUp(): - json_data = [ +@pytest.fixture(name="json_data") +def json_data_fixture(): + yield [ '{"key1": "value1", "key2": {"subkey1": "subvalue1", "subkey2": "subvalue2"}}', '{"key1": "value2", "key2": {"subkey1": "subvalue3", "subkey2": "subvalue4"}}', '{"key1": "value3", "key2": {"subkey1": "subvalue5", "subkey2": "subvalue6"}}' ] - expected_pdf = pd.DataFrame({ + +@pytest.fixture(name="expected_pdf") +def expected_pdf_fixture(): + yield pd.DataFrame({ 'col1.key1': ['value1', 'value2', 'value3'], 'col1.key2.subkey1': ['subvalue1', 'subvalue3', 'subvalue5'], 'col1.key2.subkey2': ['subvalue2', 'subvalue4', 'subvalue6'] }) - return json_data, expected_pdf - - -def test_integration_pandas(): - json_data, expected_pdf = setUp() +def test_integration_pandas(json_data: typing.List[str], expected_pdf: pd.DataFrame): pdf = pd.DataFrame({'col1': json_data}) col_selector = ColumnSelector(['col1']) @@ -50,9 +53,7 @@ def test_integration_pandas(): assert result_pdf.equals(expected_pdf), "Integration test with pandas DataFrame failed" -def test_integration_cudf(): - json_data, expected_pdf = setUp() - +def test_integration_cudf(json_data: typing.List[str], expected_pdf: pd.DataFrame): cdf = cudf.DataFrame({'col1': json_data}) col_selector = ColumnSelector(['col1']) From 64f24bc3b1e67b072b62f113653aba4f7d87dadc Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 10 Jul 2023 12:38:30 -0700 Subject: [PATCH 2/4] Fix import sorting --- .../digital_fingerprinting/test_dfp_file_to_df.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py b/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py index 7ee7cbdb5d..54031fac8a 100644 --- a/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py +++ b/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py @@ -18,11 +18,12 @@ import os from unittest import mock -import cudf import fsspec import pandas as pd import pytest +import cudf + from morpheus.common import FileTypes from morpheus.config import Config from morpheus.pipeline.preallocator_mixin import PreallocatorMixin @@ -47,8 +48,9 @@ def single_file_obj(): def test_single_object_to_dataframe(single_file_obj: fsspec.core.OpenFile): from dfp.stages.dfp_file_to_df import _single_object_to_dataframe - schema = DataFrameInputSchema( - column_info=[CustomColumn(name='data', dtype=str, process_column_fn=lambda df: df['data'].to_arrow().to_pylist()[0])]) + schema = DataFrameInputSchema(column_info=[ + CustomColumn(name='data', dtype=str, process_column_fn=lambda df: df['data'].to_arrow().to_pylist()[0]) + ]) df = _single_object_to_dataframe(single_file_obj, schema, FileTypes.Auto, False, {}) assert df.columns == ['data'] @@ -56,10 +58,9 @@ def test_single_object_to_dataframe(single_file_obj: fsspec.core.OpenFile): d = json.load(fh) expected_data = d['data'] - aslist = [x.tolist() for x in df['data'].to_list()] # to_list returns a list of numpy arrays - - assert(aslist == expected_data) + aslist = [x.tolist() for x in df['data'].to_list()] # to_list returns a list of numpy arrays + assert (aslist == expected_data) def test_single_object_to_dataframe_timeout(): From 2411732d7e5e4856143ace18db7f42b6901e6764 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 10 Jul 2023 12:49:40 -0700 Subject: [PATCH 3/4] pylint fixes for morpheus/utils/column_info.py --- morpheus/utils/column_info.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/morpheus/utils/column_info.py b/morpheus/utils/column_info.py index 8cc369e704..1182932b46 100644 --- a/morpheus/utils/column_info.py +++ b/morpheus/utils/column_info.py @@ -22,12 +22,14 @@ import cudf -logger = logging.getLogger("morpheus.{}".format(__name__)) +logger = logging.getLogger(f"morpheus.{__name__}") + +DEFAULT_DATE = '1970-01-01T00:00:00.000000+00:00' # TODO(Devin): Proxying this for backwards compatibility. Had to move the primary definition to avoid circular imports. def process_dataframe(df_in: typing.Union[pd.DataFrame, cudf.DataFrame], input_schema) -> pd.DataFrame: - import morpheus.utils.schema_transforms as schema_transforms + from morpheus.utils import schema_transforms return schema_transforms.process_dataframe(df_in, input_schema) @@ -37,7 +39,6 @@ def create_increment_col(df, column_name: str, groupby_column="username", timest timestamp values in `timestamp_column` and then grouping by `groupby_column` returning incrementing values starting at `1`. """ - DEFAULT_DATE = '1970-01-01T00:00:00.000000+00:00' # Ensure we are pandas for this if (isinstance(df, cudf.DataFrame)): @@ -63,8 +64,8 @@ def column_listjoin(df, col_name: str) -> pd.Series: """ if col_name in df: return df[col_name].transform(lambda x: ",".join(x)).astype('string') - else: - return pd.Series(None, dtype='string') + + return pd.Series(None, dtype='string') @dataclasses.dataclass @@ -78,11 +79,11 @@ def get_pandas_dtype(self) -> str: if ((isinstance(self.dtype, str) and self.dtype.startswith("datetime")) or (isinstance(self.dtype, type) and issubclass(self.dtype, datetime))): return "datetime64[ns]" - else: - if (isinstance(self.dtype, str)): - return self.dtype - else: - return self.dtype.__name__ + + if (isinstance(self.dtype, str)): + return self.dtype + + return self.dtype.__name__ def _process_column(self, df: pd.DataFrame) -> pd.Series: """ @@ -226,7 +227,7 @@ def __post_init__(self): # Compile the regex if (input_preserve_columns is not None and len(input_preserve_columns) > 0): - input_preserve_columns = re.compile("({})".format("|".join(input_preserve_columns))) + input_preserve_columns = re.compile(f"({'|'.join(input_preserve_columns)})") else: input_preserve_columns = None From 463cbd9e964d0a5a4b929207c9b0a0f0b15cb59b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 10 Jul 2023 13:03:32 -0700 Subject: [PATCH 4/4] pylint fixes for morpheus/utils/nvt/mutate.py --- morpheus/utils/nvt/mutate.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/morpheus/utils/nvt/mutate.py b/morpheus/utils/nvt/mutate.py index b36f806fea..bf3e96ab2e 100644 --- a/morpheus/utils/nvt/mutate.py +++ b/morpheus/utils/nvt/mutate.py @@ -48,25 +48,26 @@ def label(self): name = self._func.__name__.split(".")[-1] if name != "": return f"MutateOp: {name}" - else: - try: - # otherwise get the lambda source code from the inspect module if possible - source = getsourcelines(self.f)[0][0] - lambdas = [op.strip() for op in source.split(">>") if "lambda " in op] - if len(lambdas) == 1 and lambdas[0].count("lambda") == 1: - return lambdas[0] - except Exception: # pylint: disable=broad-except - # we can fail to load the source in distributed environments. Since the - # label is mainly used for diagnostics, don't worry about the error here and - # fallback to the default labelling - pass + + try: + # otherwise get the lambda source code from the inspect module if possible + source = getsourcelines(self.f)[0][0] + lambdas = [op.strip() for op in source.split(">>") if "lambda " in op] + if len(lambdas) == 1 and lambdas[0].count("lambda") == 1: + return lambdas[0] + except Exception: # pylint: disable=broad-except + # we can fail to load the source in distributed environments. Since the + # label is mainly used for diagnostics, don't worry about the error here and + # fallback to the default labelling + pass # Failed to figure out the source return "MutateOp" + # pylint: disable=arguments-renamed @annotate("MutateOp", color="darkgreen", domain="nvt_python") - def transform(self, column_selector: ColumnSelector, df: DataFrameType) -> DataFrameType: - return self._func(column_selector, df) + def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType: + return self._func(col_selector, df) def column_mapping( self,