Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#4564: Workaround import issues in Ray: auto-import pandas on python start if env var is set #4603

Merged
merged 18 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.16.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Key Features and Updates
* FIX-#4577: Set attribute of Modin dataframe to updated value (#4588)
* FIX-#4411: Fix binary_op between datetime64 Series and pandas timedelta (#4592)
* FIX-#4582: Inherit custom log layer (#4583)
* FIX-#4564: Workaround import issues in Ray: auto-import pandas on python start if env var is set (#4603)
* Performance enhancements
* PERF-#4182: Add cell-wise execution for binary ops, fix bin ops for empty dataframes (#4391)
* Benchmarking enhancements
Expand Down
7 changes: 1 addition & 6 deletions docs/requirements-doc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ pyyaml
recommonmark
sphinx
sphinx-click
# Pin ray to < 1.13.0 to work around GH#4564
# TODO(https://github.com/modin-project/modin/issues/4564): let ray go past 1.13.0.
ray[default]>=1.4.0,<1.13.0
# Following https://github.com/ray-project/ray/pull/25648, pin protobuf < 4,
# because versions >= 4.0.0 are incompatible with ray<1.13.0.
protobuf<4.0.0
ray[default]>=1.4.0
git+https://github.com/modin-project/modin.git@master#egg=modin[all]
sphinxcontrib_plantuml
sphinx-issues
Expand Down
7 changes: 1 addition & 6 deletions environment-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,7 @@ dependencies:
- modin-spreadsheet>=0.1.1
- tqdm
- git+https://github.com/airspeed-velocity/asv.git@ef016e233cb9a0b19d517135104f49e0a3c380e9
# Pin ray to < 1.13.0 to work around GH#4564
# TODO(https://github.com/modin-project/modin/issues/4564): let ray go past 1.13.0.
- ray[default]>=1.4.0,<1.13.0
# Following https://github.com/ray-project/ray/pull/25648, pin protobuf < 4,
# because versions >= 4.0.0 are incompatible with ray<1.13.0.
- protobuf<4.0.0
- ray[default]>=1.4.0
- connectorx>=0.2.6a4
# TODO: remove when resolving GH#4398
- redis>=3.5.0,<4.0.0
Expand Down
1 change: 1 addition & 0 deletions modin-autoimport-pandas.pth
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import os; os.environ.get("__MODIN_AUTOIMPORT_PANDAS__", None) and __import__("pandas")
8 changes: 2 additions & 6 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def _get_default(cls):
"""
from modin.utils import (
MIN_RAY_VERSION,
MAX_RAY_VERSION_EXCLUSIVE,
MIN_DASK_VERSION,
)

Expand All @@ -95,14 +94,11 @@ def _get_default(cls):
except ImportError:
pass
else:
if (
version.parse(ray.__version__) < MIN_RAY_VERSION
or version.parse(ray.__version__) >= MAX_RAY_VERSION_EXCLUSIVE
):
if version.parse(ray.__version__) < MIN_RAY_VERSION:
raise ImportError(
"Please `pip install modin[ray]` to install compatible Ray "
+ "version "
+ f"(>={MIN_RAY_VERSION},<{MAX_RAY_VERSION_EXCLUSIVE})."
+ f"(>={MIN_RAY_VERSION})."
)
return "Ray"
try:
Expand Down
78 changes: 15 additions & 63 deletions modin/core/execution/ray/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
NPartitions,
ValueSource,
)
from modin.error_message import ErrorMessage

ObjectIDType = ray.ObjectRef
if version.parse(ray.__version__) >= version.parse("1.2.0"):
Expand All @@ -40,60 +41,6 @@
ObjectIDType = (ray.ObjectRef, ClientObjectRef)


def _move_stdlib_ahead_of_site_packages(*args):
"""
Ensure packages from stdlib have higher import priority than from site-packages.

Parameters
----------
*args : tuple
Ignored, added for compatibility with Ray.

Notes
-----
This function is expected to be run on all workers including the driver.
This is a hack solution to fix GH-#647, GH-#746.
"""
site_packages_path = None
site_packages_path_index = -1
for i, path in enumerate(sys.path):
if sys.exec_prefix in path and path.endswith("site-packages"):
site_packages_path = path
site_packages_path_index = i
# break on first found
break

if site_packages_path is not None:
# stdlib packages layout as follows:
# - python3.x
# - typing.py
# - site-packages/
# - pandas
# So extracting the dirname of the site_packages can point us
# to the directory containing standard libraries.
sys.path.insert(site_packages_path_index, os.path.dirname(site_packages_path))


def _import_pandas(*args):
"""
Import pandas to make sure all its machinery is ready.

