Skip to content

Commit

Permalink
use dask.array.apply_gufunc in xr.apply_ufunc (#4060)
Browse files Browse the repository at this point in the history
* ENH: use `dask.array.apply_gufunc` in `xr.apply_ufunc` for multiple outputs when `dask='parallelized'`, add/fix tests

* DOC: Update docstring and whats-new.rst

* WIP: apply_gufunc

* WIP: apply_gufunc -> reinstate dask='allowed' as per @mathause, adapting tests

* WIP: apply_gufunc -> add test for GH #4015, fix test for sparse meta checking

* WIP: apply_gufunc -> remove unused `input_dims`

* Update xarray/core/computation.py

Co-authored-by: Mathias Hauser <mathause@users.noreply.github.com>

* Update xarray/core/computation.py

Co-authored-by: Mathias Hauser <mathause@users.noreply.github.com>

* Update xarray/core/computation.py

Co-authored-by: Mathias Hauser <mathause@users.noreply.github.com>

* WIP: use dask_gufunc_kwargs, keep vectorize first but only for non-dask-gufunc, rework docstrings, adapt tests

* DOC: add reference to internal changes in whats-new.rst

* FIX: mypy

* FIX: vectorize inside `apply_variable_ufunc`

* TST: add tests from #4022 from @mathause

* FIX: address black issue

* FIX: xfail test for dask < 2.3

* WIP: apply changes in response to @mathause's review comments

* WIP: remove line

* WIP: catch different chunksize error and allow_rechunk, docstring fixes

* WIP: remove comment

* WIP: style issues

* WIP: revert catch, revert test, add tests without output_dtypes

* WIP: fix signature in apply_ufunc->apply_gufunc, handle output_sizes, handle dask version, fix tests

* WIP: fix tuple

* WIP: add dims_map to _UFuncSignature, adapt output_sizes to fit for apply_gufunc

* WIP: black

* WIP: raise ValueError if output_sizes dimension mismatch

* WIP: raise ValueError if output_sizes is missing for given output_core_dims

* WIP: simplify if/else

* FIX: resolve conflicts prior merge with master

* FIX: combine if's as per review

* FIX: pass `vectorize` and `output_dtypes` kwargs explicitely into `apply_variable_ufunc` as per review suggestion

* FIX: pass `vectorize` and `output_dtypes` kwargs explicitely into `da.apply_gufunc`

* FIX: address review comments of @keewis and @mathause

* FIX: black

* FIX: `vectorize` not needed in if-clause

* FIX: set DeprecationWarning and stacklevel=2

* FIX: use FutureWarning for user visibility

* FIX: remove comment as suggested

Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com>

Co-authored-by: Mathias Hauser <mathause@users.noreply.github.com>
Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 19, 2020
1 parent 11d72d0 commit a7fb5a9
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 154 deletions.
4 changes: 4 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Breaking changes

New Features
~~~~~~~~~~~~
- Support multiple outputs in :py:func:`xarray.apply_ufunc` when using ``dask='parallelized'``. (:issue:`1815`, :pull:`4060`)
By `Kai Mühlbauer <https://github.com/kmuehlbauer>`_.
- :py:meth:`~xarray.DataArray.rolling` and :py:meth:`~xarray.Dataset.rolling`
now accept more than 1 dimension.(:pull:`4219`)
By `Keisuke Fujii <https://github.com/fujiisoup>`_.
Expand Down Expand Up @@ -75,6 +77,8 @@ Documentation

Internal Changes
~~~~~~~~~~~~~~~~
- Use :py:func:`dask.array.apply_gufunc` instead of :py:func:`dask.array.blockwise` in
:py:func:`xarray.apply_ufunc` when using ``dask='parallelized'``. (:pull:`4060`)
- Fix ``pip install .`` when no ``.git`` directory exists; namely when the xarray source
directory has been rsync'ed by PyCharm Professional for a remote deployment over SSH.
By `Guido Imperiale <https://github.com/crusaderky>`_
Expand Down
238 changes: 116 additions & 122 deletions xarray/core/computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import functools
import itertools
import operator
import warnings
from collections import Counter
from distutils.version import LooseVersion
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand Down Expand Up @@ -90,6 +92,12 @@ def all_core_dims(self):
self._all_core_dims = self.all_input_core_dims | self.all_output_core_dims
return self._all_core_dims

@property
def dims_map(self):
return {
core_dim: f"dim{n}" for n, core_dim in enumerate(sorted(self.all_core_dims))
}

@property
def num_inputs(self):
return len(self.input_core_dims)
Expand Down Expand Up @@ -126,14 +134,12 @@ def to_gufunc_string(self):
Unlike __str__, handles dimensions that don't map to Python
identifiers.
"""
all_dims = self.all_core_dims
dims_map = dict(zip(sorted(all_dims), range(len(all_dims))))
input_core_dims = [
["dim%d" % dims_map[dim] for dim in core_dims]
[self.dims_map[dim] for dim in core_dims]
for core_dims in self.input_core_dims
]
output_core_dims = [
["dim%d" % dims_map[dim] for dim in core_dims]
[self.dims_map[dim] for dim in core_dims]
for core_dims in self.output_core_dims
]
alt_signature = type(self)(input_core_dims, output_core_dims)
Expand Down Expand Up @@ -424,7 +430,7 @@ def apply_groupby_func(func, *args):
if any(not first_groupby._group.equals(gb._group) for gb in groupbys[1:]):
raise ValueError(
"apply_ufunc can only perform operations over "
"multiple GroupBy objets at once if they are all "
"multiple GroupBy objects at once if they are all "
"grouped the same way"
)

Expand Down Expand Up @@ -539,16 +545,27 @@ def broadcast_compat_data(
return data


def _vectorize(func, signature, output_dtypes):
if signature.all_core_dims:
func = np.vectorize(
func, otypes=output_dtypes, signature=signature.to_gufunc_string()
)
else:
func = np.vectorize(func, otypes=output_dtypes)

return func


def apply_variable_ufunc(
func,
*args,
signature,
exclude_dims=frozenset(),
dask="forbidden",
output_dtypes=None,
output_sizes=None,
vectorize=False,
keep_attrs=False,
meta=None,
dask_gufunc_kwargs=None,
):
"""Apply a ndarray level function over Variable and/or ndarray objects.
"""
Expand Down Expand Up @@ -579,28 +596,61 @@ def apply_variable_ufunc(
"``.load()`` or ``.compute()``"
)
elif dask == "parallelized":
input_dims = [broadcast_dims + dims for dims in signature.input_core_dims]
numpy_func = func

if dask_gufunc_kwargs is None:
dask_gufunc_kwargs = {}

output_sizes = dask_gufunc_kwargs.pop("output_sizes", {})
if output_sizes:
output_sizes_renamed = {}
for key, value in output_sizes.items():
if key not in signature.all_output_core_dims:
raise ValueError(
f"dimension '{key}' in 'output_sizes' must correspond to output_core_dims"
)
output_sizes_renamed[signature.dims_map[key]] = value
dask_gufunc_kwargs["output_sizes"] = output_sizes_renamed

for key in signature.all_output_core_dims:
if key not in signature.all_input_core_dims and key not in output_sizes:
raise ValueError(
f"dimension '{key}' in 'output_core_dims' needs corresponding (dim, size) in 'output_sizes'"
)

def func(*arrays):
return _apply_blockwise(
import dask.array as da

res = da.apply_gufunc(
numpy_func,
arrays,
input_dims,
output_dims,
signature,
output_dtypes,
output_sizes,
meta,
signature.to_gufunc_string(),
*arrays,
vectorize=vectorize,
output_dtypes=output_dtypes,
**dask_gufunc_kwargs,
)

# todo: covers for https://github.com/dask/dask/pull/6207
# remove when minimal dask version >= 2.17.0
from dask import __version__ as dask_version

if LooseVersion(dask_version) < LooseVersion("2.17.0"):
if signature.num_outputs > 1:
res = tuple(res)

return res

elif dask == "allowed":
pass
else:
raise ValueError(
"unknown setting for dask array handling in "
"apply_ufunc: {}".format(dask)
)
else:
if vectorize:
func = _vectorize(func, signature, output_dtypes=output_dtypes)

result_data = func(*input_data)

if signature.num_outputs == 1:
Expand Down Expand Up @@ -648,90 +698,6 @@ def func(*arrays):
return tuple(output)


def _apply_blockwise(
func,
args,
input_dims,
output_dims,
signature,
output_dtypes,
output_sizes=None,
meta=None,
):
import dask.array

if signature.num_outputs > 1:
raise NotImplementedError(
"multiple outputs from apply_ufunc not yet "
"supported with dask='parallelized'"
)

if output_dtypes is None:
raise ValueError(
"output dtypes (output_dtypes) must be supplied to "
"apply_func when using dask='parallelized'"
)
if not isinstance(output_dtypes, list):
raise TypeError(
"output_dtypes must be a list of objects coercible to "
"numpy dtypes, got {}".format(output_dtypes)
)
if len(output_dtypes) != signature.num_outputs:
raise ValueError(
"apply_ufunc arguments output_dtypes and "
"output_core_dims must have the same length: {} vs {}".format(
len(output_dtypes), signature.num_outputs
)
)
(dtype,) = output_dtypes

if output_sizes is None:
output_sizes = {}

new_dims = signature.all_output_core_dims - signature.all_input_core_dims
if any(dim not in output_sizes for dim in new_dims):
raise ValueError(
"when using dask='parallelized' with apply_ufunc, "
"output core dimensions not found on inputs must "
"have explicitly set sizes with ``output_sizes``: {}".format(new_dims)
)

for n, (data, core_dims) in enumerate(zip(args, signature.input_core_dims)):
if isinstance(data, dask_array_type):
# core dimensions cannot span multiple chunks
for axis, dim in enumerate(core_dims, start=-len(core_dims)):
if len(data.chunks[axis]) != 1:
raise ValueError(
"dimension {!r} on {}th function argument to "
"apply_ufunc with dask='parallelized' consists of "
"multiple chunks, but is also a core dimension. To "
"fix, rechunk into a single dask array chunk along "
"this dimension, i.e., ``.chunk({})``, but beware "
"that this may significantly increase memory usage.".format(
dim, n, {dim: -1}
)
)

(out_ind,) = output_dims

blockwise_args = []
for arg, dims in zip(args, input_dims):
# skip leading dimensions that are implicitly added by broadcasting
ndim = getattr(arg, "ndim", 0)
trimmed_dims = dims[-ndim:] if ndim else ()
blockwise_args.extend([arg, trimmed_dims])

return dask.array.blockwise(
func,
out_ind,
*blockwise_args,
dtype=dtype,
concatenate=True,
new_axes=output_sizes,
meta=meta,
)


def apply_array_ufunc(func, *args, dask="forbidden"):
"""Apply a ndarray level function over ndarray objects."""
if any(isinstance(arg, dask_array_type) for arg in args):
Expand Down Expand Up @@ -771,6 +737,7 @@ def apply_ufunc(
output_dtypes: Sequence = None,
output_sizes: Mapping[Any, int] = None,
meta: Any = None,
dask_gufunc_kwargs: Dict[str, Any] = None,
) -> Any:
"""Apply a vectorized function for unlabeled arrays on xarray objects.
Expand Down Expand Up @@ -857,19 +824,29 @@ def apply_ufunc(
dask arrays:
- 'forbidden' (default): raise an error if a dask array is encountered.
- 'allowed': pass dask arrays directly on to ``func``.
- 'allowed': pass dask arrays directly on to ``func``. Prefer this option if
``func`` natively supports dask arrays.
- 'parallelized': automatically parallelize ``func`` if any of the
inputs are a dask array. If used, the ``output_dtypes`` argument must
also be provided. Multiple output arguments are not yet supported.
inputs are a dask array by using `dask.array.apply_gufunc`. Multiple output
arguments are supported. Only use this option if ``func`` does not natively
support dask arrays (e.g. converts them to numpy arrays).
dask_gufunc_kwargs : dict, optional
Optional keyword arguments passed to ``dask.array.apply_gufunc`` if
dask='parallelized'. Possible keywords are ``output_sizes``, ``allow_rechunk``
and ``meta``.
output_dtypes : list of dtypes, optional
Optional list of output dtypes. Only used if dask='parallelized'.
Optional list of output dtypes. Only used if ``dask='parallelized'`` or
vectorize=True.
output_sizes : dict, optional
Optional mapping from dimension names to sizes for outputs. Only used
if dask='parallelized' and new dimensions (not found on inputs) appear
on outputs.
on outputs. ``output_sizes`` should be given in the ``dask_gufunc_kwargs``
parameter. It will be removed as direct parameter in a future version.
meta : optional
Size-0 object representing the type of array wrapped by dask array. Passed on to
``dask.array.blockwise``.
``dask.array.apply_gufunc``. ``meta`` should be given in the
``dask_gufunc_kwargs`` parameter . It will be removed as direct parameter
a future version.
Returns
-------
Expand Down Expand Up @@ -1006,34 +983,45 @@ def earth_mover_distance(first_samples,
f"Please make {(exclude_dims - signature.all_core_dims)} a core dimension"
)

# handle dask_gufunc_kwargs
if dask == "parallelized":
if dask_gufunc_kwargs is None:
dask_gufunc_kwargs = {}
# todo: remove warnings after deprecation cycle
if meta is not None:
warnings.warn(
"``meta`` should be given in the ``dask_gufunc_kwargs`` parameter."
" It will be removed as direct parameter in a future version.",
FutureWarning,
stacklevel=2,
)
dask_gufunc_kwargs.setdefault("meta", meta)
if output_sizes is not None:
warnings.warn(
"``output_sizes`` should be given in the ``dask_gufunc_kwargs`` "
"parameter. It will be removed as direct parameter in a future "
"version.",
FutureWarning,
stacklevel=2,
)
dask_gufunc_kwargs.setdefault("output_sizes", output_sizes)

if kwargs:
func = functools.partial(func, **kwargs)

if vectorize:
if meta is None:
# set meta=np.ndarray by default for numpy vectorized functions
# work around dask bug computing meta with vectorized functions: GH5642
meta = np.ndarray

if signature.all_core_dims:
func = np.vectorize(
func, otypes=output_dtypes, signature=signature.to_gufunc_string()
)
else:
func = np.vectorize(func, otypes=output_dtypes)

variables_vfunc = functools.partial(
apply_variable_ufunc,
func,
signature=signature,
exclude_dims=exclude_dims,
keep_attrs=keep_attrs,
dask=dask,
vectorize=vectorize,
output_dtypes=output_dtypes,
output_sizes=output_sizes,
meta=meta,
dask_gufunc_kwargs=dask_gufunc_kwargs,
)

# feed groupby-apply_ufunc through apply_groupby_func
if any(isinstance(a, GroupBy) for a in args):
this_apply = functools.partial(
apply_ufunc,
Expand All @@ -1046,9 +1034,12 @@ def earth_mover_distance(first_samples,
dataset_fill_value=dataset_fill_value,
keep_attrs=keep_attrs,
dask=dask,
meta=meta,
vectorize=vectorize,
output_dtypes=output_dtypes,
dask_gufunc_kwargs=dask_gufunc_kwargs,
)
return apply_groupby_func(this_apply, *args)
# feed datasets apply_variable_ufunc through apply_dataset_vfunc
elif any(is_dict_like(a) for a in args):
return apply_dataset_vfunc(
variables_vfunc,
Expand All @@ -1060,6 +1051,7 @@ def earth_mover_distance(first_samples,
fill_value=dataset_fill_value,
keep_attrs=keep_attrs,
)
# feed DataArray apply_variable_ufunc through apply_dataarray_vfunc
elif any(isinstance(a, DataArray) for a in args):
return apply_dataarray_vfunc(
variables_vfunc,
Expand All @@ -1069,9 +1061,11 @@ def earth_mover_distance(first_samples,
exclude_dims=exclude_dims,
keep_attrs=keep_attrs,
)
# feed Variables directly through apply_variable_ufunc
elif any(isinstance(a, Variable) for a in args):
return variables_vfunc(*args)
else:
# feed anything else through apply_array_ufunc
return apply_array_ufunc(func, *args, dask=dask)


Expand Down
Loading

0 comments on commit a7fb5a9

Please sign in to comment.