Skip to content

Commit

Permalink
Add tests for issue #1074 and update Categorify args (#1093)
Browse files Browse the repository at this point in the history
  • Loading branch information
lesnikow authored Sep 13, 2021
1 parent 267f67f commit 5dd2820
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 2 deletions.
87 changes: 85 additions & 2 deletions nvtabular/ops/categorify.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ class Categorify(StatOperator):
value will be `max_size - num_buckets -1`. Setting the max_size param means that
freq_threshold should not be given. If the num_buckets parameter is set, it must be
smaller than the max_size value.
start_index: int, default 0
The start index where Categorify will begin to translate dataframe entries
into integer values, including an initial out-of-vocabulary encoding value.
For instance, if our original translated dataframe entries appear
as [[1], [1, 4], [3, 2], [2]], with an out-of-vocabulary value of 0, then with a
start_index of 16, Categorify will reserve 16 as the out-of-vocabulary encoding value,
and our new translated dataframe entry will now be [[17], [17, 20], [19, 18], [18]].
This parameter is useful to reserve an intial segment of non-negative translated integers
for special user-defined values.
"""

def __init__(
Expand All @@ -192,6 +201,7 @@ def __init__(
num_buckets=None,
vocabs=None,
max_size=0,
start_index=0,
):

# We need to handle three types of encoding here:
Expand Down Expand Up @@ -244,6 +254,7 @@ def __init__(
self.cat_cache = cat_cache
self.encode_type = encode_type
self.search_sorted = search_sorted
self.start_index = start_index

if self.search_sorted and self.freq_threshold:
raise ValueError(
Expand Down Expand Up @@ -439,6 +450,7 @@ def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFram
cat_names=cat_names,
max_size=self.max_size,
dtype=self.dtype,
start_index=self.start_index,
)
except Exception as e:
raise RuntimeError(f"Failed to categorical encode column {name}") from e
Expand All @@ -455,7 +467,12 @@ def output_column_names(self, col_selector: ColumnSelector) -> ColumnSelector:

def get_embedding_sizes(self, columns):
return _get_embeddings_dask(
self.categories, columns, self.num_buckets, self.freq_threshold, self.max_size
self.categories,
columns,
self.num_buckets,
self.freq_threshold,
self.max_size,
self.start_index,
)

def inference_initialize(self, columns, inference_config):
Expand Down Expand Up @@ -542,7 +559,7 @@ def get_embedding_sizes(source, output_dtypes=None):
return single_hots, multi_hots


def _get_embeddings_dask(paths, cat_names, buckets=0, freq_limit=0, max_size=0):
def _get_embeddings_dask(paths, cat_names, buckets=0, freq_limit=0, max_size=0, start_index=0):
embeddings = {}
if isinstance(freq_limit, int):
freq_limit = {name: freq_limit for name in cat_names}
Expand All @@ -564,6 +581,7 @@ def _get_embeddings_dask(paths, cat_names, buckets=0, freq_limit=0, max_size=0):
else:
num_rows += bucket_size

num_rows += start_index
embeddings[col] = _emb_sz_rule(num_rows)
return embeddings

Expand Down Expand Up @@ -610,6 +628,8 @@ class FitOptions:
num_buckets:
If specified will also do hashing operation for values that would otherwise be mapped
to as unknown (by freq_limit or max_size parameters)
start_index: int
The index to start mapping our output categorical values to.
"""

col_groups: list
Expand All @@ -624,6 +644,7 @@ class FitOptions:
name_sep: str = "-"
max_size: Optional[Union[int, dict]] = None
num_buckets: Optional[Union[int, dict]] = None
start_index: int = 0

