Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/apply ufunc meta dtype #4022

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ Bug fixes
- Fix bug in time parsing failing to fall back to cftime. This was causing time
variables with a time unit of `'msecs'` to fail to parse. (:pull:`3998`)
By `Ryan May <https://github.com/dopplershift>`_.
- Ensure ``output_dtypes`` is preserved when using :py:func:`apply_ufunc` with
``vectorize=True`` and ``dask="parallelized"`` (:issue:`4015`), by
`Mathias Hauser <https://github.com/mathause>`_

Documentation
~~~~~~~~~~~~~
Expand Down
23 changes: 20 additions & 3 deletions xarray/core/computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import functools
import itertools
import operator
import warnings
from collections import Counter
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -687,6 +688,14 @@ def _apply_blockwise(
)
(dtype,) = output_dtypes

if (meta is not None) and hasattr(meta, "dtype") and (meta.dtype != dtype):
warnings.warn(
f"dtype of meta ({meta.dtype}) takes precedence over"
f" output_dtypes ({dtype})",
UserWarning,
stacklevel=3,
)

if output_sizes is None:
output_sizes = {}

Expand Down Expand Up @@ -864,7 +873,9 @@ def apply_ufunc(
inputs are a dask array. If used, the ``output_dtypes`` argument must
also be provided. Multiple output arguments are not yet supported.
output_dtypes : list of dtypes, optional
Optional list of output dtypes. Only used if dask='parallelized'.
Optional list of output dtypes. Used in ``np.vectorize`` and required if
dask='parallelized'. Note that the dtype of meta takes precedence over
output_dtypes in ``dask.array.blockwise``.
output_sizes : dict, optional
Optional mapping from dimension names to sizes for outputs. Only used
if dask='parallelized' and new dimensions (not found on inputs) appear
Expand Down Expand Up @@ -1005,10 +1016,16 @@ def earth_mover_distance(first_samples,
func = functools.partial(func, **kwargs)

if vectorize:
if meta is None:
if (
dask == "parallelized"
and meta is None
and output_dtypes is not None
and isinstance(output_dtypes, list)
):
# set meta=np.ndarray by default for numpy vectorized functions
# work around dask bug computing meta with vectorized functions: GH5642
meta = np.ndarray
# defer raising errors to _apply_blockwise (e.g. if output_dtypes is None)
meta = np.ndarray((0, 0), dtype=output_dtypes[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we still set meta = np.ndarray if no output dtype is specified?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output_dtypes is required for dask="parallelized" and will error if it is missing:

if output_dtypes is None:
raise ValueError(
"output dtypes (output_dtypes) must be supplied to "
"apply_func when using dask='parallelized'"
)

so this wont take effect. I am also not very happy with my approach, but didn't want to copy the checks from apply_blockwise up here - suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the cleaner workaround is to move this down in to _apply_blockwise? Would it be enough to pass vectorize down to that level and then set meta as you are doing here?

Also, it seems like we should raise that error about output_dtypes only if meta.dtype has not been set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree it would be cleaner to thread vectorize through to _apply_blockwise.

Also, it seems like we should raise that error about output_dtypes only if meta.dtype has not been set?

Depends how important output_dtypes is for np.vectorize.

I am happy to work more on this, but I think it would be good to discuss #4060 first, which might make this obsolete.


if signature.all_core_dims:
func = np.vectorize(
Expand Down
56 changes: 56 additions & 0 deletions xarray/tests/test_computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,62 @@ def test_vectorize_dask():
assert_identical(expected, actual)


@requires_dask
def test_vectorize_dask_dtype():
# ensure output_dtypes is preserved with vectorize=True
# GH4015

# integer
data_array = xr.DataArray([[0, 1, 2], [1, 2, 3]], dims=("x", "y"))
expected = xr.DataArray([1, 2], dims=["x"])
actual = apply_ufunc(
pandas_median,
data_array.chunk({"x": 1}),
input_core_dims=[["y"]],
vectorize=True,
dask="parallelized",
output_dtypes=[int],
)
assert_identical(expected, actual)
assert expected.dtype == actual.dtype

# complex
data_array = xr.DataArray([[0 + 0j, 1 + 2j, 2 + 1j]], dims=("x", "y"))
expected = data_array.copy()
actual = apply_ufunc(
identity,
data_array.chunk({"x": 1}),
vectorize=True,
dask="parallelized",
output_dtypes=[complex],
)
assert_identical(expected, actual)
assert expected.dtype == actual.dtype


@requires_dask
def test_vectorize_dask_dtype_meta():
# meta dtype takes precedence

data_array = xr.DataArray([[0, 1, 2], [1, 2, 3]], dims=("x", "y"))
expected = xr.DataArray([1, 2], dims=["x"])

with pytest.warns(
UserWarning, match=r"dtype of meta \(float64\) takes precedence",
):
actual = apply_ufunc(
pandas_median,
data_array.chunk({"x": 1}),
input_core_dims=[["y"]],
vectorize=True,
dask="parallelized",
output_dtypes=[int],
meta=np.ndarray((0, 0), dtype=np.float),
)
assert_identical(expected, actual)
assert np.float == actual.dtype


@requires_dask
def test_vectorize_dask_new_output_dims():
# regression test for GH3574
Expand Down