Skip to content

Commit

Permalink
[SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for f…
Browse files Browse the repository at this point in the history
…rame methods

### What changes were proposed in this pull request?
1, Introduce a basic fallback mechanism for frame methods, with a new option `compute.pandas_fallback` default false;
2, implement `Frame.asfreq` and `Frame.asof`

### Why are the changes needed?
for pandas parity

### Does this PR introduce _any_ user-facing change?
yes

```
In [1]: import pyspark.pandas as ps
   ...: import pandas as pd
   ...:
   ...: index = pd.date_range('1/1/2000', periods=4, freq='min')
   ...: series = pd.Series([0.0, None, 2.0, 3.0], index=index)
   ...: pdf = pd.DataFrame({'s': series})
   ...: psdf = ps.from_pandas(pdf)

In [2]: psdf.asfreq(freq='30s')
---------------------------------------------------------------------------
PandasNotImplementedError                 Traceback (most recent call last)
Cell In[2], line 1
----> 1 psdf.asfreq(freq='30s')

File ~/Dev/spark/python/pyspark/pandas/missing/__init__.py:23, in unsupported_function.<locals>.unsupported_function(*args, **kwargs)
     22 def unsupported_function(*args, **kwargs):
---> 23     raise PandasNotImplementedError(
     24         class_name=class_name, method_name=method_name, reason=reason
     25     )

PandasNotImplementedError: The method `pd.DataFrame.asfreq()` is not implemented yet.

In [3]: ps.set_option("compute.pandas_fallback", True)

In [4]: psdf.asfreq(freq='30s')
/Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:1015: PandasAPIOnSparkAdviceWarning: `asfreq` is executed in fallback mode. It loads partial data into the driver's memory to infer the schema, and loads all data into one executor's memory to compute. It should only be used if the pandas DataFrame is expected to be small.
  warnings.warn(message, PandasAPIOnSparkAdviceWarning)
/Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:1015: PandasAPIOnSparkAdviceWarning: If the type hints is not specified for `groupby.apply`, it is expensive to infer the data type internally.
  warnings.warn(message, PandasAPIOnSparkAdviceWarning)
Out[4]:
                       s
2000-01-01 00:00:00  0.0
2000-01-01 00:00:30  NaN
2000-01-01 00:01:00  NaN
2000-01-01 00:01:30  NaN
2000-01-01 00:02:00  2.0
2000-01-01 00:02:30  NaN
2000-01-01 00:03:00  3.0
```

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#44869 from zhengruifeng/ps_df_fallback.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
  • Loading branch information
zhengruifeng committed Jan 25, 2024
1 parent 95ea2a6 commit 8e1fa56
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 0 deletions.
6 changes: 6 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,9 @@ def __hash__(self):
"pyspark.pandas.tests.io.test_dataframe_conversion",
"pyspark.pandas.tests.io.test_dataframe_spark_io",
"pyspark.pandas.tests.io.test_series_conversion",
# fallback
"pyspark.pandas.tests.frame.test_asfreq",
"pyspark.pandas.tests.frame.test_asof",
],
excluded_python_implementations=[
"PyPy" # Skip these tests under PyPy since they require numpy, pandas, and pyarrow and
Expand Down Expand Up @@ -1200,6 +1203,9 @@ def __hash__(self):
"pyspark.pandas.tests.connect.reshape.test_parity_get_dummies_object",
"pyspark.pandas.tests.connect.reshape.test_parity_get_dummies_prefix",
"pyspark.pandas.tests.connect.reshape.test_parity_merge_asof",
# fallback
"pyspark.pandas.tests.connect.frame.test_parity_asfreq",
"pyspark.pandas.tests.connect.frame.test_parity_asof",
],
excluded_python_implementations=[
"PyPy" # Skip these tests under PyPy since they require numpy, pandas, and pyarrow and
Expand Down
2 changes: 2 additions & 0 deletions python/docs/source/user_guide/pandas_on_spark/options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ compute.isin_limit 80 'compute.isin_limit' set
'Column.isin(list)'. If the length of the ‘list’ is
above the limit, broadcast join is used instead for
better performance.
compute.pandas_fallback False 'compute.pandas_fallback' sets whether or not to
fallback automatically to Pandas' implementation.
plotting.max_rows 1000 'plotting.max_rows' sets the visual limit on top-n-
based plots such as `plot.bar` and `plot.pie`. If it
is set to 1000, the first 1000 data points will be
Expand Down
9 changes: 9 additions & 0 deletions python/pyspark/pandas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ def validate(self, v: Any) -> None:
"'compute.isin_limit' should be greater than or equal to 0.",
),
),
Option(
key="compute.pandas_fallback",
doc=(
"'compute.pandas_fallback' sets whether or not to fallback automatically "
"to Pandas' implementation."
),
default=False,
types=bool,
),
Option(
key="plotting.max_rows",
doc=(
Expand Down
43 changes: 43 additions & 0 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -13446,10 +13446,53 @@ def _index_normalized_frame(level: int, psser_or_psdf: DataFrameOrSeries) -> "Da

return psdf

def _fall_back_frame(self, method: str) -> Callable:
def _internal_fall_back_function(*inputs: Any, **kwargs: Any) -> "DataFrame":
log_advice(
f"`{method}` is executed in fallback mode. It loads partial data into the "
f"driver's memory to infer the schema, and loads all data into one executor's "
f"memory to compute. It should only be used if the pandas DataFrame is expected "
f"to be small."
)

input_df = self.copy()
index_names = input_df.index.names

sdf = input_df._internal.spark_frame
tmp_agg_column_name = verify_temp_column_name(
sdf, f"__tmp_aggregate_col_for_frame_{method}__"
)
input_df[tmp_agg_column_name] = 0

tmp_idx_column_name = verify_temp_column_name(
sdf, f"__tmp_index_col_for_frame_{method}__"
)
input_df[tmp_idx_column_name] = input_df.index

# TODO(SPARK-46859): specify the return type if possible
def compute_function(pdf: pd.DataFrame): # type: ignore[no-untyped-def]
pdf = pdf.drop(columns=[tmp_agg_column_name])
pdf = pdf.set_index(tmp_idx_column_name, drop=True)
pdf = pdf.sort_index()
pdf = getattr(pdf, method)(*inputs, **kwargs)
pdf[tmp_idx_column_name] = pdf.index
return pdf.reset_index(drop=True)

output_df = input_df.groupby(tmp_agg_column_name).apply(compute_function)
output_df = output_df.set_index(tmp_idx_column_name)
output_df.index.names = index_names

return output_df

return _internal_fall_back_function

def __getattr__(self, key: str) -> Any:
if key.startswith("__"):
raise AttributeError(key)
if hasattr(MissingPandasLikeDataFrame, key):
if key in ["asfreq", "asof"] and get_option("compute.pandas_fallback"):
return self._fall_back_frame(key)

property_or_func = getattr(MissingPandasLikeDataFrame, key)
if isinstance(property_or_func, property):
return property_or_func.fget(self)
Expand Down
41 changes: 41 additions & 0 deletions python/pyspark/pandas/tests/connect/frame/test_parity_asfreq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest

from pyspark.pandas.tests.frame.test_asfreq import AsFreqMixin
from pyspark.testing.connectutils import ReusedConnectTestCase
from pyspark.testing.pandasutils import PandasOnSparkTestUtils


class AsFreqParityTests(
AsFreqMixin,
PandasOnSparkTestUtils,
ReusedConnectTestCase,
):
pass


if __name__ == "__main__":
from pyspark.pandas.tests.connect.frame.test_parity_asfreq import * # noqa: F401

try:
import xmlrunner # type: ignore[import]

testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
41 changes: 41 additions & 0 deletions python/pyspark/pandas/tests/connect/frame/test_parity_asof.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest

from pyspark.pandas.tests.frame.test_asof import AsOfMixin
from pyspark.testing.connectutils import ReusedConnectTestCase
from pyspark.testing.pandasutils import PandasOnSparkTestUtils


class AsOfParityTests(
AsOfMixin,
PandasOnSparkTestUtils,
ReusedConnectTestCase,
):
pass


if __name__ == "__main__":
from pyspark.pandas.tests.connect.frame.test_parity_asof import * # noqa: F401

try:
import xmlrunner # type: ignore[import]

testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
87 changes: 87 additions & 0 deletions python/pyspark/pandas/tests/frame/test_asfreq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest

import pandas as pd

import pyspark.pandas as ps
from pyspark.pandas.exceptions import PandasNotImplementedError
from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils


class AsFreqMixin:
@property
def pdf(self):
index = pd.date_range("1/1/2000", periods=4, freq="min")
series = pd.Series([0.0, None, 2.0, 3.0], index=index)
return pd.DataFrame({"s": series})

@property
def psdf(self):
return ps.from_pandas(self.pdf)

def test_disabled(self):
with self.assertRaises(PandasNotImplementedError):
self.psdf.asfreq(freq="30s")

def test_fallback(self):
ps.set_option("compute.pandas_fallback", True)

self.assert_eq(self.pdf.asfreq(freq="30s"), self.psdf.asfreq(freq="30s"))
self.assert_eq(
self.pdf.asfreq(freq="30s", fill_value=9.0),
self.psdf.asfreq(freq="30s", fill_value=9.0),
)
self.assert_eq(
self.pdf.asfreq(freq="30s", method="bfill"),
self.psdf.asfreq(freq="30s", method="bfill"),
)

# test with schema infered from partial dataset, len(pdf)==4
ps.set_option("compute.shortcut_limit", 2)
self.assert_eq(self.pdf.asfreq(freq="30s"), self.psdf.asfreq(freq="30s"))
self.assert_eq(
self.pdf.asfreq(freq="30s", fill_value=9.0),
self.psdf.asfreq(freq="30s", fill_value=9.0),
)
self.assert_eq(
self.pdf.asfreq(freq="30s", method="bfill"),
self.psdf.asfreq(freq="30s", method="bfill"),
)

ps.reset_option("compute.shortcut_limit")
ps.reset_option("compute.pandas_fallback")


class AsFreqTests(
AsFreqMixin,
PandasOnSparkTestCase,
TestUtils,
):
pass


if __name__ == "__main__":
from pyspark.pandas.tests.frame.test_asfreq import * # noqa: F401

try:
import xmlrunner

testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
106 changes: 106 additions & 0 deletions python/pyspark/pandas/tests/frame/test_asof.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest

import pandas as pd

import pyspark.pandas as ps
from pyspark.pandas.exceptions import PandasNotImplementedError
from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils


class AsOfMixin:
@property
def pdf(self):
return pd.DataFrame(
{"a": [10.0, 20.0, 30.0, 40.0, 50.0], "b": [None, None, None, None, 500]},
index=pd.DatetimeIndex(
[
"2018-02-27 09:01:00",
"2018-02-27 09:02:00",
"2018-02-27 09:03:00",
"2018-02-27 09:04:00",
"2018-02-27 09:05:00",
]
),
)

@property
def psdf(self):
return ps.from_pandas(self.pdf)

def test_disabled(self):
with self.assertRaises(PandasNotImplementedError):
self.psdf.asof(pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 09:04:30"]))

def test_fallback(self):
ps.set_option("compute.pandas_fallback", True)

self.assert_eq(
self.pdf.asof(pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 09:04:30"])),
self.psdf.asof(pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 09:04:30"])),
)
self.assert_eq(
self.pdf.asof(
pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 09:04:30"]),
subset=["a"],
),
self.psdf.asof(
pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 09:04:30"]),
subset=["a"],
),
)

# test with schema infered from partial dataset, len(pdf)==5
ps.set_option("compute.shortcut_limit", 2)
self.assert_eq(
self.pdf.asof(pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 09:04:30"])),
self.psdf.asof(pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 09:04:30"])),
)
self.assert_eq(
self.pdf.asof(
pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 09:04:30"]),
subset=["a"],
),
self.psdf.asof(
pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 09:04:30"]),
subset=["a"],
),
)

ps.reset_option("compute.shortcut_limit")
ps.reset_option("compute.pandas_fallback")


class AsFreqTests(
AsOfMixin,
PandasOnSparkTestCase,
TestUtils,
):
pass


if __name__ == "__main__":
from pyspark.pandas.tests.frame.test_asof import * # noqa: F401

try:
import xmlrunner

testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)

0 comments on commit 8e1fa56

Please sign in to comment.