Skip to content

Commit

Permalink
PERF-modin-project#5554: Implement drop_duplicates via new `duplica…
Browse files Browse the repository at this point in the history
…ted`

Signed-off-by: Igoshev, Iaroslav <iaroslav.igoshev@intel.com>
  • Loading branch information
YarShev committed Jan 27, 2023
1 parent e56a80c commit 550a7db
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 16 deletions.
17 changes: 17 additions & 0 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1962,6 +1962,23 @@ def dropna(self, **kwargs): # noqa: PR02
"""
return DataFrameDefault.register(pandas.DataFrame.dropna)(self, **kwargs)

@doc_utils.add_refer_to("DataFrame.duplicated")
def duplicated(self, **kwargs):
"""
Return boolean Series denoting duplicate rows.
Parameters
----------
**kwargs : dict
Additional keyword arguments to be passed in to `pandas.DataFrame.duplicated`.
Returns
-------
BaseQueryCompiler
New QueryCompiler containing boolean Series denoting duplicate rows.
"""
return DataFrameDefault.register(pandas.DataFrame.duplicated)(self, **kwargs)

@doc_utils.add_refer_to("DataFrame.nlargest")
def nlargest(self, n=5, columns=None, keep="first"):
"""
Expand Down
42 changes: 42 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from collections.abc import Iterable
from typing import List, Hashable
import warnings
import hashlib

from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler
from modin.config import Engine
Expand Down Expand Up @@ -2363,6 +2364,47 @@ def drop(self, index=None, columns=None, errors: str = "raise"):

# END Drop/Dropna

def duplicated(self, **kwargs):
def _compute_hash(df):
return df.apply(
lambda s: hashlib.new("md5", str(tuple(s)).encode()).hexdigest(), axis=1
).to_frame()

def _compute_duplicated(df):
return df.duplicated(**kwargs).to_frame()

new_index = self._modin_frame._index_cache
new_columns = [MODIN_UNNAMED_SERIES_LABEL]
if len(self.columns) > 1:
# if the number of columns we are checking for duplicates is larger than 1,
# we must hash them to generate a single value that can be compared across rows.
hashed_modin_frame = self._modin_frame.apply_full_axis(
1,
_compute_hash,
new_index=new_index,
new_columns=new_columns,
keep_partitioning=False,
dtypes=np.dtype("O"),
)
new_modin_frame = hashed_modin_frame.apply_full_axis(
0,
_compute_duplicated,
new_index=new_index,
new_columns=new_columns,
keep_partitioning=False,
dtypes=np.bool_,
)
else:
new_modin_frame = self._modin_frame.apply_full_axis(
0,
_compute_duplicated,
new_index=new_index,
new_columns=new_columns,
keep_partitioning=False,
dtypes=np.bool_,
)
return self.__constructor__(new_modin_frame, shape_hint="column")

# Insert
# This method changes the shape of the resulting data. In Pandas, this
# operation is always inplace, but this object is immutable, so we just
Expand Down
9 changes: 5 additions & 4 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1384,18 +1384,19 @@ def drop_duplicates(
Return `BasePandasDataset` with duplicate rows removed.
"""
inplace = validate_bool_kwarg(inplace, "inplace")
subset = kwargs.get("subset", None)
ignore_index = kwargs.get("ignore_index", False)
subset = kwargs.get("subset", None)
if subset is not None:
if is_list_like(subset):
if not isinstance(subset, list):
subset = list(subset)
else:
subset = [subset]
duplicates = self.duplicated(keep=keep, subset=subset)
df = self[subset]
else:
duplicates = self.duplicated(keep=keep)
result = self[~duplicates]
df = self
duplicated = df.duplicated(keep=keep)
result = self[~duplicated]
if ignore_index:
result.index = pandas.RangeIndex(stop=len(result))
if inplace:
Expand Down
15 changes: 3 additions & 12 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,19 +334,10 @@ def duplicated(self, subset=None, keep="first"): # noqa: PR01, RT01, D200
"""
Return boolean ``Series`` denoting duplicate rows.
"""
import hashlib

df = self[subset] if subset is not None else self
# if the number of columns we are checking for duplicates is larger than 1, we must
# hash them to generate a single value that can be compared across rows.
if len(df.columns) > 1:
hashed = df.apply(
lambda s: hashlib.new("md5", str(tuple(s)).encode()).hexdigest(), axis=1
).to_frame()
else:
hashed = df
duplicates = hashed.apply(lambda s: s.duplicated(keep=keep)).squeeze(axis=1)
# remove Series name which was assigned automatically by .apply
new_qc = df._query_compiler.duplicated(keep=keep)
duplicates = self._reduce_dimension(new_qc)
# remove Series name which was assigned automatically by .apply in QC
duplicates.name = None
return duplicates

Expand Down

0 comments on commit 550a7db

Please sign in to comment.