diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index 3a65e12b6c8..e0aa7093e9b 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -1962,6 +1962,23 @@ def dropna(self, **kwargs): # noqa: PR02 """ return DataFrameDefault.register(pandas.DataFrame.dropna)(self, **kwargs) + @doc_utils.add_refer_to("DataFrame.duplicated") + def duplicated(self, **kwargs): + """ + Return boolean Series denoting duplicate rows. + + Parameters + ---------- + **kwargs : dict + Additional keyword arguments to be passed in to `pandas.DataFrame.duplicated`. + + Returns + ------- + BaseQueryCompiler + New QueryCompiler containing boolean Series denoting duplicate rows. + """ + return DataFrameDefault.register(pandas.DataFrame.duplicated)(self, **kwargs) + @doc_utils.add_refer_to("DataFrame.nlargest") def nlargest(self, n=5, columns=None, keep="first"): """ diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 74bf149df21..36796f8e78a 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -38,6 +38,7 @@ from collections.abc import Iterable from typing import List, Hashable import warnings +import hashlib from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler from modin.config import Engine @@ -2360,6 +2361,40 @@ def drop(self, index=None, columns=None, errors: str = "raise"): # END Drop/Dropna + def duplicated(self, **kwargs): + def _compute_hash(df): + return df.apply( + lambda s: hashlib.new("md5", str(tuple(s)).encode()).hexdigest(), axis=1 + ).to_frame() + + def _compute_duplicated(df): + return df.duplicated(**kwargs).to_frame() + + new_index = self._modin_frame._index_cache + new_columns = [MODIN_UNNAMED_SERIES_LABEL] + if len(self.columns) > 1: + # if the number of columns we are checking for duplicates is larger than 1, + # we must hash them to generate a single value that can be compared across rows. + hashed_modin_frame = self._modin_frame.apply_full_axis( + 1, + _compute_hash, + new_index=new_index, + new_columns=new_columns, + keep_partitioning=False, + dtypes=np.dtype("O"), + ) + else: + hashed_modin_frame = self._modin_frame + new_modin_frame = hashed_modin_frame.apply_full_axis( + 0, + _compute_duplicated, + new_index=new_index, + new_columns=new_columns, + keep_partitioning=False, + dtypes=np.bool_, + ) + return self.__constructor__(new_modin_frame, shape_hint="column") + # Insert # This method changes the shape of the resulting data. In Pandas, this # operation is always inplace, but this object is immutable, so we just diff --git a/modin/pandas/base.py b/modin/pandas/base.py index f9087be81e7..9e30b3ae9bf 100644 --- a/modin/pandas/base.py +++ b/modin/pandas/base.py @@ -1384,18 +1384,19 @@ def drop_duplicates( Return `BasePandasDataset` with duplicate rows removed. """ inplace = validate_bool_kwarg(inplace, "inplace") - subset = kwargs.get("subset", None) ignore_index = kwargs.get("ignore_index", False) + subset = kwargs.get("subset", None) if subset is not None: if is_list_like(subset): if not isinstance(subset, list): subset = list(subset) else: subset = [subset] - duplicates = self.duplicated(keep=keep, subset=subset) + df = self[subset] else: - duplicates = self.duplicated(keep=keep) - result = self[~duplicates] + df = self + duplicated = df.duplicated(keep=keep) + result = self[~duplicated] if ignore_index: result.index = pandas.RangeIndex(stop=len(result)) if inplace: diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index f603489be10..2980be08a4d 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -334,19 +334,10 @@ def duplicated(self, subset=None, keep="first"): # noqa: PR01, RT01, D200 """ Return boolean ``Series`` denoting duplicate rows. """ - import hashlib - df = self[subset] if subset is not None else self - # if the number of columns we are checking for duplicates is larger than 1, we must - # hash them to generate a single value that can be compared across rows. - if len(df.columns) > 1: - hashed = df.apply( - lambda s: hashlib.new("md5", str(tuple(s)).encode()).hexdigest(), axis=1 - ).to_frame() - else: - hashed = df - duplicates = hashed.apply(lambda s: s.duplicated(keep=keep)).squeeze(axis=1) - # remove Series name which was assigned automatically by .apply + new_qc = df._query_compiler.duplicated(keep=keep) + duplicates = self._reduce_dimension(new_qc) + # remove Series name which was assigned automatically by .apply in QC duplicates.name = None return duplicates