diff --git a/ci/requirements/py36-min-all-deps.yml b/ci/requirements/py36-min-all-deps.yml index e7756172311..ea707461013 100644 --- a/ci/requirements/py36-min-all-deps.yml +++ b/ci/requirements/py36-min-all-deps.yml @@ -15,8 +15,8 @@ dependencies: - cfgrib=0.9 - cftime=1.0 - coveralls - - dask=1.2 - - distributed=1.27 + - dask=2.2 + - distributed=2.2 - flake8 - h5netcdf=0.7 - h5py=2.9 # Policy allows for 2.10, but it's a conflict-fest diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 67cb8e7125a..22088357e0b 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -21,6 +21,7 @@ v0.15.0 (unreleased) Breaking changes ~~~~~~~~~~~~~~~~ +- Bumped minimum ``dask`` version to 2.2. - Remove ``compat`` and ``encoding`` kwargs from ``DataArray``, which have been deprecated since 0.12. (:pull:`3650`). Instead, specify the encoding when writing to disk or set @@ -50,6 +51,8 @@ New Features - Added the ``count`` reduction method to both :py:class:`~core.rolling.DatasetCoarsen` and :py:class:`~core.rolling.DataArrayCoarsen` objects. (:pull:`3500`) By `Deepak Cherian `_ +- Add ``meta`` kwarg to :py:func:`~xarray.apply_ufunc`; this is passed on to + :py:meth:`dask.array.blockwise`. (:pull:`3660`) By `Deepak Cherian `_. - Add `attrs_file` option in :py:func:`~xarray.open_mfdataset` to choose the source file for global attributes in a multi-file dataset (:issue:`2382`, :pull:`3498`) by `Julien Seguinot _`. @@ -63,7 +66,9 @@ New Features Bug fixes ~~~~~~~~~ - +- Applying a user-defined function that adds new dimensions using :py:func:`apply_ufunc` + and ``vectorize=True`` now works with ``dask > 2.0``. (:issue:`3574`, :pull:`3660`). + By `Deepak Cherian `_. - Fix :py:meth:`xarray.combine_by_coords` to allow for combining incomplete hypercubes of Datasets (:issue:`3648`). By `Ian Bolliger `_. diff --git a/xarray/core/computation.py b/xarray/core/computation.py index eb9ca8c17fc..d2c5c32bc00 100644 --- a/xarray/core/computation.py +++ b/xarray/core/computation.py @@ -548,6 +548,7 @@ def apply_variable_ufunc( output_dtypes=None, output_sizes=None, keep_attrs=False, + meta=None, ): """Apply a ndarray level function over Variable and/or ndarray objects. """ @@ -590,6 +591,7 @@ def func(*arrays): signature, output_dtypes, output_sizes, + meta, ) elif dask == "allowed": @@ -648,7 +650,14 @@ def func(*arrays): def _apply_blockwise( - func, args, input_dims, output_dims, signature, output_dtypes, output_sizes=None + func, + args, + input_dims, + output_dims, + signature, + output_dtypes, + output_sizes=None, + meta=None, ): import dask.array @@ -720,6 +729,7 @@ def _apply_blockwise( dtype=dtype, concatenate=True, new_axes=output_sizes, + meta=meta, ) @@ -761,6 +771,7 @@ def apply_ufunc( dask: str = "forbidden", output_dtypes: Sequence = None, output_sizes: Mapping[Any, int] = None, + meta: Any = None, ) -> Any: """Apply a vectorized function for unlabeled arrays on xarray objects. @@ -857,6 +868,9 @@ def apply_ufunc( Optional mapping from dimension names to sizes for outputs. Only used if dask='parallelized' and new dimensions (not found on inputs) appear on outputs. + meta : optional + Size-0 object representing the type of array wrapped by dask array. Passed on to + ``dask.array.blockwise``. Returns ------- @@ -990,6 +1004,11 @@ def earth_mover_distance(first_samples, func = functools.partial(func, **kwargs) if vectorize: + if meta is None: + # set meta=np.ndarray by default for numpy vectorized functions + # work around dask bug computing meta with vectorized functions: GH5642 + meta = np.ndarray + if signature.all_core_dims: func = np.vectorize( func, otypes=output_dtypes, signature=signature.to_gufunc_string() @@ -1006,6 +1025,7 @@ def earth_mover_distance(first_samples, dask=dask, output_dtypes=output_dtypes, output_sizes=output_sizes, + meta=meta, ) if any(isinstance(a, GroupBy) for a in args): @@ -1020,6 +1040,7 @@ def earth_mover_distance(first_samples, dataset_fill_value=dataset_fill_value, keep_attrs=keep_attrs, dask=dask, + meta=meta, ) return apply_groupby_func(this_apply, *args) elif any(is_dict_like(a) for a in args): diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 0436ae9d244..bb77cbb94fe 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -37,7 +37,7 @@ from xarray.core import indexing from xarray.core.options import set_options from xarray.core.pycompat import dask_array_type -from xarray.tests import mock +from xarray.tests import LooseVersion, mock from . import ( arm_xfail, @@ -76,9 +76,14 @@ pass try: + import dask import dask.array as da + + dask_version = dask.__version__ except ImportError: - pass + # needed for xfailed tests when dask < 2.4.0 + # remove when min dask > 2.4.0 + dask_version = "10.0" ON_WINDOWS = sys.platform == "win32" @@ -1723,6 +1728,7 @@ def test_hidden_zarr_keys(self): with xr.decode_cf(store): pass + @pytest.mark.skipif(LooseVersion(dask_version) < "2.4", reason="dask GH5334") def test_write_persistence_modes(self): original = create_test_data() @@ -1787,6 +1793,7 @@ def test_encoding_kwarg_fixed_width_string(self): def test_dataset_caching(self): super().test_dataset_caching() + @pytest.mark.skipif(LooseVersion(dask_version) < "2.4", reason="dask GH5334") def test_append_write(self): ds, ds_to_append, _ = create_append_test_data() with self.create_zarr_target() as store_target: @@ -1863,6 +1870,7 @@ def test_check_encoding_is_consistent_after_append(self): xr.concat([ds, ds_to_append], dim="time"), ) + @pytest.mark.skipif(LooseVersion(dask_version) < "2.4", reason="dask GH5334") def test_append_with_new_variable(self): ds, ds_to_append, ds_with_new_var = create_append_test_data() diff --git a/xarray/tests/test_computation.py b/xarray/tests/test_computation.py index 2d373d12095..369903552ad 100644 --- a/xarray/tests/test_computation.py +++ b/xarray/tests/test_computation.py @@ -817,6 +817,24 @@ def test_vectorize_dask(): assert_identical(expected, actual) +@requires_dask +def test_vectorize_dask_new_output_dims(): + # regression test for GH3574 + data_array = xr.DataArray([[0, 1, 2], [1, 2, 3]], dims=("x", "y")) + func = lambda x: x[np.newaxis, ...] + expected = data_array.expand_dims("z") + actual = apply_ufunc( + func, + data_array.chunk({"x": 1}), + output_core_dims=[["z"]], + vectorize=True, + dask="parallelized", + output_dtypes=[float], + output_sizes={"z": 1}, + ).transpose(*expected.dims) + assert_identical(expected, actual) + + def test_output_wrong_number(): variable = xr.Variable("x", np.arange(10)) diff --git a/xarray/tests/test_sparse.py b/xarray/tests/test_sparse.py index a02fef2faeb..21a212c29b3 100644 --- a/xarray/tests/test_sparse.py +++ b/xarray/tests/test_sparse.py @@ -873,3 +873,16 @@ def test_dask_token(): t5 = dask.base.tokenize(ac + 1) assert t4 != t5 assert isinstance(ac.data._meta, sparse.COO) + + +@requires_dask +def test_apply_ufunc_meta_to_blockwise(): + da = xr.DataArray(np.zeros((2, 3)), dims=["x", "y"]).chunk({"x": 2, "y": 1}) + sparse_meta = sparse.COO.from_numpy(np.zeros((0, 0))) + + # if dask computed meta, it would be np.ndarray + expected = xr.apply_ufunc( + lambda x: x, da, dask="parallelized", output_dtypes=[da.dtype], meta=sparse_meta + ).data._meta + + assert_sparse_equal(expected, sparse_meta)