Skip to content

Commit

Permalink
FEAT-#5230: Support external query compiler and IO (#5231)
Browse files Browse the repository at this point in the history
Signed-off-by: Devin Petersohn <devin.petersohn@gmail.com>
Co-authored-by: Vasily Litvinov <fam1ly.n4me@yandex.ru>
  • Loading branch information
devin-petersohn and vnlitvinov authored Nov 17, 2022
1 parent ae6901c commit fd776a5
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 31 deletions.
15 changes: 14 additions & 1 deletion modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
from packaging import version
import secrets

from pandas.util._decorators import doc # type: ignore[attr-defined]

from .pubsub import Parameter, _TYPE_PARAMS, ExactStr, ValueSource
from typing import Optional
from typing import Any, Optional


class EnvironmentVariable(Parameter, type=str, abstract=True):
Expand Down Expand Up @@ -77,6 +79,10 @@ class Engine(EnvironmentVariable, type=str):
varname = "MODIN_ENGINE"
choices = ("Ray", "Dask", "Python", "Native")

NOINIT_ENGINES = {
"Python",
} # engines that don't require initialization, useful for unit tests

@classmethod
def _get_default(cls) -> str:
"""
Expand Down Expand Up @@ -135,6 +141,13 @@ def _get_default(cls) -> str:
"Please refer to installation documentation page to install an engine"
)

@classmethod
@doc(Parameter.add_option.__doc__)
def add_option(cls, choice: Any) -> Any:
choice = super().add_option(choice)
cls.NOINIT_ENGINES.add(choice)
return choice


class StorageFormat(EnvironmentVariable, type=str):
"""Engine to run on a single node of distribution."""
Expand Down
28 changes: 26 additions & 2 deletions modin/config/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from collections import defaultdict
from enum import IntEnum
from typing import Any, Callable, DefaultDict, NamedTuple, Optional, Sequence
from typing import Any, Callable, DefaultDict, NamedTuple, Optional, Tuple


class TypeDescriptor(NamedTuple):
Expand Down Expand Up @@ -136,7 +136,7 @@ class Parameter(object):
``ValueSource``.
"""

choices: Optional[Sequence[str]] = None
choices: Optional[Tuple[str, ...]] = None
type = str
default: Optional[Any] = None
is_abstract = True
Expand Down Expand Up @@ -340,5 +340,29 @@ def _check_callbacks(cls, oldvalue: Any) -> None:
for callback in cls._once.pop(cls.get(), ()):
callback(cls)

@classmethod
def add_option(cls, choice: Any) -> Any:
"""
Add a new choice for the parameter.
Parameters
----------
choice : Any
New choice to add to the available choices.
Returns
-------
Any
Added choice normalized according to the parameter type.
"""
if cls.choices is not None:
if not _TYPE_PARAMS[cls.type].verify(choice):
raise ValueError(f"Unsupported choice value: {choice}")
choice = _TYPE_PARAMS[cls.type].normalize(choice)
if choice not in cls.choices:
cls.choices += (choice,)
return choice
raise TypeError("Cannot add a choice to a parameter where choices is None")


__all__ = ["Parameter"]
10 changes: 4 additions & 6 deletions modin/core/execution/dispatching/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,16 @@ def _update_factory(cls, _):
# allow missing factories in experimenal mode only
if hasattr(factories, "Experimental" + factory_name):
msg = (
"{0} on {1} is only accessible through the experimental API.\nRun "
+ "`import modin.experimental.pandas as pd` to use {0} on {1}."
"{0} is only accessible through the experimental API.\nRun "
+ "`import modin.experimental.pandas as pd` to use {0}."
)
else:
msg = (
"Cannot find a factory for partition '{}' and execution engine '{}'. "
"Cannot find factory {}. "
+ "Potential reason might be incorrect environment variable value for "
+ f"{StorageFormat.varname} or {Engine.varname}"
)
raise FactoryNotFoundError(
msg.format(StorageFormat.get(), Engine.get())
)
raise FactoryNotFoundError(msg.format(factory_name))
cls.__factory = StubFactory.set_failing_name(factory_name)
else:
cls.__factory.prepare()
Expand Down
2 changes: 1 addition & 1 deletion modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class NotRealFactory(Exception):

@doc(_doc_abstract_factory_class, role="")
class BaseFactory(object):
io_cls: BaseIO = None # The module where the I/O functionality exists.
io_cls: typing.Type[BaseIO] = None # The module where the I/O functionality exists.

@classmethod
def get_info(cls) -> FactoryInfo:
Expand Down
65 changes: 51 additions & 14 deletions modin/core/execution/dispatching/factories/test/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,42 @@
# governing permissions and limitations under the License.

