From 29ddaed341b00329146ee3998066211a4442a773 Mon Sep 17 00:00:00 2001 From: Karthik Velayutham Date: Fri, 10 Mar 2023 09:25:22 -0600 Subject: [PATCH] FEAT: Implement df.applymap and Series.apply [DO NOT UPSTREAM] (#9) Co-authored-by: Mahesh Vashishtha --------- Co-authored-by: Mahesh Vashishtha --- modin/pandas/base.py | 11 ++++++++++- modin/pandas/dataframe.py | 20 +++++++++++++++++++- modin/pandas/series.py | 34 ++++++++++++++++++++++++---------- 3 files changed, 53 insertions(+), 12 deletions(-) diff --git a/modin/pandas/base.py b/modin/pandas/base.py index 7281a2be3bf..99335d2a62e 100644 --- a/modin/pandas/base.py +++ b/modin/pandas/base.py @@ -875,6 +875,15 @@ def apply( Apply a function along an axis of the `BasePandasDataset`. """ import cloudpickle + import pickle + import sys + + if sys.version_info.major == 3 and sys.version_info.minor != 8: + version = ".".join(map(str, sys.version_info[:3])) + warnings.warn( + f"current Python version is {version}, but expected 3.8. User defined" + + " functions may not work as expected due to compatibility issues." + ) def error_raiser(msg, exception): """Convert passed exception to the same type as pandas do and raise it.""" @@ -911,7 +920,7 @@ def error_raiser(msg, exception): **kwds, ) query_compiler = self._query_compiler.apply( - cloudpickle.dumps(func), + cloudpickle.dumps(func, protocol=pickle.DEFAULT_PROTOCOL), axis, args=args, raw=raw, diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 5166b1fb907..b55e444e8de 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -371,11 +371,29 @@ def add_suffix(self, suffix): # noqa: PR01, RT01, D200 ) def applymap(self, func, na_action: Optional[str] = None, **kwargs): + import cloudpickle + import pickle + import sys + + if sys.version_info.major == 3 and sys.version_info.minor != 8: + version = ".".join(map(str, sys.version_info[:3])) + warnings.warn( + f"current Python version is {version}, but expected 3.8. User defined" + + " functions may not work as expected due to compatibility issues." + ) + if not callable(func): raise ValueError("'{0}' object is not callable".format(type(func))) + if na_action not in ("ignore", None): + raise ValueError(f"na_action must be 'ignore' or None. Got {na_action}") + + output_meta = self._to_pandas().applymap(func, na_action=na_action, **kwargs) return self.__constructor__( query_compiler=self._query_compiler.applymap( - func, na_action=na_action, **kwargs + cloudpickle.dumps(func, protocol=pickle.DEFAULT_PROTOCOL), + na_action=na_action, + output_meta=output_meta, + **kwargs, ) ) diff --git a/modin/pandas/series.py b/modin/pandas/series.py index 63520bf08ac..0e08f38152c 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -631,6 +631,15 @@ def apply( """ Invoke function on values of Series. """ + import sys + + if sys.version_info.major == 3 and sys.version_info.minor != 8: + version = ".".join(map(str, sys.version_info[:3])) + warnings.warn( + f"current Python version is {version}, but expected 3.8. User defined" + + " functions may not work as expected due to compatibility issues." + ) + self._validate_function(func) # apply and aggregate have slightly different behaviors, so we have to use # each one separately to determine the correct return type. In the case of @@ -1232,13 +1241,16 @@ def map(self, arg, na_action=None): # noqa: PR01, RT01, D200 """ Map values of Series according to input correspondence. """ - if isinstance(arg, type(self)): - # HACK: if we don't cast to pandas, then the execution engine will try to - # propagate the distributed Series to workers and most likely would have - # some performance problems. - # TODO: A better way of doing so could be passing this `arg` as a query compiler - # and broadcast accordingly. - arg = arg._to_pandas() + import cloudpickle + import pickle + import sys + + if sys.version_info.major == 3 and sys.version_info.minor != 8: + version = ".".join(map(str, sys.version_info[:3])) + warnings.warn( + f"current Python version is {version}, but expected 3.8. User defined" + + " functions may not work as expected due to compatibility issues." + ) if not callable(arg) and hasattr(arg, "get"): mapper = arg @@ -1246,11 +1258,13 @@ def map(self, arg, na_action=None): # noqa: PR01, RT01, D200 def arg(s): return mapper.get(s, np.nan) + output_meta = self._to_pandas().map(arg, na_action) + + func = lambda s: s if pandas.isnull(s) and na_action else arg(s) return self.__constructor__( query_compiler=self._query_compiler.applymap( - lambda s: arg(s) - if pandas.isnull(s) is not True or na_action is None - else s + cloudpickle.dumps(func, protocol=pickle.DEFAULT_PROTOCOL), + output_meta=output_meta, ) )