Skip to content

Commit

Permalink
Merge pull request #11 from dagardner-nv/devin_issue_862_dg
Browse files Browse the repository at this point in the history
Misc Cleanups
  • Loading branch information
drobison00 authored Jul 10, 2023
2 parents 7044a6f + 74d9fc5 commit 1a4893d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 78 deletions.
30 changes: 17 additions & 13 deletions tests/utils/dataset_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from morpheus.io.deserializers import read_file_to_df
from morpheus.utils import compare_df
from morpheus.utils.type_aliases import DataFrameType
from utils import TEST_DIRS
from utils import assert_results

Expand All @@ -40,8 +41,7 @@ class DatasetManager:
Type of DataFrame to return unless otherwise explicitly specified.
"""

__df_cache: typing.Dict[typing.Tuple[typing.Literal['cudf', 'pandas'], str],
typing.Union[cdf.DataFrame, pd.DataFrame]] = {}
__df_cache: typing.Dict[typing.Tuple[typing.Literal['cudf', 'pandas'], str], DataFrameType] = {}

# Values in `__instances` are instances of `DatasetLoader`
__instances: typing.Dict[typing.Literal['cudf', 'pandas'], typing.Any] = {}
Expand Down Expand Up @@ -72,7 +72,7 @@ def get_df(self,
file_path: str,
df_type: typing.Literal['cudf', 'pandas'] = None,
no_cache: bool = False,
**reader_kwargs) -> typing.Union[cdf.DataFrame, pd.DataFrame]:
**reader_kwargs) -> DataFrameType:
"""
Fetch a DataFrame specified from `file_path`. If `file_path` is not an absolute path, it is assumed to be
relative to the `test/tests_data` dir. If a DataFrame matching both `file_path` and `df_type` has already been
Expand Down Expand Up @@ -123,8 +123,8 @@ def get_df(self,
return df.copy(deep=True)

def __getitem__(
self, item: typing.Union[str, typing.Tuple[str], typing.Tuple[str, typing.Literal['cudf', 'pandas']]]
) -> typing.Union[cdf.DataFrame, pd.DataFrame]:
self, item: typing.Union[str, typing.Tuple[str], typing.Tuple[str, typing.Literal['cudf',
'pandas']]]) -> DataFrameType:
"""Implements `__getitem__` to allow for fetching DataFrames using the `[]` operator."""
if not isinstance(item, tuple):
item = (item, )
Expand All @@ -146,10 +146,17 @@ def default_df_type(self):
"""Returns the default DataFrame type for this instance of `DatasetManager`."""
return self._default_df_type

@property
def df_class(self):
if self.default_df_type == 'cudf':
cls = cdf.DataFrame
else:
cls = pd.DataFrame

return cls

