Skip to content

Commit

Permalink
FIX-#5730: Add __repr__, __len__, size, and make dtype changing lazy. (
Browse files Browse the repository at this point in the history
…#5731)

Signed-off-by: Rehan Durrani <rehan@ponder.io>
  • Loading branch information
RehanSD authored Mar 7, 2023
1 parent db7103b commit d1c60b3
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 38 deletions.
114 changes: 80 additions & 34 deletions modin/numpy/arr.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from math import prod
import numpy
import pandas
from pandas.core.dtypes.common import is_list_like, is_numeric_dtype, is_bool_dtype
from pandas.api.types import is_scalar
from inspect import signature
Expand All @@ -26,6 +27,7 @@
Reduce,
Binary,
)
from modin.config import StorageFormat

from .utils import try_convert_from_interoperable_type

Expand Down Expand Up @@ -100,17 +102,6 @@ def fix_dtypes_and_determine_return(
return result


def find_common_dtype(dtypes):
if len(dtypes) == 1:
return dtypes[0]
elif len(dtypes) == 2:
return numpy.promote_types(*dtypes)
midpoint = len(dtypes) // 2
return numpy.promote_types(
find_common_dtype(dtypes[:midpoint]), find_common_dtype(dtypes[midpoint:])
)


class array(object):
"""
Modin distributed representation of ``numpy.array``.
Expand Down Expand Up @@ -143,8 +134,8 @@ def __init__(
if _query_compiler is not None:
self._query_compiler = _query_compiler
self._ndim = _ndim
new_dtype = find_common_dtype(
numpy.unique(self._query_compiler.dtypes.values)
new_dtype = pandas.core.dtypes.cast.find_common_type(
list(self._query_compiler.dtypes.values)
)
elif is_list_like(object) and not is_list_like(object[0]):
series = pd.Series(object)
Expand Down Expand Up @@ -178,17 +169,23 @@ def __init__(

self._query_compiler = pd.DataFrame(arr)._query_compiler
new_dtype = arr.dtype
# These two lines are necessary so that our query compiler does not keep track of indices
# and try to map like indices to like indices. (e.g. if we multiply two arrays that used
# to be dataframes, and the dataframes had the same column names but ordered differently
# we want to do a simple broadcast where we only consider position, as numpy would, rather
# than pair columns with the same name and multiply them.)
self._query_compiler = self._query_compiler.reset_index(drop=True)
self._query_compiler.columns = range(len(self._query_compiler.columns))
if StorageFormat.get() == "Pandas":
# These two lines are necessary so that our query compiler does not keep track of indices
# and try to map like indices to like indices. (e.g. if we multiply two arrays that used
# to be dataframes, and the dataframes had the same column names but ordered differently
# we want to do a simple broadcast where we only consider position, as numpy would, rather
# than pair columns with the same name and multiply them.)
self._query_compiler = self._query_compiler.reset_index(drop=True)
self._query_compiler.columns = range(len(self._query_compiler.columns))
new_dtype = new_dtype if dtype is None else dtype
self._query_compiler = self._query_compiler.astype(
{col_name: new_dtype for col_name in self._query_compiler.columns}
)
cols_with_wrong_dtype = self._query_compiler.dtypes != new_dtype
if cols_with_wrong_dtype.any():
self._query_compiler = self._query_compiler.astype(
{
col_name: new_dtype
for col_name in self._query_compiler.columns[cols_with_wrong_dtype]
}
)

def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
ufunc_name = ufunc.__name__
Expand Down Expand Up @@ -955,7 +952,7 @@ def __add__(
operand_dtype = (
self.dtype
if not isinstance(x2, array)
else find_common_dtype([self.dtype, x2.dtype])
else pandas.core.dtypes.cast.find_common_type([self.dtype, x2.dtype])
)
out_dtype = (
dtype
Expand Down Expand Up @@ -1005,7 +1002,7 @@ def divide(
operand_dtype = (
self.dtype
if not isinstance(x2, array)
else find_common_dtype([self.dtype, x2.dtype])
else pandas.core.dtypes.cast.find_common_type([self.dtype, x2.dtype])
)
out_dtype = (
dtype
Expand Down Expand Up @@ -1054,7 +1051,7 @@ def __rtruediv__(
operand_dtype = (
self.dtype
if not isinstance(x2, array)
else find_common_dtype([self.dtype, x2.dtype])
else pandas.core.dtypes.cast.find_common_type([self.dtype, x2.dtype])
)
out_dtype = (
dtype
Expand Down Expand Up @@ -1098,7 +1095,7 @@ def floor_divide(
operand_dtype = (
self.dtype
if not isinstance(x2, array)
else find_common_dtype([self.dtype, x2.dtype])
else pandas.core.dtypes.cast.find_common_type([self.dtype, x2.dtype])
)
out_dtype = (
dtype
Expand Down Expand Up @@ -1163,7 +1160,7 @@ def power(
operand_dtype = (
self.dtype
if not isinstance(x2, array)
else find_common_dtype([self.dtype, x2.dtype])
else pandas.core.dtypes.cast.find_common_type([self.dtype, x2.dtype])
)
out_dtype = (
dtype
Expand Down Expand Up @@ -1308,7 +1305,7 @@ def multiply(
operand_dtype = (
self.dtype
if not isinstance(x2, array)
else find_common_dtype([self.dtype, x2.dtype])
else pandas.core.dtypes.cast.find_common_type([self.dtype, x2.dtype])
)
out_dtype = (
dtype
Expand Down Expand Up @@ -1363,7 +1360,7 @@ def remainder(
operand_dtype = (
self.dtype
if not isinstance(x2, array)
else find_common_dtype([self.dtype, x2.dtype])
else pandas.core.dtypes.cast.find_common_type([self.dtype, x2.dtype])
)
out_dtype = (
dtype
Expand Down Expand Up @@ -1420,7 +1417,7 @@ def subtract(
operand_dtype = (
self.dtype
if not isinstance(x2, array)
else find_common_dtype([self.dtype, x2.dtype])
else pandas.core.dtypes.cast.find_common_type([self.dtype, x2.dtype])
)
out_dtype = (
dtype
Expand Down Expand Up @@ -1469,7 +1466,7 @@ def __rsub__(
operand_dtype = (
self.dtype
if not isinstance(x2, array)
else find_common_dtype([self.dtype, x2.dtype])
else pandas.core.dtypes.cast.find_common_type([self.dtype, x2.dtype])
)
out_dtype = (
dtype
Expand Down Expand Up @@ -1928,7 +1925,14 @@ def dtype(self):
if self._ndim == 1:
return dtype[0]
else:
return find_common_dtype(dtype.values)
return pandas.core.dtypes.cast.find_common_type(list(dtype.values))

@property
def size(self):
return prod(self.shape)

def __len__(self):
return self.shape[0]

def astype(self, dtype, order="K", casting="unsafe", subok=True, copy=True):
if casting != "unsafe":
Expand All @@ -1943,8 +1947,50 @@ def astype(self, dtype, order="K", casting="unsafe", subok=True, copy=True):
return self
return array(_query_compiler=result, _ndim=self._ndim)

def _build_repr_array(self):
def _generate_indices_for_axis(
axis_size, num_elements=numpy.get_printoptions()["edgeitems"]
):
if axis_size > num_elements * 2:
return list(range(num_elements + 1)) + list(
range(axis_size - num_elements, axis_size)
)
return list(range(axis_size))

# We want to rely on NumPy for creating a string representation of this array; however
# we also don't want to materialize all of the data to the head node. Instead, we will
# materialize enough data that NumPy can build the summarized representation of the array
# (while changing with the NumPy print options so it will format this smaller array as
# abridged) and return this smaller array. In the worst case, this array will have
# (2*numpy.get_printoptions()["edgeitems"] + 1)^2 items, so 49 items max for the default
# value of 3.
if self._ndim == 1 or self.shape[1] == 0:
idxs = _generate_indices_for_axis(len(self))
arr = self._query_compiler.getitem_row_array(idxs).to_numpy()
if self._ndim == 1:
arr = arr.flatten()
elif self.shape[0] == 1:
idxs = _generate_indices_for_axis(self.shape[1])
arr = self._query_compiler.getitem_column_array(idxs).to_numpy()
else:
row_idxs = _generate_indices_for_axis(len(self))
col_idxs = _generate_indices_for_axis(self.shape[1])
arr = self._query_compiler.take_2d_positional(row_idxs, col_idxs).to_numpy()
return arr

def __repr__(self):
return repr(self._to_numpy())
# If we are dealing with a small array, we can just collate all the data on the
# head node and let numpy handle the logic to get a string representation.
if self.size <= numpy.get_printoptions()["threshold"]:
return repr(self._to_numpy())
arr = self._build_repr_array()
prev_threshold = numpy.get_printoptions()["threshold"]
numpy.set_printoptions(threshold=arr.size - 1)
try:
repr_str = repr(arr)
finally:
numpy.set_printoptions(threshold=prev_threshold)
return repr_str

def _to_numpy(self):
arr = self._query_compiler.to_numpy()
Expand Down
26 changes: 24 additions & 2 deletions modin/numpy/test/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,30 @@
import modin.numpy as np


@pytest.mark.parametrize("size", [100, (2, 100), (100, 2), (1, 100), (100, 1)])
def test_repr(size):
@pytest.fixture
def change_numpy_print_threshold():
prev_threshold = numpy.get_printoptions()["threshold"]
numpy.set_printoptions(threshold=50)
yield prev_threshold
numpy.set_printoptions(threshold=prev_threshold)


@pytest.mark.parametrize(
"size",
[
100,
(2, 100),
(100, 2),
(1, 100),
(100, 1),
(100, 100),
(6, 100),
(100, 6),
(100, 7),
(7, 100),
],
)
def test_repr(size, change_numpy_print_threshold):
numpy_arr = numpy.random.randint(-100, 100, size=size)
modin_arr = np.array(numpy_arr)
assert repr(modin_arr) == repr(numpy_arr)
Expand Down
10 changes: 8 additions & 2 deletions modin/numpy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@

import modin.pandas as pd
import modin.numpy as np
from modin.config import StorageFormat

_INTEROPERABLE_TYPES = (pd.DataFrame, pd.Series)


def try_convert_from_interoperable_type(obj):
if isinstance(obj, _INTEROPERABLE_TYPES):
new_qc = obj._query_compiler.reset_index(drop=True)
new_qc.columns = range(len(new_qc.columns))
if StorageFormat.get() == "Pandas":
# If we are dealing with pandas partitions, we need to reset the index
# and replace column names in order to broadcast correctly.
new_qc = obj._query_compiler.reset_index(drop=True)
new_qc.columns = range(len(new_qc.columns))
else:
new_qc = obj._query_compiler
obj = np.array(
_query_compiler=new_qc,
_ndim=2 if isinstance(obj, pd.DataFrame) else 1,
Expand Down

0 comments on commit d1c60b3

Please sign in to comment.