Skip to content

Commit

Permalink
FEAT: Implement df.applymap and Series.apply [DO NOT UPSTREAM] (modin…
Browse files Browse the repository at this point in the history
…-project#9)

Co-authored-by: Mahesh Vashishtha <mvashishtha@users.noreply.github.com>
---------

Co-authored-by: Mahesh Vashishtha <mvashishtha@users.noreply.github.com>
  • Loading branch information
Karthik Velayutham and mvashishtha committed Mar 10, 2023
1 parent 1d10d97 commit 13b4cca
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 5 deletions.
11 changes: 10 additions & 1 deletion modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,15 @@ def apply(
Apply a function along an axis of the `BasePandasDataset`.
"""
import cloudpickle
import pickle
import sys

if sys.version_info.major == 3 and sys.version_info.minor != 8:
version = ".".join(map(str, sys.version_info[:3]))
warnings.warn(
f"current Python version is {version}, but expected 3.8. User defined"
+ " functions may not work as expected due to compatibility issues."
)

def error_raiser(msg, exception):
"""Convert passed exception to the same type as pandas do and raise it."""
Expand Down Expand Up @@ -909,7 +918,7 @@ def error_raiser(msg, exception):
**kwds,
)
query_compiler = self._query_compiler.apply(
cloudpickle.dumps(func),
cloudpickle.dumps(func, protocol=pickle.DEFAULT_PROTOCOL),
axis,
args=args,
raw=raw,
Expand Down
20 changes: 19 additions & 1 deletion modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,29 @@ def add_suffix(self, suffix): # noqa: PR01, RT01, D200
return DataFrame(query_compiler=self._query_compiler.add_suffix(suffix))

def applymap(self, func, na_action: Optional[str] = None, **kwargs):
import cloudpickle
import pickle
import sys

if sys.version_info.major == 3 and sys.version_info.minor != 8:
version = ".".join(map(str, sys.version_info[:3]))
warnings.warn(
f"current Python version is {version}, but expected 3.8. User defined"
+ " functions may not work as expected due to compatibility issues."
)

if not callable(func):
raise ValueError("'{0}' object is not callable".format(type(func)))
if na_action not in ("ignore", None):
raise ValueError(f"na_action must be 'ignore' or None. Got {na_action}")

output_meta = self._to_pandas().applymap(func, na_action=na_action, **kwargs)
return DataFrame(
query_compiler=self._query_compiler.applymap(
func, na_action=na_action, **kwargs
cloudpickle.dumps(func, protocol=pickle.DEFAULT_PROTOCOL),
na_action=na_action,
output_meta=output_meta,
**kwargs,
)
)

Expand Down
28 changes: 25 additions & 3 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,15 @@ def apply(
"""
Invoke function on values of Series.
"""
import sys

if sys.version_info.major == 3 and sys.version_info.minor != 8:
version = ".".join(map(str, sys.version_info[:3]))
warnings.warn(
f"current Python version is {version}, but expected 3.8. User defined"
+ " functions may not work as expected due to compatibility issues."
)

self._validate_function(func)
# apply and aggregate have slightly different behaviors, so we have to use
# each one separately to determine the correct return type. In the case of
Expand Down Expand Up @@ -1203,17 +1212,30 @@ def map(self, arg, na_action=None): # noqa: PR01, RT01, D200
"""
Map values of Series according to input correspondence.
"""
import cloudpickle
import pickle
import sys

if sys.version_info.major == 3 and sys.version_info.minor != 8:
version = ".".join(map(str, sys.version_info[:3]))
warnings.warn(
f"current Python version is {version}, but expected 3.8. User defined"
+ " functions may not work as expected due to compatibility issues."
)

if not callable(arg) and hasattr(arg, "get"):
mapper = arg

def arg(s):
return mapper.get(s, np.nan)

output_meta = self._to_pandas().map(arg, na_action)

func = lambda s: s if pandas.isnull(s) and na_action else arg(s)
return self.__constructor__(
query_compiler=self._query_compiler.applymap(
lambda s: arg(s)
if pandas.isnull(s) is not True or na_action is None
else s
cloudpickle.dumps(func, protocol=pickle.DEFAULT_PROTOCOL),
output_meta=output_meta,
)
)

Expand Down

0 comments on commit 13b4cca

Please sign in to comment.