@staticmethod
def repeat(df: typing.Union[cdf.DataFrame, pd.DataFrame],
repeat_count: int = 2,
reset_index: bool = True) -> typing.Union[cdf.DataFrame, pd.DataFrame]:
def repeat(df: DataFrameType, repeat_count: int = 2, reset_index: bool = True) -> DataFrameType:
"""Returns a DF consisting of `repeat_count` copies of the original."""
if isinstance(df, pd.DataFrame):
concat_fn = pd.concat
Expand All @@ -164,15 +171,12 @@ def repeat(df: typing.Union[cdf.DataFrame, pd.DataFrame],
return repeated_df

@staticmethod
def replace_index(df: typing.Union[cdf.DataFrame, pd.DataFrame],
replace_ids: typing.Dict[int, int]) -> typing.Union[cdf.DataFrame, pd.DataFrame]:
def replace_index(df: DataFrameType, replace_ids: typing.Dict[int, int]) -> DataFrameType:
"""Return a new DataFrame's where we replace some index values with others."""
return df.rename(index=replace_ids)

@classmethod
def dup_index(cls,
df: typing.Union[cdf.DataFrame, pd.DataFrame],
count: int = 1) -> typing.Union[cdf.DataFrame, pd.DataFrame]:
def dup_index(cls, df: DataFrameType, count: int = 1) -> DataFrameType:
"""Randomly duplicate `count` entries in a DataFrame's index"""
assert count * 2 <= len(df), "Count must be less than half the number of rows."

Expand Down
23 changes: 12 additions & 11 deletions tests/utils/nvt/integration/test_mutate_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import typing

import pandas as pd
import pytest
from merlin.dag import ColumnSelector

import cudf
Expand All @@ -21,25 +24,25 @@
from morpheus.utils.nvt.transforms import json_flatten


def setUp():
json_data = [
@pytest.fixture(name="json_data")
def json_data_fixture():
yield [
'{"key1": "value1", "key2": {"subkey1": "subvalue1", "subkey2": "subvalue2"}}',
'{"key1": "value2", "key2": {"subkey1": "subvalue3", "subkey2": "subvalue4"}}',
'{"key1": "value3", "key2": {"subkey1": "subvalue5", "subkey2": "subvalue6"}}'
]

expected_pdf = pd.DataFrame({

@pytest.fixture(name="expected_pdf")
def expected_pdf_fixture():
yield pd.DataFrame({
'col1.key1': ['value1', 'value2', 'value3'],
'col1.key2.subkey1': ['subvalue1', 'subvalue3', 'subvalue5'],
'col1.key2.subkey2': ['subvalue2', 'subvalue4', 'subvalue6']
})

return json_data, expected_pdf


def test_integration_pandas():
json_data, expected_pdf = setUp()

def test_integration_pandas(json_data: typing.List[str], expected_pdf: pd.DataFrame):
pdf = pd.DataFrame({'col1': json_data})
col_selector = ColumnSelector(['col1'])

Expand All @@ -50,9 +53,7 @@ def test_integration_pandas():
assert result_pdf.equals(expected_pdf), "Integration test with pandas DataFrame failed"


def test_integration_cudf():
json_data, expected_pdf = setUp()

def test_integration_cudf(json_data: typing.List[str], expected_pdf: pd.DataFrame):
cdf = cudf.DataFrame({'col1': json_data})
col_selector = ColumnSelector(['col1'])

Expand Down
17 changes: 8 additions & 9 deletions tests/utils/nvt/test_json_flatten_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,25 @@
# limitations under the License.

import pandas as pd
import pytest
from nvtabular.ops.operator import ColumnSelector

import cudf

from morpheus.utils.nvt.transforms import json_flatten


def test_json_flatten_pandas():
data = {
@pytest.fixture(name="data")
def data_fixture():
yield {
"id": [1, 2],
"info": [
'{"name": "John", "age": 30, "city": "New York"}', '{"name": "Jane", "age": 28, "city": "San Francisco"}'
]
}


def test_json_flatten_pandas(data: dict):
df = pd.DataFrame(data)
col_selector = ColumnSelector(["info"])
result = json_flatten(col_selector, df)
Expand All @@ -37,13 +42,7 @@ def test_json_flatten_pandas():
pd.testing.assert_frame_equal(result, expected_df)


def test_json_flatten_cudf():
data = {
"id": [1, 2],
"info": [
'{"name": "John", "age": 30, "city": "New York"}', '{"name": "Jane", "age": 28, "city": "San Francisco"}'
]
}
def test_json_flatten_cudf(data: dict):
df = cudf.DataFrame(data)
col_selector = ColumnSelector(["info"])
result = json_flatten(col_selector, df)
Expand Down
29 changes: 12 additions & 17 deletions tests/utils/nvt/test_mutate_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import numpy as np
import pandas as pd
import pytest
from merlin.core.dispatch import DataFrameType
from merlin.schema import ColumnSchema
from merlin.schema import Schema
Expand All @@ -22,20 +23,19 @@
from morpheus.utils.nvt import MutateOp


def setUp():
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6], 'C': [7, 8, 9]})
@pytest.fixture(name="df")
def df_fixture():
yield pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6], 'C': [7, 8, 9]})

