Skip to content

Commit

Permalink
FIX-#3605: Address comments related to new project structure (#3606)
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Yaroslav <yaroslav.igoshev@intel.com>
  • Loading branch information
YarShev authored Nov 8, 2021
1 parent 8acad95 commit 7ab2d90
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
"from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler\n",
"```\n",
"\n",
"The `PandasQueryCompiler` is responsible for defining and compiling the queries that can be operated on by Modin, and is specific to the pandas storage format. Any queries defined here must also both be compatible with and result in a `pandas.DataFrame`. Many functionalities are very simply implemented, as you can see in the current code: [Link](https://github.com/modin-project/modin/blob/f15fb8ea776ed039893130b1e85053e875912d4b/modin/core/storage_formats/pandas/query_compiler.py#L365).\n",
"The `PandasQueryCompiler` is responsible for defining and compiling the queries that can be operated on by Modin, and is specific to the pandas storage format. Any queries defined here must also both be compatible with and result in a `pandas.DataFrame`. Many functionalities are very simply implemented, as you can see in the current code: [Link](https://github.com/modin-project/modin/blob/7a8158873e77cb5f1a5a3b89be4ddac89f576269/modin/core/storage_formats/pandas/query_compiler.py#L216).\n",
"\n",
"If we want to register a new function, we next to understand what kind of function it is. In our example, we will use `kurtosis`, which is a reduction. So we next want to import the function type so we can use it in our definition:\n",
"\n",
Expand Down
16 changes: 8 additions & 8 deletions modin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,29 @@ def custom_formatwarning(msg, category, *args, **kwargs):
)


def set_execution(engine=None, partition=None):
def set_execution(engine=None, storage_format=None):
"""
Method to set the _pair_ of execution engine and partition format simultaneously.
Method to set the _pair_ of execution engine and storage format format simultaneously.
This is needed because there might be cases where switching one by one would be
impossible, as not all pairs of values are meaningful.
The method returns pair of old values, so it is easy to return back.
"""
from .config import Engine, StorageFormat

old_engine, old_partition = None, None
old_engine, old_storage_format = None, None
# defer callbacks until both entities are set
if engine is not None:
old_engine = Engine._put_nocallback(engine)
if partition is not None:
old_partition = StorageFormat._put_nocallback(partition)
if storage_format is not None:
old_storage_format = StorageFormat._put_nocallback(storage_format)
# execute callbacks if something was changed
if old_engine is not None:
Engine._check_callbacks(old_engine)
if old_partition is not None:
StorageFormat._check_callbacks(old_partition)
if old_storage_format is not None:
StorageFormat._check_callbacks(old_storage_format)

return old_engine, old_partition
return old_engine, old_storage_format


__version__ = get_versions()["version"]
Expand Down
4 changes: 2 additions & 2 deletions modin/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def prepare(cls):

def set_base_execution(name=BASE_EXECUTION_NAME):
setattr(factories, f"{name}Factory", BaseOnPythonFactory)
modin.set_execution(engine="python", partition=name.split("On")[0])
modin.set_execution(engine="python", storage_format=name.split("On")[0])


def pytest_configure(config):
Expand All @@ -267,7 +267,7 @@ def pytest_configure(config):
set_base_execution(BASE_EXECUTION_NAME)
else:
partition, engine = execution.split("On")
modin.set_execution(engine=engine, partition=partition)
modin.set_execution(engine=engine, storage_format=partition)


