diff --git a/deselected_tests.yaml b/deselected_tests.yaml index 4b7b857d0a..3efe4325c0 100755 --- a/deselected_tests.yaml +++ b/deselected_tests.yaml @@ -357,12 +357,13 @@ deselected_tests: # Need to rework getting policy to correctly obtain it for method without data (finalize_fit) # and avoid keeping it in class attribute, also need to investigate how to implement # partial result serialization + - tests/test_common.py::test_estimators[IncrementalEmpiricalCovariance()-check_estimators_pickle] + - tests/test_common.py::test_estimators[IncrementalEmpiricalCovariance()-check_estimators_pickle(readonly_memmap=True)] - tests/test_common.py::test_estimators[IncrementalLinearRegression()-check_estimators_pickle] - tests/test_common.py::test_estimators[IncrementalLinearRegression()-check_estimators_pickle(readonly_memmap=True)] # There are not enough data to run onedal backend - tests/test_common.py::test_estimators[IncrementalLinearRegression()-check_fit2d_1sample] - # -------------------------------------------------------- # No need to test daal4py patching reduced_tests: diff --git a/onedal/covariance/incremental_covariance.py b/onedal/covariance/incremental_covariance.py index 3d9853ccae..d2737ce2b4 100644 --- a/onedal/covariance/incremental_covariance.py +++ b/onedal/covariance/incremental_covariance.py @@ -19,6 +19,7 @@ from onedal import _backend from ..datatypes import _convert_to_supported, from_table, to_table +from ..utils import _check_array from .covariance import BaseEmpiricalCovariance @@ -37,6 +38,12 @@ class IncrementalEmpiricalCovariance(BaseEmpiricalCovariance): If True biased estimation of covariance is computed which equals to the unbiased one multiplied by (n_samples - 1) / n_samples. + assume_centered : bool, default=False + If True, data are not centered before computation. + Useful when working with data whose mean is almost, but not exactly + zero. + If False (default), data are centered before computation. + Attributes ---------- location_ : ndarray of shape (n_features,) @@ -46,8 +53,11 @@ class IncrementalEmpiricalCovariance(BaseEmpiricalCovariance): Estimated covariance matrix """ - def __init__(self, method="dense", bias=False): - super().__init__(method, bias) + def __init__(self, method="dense", bias=False, assume_centered=False): + super().__init__(method, bias, assume_centered) + self._reset() + + def _reset(self): self._partial_result = self._get_backend( "covariance", None, "partial_compute_result" ) @@ -74,15 +84,16 @@ def partial_fit(self, X, y=None, queue=None): self : object Returns the instance itself. """ + X = _check_array(X, dtype=[np.float64, np.float32], ensure_2d=True) + if not hasattr(self, "_policy"): self._policy = self._get_policy(queue, X) + + X = _convert_to_supported(self._policy, X) + if not hasattr(self, "_dtype"): self._dtype = get_dtype(X) - X = make2d(X) - types = [np.float32, np.float64] - if get_dtype(X) not in types: - X = X.astype(np.float64) - X = _convert_to_supported(self._policy, X) + params = self._get_onedal_params(self._dtype) table_X = to_table(X) self._partial_result = self._get_backend( diff --git a/sklearnex/covariance/incremental_covariance.py b/sklearnex/covariance/incremental_covariance.py index e0b8411210..63b1316fc9 100644 --- a/sklearnex/covariance/incremental_covariance.py +++ b/sklearnex/covariance/incremental_covariance.py @@ -14,18 +14,33 @@ # limitations under the License. # =============================================================================== +import numbers +import warnings + import numpy as np +from scipy import linalg +from sklearn.base import BaseEstimator +from sklearn.covariance import EmpiricalCovariance as sklearn_EmpiricalCovariance from sklearn.utils import check_array, gen_batches from daal4py.sklearn._n_jobs_support import control_n_jobs +from daal4py.sklearn._utils import daal_check_version, sklearn_check_version from onedal._device_offload import support_usm_ndarray from onedal.covariance import ( IncrementalEmpiricalCovariance as onedal_IncrementalEmpiricalCovariance, ) +from sklearnex import config_context + +from .._device_offload import dispatch, wrap_output_data +from .._utils import PatchingConditionsChain, register_hyperparameters +from ..metrics import pairwise_distances +if sklearn_check_version("1.2"): + from sklearn.utils._param_validation import Interval -@control_n_jobs(decorated_methods=["partial_fit"]) -class IncrementalEmpiricalCovariance: + +@control_n_jobs(decorated_methods=["partial_fit", "fit", "_onedal_finalize_fit"]) +class IncrementalEmpiricalCovariance(BaseEstimator): """ Incremental estimator for covariance. Allows to compute empirical covariance estimated by maximum @@ -33,12 +48,25 @@ class IncrementalEmpiricalCovariance: Parameters ---------- + store_precision : bool, default=False + Specifies if the estimated precision is stored. + + assume_centered : bool, default=False + If True, data are not centered before computation. + Useful when working with data whose mean is almost, but not exactly + zero. + If False (default), data are centered before computation. + batch_size : int, default=None The number of samples to use for each batch. Only used when calling ``fit``. If ``batch_size`` is ``None``, then ``batch_size`` is inferred from the data and set to ``5 * n_features``, to provide a balance between approximation accuracy and memory consumption. + copy : bool, default=True + If False, X will be overwritten. ``copy=False`` can be used to + save memory but is unsafe for general use. + Attributes ---------- location_ : ndarray of shape (n_features,) @@ -46,44 +74,130 @@ class IncrementalEmpiricalCovariance: covariance_ : ndarray of shape (n_features, n_features) Estimated covariance matrix + + n_samples_seen_ : int + The number of samples processed by the estimator. Will be reset on + new calls to fit, but increments across ``partial_fit`` calls. + + batch_size_ : int + Inferred batch size from ``batch_size``. + + n_features_in_ : int + Number of features seen during :term:`fit` `partial_fit`. """ _onedal_incremental_covariance = staticmethod(onedal_IncrementalEmpiricalCovariance) - def __init__(self, batch_size=None): - self._need_to_finalize = False # If True then finalize compute should - # be called to obtain covariance_ or location_ from partial compute data + if sklearn_check_version("1.2"): + _parameter_constraints: dict = { + "store_precision": ["boolean"], + "assume_centered": ["boolean"], + "batch_size": [Interval(numbers.Integral, 1, None, closed="left"), None], + "copy": ["boolean"], + } + + get_precision = sklearn_EmpiricalCovariance.get_precision + error_norm = wrap_output_data(sklearn_EmpiricalCovariance.error_norm) + score = wrap_output_data(sklearn_EmpiricalCovariance.score) + + def __init__( + self, *, store_precision=False, assume_centered=False, batch_size=None, copy=True + ): + self.assume_centered = assume_centered + self.store_precision = store_precision self.batch_size = batch_size + self.copy = copy + + def _onedal_supported(self, method_name, *data): + patching_status = PatchingConditionsChain( + f"sklearn.covariance.{self.__class__.__name__}.{method_name}" + ) + return patching_status def _onedal_finalize_fit(self): assert hasattr(self, "_onedal_estimator") self._onedal_estimator.finalize_fit() self._need_to_finalize = False - def _onedal_partial_fit(self, X, queue): + if not daal_check_version((2024, "P", 400)) and self.assume_centered: + location = self._onedal_estimator.location_[None, :] + self._onedal_estimator.covariance_ += np.dot(location.T, location) + self._onedal_estimator.location_ = np.zeros_like(np.squeeze(location)) + if self.store_precision: + self.precision_ = linalg.pinvh( + self._onedal_estimator.covariance_, check_finite=False + ) + else: + self.precision_ = None + + @property + def covariance_(self): + if hasattr(self, "_onedal_estimator"): + if self._need_to_finalize: + self._onedal_finalize_fit() + return self._onedal_estimator.covariance_ + else: + raise AttributeError( + f"'{self.__class__.__name__}' object has no attribute 'covariance_'" + ) + + @property + def location_(self): + if hasattr(self, "_onedal_estimator"): + if self._need_to_finalize: + self._onedal_finalize_fit() + return self._onedal_estimator.location_ + else: + raise AttributeError( + f"'{self.__class__.__name__}' object has no attribute 'location_'" + ) + + def _onedal_partial_fit(self, X, queue=None, check_input=True): + + first_pass = not hasattr(self, "n_samples_seen_") or self.n_samples_seen_ == 0 + + # finite check occurs on onedal side + if check_input: + if sklearn_check_version("1.2"): + self._validate_params() + + if sklearn_check_version("1.0"): + X = self._validate_data( + X, + dtype=[np.float64, np.float32], + reset=first_pass, + copy=self.copy, + force_all_finite=False, + ) + else: + X = check_array( + X, + dtype=[np.float64, np.float32], + copy=self.copy, + force_all_finite=False, + ) + onedal_params = { "method": "dense", "bias": True, + "assume_centered": self.assume_centered, } if not hasattr(self, "_onedal_estimator"): self._onedal_estimator = self._onedal_incremental_covariance(**onedal_params) - self._onedal_estimator.partial_fit(X, queue) - self._need_to_finalize = True + try: + if first_pass: + self.n_samples_seen_ = X.shape[0] + self.n_features_in_ = X.shape[1] + else: + self.n_samples_seen_ += X.shape[0] - @property - def covariance_(self): - if self._need_to_finalize: - self._onedal_finalize_fit() - return self._onedal_estimator.covariance_ + self._onedal_estimator.partial_fit(X, queue) + finally: + self._need_to_finalize = True - @property - def location_(self): - if self._need_to_finalize: - self._onedal_finalize_fit() - return self._onedal_estimator.location_ + return self - @support_usm_ndarray() - def partial_fit(self, X, queue=None): + def partial_fit(self, X, y=None, check_input=True): """ Incremental fit with X. All of X is processed as a single batch. @@ -93,16 +207,29 @@ def partial_fit(self, X, queue=None): Training data, where `n_samples` is the number of samples and `n_features` is the number of features. + y : Ignored + Not used, present for API consistency by convention. + + check_input : bool, default=True + Run check_array on X. + Returns ------- self : object Returns the instance itself. """ - X = check_array(X, dtype=[np.float64, np.float32]) - self._onedal_partial_fit(X, queue) - return self + return dispatch( + self, + "partial_fit", + { + "onedal": self.__class__._onedal_partial_fit, + "sklearn": None, + }, + X, + check_input=check_input, + ) - def fit(self, X, queue=None): + def fit(self, X, y=None): """ Fit the model with X, using minibatches of size batch_size. @@ -112,19 +239,79 @@ def fit(self, X, queue=None): Training data, where `n_samples` is the number of samples and `n_features` is the number of features. + y : Ignored + Not used, present for API consistency by convention. + Returns ------- self : object Returns the instance itself. """ - n_samples, n_features = X.shape - if self.batch_size is None: - batch_size_ = 5 * n_features + + return dispatch( + self, + "fit", + { + "onedal": self.__class__._onedal_fit, + "sklearn": None, + }, + X, + ) + + def _onedal_fit(self, X, queue=None): + self.n_samples_seen_ = 0 + if hasattr(self, "_onedal_estimator"): + self._onedal_estimator._reset() + + if sklearn_check_version("1.2"): + self._validate_params() + + # finite check occurs on onedal side + if sklearn_check_version("1.0"): + X = self._validate_data( + X, dtype=[np.float64, np.float32], copy=self.copy, force_all_finite=False + ) else: - batch_size_ = self.batch_size - for batch in gen_batches(n_samples, batch_size_): + X = check_array( + X, dtype=[np.float64, np.float32], copy=self.copy, force_all_finite=False + ) + self.n_features_in_ = X.shape[1] + + self.batch_size_ = self.batch_size if self.batch_size else 5 * self.n_features_in_ + + if X.shape[0] == 1: + warnings.warn( + "Only one sample available. You may want to reshape your data array" + ) + + for batch in gen_batches(X.shape[0], self.batch_size_): X_batch = X[batch] - self.partial_fit(X_batch, queue=queue) + self._onedal_partial_fit(X_batch, queue=queue, check_input=False) self._onedal_finalize_fit() + return self + + # expose sklearnex pairwise_distances if mahalanobis distance eventually supported + @wrap_output_data + def mahalanobis(self, X): + if sklearn_check_version("1.0"): + self._validate_data(X, reset=False, copy=self.copy) + else: + check_array(X, copy=self.copy) + + precision = self.get_precision() + with config_context(assume_finite=True): + # compute mahalanobis distances + dist = pairwise_distances( + X, self.location_[np.newaxis, :], metric="mahalanobis", VI=precision + ) + + return np.reshape(dist, (len(X),)) ** 2 + + _onedal_cpu_supported = _onedal_supported + _onedal_gpu_supported = _onedal_supported + + mahalanobis.__doc__ = sklearn_EmpiricalCovariance.mahalanobis.__doc__ + error_norm.__doc__ = sklearn_EmpiricalCovariance.error_norm.__doc__ + score.__doc__ = sklearn_EmpiricalCovariance.score.__doc__ diff --git a/sklearnex/covariance/tests/test_incremental_covariance.py b/sklearnex/covariance/tests/test_incremental_covariance.py index 9eb05f07d9..8adf61034e 100644 --- a/sklearnex/covariance/tests/test_incremental_covariance.py +++ b/sklearnex/covariance/tests/test_incremental_covariance.py @@ -17,6 +17,10 @@ import numpy as np import pytest from numpy.testing import assert_allclose +from sklearn.covariance.tests.test_covariance import ( + test_covariance, + test_EmpiricalCovariance_validates_mahalanobis, +) from onedal.tests.utils._dataframes_support import ( _convert_to_dataframe, @@ -26,13 +30,14 @@ @pytest.mark.parametrize("dataframe,queue", get_dataframes_and_queues()) @pytest.mark.parametrize("dtype", [np.float32, np.float64]) -def test_sklearnex_partial_fit_on_gold_data(dataframe, queue, dtype): +@pytest.mark.parametrize("assume_centered", [True, False]) +def test_sklearnex_partial_fit_on_gold_data(dataframe, queue, dtype, assume_centered): from sklearnex.covariance import IncrementalEmpiricalCovariance X = np.array([[0, 1], [0, 1]]) X = X.astype(dtype) X_split = np.array_split(X, 2) - inccov = IncrementalEmpiricalCovariance() + inccov = IncrementalEmpiricalCovariance(assume_centered=assume_centered) for i in range(2): X_split_df = _convert_to_dataframe( @@ -40,8 +45,12 @@ def test_sklearnex_partial_fit_on_gold_data(dataframe, queue, dtype): ) result = inccov.partial_fit(X_split_df) - expected_covariance = np.array([[0, 0], [0, 0]]) - expected_means = np.array([0, 1]) + if assume_centered: + expected_covariance = np.array([[0, 0], [0, 1]]) + expected_means = np.array([0, 0]) + else: + expected_covariance = np.array([[0, 0], [0, 0]]) + expected_means = np.array([0, 1]) assert_allclose(expected_covariance, result.covariance_) assert_allclose(expected_means, result.location_) @@ -49,7 +58,7 @@ def test_sklearnex_partial_fit_on_gold_data(dataframe, queue, dtype): X = np.array([[1, 2], [3, 6]]) X = X.astype(dtype) X_split = np.array_split(X, 2) - inccov = IncrementalEmpiricalCovariance() + inccov = IncrementalEmpiricalCovariance(assume_centered=assume_centered) for i in range(2): X_split_df = _convert_to_dataframe( @@ -57,8 +66,12 @@ def test_sklearnex_partial_fit_on_gold_data(dataframe, queue, dtype): ) result = inccov.partial_fit(X_split_df) - expected_covariance = np.array([[1, 2], [2, 4]]) - expected_means = np.array([2, 4]) + if assume_centered: + expected_covariance = np.array([[5, 10], [10, 20]]) + expected_means = np.array([0, 0]) + else: + expected_covariance = np.array([[1, 2], [2, 4]]) + expected_means = np.array([2, 4]) assert_allclose(expected_covariance, result.covariance_) assert_allclose(expected_means, result.location_) @@ -87,9 +100,9 @@ def test_sklearnex_fit_on_gold_data(dataframe, queue, batch_size, dtype): @pytest.mark.parametrize("dataframe,queue", get_dataframes_and_queues()) -@pytest.mark.parametrize("num_batches", [2, 4, 6, 8, 10]) -@pytest.mark.parametrize("row_count", [100, 1000, 2000]) -@pytest.mark.parametrize("column_count", [10, 100, 200]) +@pytest.mark.parametrize("num_batches", [2, 10]) +@pytest.mark.parametrize("row_count", [100, 1000]) +@pytest.mark.parametrize("column_count", [10, 100]) @pytest.mark.parametrize("dtype", [np.float32, np.float64]) def test_sklearnex_partial_fit_on_random_data( dataframe, queue, num_batches, row_count, column_count, dtype @@ -117,12 +130,13 @@ def test_sklearnex_partial_fit_on_random_data( @pytest.mark.parametrize("dataframe,queue", get_dataframes_and_queues()) -@pytest.mark.parametrize("num_batches", [2, 4, 6, 8, 10]) -@pytest.mark.parametrize("row_count", [100, 1000, 2000]) -@pytest.mark.parametrize("column_count", [10, 100, 200]) +@pytest.mark.parametrize("num_batches", [2, 10]) +@pytest.mark.parametrize("row_count", [100, 1000]) +@pytest.mark.parametrize("column_count", [10, 100]) @pytest.mark.parametrize("dtype", [np.float32, np.float64]) +@pytest.mark.parametrize("assume_centered", [True, False]) def test_sklearnex_fit_on_random_data( - dataframe, queue, num_batches, row_count, column_count, dtype + dataframe, queue, num_batches, row_count, column_count, dtype, assume_centered ): from sklearnex.covariance import IncrementalEmpiricalCovariance @@ -132,12 +146,35 @@ def test_sklearnex_fit_on_random_data( X = X.astype(dtype) X_df = _convert_to_dataframe(X, sycl_queue=queue, target_df=dataframe) batch_size = row_count // num_batches - inccov = IncrementalEmpiricalCovariance(batch_size=batch_size) + inccov = IncrementalEmpiricalCovariance( + batch_size=batch_size, assume_centered=assume_centered + ) result = inccov.fit(X_df) - expected_covariance = np.cov(X.T, bias=1) - expected_means = np.mean(X, axis=0) + if assume_centered: + expected_covariance = np.dot(X.T, X) / X.shape[0] + expected_means = np.zeros_like(X[0]) + else: + expected_covariance = np.cov(X.T, bias=1) + expected_means = np.mean(X, axis=0) assert_allclose(expected_covariance, result.covariance_, atol=1e-6) assert_allclose(expected_means, result.location_, atol=1e-6) + + +# Monkeypatch IncrementalEmpiricalCovariance into relevant sklearn.covariance tests +@pytest.mark.allow_sklearn_fallback +@pytest.mark.parametrize( + "sklearn_test", + [ + test_covariance, + test_EmpiricalCovariance_validates_mahalanobis, + ], +) +def test_IncrementalEmpiricalCovariance_against_sklearn(monkeypatch, sklearn_test): + from sklearnex.covariance import IncrementalEmpiricalCovariance + + class_name = ".".join([sklearn_test.__module__, "EmpiricalCovariance"]) + monkeypatch.setattr(class_name, IncrementalEmpiricalCovariance) + sklearn_test() diff --git a/sklearnex/dispatcher.py b/sklearnex/dispatcher.py index 3df5aa2bec..f649ef7fd2 100644 --- a/sklearnex/dispatcher.py +++ b/sklearnex/dispatcher.py @@ -93,6 +93,7 @@ def get_patch_map_core(preview=False): # Scikit-learn* modules import sklearn as base_module import sklearn.cluster as cluster_module + import sklearn.covariance as covariance_module import sklearn.decomposition as decomposition_module import sklearn.ensemble as ensemble_module import sklearn.linear_model as linear_model_module @@ -115,6 +116,9 @@ def get_patch_map_core(preview=False): from .utils.parallel import _FuncWrapperOld as _FuncWrapper_sklearnex from .cluster import DBSCAN as DBSCAN_sklearnex + from .covariance import ( + IncrementalEmpiricalCovariance as IncrementalEmpiricalCovariance_sklearnex, + ) from .decomposition import PCA as PCA_sklearnex from .ensemble import ExtraTreesClassifier as ExtraTreesClassifier_sklearnex from .ensemble import ExtraTreesRegressor as ExtraTreesRegressor_sklearnex @@ -276,6 +280,18 @@ def get_patch_map_core(preview=False): ] mapping["localoutlierfactor"] = mapping["lof"] + # IncrementalEmpiricalCovariance + mapping["incrementalempiricalcovariance"] = [ + [ + ( + covariance_module, + "IncrementalEmpiricalCovariance", + IncrementalEmpiricalCovariance_sklearnex, + ), + None, + ] + ] + # IncrementalLinearRegression mapping["incrementallinearregression"] = [ [ diff --git a/sklearnex/tests/test_memory_usage.py b/sklearnex/tests/test_memory_usage.py index f9d51369e1..005fecc346 100644 --- a/sklearnex/tests/test_memory_usage.py +++ b/sklearnex/tests/test_memory_usage.py @@ -97,6 +97,7 @@ def remove_duplicated_estimators(estimators_list): BANNED_ESTIMATORS = ( + "IncrementalEmpiricalCovariance", # dataframe_f issues "IncrementalLinearRegression", # TODO fix memory leak issue in private CI for data_shape = (1000, 100), data_transform_function = dataframe_f "TSNE", # too slow for using in testing on common data size )