def __post_init__(self):
if not isinstance(self.col_groups, ColumnSelector):
Expand Down Expand Up @@ -861,6 +882,26 @@ def _write_gb_stats(dfs, base_path, col_selector: ColumnSelector, options: FitOp

@annotate("write_uniques", color="green", domain="nvt_python")
def _write_uniques(dfs, base_path, col_selector: ColumnSelector, options: FitOptions):
"""Writes out a dataframe to a parquet file.
Parameters
----------
dfs : DataFrame
base_path : str
col_selector :
options : FitOptions
Raises
------
ValueError
If the computed nlargest value is non-positive.
Returns
-------
path : str
the path to the output parquet file.
"""
if options.concat_groups and len(col_selector.names) > 1:
col_selector = ColumnSelector([_make_name(*col_selector.names, sep=options.name_sep)])

Expand Down Expand Up @@ -1054,7 +1095,48 @@ def _encode(
cat_names=None,
max_size=0,
dtype=None,
start_index=0,
):
"""The _encode method is responsible for transforming a dataframe by taking the written
out vocabulary file and looking up values to translate inputs to numeric
outputs.
Parameters
----------
name :
storage_name : dict
path : str
df : DataFrame
cat_cache :
na_sentinel : int
Sentinel for NA value. Defaults to -1.
freq_threshold : int
Categories with a count or frequency below this threshold will
be ommitted from the encoding and corresponding data will be
mapped to the "Null" category. Defaults to 0.
search_sorted :
Defaults to False.
buckets :
Defaults to None.
encode_type :
Defaults to "joint".
cat_names :
Defaults to None.
max_size :
Defaults to 0.
dtype :
Defaults to None.
start_index : int
The index to start outputing categorical values to. This is useful
to, for instance, reserve an initial segment of non-negative
integers for out-of-vocabulary or other special values. Defaults
to 1.
Returns
-------
labels : numpy ndarray or Pandas Series
"""
if isinstance(buckets, int):
buckets = {name: buckets for name in cat_names}
# this is to apply freq_hashing logic
Expand Down Expand Up @@ -1139,6 +1221,7 @@ def _encode(
)
labels[labels >= len(value[selection_r.names])] = na_sentinel

labels = labels + start_index
if list_col:
labels = dispatch._encode_list_column(df[selection_l.names[0]], labels, dtype=dtype)
elif dtype:
Expand Down
46 changes: 46 additions & 0 deletions tests/unit/ops/test_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,52 @@ def test_categorify_lists(tmpdir, freq_threshold, cpu, dtype, vocabs):
assert compare == [[1], [1, 0], [0, 2], [2]]


@pytest.mark.parametrize("cpu", _CPU)
@pytest.mark.parametrize("start_index", [1, 2, 16])
def test_categorify_lists_with_start_index(tmpdir, cpu, start_index):
df = dispatch._make_df(
{
"Authors": [["User_A"], ["User_A", "User_E"], ["User_B", "User_C"], ["User_C"]],
"Engaging User": ["User_B", "User_B", "User_A", "User_D"],
"Post": [1, 2, 3, 4],
}
)
cat_names = ["Authors", "Engaging User"]
label_name = ["Post"]
dataset = nvt.Dataset(df, cpu=cpu)
cat_features = cat_names >> ops.Categorify(out_path=str(tmpdir), start_index=start_index)
processor = nvt.Workflow(cat_features + label_name)
processor.fit(dataset)
df_out = processor.transform(dataset).to_ddf().compute()

if cpu:
compare = [list(row) for row in df_out["Authors"].tolist()]
else:
compare = df_out["Authors"].to_arrow().to_pylist()

# Note that start_index is the start_index of the range of encoding, which
# includes both an initial value for the encoding for out-of-vocabulary items,
# as well as the values for the rest of the in-vocabulary items.
# In this group of tests below, there are no out-of-vocabulary items, so our start index
# value does not appear in the expected comparison object.
if start_index == 0:
assert compare == [[1], [1, 4], [3, 2], [2]]
elif start_index == 1:
assert compare == [[2], [2, 5], [4, 3], [3]]
elif start_index == 16:
assert compare == [[17], [17, 20], [19, 18], [18]]

# We expect five entries in the embedding size, one for each author,
# plus start_index many additional entries for our offset start_index.
embeddings = nvt.ops.get_embedding_sizes(processor)

# MH embeddings on GPU are returned as a tuple of (singlehot, multihot)
if not cpu:
embeddings = embeddings[1]

assert embeddings["Authors"][0] == (5 + start_index)


@pytest.mark.parametrize("cat_names", [[["Author", "Engaging User"]], ["Author", "Engaging User"]])
@pytest.mark.parametrize("kind", ["joint", "combo"])
@pytest.mark.parametrize("cpu", _CPU)
Expand Down

0 comments on commit 5dd2820

Please sign in to comment.