This prevents a race condition between two threads deserializing functions
and trying to import pandas at the same time.

Parameters
----------
*args : tuple
Ignored, added for compatibility with Ray.

Notes
-----
This function is expected to be run on all workers before any
serialization or deserialization starts.
"""
import pandas # noqa F401


def initialize_ray(
override_is_cluster=False,
override_redis_address: str = None,
Expand All @@ -117,6 +64,7 @@ def initialize_ray(
What password to use when connecting to Redis.
If not specified, ``modin.config.RayRedisPassword`` is used.
"""
extra_init_kw = {"runtime_env": {"env_vars": {"__MODIN_AUTOIMPORT_PANDAS__": "1"}}}
if not ray.is_initialized() or override_is_cluster:
cluster = override_is_cluster or IsRayCluster.get()
redis_address = override_redis_address or RayRedisAddress.get()
Expand All @@ -138,17 +86,16 @@ def initialize_ray(
include_dashboard=False,
ignore_reinit_error=True,
_redis_password=redis_password,
**extra_init_kw,
Copy link
Contributor

@prutskov prutskov Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check this case? I can't find that extra parameters could be provided in case of existing cluster https://github.com/ray-project/ray/blob/master/python/ray/_private/worker.py#L1400-L1412

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't, but apparently extra ones would just get ignored, and, if in the future we'll add more arguments than runtime_env it would be useful.

As for the runtime environment being different, there is a code later on checking the variables. It should give a warning to the user.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't, but apparently extra ones would just get ignored, and, if in the future we'll add more arguments than runtime_env it would be useful.

Can you elaborate why it would be useful?

As for the runtime environment being different, there is a code later on checking the variables. It should give a warning to the user.

What the code do you mean?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate why it would be useful?

This creates one central place to add more arguments to ray.init() instead of copy-pasting them in several different places.

What the code do you mean?

else: # ray is already initialized, check runtime env config
env_vars = ray.get_runtime_context().runtime_env.get("env_vars", {})
for varname, varvalue in extra_init_kw["runtime_env"]["env_vars"].items():
if str(env_vars.get(varname, "")) != str(varvalue):
ErrorMessage.single_warning(
"If initialising Ray yourself, please ensure its runtime env "
+ f"sets environment variable {varname} to {varvalue}"
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't hit this branch in case of cluster init.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh?.. we either call ray.init() or hit this else: branch.

We hit ray.init() at line 84 (under if cluster:) or at line 158 (in else branch of that if cluster).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just wondering if we should check the runtime environment after ray.init() in case Modin itself initializes Ray? Because of this.

Did you check this case? I can't find that extra parameters could be provided in case of existing cluster https://github.com/ray-project/ray/blob/master/python/ray/_private/worker.py#L1400-L1412

I hadn't, but apparently extra ones would just get ignored

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point!

)
else:
from modin.error_message import ErrorMessage

# This string is intentionally formatted this way. We want it indented in
# the warning message.
ErrorMessage.not_initialized(
"Ray",
"""
f"""
import ray
ray.init()
ray.init({', '.join([f'{k}={v}' for k,v in extra_init_kw.items()])})
YarShev marked this conversation as resolved.
Show resolved Hide resolved
""",
)
object_store_memory = Memory.get()
Expand Down Expand Up @@ -206,6 +153,7 @@ def initialize_ray(
"object_store_memory": object_store_memory,
"_redis_password": redis_password,
"_memory": object_store_memory,
**extra_init_kw,
}
ray.init(**ray_init_kwargs)

Expand All @@ -219,11 +167,15 @@ def initialize_ray(
if not GPU_MANAGERS:
for i in range(GpuCount.get()):
GPU_MANAGERS.append(GPUManager.remote(i))
_move_stdlib_ahead_of_site_packages()
ray.worker.global_worker.run_function_on_all_workers(
_move_stdlib_ahead_of_site_packages
)
ray.worker.global_worker.run_function_on_all_workers(_import_pandas)
else: # ray is already initialized, check runtime env config
env_vars = ray.get_runtime_context().runtime_env.get("env_vars", {})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only checks that the head nodes env vars are set correctly, right? Don't we need to set the environment variable on all of the workers/nodes, and if so, shouldn't we be checking that the env vars are set correctly on all of the workers? We could probably do that using the ray function that runs on all workers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, .runtime_env is the thing which tells Ray which environment variables to set for its workers. It should set these variables for the workers automatically.

for varname, varvalue in extra_init_kw["runtime_env"]["env_vars"].items():
if str(env_vars.get(varname, "")) != str(varvalue):
ErrorMessage.single_warning(
"If initialising Ray yourself, please ensure its runtime env "
vnlitvinov marked this conversation as resolved.
Show resolved Hide resolved
+ f"sets environment variable {varname} to {varvalue}"
vnlitvinov marked this conversation as resolved.
Show resolved Hide resolved
)

num_cpus = int(ray.cluster_resources()["CPU"])
num_gpus = int(ray.cluster_resources().get("GPU", 0))
if StorageFormat.get() == "Cudf":
Expand Down
3 changes: 0 additions & 3 deletions modin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@
from modin._version import get_versions

MIN_RAY_VERSION = version.parse("1.4.0")
# TODO(https://github.com/modin-project/modin/issues/4564):
# once ray can go past 1.13.0, remove this constant and stop checking it.
MAX_RAY_VERSION_EXCLUSIVE = version.parse("1.13.0")
MIN_DASK_VERSION = version.parse("2.22.0")

PANDAS_API_URL_TEMPLATE = f"https://pandas.pydata.org/pandas-docs/version/{pandas.__version__}/reference/api/{{}}.html"
Expand Down
7 changes: 1 addition & 6 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,7 @@ numpy>=1.18.5
pyarrow>=4.0.1
dask[complete]>=2.22.0,<2022.2.0
distributed>=2.22.0,<2022.2.0
# Pin ray to < 1.13.0 to work around GH#4564
# TODO(https://github.com/modin-project/modin/issues/4564): let ray go past 1.13.0.
ray[default]>=1.4.0,<1.13.0
# Following https://github.com/ray-project/ray/pull/25648, pin protobuf < 4,
# because versions >= 4.0.0 are incompatible with ray<1.13.0.
protobuf<4.0.0
ray[default]>=1.4.0
redis>=3.5.0,<4.0.0
psutil
fsspec
Expand Down
32 changes: 24 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,42 @@
long_description = fh.read()

dask_deps = ["dask>=2.22.0,<2022.2.0", "distributed>=2.22.0,<2022.2.0"]
# TODO: remove redis dependency when resolving GH#4398
# Pin ray to < 1.13.0 to work around GH#4564
# TODO(https://github.com/modin-project/modin/issues/4564): let ray go past 1.13.0.
# Following https://github.com/ray-project/ray/pull/25648, pin protobuf < 4,
# because versions >= 4.0.0 are incompatible with ray<1.13.0.
ray_deps = [
"ray[default]>=1.4.0,<1.13.0",
"ray[default]>=1.4.0",
"pyarrow>=4.0.1",
"redis>=3.5.0,<4.0.0",
"protobuf<4.0.0",
]
remote_deps = ["rpyc==4.1.5", "cloudpickle", "boto3"]
spreadsheet_deps = ["modin-spreadsheet>=0.1.0"]
sql_deps = ["dfsql>=0.4.2", "pyparsing<=2.4.7"]
all_deps = dask_deps + ray_deps + remote_deps + spreadsheet_deps

# Distribute 'modin-autoimport-pandas.pth' along with binary and source distributions.
# This file provides the "import pandas before Ray init" feature if specific
# environment variable is set (see https://github.com/modin-project/modin/issues/4564).
cmdclass = versioneer.get_cmdclass()


class AddPthFileBuild(cmdclass["build_py"]):
def _get_data_files(self):
return (super()._get_data_files() or []) + [
(".", ".", self.build_lib, ["modin-autoimport-pandas.pth"])
vnlitvinov marked this conversation as resolved.
Show resolved Hide resolved
]


class AddPthFileSDist(cmdclass["sdist"]):
def make_distribution(self):
self.filelist.append("modin-autoimport-pandas.pth")
return super().make_distribution()


cmdclass["build_py"] = AddPthFileBuild
cmdclass["sdist"] = AddPthFileSDist

setup(
name="modin",
version=versioneer.get_version(),
cmdclass=versioneer.get_cmdclass(),
cmdclass=cmdclass,
description="Modin: Make your pandas code run faster by changing one line of code.",
packages=find_packages(exclude=["scripts", "scripts.*"]),
include_package_data=True,
Expand Down