import pytest
from contextlib import contextmanager

from modin.config import Engine, StorageFormat
from modin.config import Parameter, Engine, StorageFormat
from modin import set_execution

from modin.core.execution.dispatching.factories.dispatcher import (
FactoryDispatcher,
FactoryNotFoundError,
)
from modin.core.execution.dispatching.factories import factories
from modin.core.execution.python.implementations.pandas_on_python.io import (
PandasOnPythonIO,
)
from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler

import modin.pandas as pd


@contextmanager
def _switch_execution(engine: str, storage_format: str):
old_engine, old_storage = set_execution(engine, storage_format)
try:
yield
finally:
set_execution(old_engine, old_storage)


@contextmanager
def _switch_value(config: Parameter, value: str):
old_value = config.get()
try:
yield config.put(value)
finally:
config.put(old_value)


class PandasOnTestFactory(factories.BaseFactory):
"""
Stub factory to ensure we can switch execution engine to 'Test'
Expand Down Expand Up @@ -70,7 +93,7 @@ def prepare(cls):
factories.FooOnBarFactory = FooOnBarFactory

# register them as known "no init" engines for modin.pandas
pd._NOINIT_ENGINES |= {"Test", "Bar"}
Engine.NOINIT_ENGINES |= {"Test", "Bar"}


def test_default_factory():
Expand All @@ -79,23 +102,37 @@ def test_default_factory():


def test_factory_switch():
Engine.put("Test")
assert FactoryDispatcher.get_factory() == PandasOnTestFactory
assert FactoryDispatcher.get_factory().io_cls == "Foo"
Engine.put("Python") # revert engine to default
with _switch_execution("Python", "Pandas"):
with _switch_value(Engine, "Test"):
assert FactoryDispatcher.get_factory() == PandasOnTestFactory
assert FactoryDispatcher.get_factory().io_cls == "Foo"

StorageFormat.put("Test")
assert FactoryDispatcher.get_factory() == TestOnPythonFactory
assert FactoryDispatcher.get_factory().io_cls == "Bar"
StorageFormat.put("Pandas") # revert engine to default
with _switch_value(StorageFormat, "Test"):
assert FactoryDispatcher.get_factory() == TestOnPythonFactory
assert FactoryDispatcher.get_factory().io_cls == "Bar"


def test_engine_wrong_factory():
with pytest.raises(FactoryNotFoundError):
Engine.put("BadEngine")
Engine.put("Python") # revert engine to default
with _switch_value(Engine, "BadEngine"):
pass


def test_set_execution():
set_execution("Bar", "Foo")
assert FactoryDispatcher.get_factory() == FooOnBarFactory
with _switch_execution("Bar", "Foo"):
assert FactoryDispatcher.get_factory() == FooOnBarFactory


def test_add_option():
class DifferentlyNamedFactory(factories.BaseFactory):
@classmethod
def prepare(cls):
cls.io_cls = PandasOnPythonIO

factories.StorageOnExecFactory = DifferentlyNamedFactory
StorageFormat.add_option("sToragE")
Engine.add_option("Exec")

with _switch_execution("Exec", "Storage"):
df = pd.DataFrame([[1, 2, 3], [3, 4, 5], [5, 6, 7]])
assert isinstance(df._query_compiler, PandasQueryCompiler)
11 changes: 4 additions & 7 deletions modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,13 @@
)
import os

from modin.config import Engine, Parameter
from modin.config import Parameter

_is_first_update = {}
_NOINIT_ENGINES = {
"Python",
} # engines that don't require initialization, useful for unit tests


def _update_engine(publisher: Parameter):
from modin.config import StorageFormat, CpuCount
from modin.config import Engine, StorageFormat, CpuCount
from modin.config.envvars import IsExperimental
from modin.config.pubsub import ValueSource

Expand Down Expand Up @@ -196,7 +193,7 @@ def init_remote_ray(partition):
), f"Storage format should be 'Hdk' with 'Cloudnative' engine, but provided {sfmt}."
get_connection().modules["modin"].set_execution("Native", "Hdk")

elif publisher.get() not in _NOINIT_ENGINES:
elif publisher.get() not in Engine.NOINIT_ENGINES:
raise ImportError("Unrecognized execution engine: {}.".format(publisher.get()))

_is_first_update[publisher.get()] = False
Expand Down Expand Up @@ -381,4 +378,4 @@ def init_remote_ray(partition):
)
del PandasCompatVersion

del pandas, Engine, Parameter
del pandas, Parameter

0 comments on commit fd776a5

Please sign in to comment.