Skip to content

Commit

Permalink
[enhancement] refactor onedal to_table (#2151)
Browse files Browse the repository at this point in the history
* carryover from #2126

* formatting

* Update test_memory_usage.py

* Update _data_conversion.py

* Update _data_conversion.py

* Update _data_conversion.py

* Update test_data.py

* Update onedal/datatypes/_data_conversion.py

Co-authored-by: Victoriya Fedotova <viktoria.nn@gmail.com>

* Update _data_conversion.py

* Update _data_conversion.py

* Update onedal/datatypes/_data_conversion.py

Co-authored-by: Samir Nasibli <samir.nasibli@intel.com>

* Update _data_conversion.py

* Update _data_conversion.py

---------

Co-authored-by: Victoriya Fedotova <viktoria.nn@gmail.com>
Co-authored-by: Samir Nasibli <samir.nasibli@intel.com>
  • Loading branch information
3 people authored Nov 11, 2024
1 parent 4eaea5b commit e2d7add
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 56 deletions.
90 changes: 49 additions & 41 deletions onedal/datatypes/_data_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,67 @@

import numpy as np

from daal4py.sklearn._utils import make2d
from onedal import _backend, _is_dpc_backend

from ..utils import _is_csr


def _apply_and_pass(func, *args, **kwargs):
if len(args) == 1:
return func(args[0], **kwargs)
return tuple(map(lambda arg: func(arg, **kwargs), args))


if _is_dpc_backend:
def _convert_one_to_table(arg):
# All inputs for table conversion must be array-like or sparse, not scalars
return _backend.to_table(np.atleast_2d(arg) if np.isscalar(arg) else arg)


def to_table(*args):
"""Create oneDAL tables from scalars and/or arrays.
Note: this implementation can be used with contiguous scipy.sparse, numpy
ndarrays, DPCTL/DPNP usm_ndarrays and scalars. Tables will use pointers to the
original array data. Scalars will be copies. Arrays may be modified in-
place by oneDAL during computation. This works for data located on CPU and
SYCL-enabled Intel GPUs. Each array may only be of a single datatype (i.e.
each must be homogeneous).
Parameters
----------
*args : {scalar, numpy array, sycl_usm_ndarray, csr_matrix, or csr_array}
arg1, arg2... The arrays should be given as arguments.
from ..utils._dpep_helpers import dpctl_available, dpnp_available
Returns
-------
tables: {oneDAL homogeneous tables}
"""
return _apply_and_pass(_convert_one_to_table, *args)

if dpctl_available:
import dpctl.tensor as dpt

if dpnp_available:
if _is_dpc_backend:

try:
# try/catch is used here instead of dpep_helpers because
# of circular import issues of _data_conversion.py and
# utils/validation.py. This is a temporary fix until the
# issue with dpnp is addressed, at which point this can
# be removed entirely.
import dpnp

def _table_to_array(table, xp=None):
# By default DPNP ndarray created with a copy.
# TODO:
# investigate why dpnp.array(table, copy=False) doesn't work.
# Work around with using dpctl.tensor.asarray.
if xp == dpnp:
return dpnp.array(dpnp.dpctl.tensor.asarray(table), copy=False)
else:
return xp.asarray(table)

except ImportError:

def _table_to_array(table, xp=None):
return xp.asarray(table)

from ..common._policy import _HostInteropPolicy

def _convert_to_supported(policy, *data):
Expand Down Expand Up @@ -87,26 +126,9 @@ def convert_one_from_table(table, sycl_queue=None, sua_iface=None, xp=None):
_backend.from_table(table), usm_type="device", sycl_queue=sycl_queue
)
else:
xp_name = xp.__name__
if dpnp_available and xp_name == "dpnp":
# By default DPNP ndarray created with a copy.
# TODO:
# investigate why dpnp.array(table, copy=False) doesn't work.
# Work around with using dpctl.tensor.asarray.
return dpnp.array(dpt.asarray(table), copy=False)
else:
return xp.asarray(table)
return _backend.from_table(table)

def convert_one_to_table(arg, sua_iface=None):
# Note: currently only oneDAL homogen tables are supported and the
# contiuginity of the input array should be checked in advance.
if sua_iface:
return _backend.sua_iface_to_table(arg)
return _table_to_array(table, xp=xp)

if not _is_csr(arg):
arg = make2d(arg)
return _backend.to_table(arg)
return _backend.from_table(table)

else:

Expand All @@ -125,22 +147,8 @@ def convert_one_from_table(table, sycl_queue=None, sua_iface=None, xp=None):
)
return _backend.from_table(table)

def convert_one_to_table(arg, sua_iface=None):
if sua_iface:
raise RuntimeError(
"SYCL usm array conversion to table requires the DPC backend"
)

if not _is_csr(arg):
arg = make2d(arg)
return _backend.to_table(arg)


