Skip to content

Commit

Permalink
Merge pull request #13 from dagardner-nv/devin_issue_862_dg_lint
Browse files Browse the repository at this point in the history
WIP: Lint fixes
  • Loading branch information
drobison00 authored Jul 10, 2023
2 parents 4e6c9b1 + 463cbd9 commit b7191ea
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 31 deletions.
23 changes: 12 additions & 11 deletions morpheus/utils/column_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@

import cudf

logger = logging.getLogger("morpheus.{}".format(__name__))
logger = logging.getLogger(f"morpheus.{__name__}")

DEFAULT_DATE = '1970-01-01T00:00:00.000000+00:00'


# TODO(Devin): Proxying this for backwards compatibility. Had to move the primary definition to avoid circular imports.
def process_dataframe(df_in: typing.Union[pd.DataFrame, cudf.DataFrame], input_schema) -> pd.DataFrame:
import morpheus.utils.schema_transforms as schema_transforms
from morpheus.utils import schema_transforms
return schema_transforms.process_dataframe(df_in, input_schema)


Expand All @@ -37,7 +39,6 @@ def create_increment_col(df, column_name: str, groupby_column="username", timest
timestamp values in `timestamp_column` and then grouping by `groupby_column` returning incrementing values starting
at `1`.
"""
DEFAULT_DATE = '1970-01-01T00:00:00.000000+00:00'

# Ensure we are pandas for this
if (isinstance(df, cudf.DataFrame)):
Expand All @@ -63,8 +64,8 @@ def column_listjoin(df, col_name: str) -> pd.Series:
"""
if col_name in df:
return df[col_name].transform(lambda x: ",".join(x)).astype('string')
else:
return pd.Series(None, dtype='string')

return pd.Series(None, dtype='string')


@dataclasses.dataclass
Expand All @@ -78,11 +79,11 @@ def get_pandas_dtype(self) -> str:
if ((isinstance(self.dtype, str) and self.dtype.startswith("datetime"))
or (isinstance(self.dtype, type) and issubclass(self.dtype, datetime))):
return "datetime64[ns]"
else:
if (isinstance(self.dtype, str)):
return self.dtype
else:
return self.dtype.__name__

if (isinstance(self.dtype, str)):
return self.dtype

return self.dtype.__name__

def _process_column(self, df: pd.DataFrame) -> pd.Series:
"""
Expand Down Expand Up @@ -226,7 +227,7 @@ def __post_init__(self):

# Compile the regex
if (input_preserve_columns is not None and len(input_preserve_columns) > 0):
input_preserve_columns = re.compile("({})".format("|".join(input_preserve_columns)))
input_preserve_columns = re.compile(f"({'|'.join(input_preserve_columns)})")
else:
input_preserve_columns = None

Expand Down
29 changes: 15 additions & 14 deletions morpheus/utils/nvt/mutate.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,26 @@ def label(self):
name = self._func.__name__.split(".")[-1]
if name != "<lambda>":
return f"MutateOp: {name}"
else:
try:
# otherwise get the lambda source code from the inspect module if possible
source = getsourcelines(self.f)[0][0]
lambdas = [op.strip() for op in source.split(">>") if "lambda " in op]
if len(lambdas) == 1 and lambdas[0].count("lambda") == 1:
return lambdas[0]
except Exception: # pylint: disable=broad-except
# we can fail to load the source in distributed environments. Since the
# label is mainly used for diagnostics, don't worry about the error here and
# fallback to the default labelling
pass

try:
# otherwise get the lambda source code from the inspect module if possible
source = getsourcelines(self.f)[0][0]
lambdas = [op.strip() for op in source.split(">>") if "lambda " in op]
if len(lambdas) == 1 and lambdas[0].count("lambda") == 1:
return lambdas[0]
except Exception: # pylint: disable=broad-except
# we can fail to load the source in distributed environments. Since the
# label is mainly used for diagnostics, don't worry about the error here and
# fallback to the default labelling
pass

# Failed to figure out the source
return "MutateOp"

# pylint: disable=arguments-renamed
@annotate("MutateOp", color="darkgreen", domain="nvt_python")
def transform(self, column_selector: ColumnSelector, df: DataFrameType) -> DataFrameType:
return self._func(column_selector, df)
def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType:
return self._func(col_selector, df)

def column_mapping(
self,
Expand Down
13 changes: 7 additions & 6 deletions tests/examples/digital_fingerprinting/test_dfp_file_to_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import os
from unittest import mock

import cudf
import fsspec
import pandas as pd
import pytest

import cudf

from morpheus.common import FileTypes
from morpheus.config import Config
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
Expand All @@ -47,19 +48,19 @@ def single_file_obj():
def test_single_object_to_dataframe(single_file_obj: fsspec.core.OpenFile):
from dfp.stages.dfp_file_to_df import _single_object_to_dataframe

schema = DataFrameInputSchema(
column_info=[CustomColumn(name='data', dtype=str, process_column_fn=lambda df: df['data'].to_arrow().to_pylist()[0])])
schema = DataFrameInputSchema(column_info=[
CustomColumn(name='data', dtype=str, process_column_fn=lambda df: df['data'].to_arrow().to_pylist()[0])
])
df = _single_object_to_dataframe(single_file_obj, schema, FileTypes.Auto, False, {})

assert df.columns == ['data']
with open(single_file_obj.path, encoding='UTF-8') as fh:
d = json.load(fh)
expected_data = d['data']

aslist = [x.tolist() for x in df['data'].to_list()] # to_list returns a list of numpy arrays

assert(aslist == expected_data)
aslist = [x.tolist() for x in df['data'].to_list()] # to_list returns a list of numpy arrays

assert (aslist == expected_data)


def test_single_object_to_dataframe_timeout():
Expand Down

0 comments on commit b7191ea

Please sign in to comment.