Skip to content

Commit

Permalink
FEAT: Implement df.applymap and Series.apply [DO NOT UPSTREAM] (modin…
Browse files Browse the repository at this point in the history
…-project#9)

Co-authored-by: Mahesh Vashishtha <mvashishtha@users.noreply.github.com>
---------

Co-authored-by: Mahesh Vashishtha <mvashishtha@users.noreply.github.com>
  • Loading branch information
2 people authored and vnlitvinov committed Mar 16, 2023
1 parent 20b7282 commit 3672270
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 12 deletions.
11 changes: 10 additions & 1 deletion modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,15 @@ def apply(
Apply a function along an axis of the `BasePandasDataset`.
"""
import cloudpickle
import pickle
import sys

if sys.version_info.major == 3 and sys.version_info.minor != 8:
version = ".".join(map(str, sys.version_info[:3]))
warnings.warn(
f"current Python version is {version}, but expected 3.8. User defined"
+ " functions may not work as expected due to compatibility issues."
)

def error_raiser(msg, exception):
"""Convert passed exception to the same type as pandas do and raise it."""
Expand Down Expand Up @@ -911,7 +920,7 @@ def error_raiser(msg, exception):
**kwds,
)
query_compiler = self._query_compiler.apply(
cloudpickle.dumps(func),
cloudpickle.dumps(func, protocol=pickle.DEFAULT_PROTOCOL),
axis,
args=args,
raw=raw,
Expand Down
20 changes: 19 additions & 1 deletion modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,29 @@ def add_suffix(self, suffix): # noqa: PR01, RT01, D200
)

def applymap(self, func, na_action: Optional[str] = None, **kwargs):
import cloudpickle
import pickle
import sys

if sys.version_info.major == 3 and sys.version_info.minor != 8:
version = ".".join(map(str, sys.version_info[:3]))
warnings.warn(
f"current Python version is {version}, but expected 3.8. User defined"
+ " functions may not work as expected due to compatibility issues."
)

if not callable(func):
raise ValueError("'{0}' object is not callable".format(type(func)))
if na_action not in ("ignore", None):
raise ValueError(f"na_action must be 'ignore' or None. Got {na_action}")

output_meta = self._to_pandas().applymap(func, na_action=na_action, **kwargs)
return self.__constructor__(
query_compiler=self._query_compiler.applymap(
func, na_action=na_action, **kwargs
cloudpickle.dumps(func, protocol=pickle.DEFAULT_PROTOCOL),
na_action=na_action,
output_meta=output_meta,
**kwargs,
)
)

Expand Down
34 changes: 24 additions & 10 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,15 @@ def apply(
"""
Invoke function on values of Series.
"""
import sys

if sys.version_info.major == 3 and sys.version_info.minor != 8:
version = ".".join(map(str, sys.version_info[:3]))
warnings.warn(
f"current Python version is {version}, but expected 3.8. User defined"
+ " functions may not work as expected due to compatibility issues."
)

self._validate_function(func)
# apply and aggregate have slightly different behaviors, so we have to use
# each one separately to determine the correct return type. In the case of
Expand Down Expand Up @@ -1232,25 +1241,30 @@ def map(self, arg, na_action=None): # noqa: PR01, RT01, D200
"""
Map values of Series according to input correspondence.
"""
if isinstance(arg, type(self)):
# HACK: if we don't cast to pandas, then the execution engine will try to
# propagate the distributed Series to workers and most likely would have
# some performance problems.
# TODO: A better way of doing so could be passing this `arg` as a query compiler
# and broadcast accordingly.
arg = arg._to_pandas()
import cloudpickle
import pickle
import sys

if sys.version_info.major == 3 and sys.version_info.minor != 8:
version = ".".join(map(str, sys.version_info[:3]))
warnings.warn(
f"current Python version is {version}, but expected 3.8. User defined"
+ " functions may not work as expected due to compatibility issues."
)

if not callable(arg) and hasattr(arg, "get"):
mapper = arg

def arg(s):
return mapper.get(s, np.nan)

output_meta = self._to_pandas().map(arg, na_action)

func = lambda s: s if pandas.isnull(s) and na_action else arg(s)
return self.__constructor__(
query_compiler=self._query_compiler.applymap(
lambda s: arg(s)
if pandas.isnull(s) is not True or na_action is None
else s
cloudpickle.dumps(func, protocol=pickle.DEFAULT_PROTOCOL),
output_meta=output_meta,
)
)

Expand Down

0 comments on commit 3672270

Please sign in to comment.