def from_table(*args, sycl_queue=None, sua_iface=None, xp=None):
return _apply_and_pass(
convert_one_from_table, *args, sycl_queue=sycl_queue, sua_iface=sua_iface, xp=xp
)


def to_table(*args, sua_iface=None):
return _apply_and_pass(convert_one_to_table, *args, sua_iface=sua_iface)
12 changes: 6 additions & 6 deletions onedal/datatypes/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ ONEDAL_PY_INIT_MODULE(table) {
#endif // ONEDAL_DATA_PARALLEL

m.def("to_table", [](py::object obj) {
#ifdef ONEDAL_DATA_PARALLEL
if (py::hasattr(obj, "__sycl_usm_array_interface__")) {
return convert_from_sua_iface(obj);
}
#endif // ONEDAL_DATA_PARALLEL

auto* obj_ptr = obj.ptr();
return convert_to_table(obj_ptr);
});
Expand All @@ -86,12 +92,6 @@ ONEDAL_PY_INIT_MODULE(table) {
auto* obj_ptr = convert_to_pyobject(t);
return obj_ptr;
});

#ifdef ONEDAL_DATA_PARALLEL
m.def("sua_iface_to_table", [](py::object obj) {
return convert_from_sua_iface(obj);
});
#endif // ONEDAL_DATA_PARALLEL
}

} // namespace oneapi::dal::python
12 changes: 6 additions & 6 deletions onedal/datatypes/tests/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def fit(self, X, y=None):
X = xp.astype(X, dtype=xp.float64)
dtype = get_dtype(X)
params = bs_DBSCAN._get_onedal_params(dtype)
X_table = to_table(X, sua_iface=sua_iface)
X_table = to_table(X)
# TODO:
# check other candidates for the dummy base oneDAL func.
# oneDAL backend func is needed to check result table checks.
Expand Down Expand Up @@ -251,7 +251,7 @@ def test_input_sua_iface_zero_copy(dataframe, queue, order, dtype):

sua_iface, X_dp_namespace, _ = _get_sycl_namespace(X_dp)

X_table = to_table(X_dp, sua_iface=sua_iface)
X_table = to_table(X_dp)
_assert_sua_iface_fields(X_dp, X_table)

X_dp_from_table = from_table(
Expand Down Expand Up @@ -339,7 +339,7 @@ def test_sua_iface_interop_invalid_shape(dataframe, queue, data_shape):
"Unable to convert from SUA interface: only 1D & 2D tensors are allowed"
)
with pytest.raises(ValueError, match=expected_err_msg):
to_table(X, sua_iface=sua_iface)
to_table(X)


@pytest.mark.skipif(
Expand Down Expand Up @@ -368,7 +368,7 @@ def test_sua_iface_interop_unsupported_dtypes(dataframe, queue, dtype):

expected_err_msg = "Unable to convert from SUA interface: unknown data type"
with pytest.raises(ValueError, match=expected_err_msg):
to_table(X, sua_iface=sua_iface)
to_table(X)


@pytest.mark.parametrize(
Expand All @@ -393,7 +393,7 @@ def test_to_table_non_contiguous_input(dataframe, queue):
else:
expected_err_msg = "Numpy input Could not convert Python object to onedal table."
with pytest.raises(ValueError, match=expected_err_msg):
to_table(X, sua_iface=sua_iface)
to_table(X)


@pytest.mark.skipif(
Expand All @@ -411,4 +411,4 @@ def test_sua_iface_interop_if_no_dpc_backend(dataframe, queue, dtype):

expected_err_msg = "SYCL usm array conversion to table requires the DPC backend"
with pytest.raises(RuntimeError, match=expected_err_msg):
to_table(X, sua_iface=sua_iface)
to_table(X)
6 changes: 3 additions & 3 deletions sklearnex/tests/test_memory_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ class DummyEstimatorWithTableConversions(BaseEstimator):

def fit(self, X, y=None):
sua_iface, xp, _ = _get_sycl_namespace(X)
X_table = to_table(X, sua_iface=sua_iface)
y_table = to_table(y, sua_iface=sua_iface)
X_table = to_table(X)
y_table = to_table(y)
# The presence of the fitted attributes (ending with a trailing
# underscore) is required for the correct check. The cleanup of
# the memory will occur at the estimator instance deletion.
Expand All @@ -160,7 +160,7 @@ def predict(self, X):
# fitted attributes (ending with a trailing underscore).
check_is_fitted(self)
sua_iface, xp, _ = _get_sycl_namespace(X)
X_table = to_table(X, sua_iface=sua_iface)
X_table = to_table(X)
returned_X = from_table(
X_table, sua_iface=sua_iface, sycl_queue=X.sycl_queue, xp=xp
)
Expand Down

0 comments on commit e2d7add

Please sign in to comment.