def pytest_runtest_call(item):
Expand Down
161 changes: 161 additions & 0 deletions modin/core/dataframe/base/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""Base class of an axis partition for a Modin Dataframe."""

from abc import ABC


class BaseDataframeAxisPartition(ABC): # pragma: no cover
"""
An abstract class that represents the parent class for any axis partition class.
This class is intended to simplify the way that operations are performed.
"""

def apply(
self,
func,
num_splits=None,
other_axis_partition=None,
maintain_partitioning=True,
**kwargs,
):
"""
Apply a function to this axis partition along full axis.
Parameters
----------
func : callable
The function to apply. This will be preprocessed according to
the corresponding `BaseDataframePartition` objects.
num_splits : int, default: None
The number of times to split the result object.
other_axis_partition : BaseDataframeAxisPartition, default: None
Another `BaseDataframeAxisPartition` object to be applied
to func. This is for operations that are between two data sets.
maintain_partitioning : bool, default: True
Whether to keep the partitioning in the same
orientation as it was previously or not. This is important because we may be
operating on an individual axis partition and not touching the rest.
In this case, we have to return the partitioning to its previous
orientation (the lengths will remain the same). This is ignored between
two axis partitions.
**kwargs : dict
Additional keywords arguments to be passed in `func`.
Returns
-------
list
A list of `BaseDataframePartition` objects.
Notes
-----
The procedures that invoke this method assume full axis
knowledge. Implement this method accordingly.
You must return a list of `BaseDataframePartition` objects from this method.
"""
pass

def shuffle(self, func, lengths, **kwargs):
"""
Shuffle the order of the data in this axis partition based on the `lengths`.
Parameters
----------
func : callable
The function to apply before splitting.
lengths : list
The list of partition lengths to split the result into.
**kwargs : dict
Additional keywords arguments to be passed in `func`.
Returns
-------
list
A list of `BaseDataframePartition` objects split by `lengths`.
"""
pass

# Child classes must have these in order to correctly subclass.
instance_type = None
partition_type = None

def _wrap_partitions(self, partitions):
"""
Wrap remote partition objects with `BaseDataframePartition` class.
Parameters
----------
partitions : list
List of remotes partition objects to be wrapped with `BaseDataframePartition` class.
Returns
-------
list
List of wrapped remote partition objects.
"""
return [self.partition_type(obj) for obj in partitions]

def force_materialization(self, get_ip=False):
"""
Materialize axis partitions into a single partition.
Parameters
----------
get_ip : bool, default: False
Whether to get node ip address to a single partition or not.
Returns
-------
BaseDataframeAxisPartition
An axis partition containing only a single materialized partition.
"""
materialized = self.apply(
lambda x: x, num_splits=1, maintain_partitioning=False
)
return type(self)(materialized, get_ip=get_ip)

def unwrap(self, squeeze=False, get_ip=False):
"""
Unwrap partitions from this axis partition.
Parameters
----------
squeeze : bool, default: False
Flag used to unwrap only one partition.
get_ip : bool, default: False
Whether to get node ip address to each partition or not.
Returns
-------
list
List of partitions from this axis partition.
Notes
-----
If `get_ip=True`, a list of tuples of Ray.ObjectRef/Dask.Future to node ip addresses and
unwrapped partitions, respectively, is returned if Ray/Dask is used as an engine
(i.e. [(Ray.ObjectRef/Dask.Future, Ray.ObjectRef/Dask.Future), ...]).
"""
if squeeze and len(self.list_of_blocks) == 1:
if get_ip:
return self.list_of_ips[0], self.list_of_blocks[0]
else:
return self.list_of_blocks[0]
else:
if get_ip:
return list(zip(self.list_of_ips, self.list_of_blocks))
else:
return self.list_of_blocks
149 changes: 3 additions & 146 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,155 +13,12 @@

"""The module defines base interface for an axis partition of a Modin DataFrame."""

from abc import ABC
import pandas
import numpy as np
from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas


class BaseDataframeAxisPartition(ABC): # pragma: no cover
"""
An abstract class that represents the parent class for any axis partition class.
This class is intended to simplify the way that operations are performed.
"""

def apply(
self,
func,
num_splits=None,
other_axis_partition=None,
maintain_partitioning=True,
**kwargs,
):
"""
Apply a function to this axis partition along full axis.
Parameters
----------
func : callable
The function to apply. This will be preprocessed according to
the corresponding `PandasDataframePartition` objects.
num_splits : int, default: None
The number of times to split the result object.
other_axis_partition : BaseDataframeAxisPartition, default: None
Another `BaseDataframeAxisPartition` object to be applied
to func. This is for operations that are between two data sets.
maintain_partitioning : bool, default: True
Whether to keep the partitioning in the same
orientation as it was previously or not. This is important because we may be
operating on an individual axis partition and not touching the rest.
In this case, we have to return the partitioning to its previous
orientation (the lengths will remain the same). This is ignored between
two axis partitions.
**kwargs : dict
Additional keywords arguments to be passed in `func`.
Returns
-------
list
A list of `PandasDataframePartition` objects.
Notes
-----
The procedures that invoke this method assume full axis
knowledge. Implement this method accordingly.
You must return a list of `PandasDataframePartition` objects from this method.
"""
pass

def shuffle(self, func, lengths, **kwargs):
"""
Shuffle the order of the data in this axis partition based on the `lengths`.
Parameters
----------
func : callable
The function to apply before splitting.
lengths : list
The list of partition lengths to split the result into.
**kwargs : dict
Additional keywords arguments to be passed in `func`.
Returns
-------
list
A list of `PandasDataframePartition` objects split by `lengths`.
"""
pass

# Child classes must have these in order to correctly subclass.
instance_type = None
partition_type = None

def _wrap_partitions(self, partitions):
"""
Wrap remote partition objects with `PandasDataframePartition` class.
Parameters
----------
partitions : list
List of remotes partition objects to be wrapped with `PandasDataframePartition` class.
Returns
-------
list
List of wrapped remote partition objects.
"""
return [self.partition_type(obj) for obj in partitions]

def force_materialization(self, get_ip=False):
"""
Materialize axis partitions into a single partition.
Parameters
----------
get_ip : bool, default: False
Whether to get node ip address to a single partition or not.
Returns
-------
BaseDataframeAxisPartition
An axis partition containing only a single materialized partition.
"""
materialized = self.apply(
lambda x: x, num_splits=1, maintain_partitioning=False
)
return type(self)(materialized, get_ip=get_ip)

def unwrap(self, squeeze=False, get_ip=False):
"""
Unwrap partitions from this axis partition.
Parameters
----------
squeeze : bool, default: False
Flag used to unwrap only one partition.
get_ip : bool, default: False
Whether to get node ip address to each partition or not.
Returns
-------
list
List of partitions from this axis partition.
Notes
-----
If `get_ip=True`, a list of tuples of Ray.ObjectRef/Dask.Future to node ip addresses and
unwrapped partitions, respectively, is returned if Ray/Dask is used as an engine
(i.e. [(Ray.ObjectRef/Dask.Future, Ray.ObjectRef/Dask.Future), ...]).
"""
if squeeze and len(self.list_of_blocks) == 1:
if get_ip:
return self.list_of_ips[0], self.list_of_blocks[0]
else:
return self.list_of_blocks[0]
else:
if get_ip:
return list(zip(self.list_of_ips, self.list_of_blocks))
else:
return self.list_of_blocks
from modin.core.dataframe.base.partitioning.axis_partition import (
BaseDataframeAxisPartition,
)


class PandasDataframeAxisPartition(BaseDataframeAxisPartition):
Expand Down
Loading

0 comments on commit 7ab2d90

Please sign in to comment.