Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#4564: Workaround import issues in Ray: auto-import pandas on python start if env var is set #4603

Merged
merged 18 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions docs/getting_started/troubleshooting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -291,5 +291,41 @@ or
df = pd.DataFrame([0, 1, 2, 3])
print(df)

Spurious error "cannot import partially initialised pandas module" on custom Ray cluster
YarShev marked this conversation as resolved.
Show resolved Hide resolved
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

If you're using some pre-configured Ray cluster to run Modin, it's possible you would
be seeing spurious errors like

.. code-block::

ray.exceptions.RaySystemError: System error: partially initialized module 'pandas' has no attribute 'core' (most likely due to a circular import)
traceback: Traceback (most recent call last):
File "/usr/share/miniconda/envs/modin/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
obj = self._deserialize_object(data, metadata, object_ref)
File "/usr/share/miniconda/envs/modin/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
return self._deserialize_msgpack_data(data, metadata_fields)
File "/usr/share/miniconda/envs/modin/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
python_objects = self._deserialize_pickle5_data(pickle5_data)
File "/usr/share/miniconda/envs/modin/lib/python3.8/site-packages/ray/serialization.py", line 180, in _deserialize_pickle5_data
obj = pickle.loads(in_band, buffers=buffers)
File "/usr/share/miniconda/envs/modin/lib/python3.8/site-packages/pandas/__init__.py", line 135, in <module>
from pandas import api, arrays, errors, io, plotting, testing, tseries
File "/usr/share/miniconda/envs/modin/lib/python3.8/site-packages/pandas/testing.py", line 6, in <module>
from pandas._testing import (
File "/usr/share/miniconda/envs/modin/lib/python3.8/site-packages/pandas/_testing/__init__.py", line 979, in <module>
cython_table = pd.core.common._cython_table.items()
AttributeError: partially initialized module 'pandas' has no attribute 'core' (most likely due to a circular import)

**Solution**

Modin contains a workaround that should automatically do ``import pandas`` upon worker process starts.

It is triggered by the presence of non-empty ``__MODIN_AUTOIMPORT_PANDAS__`` environment variable which
Modin sets up automatically on the Ray clusters it spawns, but it might be missing on pre-configured clusters.

So if you're seeing the issue like shown above, please make sure you set this environment variable on all
worker nodes of your cluster before actually spawning the workers.

.. _issue: https://github.com/modin-project/modin/issues
.. _Slack: https://modin.org/slack.html
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.16.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Key Features and Updates
* FIX-#4639: Fix `storage_options` usage for `read_csv` and `read_csv_glob` (#4644)
* FIX-#4593: Ensure Modin warns when setting columns via attributes (#4621)
* FIX-#4584: Enable pdb debug when running cloud tests (#4585)
* FIX-#4564: Workaround import issues in Ray: auto-import pandas on python start if env var is set (#4603)
* Performance enhancements
* PERF-#4182: Add cell-wise execution for binary ops, fix bin ops for empty dataframes (#4391)
* PERF-#4288: Improve perf of `groupby.mean` for narrow data (#4591)
Expand Down
7 changes: 1 addition & 6 deletions docs/requirements-doc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ pyyaml
recommonmark
sphinx
sphinx-click
# Pin ray to < 1.13.0 to work around GH#4564
# TODO(https://github.com/modin-project/modin/issues/4564): let ray go past 1.13.0.
ray[default]>=1.4.0,<1.13.0
# Following https://github.com/ray-project/ray/pull/25648, pin protobuf < 4,
# because versions >= 4.0.0 are incompatible with ray<1.13.0.
protobuf<4.0.0
ray[default]>=1.4.0
git+https://github.com/modin-project/modin.git@master#egg=modin[all]
sphinxcontrib_plantuml
sphinx-issues
Expand Down
7 changes: 1 addition & 6 deletions environment-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@ dependencies:
- modin-spreadsheet>=0.1.1
- tqdm
- git+https://github.com/airspeed-velocity/asv.git@ef016e233cb9a0b19d517135104f49e0a3c380e9
# Pin ray to < 1.13.0 to work around GH#4564
# TODO(https://github.com/modin-project/modin/issues/4564): let ray go past 1.13.0.
- ray[default]>=1.4.0,<1.13.0
# Following https://github.com/ray-project/ray/pull/25648, pin protobuf < 4,
# because versions >= 4.0.0 are incompatible with ray<1.13.0.
- protobuf<4.0.0
- ray[default]>=1.4.0
- connectorx>=0.2.6a4
# TODO: remove when resolving GH#4398
- redis>=3.5.0,<4.0.0
Expand Down
1 change: 1 addition & 0 deletions modin-autoimport-pandas.pth
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import os; os.environ.get("__MODIN_AUTOIMPORT_PANDAS__", None) and __import__("pandas")
8 changes: 2 additions & 6 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def _get_default(cls):
"""
from modin.utils import (
MIN_RAY_VERSION,
MAX_RAY_VERSION_EXCLUSIVE,
MIN_DASK_VERSION,
)

Expand All @@ -95,14 +94,11 @@ def _get_default(cls):
except ImportError:
pass
else:
if (
version.parse(ray.__version__) < MIN_RAY_VERSION
or version.parse(ray.__version__) >= MAX_RAY_VERSION_EXCLUSIVE
):
if version.parse(ray.__version__) < MIN_RAY_VERSION:
raise ImportError(
"Please `pip install modin[ray]` to install compatible Ray "
+ "version "
+ f"(>={MIN_RAY_VERSION},<{MAX_RAY_VERSION_EXCLUSIVE})."
+ f"(>={MIN_RAY_VERSION})."
)
return "Ray"
try:
Expand Down
80 changes: 17 additions & 63 deletions modin/core/execution/ray/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
NPartitions,
ValueSource,
)
from modin.error_message import ErrorMessage

ObjectIDType = ray.ObjectRef
if version.parse(ray.__version__) >= version.parse("1.2.0"):
Expand All @@ -40,60 +41,6 @@
ObjectIDType = (ray.ObjectRef, ClientObjectRef)


def _move_stdlib_ahead_of_site_packages(*args):
"""
Ensure packages from stdlib have higher import priority than from site-packages.

Parameters
----------
*args : tuple
Ignored, added for compatibility with Ray.

Notes
-----
This function is expected to be run on all workers including the driver.
This is a hack solution to fix GH-#647, GH-#746.
"""
site_packages_path = None
site_packages_path_index = -1
for i, path in enumerate(sys.path):
if sys.exec_prefix in path and path.endswith("site-packages"):
site_packages_path = path
site_packages_path_index = i
# break on first found
break

if site_packages_path is not None:
# stdlib packages layout as follows:
# - python3.x
# - typing.py
# - site-packages/
# - pandas
# So extracting the dirname of the site_packages can point us
# to the directory containing standard libraries.
sys.path.insert(site_packages_path_index, os.path.dirname(site_packages_path))


def _import_pandas(*args):
"""
Import pandas to make sure all its machinery is ready.

This prevents a race condition between two threads deserializing functions
and trying to import pandas at the same time.

Parameters
----------
*args : tuple
Ignored, added for compatibility with Ray.

Notes
-----
This function is expected to be run on all workers before any
serialization or deserialization starts.
"""
import pandas # noqa F401


def initialize_ray(
override_is_cluster=False,
override_redis_address: str = None,
Expand All @@ -117,6 +64,7 @@ def initialize_ray(
What password to use when connecting to Redis.
If not specified, ``modin.config.RayRedisPassword`` is used.
"""
extra_init_kw = {"runtime_env": {"env_vars": {"__MODIN_AUTOIMPORT_PANDAS__": "1"}}}
if not ray.is_initialized() or override_is_cluster:
cluster = override_is_cluster or IsRayCluster.get()
redis_address = override_redis_address or RayRedisAddress.get()
Expand All @@ -138,17 +86,16 @@ def initialize_ray(
include_dashboard=False,
ignore_reinit_error=True,
_redis_password=redis_password,
**extra_init_kw,
Copy link
Contributor

@prutskov prutskov Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check this case? I can't find that extra parameters could be provided in case of existing cluster https://github.com/ray-project/ray/blob/master/python/ray/_private/worker.py#L1400-L1412

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't, but apparently extra ones would just get ignored, and, if in the future we'll add more arguments than runtime_env it would be useful.

As for the runtime environment being different, there is a code later on checking the variables. It should give a warning to the user.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't, but apparently extra ones would just get ignored, and, if in the future we'll add more arguments than runtime_env it would be useful.

Can you elaborate why it would be useful?

As for the runtime environment being different, there is a code later on checking the variables. It should give a warning to the user.

What the code do you mean?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate why it would be useful?

This creates one central place to add more arguments to ray.init() instead of copy-pasting them in several different places.

What the code do you mean?

else: # ray is already initialized, check runtime env config
env_vars = ray.get_runtime_context().runtime_env.get("env_vars", {})
for varname, varvalue in extra_init_kw["runtime_env"]["env_vars"].items():
if str(env_vars.get(varname, "")) != str(varvalue):
ErrorMessage.single_warning(
"If initialising Ray yourself, please ensure its runtime env "
+ f"sets environment variable {varname} to {varvalue}"
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't hit this branch in case of cluster init.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh?.. we either call ray.init() or hit this else: branch.

We hit ray.init() at line 84 (under if cluster:) or at line 158 (in else branch of that if cluster).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just wondering if we should check the runtime environment after ray.init() in case Modin itself initializes Ray? Because of this.

Did you check this case? I can't find that extra parameters could be provided in case of existing cluster https://github.com/ray-project/ray/blob/master/python/ray/_private/worker.py#L1400-L1412

I hadn't, but apparently extra ones would just get ignored

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point!

)
else:
from modin.error_message import ErrorMessage

# This string is intentionally formatted this way. We want it indented in
# the warning message.
ErrorMessage.not_initialized(
"Ray",
"""
f"""
import ray
ray.init()
ray.init({', '.join([f'{k}={v}' for k,v in extra_init_kw.items()])})
YarShev marked this conversation as resolved.
Show resolved Hide resolved
""",
)
object_store_memory = Memory.get()
Expand Down Expand Up @@ -206,6 +153,7 @@ def initialize_ray(
"object_store_memory": object_store_memory,
"_redis_password": redis_password,
"_memory": object_store_memory,
**extra_init_kw,
}
ray.init(**ray_init_kwargs)

Expand All @@ -219,11 +167,17 @@ def initialize_ray(
if not GPU_MANAGERS:
for i in range(GpuCount.get()):
GPU_MANAGERS.append(GPUManager.remote(i))
_move_stdlib_ahead_of_site_packages()
ray.worker.global_worker.run_function_on_all_workers(
_move_stdlib_ahead_of_site_packages
)
ray.worker.global_worker.run_function_on_all_workers(_import_pandas)

# Now ray is initialized, check runtime env config - especially useful if we join
# an externally pre-configured cluster
env_vars = ray.get_runtime_context().runtime_env.get("env_vars", {})
for varname, varvalue in extra_init_kw["runtime_env"]["env_vars"].items():
if str(env_vars.get(varname, "")) != str(varvalue):
ErrorMessage.single_warning(
"When using a pre-initialized Ray cluster, please ensure that the runtime env "
+ f"sets environment variable {varname} to {varvalue}"
)

num_cpus = int(ray.cluster_resources()["CPU"])
num_gpus = int(ray.cluster_resources().get("GPU", 0))
if StorageFormat.get() == "Cudf":
Expand Down
3 changes: 0 additions & 3 deletions modin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@
from modin._version import get_versions

MIN_RAY_VERSION = version.parse("1.4.0")
# TODO(https://github.com/modin-project/modin/issues/4564):
# once ray can go past 1.13.0, remove this constant and stop checking it.
MAX_RAY_VERSION_EXCLUSIVE = version.parse("1.13.0")
MIN_DASK_VERSION = version.parse("2.22.0")

PANDAS_API_URL_TEMPLATE = f"https://pandas.pydata.org/pandas-docs/version/{pandas.__version__}/reference/api/{{}}.html"
Expand Down
7 changes: 1 addition & 6 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,7 @@ numpy>=1.18.5
pyarrow>=4.0.1
dask[complete]>=2.22.0,<2022.2.0
distributed>=2.22.0,<2022.2.0
# Pin ray to < 1.13.0 to work around GH#4564
# TODO(https://github.com/modin-project/modin/issues/4564): let ray go past 1.13.0.
ray[default]>=1.4.0,<1.13.0
# Following https://github.com/ray-project/ray/pull/25648, pin protobuf < 4,
# because versions >= 4.0.0 are incompatible with ray<1.13.0.
protobuf<4.0.0
ray[default]>=1.4.0
redis>=3.5.0,<4.0.0
psutil
fsspec
Expand Down
33 changes: 25 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,43 @@
long_description = fh.read()

dask_deps = ["dask>=2.22.0,<2022.2.0", "distributed>=2.22.0,<2022.2.0"]
# TODO: remove redis dependency when resolving GH#4398
# Pin ray to < 1.13.0 to work around GH#4564
# TODO(https://github.com/modin-project/modin/issues/4564): let ray go past 1.13.0.
# Following https://github.com/ray-project/ray/pull/25648, pin protobuf < 4,
# because versions >= 4.0.0 are incompatible with ray<1.13.0.
ray_deps = [
"ray[default]>=1.4.0,<1.13.0",
"ray[default]>=1.4.0",
"pyarrow>=4.0.1",
"redis>=3.5.0,<4.0.0",
"protobuf<4.0.0",
]
remote_deps = ["rpyc==4.1.5", "cloudpickle", "boto3"]
spreadsheet_deps = ["modin-spreadsheet>=0.1.0"]
sql_deps = ["dfsql>=0.4.2", "pyparsing<=2.4.7"]
all_deps = dask_deps + ray_deps + remote_deps + spreadsheet_deps

# Distribute 'modin-autoimport-pandas.pth' along with binary and source distributions.
# This file provides the "import pandas before Ray init" feature if specific
# environment variable is set (see https://github.com/modin-project/modin/issues/4564).
cmdclass = versioneer.get_cmdclass()
extra_files = ["modin-autoimport-pandas.pth"]


class AddPthFileBuild(cmdclass["build_py"]):
def _get_data_files(self):
return (super()._get_data_files() or []) + [
(".", ".", self.build_lib, extra_files)
]


class AddPthFileSDist(cmdclass["sdist"]):
def make_distribution(self):
self.filelist.extend(extra_files)
return super().make_distribution()


cmdclass["build_py"] = AddPthFileBuild
cmdclass["sdist"] = AddPthFileSDist

setup(
name="modin",
version=versioneer.get_version(),
cmdclass=versioneer.get_cmdclass(),
cmdclass=cmdclass,
description="Modin: Make your pandas code run faster by changing one line of code.",
packages=find_packages(exclude=["scripts", "scripts.*"]),
include_package_data=True,
Expand Down