def example_transform(col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType:
selected_columns = col_selector.names
for col in selected_columns:
df[col + '_new'] = df[col] * 2
return df

return df, example_transform
def example_transform(col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType:
selected_columns = col_selector.names
for col in selected_columns:
df[col + '_new'] = df[col] * 2
return df


def test_transform():
df, example_transform = setUp()
def test_transform(df: DataFrameType):
op = MutateOp(example_transform, output_columns=[('A_new', np.dtype('int64')), ('B_new', np.dtype('int64'))])
col_selector = ColumnSelector(['A', 'B'])
transformed_df = op.transform(col_selector, df)
Expand All @@ -52,9 +52,7 @@ def test_transform():


# Test for lambda function transformation
def test_transform_lambda():
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6], 'C': [7, 8, 9]})

def test_transform_lambda(df: DataFrameType):
op = MutateOp(lambda col_selector,
df: df.assign(**{f"{col}_new": df[col] * 2
for col in col_selector.names}),
Expand All @@ -69,8 +67,7 @@ def test_transform_lambda():
assert transformed_df.equals(expected_df), "Test transform with lambda failed"


def test_transform_additional_columns():
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6], 'C': [7, 8, 9]})
def test_transform_additional_columns(df: DataFrameType):

def additional_transform(col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType:
selected_columns = col_selector.names
Expand All @@ -93,7 +90,6 @@ def additional_transform(col_selector: ColumnSelector, df: DataFrameType) -> Dat


def test_column_mapping():
_, example_transform = setUp()
op = MutateOp(example_transform, output_columns=[('A_new', np.dtype('int64')), ('B_new', np.dtype('int64'))])
col_selector = ColumnSelector(['A', 'B'])
column_mapping = op.column_mapping(col_selector)
Expand All @@ -104,7 +100,6 @@ def test_column_mapping():


def test_compute_output_schema():
_, example_transform = setUp()
op = MutateOp(example_transform, output_columns=[('A_new', np.dtype('int64')), ('B_new', np.dtype('int64'))])
col_selector = ColumnSelector(['A', 'B'])

Expand Down
40 changes: 12 additions & 28 deletions tests/utils/nvt/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,51 +13,35 @@
# limitations under the License.

import pandas as pd
import pytest
from nvtabular.ops.operator import ColumnSelector

import cudf

from morpheus.utils.nvt.transforms import json_flatten
from utils.dataset_manager import DatasetManager


def test_json_flatten_pandas():
data = {
@pytest.fixture(name="data")
def data_fixture():
yield {
"id": [1, 2],
"info": [
'{"name": "John", "age": 30, "city": "New York"}', '{"name": "Jane", "age": 28, "city": "San Francisco"}'
]
}
df = pd.DataFrame(data)
col_selector = ColumnSelector(["info"])
result = json_flatten(col_selector, df)

expected_data = {"info.name": ["John", "Jane"], "info.age": [30, 28], "info.city": ["New York", "San Francisco"]}
expected_df = pd.DataFrame(expected_data)

pd.testing.assert_frame_equal(result, expected_df)
@pytest.fixture(name="df")
def df_fixture(dataset: DatasetManager, data: dict):
yield dataset.df_class(data)


def test_json_flatten_cudf():
data = {
"id": [1, 2],
"info": [
'{"name": "John", "age": 30, "city": "New York"}', '{"name": "Jane", "age": 28, "city": "San Francisco"}'
]
}
df = cudf.DataFrame(data)
def test_json_flatten(df: pd.DataFrame):
col_selector = ColumnSelector(["info"])
result = json_flatten(col_selector, df)

expected_data = {
"id": [1, 2], "info.name": ["John", "Jane"], "info.age": [30, 28], "info.city": ["New York", "San Francisco"]
}
expected_df = cudf.DataFrame(expected_data)

assert_frame_equal(result, expected_df)

expected_data = {"info.name": ["John", "Jane"], "info.age": [30, 28], "info.city": ["New York", "San Francisco"]}
expected_df = pd.DataFrame(expected_data)

def assert_frame_equal(df1, df2):
assert len(df1) == len(df2), "DataFrames have different lengths"
for col in df1.columns:
assert col in df2, f"Column {col} not found in the second DataFrame"
assert (df1[col] == df2[col]).all(), f"Column {col} values do not match"
DatasetManager.assert_df_equal(result, expected_df)

0 comments on commit 1a4893d

Please sign in to comment.