Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply_ufunc with dask='parallelized' and vectorize=True fails on compute_meta #3574

Closed
smartass101 opened this issue Nov 26, 2019 · 12 comments · Fixed by #3660
Closed

apply_ufunc with dask='parallelized' and vectorize=True fails on compute_meta #3574

smartass101 opened this issue Nov 26, 2019 · 12 comments · Fixed by #3660

Comments

@smartass101
Copy link

smartass101 commented Nov 26, 2019

MCVE Code Sample

import numpy as np
import xarray as xr

ds = xr.Dataset({
    'signal': (['das_time', 'das', 'record'], np.empty((1000, 120, 45))),
    'min_height': (['das'], np.empty((120,)))   # each DAS has a different resolution
})

def some_peak_finding_func(data1d, min_height):
    """process data1d with contraints by min_height"""
    result = np.zeros((4,2))  # summary matrix with 2 peak characteristics
    return result

ds_dask = ds.chunk({'record':3})

xr.apply_ufunc(some_peak_finding_func, ds_dask['signal'], ds_dask['min_height'],
               input_core_dims=[['das_time'], []],  # apply peak finding along trace
               output_core_dims=[['peak_pos', 'pulse']],
               vectorize=True, # up to here works without dask!
               dask='parallelized',
               output_sizes={'peak_pos': 4, 'pulse':2},
               output_dtypes=[np.float],
              )

fails with ValueError: cannot call vectorize with a signature including new output dimensions on size 0 inputs because dask.array.utils.compute_meta() passes it 0-sized arrays.

Expected Output

This should work and works well on the non-chunked ds, without dask='parallelized' and the associated output* parameters.

Problem Description

I'm trying to parallelize a peak finding routine with dask (works well without it) and I hoped that dask='parallelized would make that simple. However, the peak finding needs to be vectorized and it works well with vectorize=True, but np.vectorizeappears to have issues incompute_meta` which is internally issued by dask in blockwise application as indicated in the source code:

https://github.com/dask/dask/blob/e6ba8f5de1c56afeaed05c39c2384cd473d7c893/dask/array/utils.py#L118

A possible solution might be for apply_ufunc to pass meta directly to dask if it would be possible to foresee what meta should be. I suppose we are aiming for np.nadarray most of the time, though sparse might change that in the future.

I know I could use groupby-apply as an alternative, but there are several issues that made us use apply_ufunc instead:

  • groupby-apply seems to have much larger overhead
  • the non-core dimensions would have to be stacked into a new dimension over which to groupby, but some of the dimensions to be stacked are already a MutliIndex and cannot be easily stacked.
    • we could unstack the MultiIndex dimensions first at the risk of introducing quite a number of NaNs
    • extra coords might lose dimension infromation (will depend on all) after unstacking after application

Output of xr.show_versions()

commit: None
python: 3.7.4 (default, Aug 13 2019, 20:35:49)
[GCC 7.3.0]
python-bits: 64
OS: Linux
OS-release: 4.9.0-11-amd64
machine: x86_64
processor:
byteorder: little
LC_ALL: None
LANG: en_US.UTF-8
LOCALE: en_US.UTF-8
libhdf5: 1.10.4
libnetcdf: 4.6.1

xarray: 0.14.0
pandas: 0.25.1
numpy: 1.17.2
scipy: 1.3.1
netCDF4: 1.4.2
pydap: None
h5netcdf: 0.7.4
h5py: 2.9.0
Nio: None
zarr: None
cftime: 1.0.4.2
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: 1.2.1
dask: 2.5.2
distributed: 2.5.2
matplotlib: 3.1.1
cartopy: None
seaborn: 0.9.0
numbagg: None
setuptools: 41.4.0
pip: 19.2.3
conda: 4.7.12
pytest: 5.2.1
IPython: 7.8.0
sphinx: 2.2.0

@smartass101 smartass101 changed the title apply_ufunc with dask='parallelized' and vectorize=True fails on compute_metadata apply_ufunc with dask='parallelized' and vectorize=True fails on compute_meta Nov 26, 2019
@smartass101
Copy link
Author

Another approach would be to bypass compute_meta in dask.blockwise if dtype is provided which seems to be hinted at here

https://github.com/dask/dask/blob/3960c6518318f2417658c2fc47cd5b5ece726f8b/dask/array/blockwise.py#L234

Perhaps this is an oversight in dask, what do you think?

@jbusecke
Copy link
Contributor

jbusecke commented Dec 12, 2019

I am having a similar problem. This impacts some of my frequently used code to compute correlations.

Here is a simplified example that used to work with older dependencies:

import xarray as xr
import numpy as np
from scipy.stats import linregress


def _ufunc(aa,bb):
    out = linregress(aa,bb)
    return np.array([out.slope, out.intercept])

def wrapper(a, b, dim='time'):
    return xr.apply_ufunc(
        _ufunc,a,b,
        input_core_dims=[[dim], [dim]],
        output_core_dims=[["parameter"]],
        vectorize=True,
        dask="parallelized",
        output_dtypes=[a.dtype],
        output_sizes={"parameter": 2},)

This works when passing numpy arrays:

a = xr.DataArray(np.random.rand(3, 13, 5), dims=['x', 'time', 'y'])
b = xr.DataArray(np.random.rand(3, 5, 13), dims=['x','y', 'time'])
wrapper(a,b)
array([[[ 0.09958247, 0.36831431], [-0.54445474, 0.66997513], [-0.22894182, 0.65433402], [ 0.38536482, 0.20656073], [ 0.25083224, 0.46955618]],
   [[-0.21684891,  0.55521932],
    [ 0.51621616,  0.20869272],
    [-0.1502755 ,  0.55526262],
    [-0.25452988,  0.60823538],
    [-0.20571622,  0.56950115]],

   [[-0.22810421,  0.50423622],
    [ 0.33002345,  0.36121484],
    [ 0.37744774,  0.33081058],
    [-0.10825559,  0.53772493],
    [-0.12576656,  0.51722167]]])

Dimensions without coordinates: x, y, parameter

But when I convert both arrays to dask arrays, I get the same error as @smartass101.

wrapper(a.chunk({'x':2, 'time':-1}),b.chunk({'x':2, 'time':-1}))
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) in 1 a = xr.DataArray(np.random.rand(3, 13, 5), dims=['x', 'time', 'y']) 2 b = xr.DataArray(np.random.rand(3, 5, 13), dims=['x','y', 'time']) ----> 3 wrapper(a.chunk({'x':2, 'time':-1}),b.chunk({'x':2, 'time':-1}))

in wrapper(a, b, dim)
16 dask="parallelized",
17 output_dtypes=[a.dtype],
---> 18 output_sizes={"parameter": 2},)

~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/xarray/core/computation.py in apply_ufunc(func, input_core_dims, output_core_dims, exclude_dims, vectorize, join, dataset_join, dataset_fill_value, keep_attrs, kwargs, dask, output_dtypes, output_sizes, *args)
1042 join=join,
1043 exclude_dims=exclude_dims,
-> 1044 keep_attrs=keep_attrs
1045 )
1046 elif any(isinstance(a, Variable) for a in args):

~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/xarray/core/computation.py in apply_dataarray_vfunc(func, signature, join, exclude_dims, keep_attrs, *args)
232
233 data_vars = [getattr(a, "variable", a) for a in args]
--> 234 result_var = func(*data_vars)
235
236 if signature.num_outputs > 1:

~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/xarray/core/computation.py in apply_variable_ufunc(func, signature, exclude_dims, dask, output_dtypes, output_sizes, keep_attrs, *args)
601 "apply_ufunc: {}".format(dask)
602 )
--> 603 result_data = func(*input_data)
604
605 if signature.num_outputs == 1:

~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/xarray/core/computation.py in func(*arrays)
591 signature,
592 output_dtypes,
--> 593 output_sizes,
594 )
595

~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/xarray/core/computation.py in _apply_blockwise(func, args, input_dims, output_dims, signature, output_dtypes, output_sizes)
721 dtype=dtype,
722 concatenate=True,
--> 723 new_axes=output_sizes
724 )
725

~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/dask/array/blockwise.py in blockwise(func, out_ind, name, token, dtype, adjust_chunks, new_axes, align_arrays, concatenate, meta, *args, **kwargs)
231 from .utils import compute_meta
232
--> 233 meta = compute_meta(func, dtype, *args[::2], **kwargs)
234 if meta is not None:
235 return Array(graph, out, chunks, meta=meta)

~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/dask/array/utils.py in compute_meta(func, _dtype, *args, **kwargs)
119 # with np.vectorize, such as dask.array.routines._isnonzero_vec().
120 if isinstance(func, np.vectorize):
--> 121 meta = func(*args_meta)
122 else:
123 try:

~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/numpy/lib/function_base.py in call(self, *args, **kwargs)
2089 vargs.extend([kwargs[_n] for _n in names])
2090
-> 2091 return self._vectorize_call(func=func, args=vargs)
2092
2093 def _get_ufunc_and_otypes(self, func, args):

~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/numpy/lib/function_base.py in _vectorize_call(self, func, args)
2155 """Vectorized call to func over positional args."""
2156 if self.signature is not None:
-> 2157 res = self._vectorize_call_with_signature(func, args)
2158 elif not args:
2159 res = func()

~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/numpy/lib/function_base.py in _vectorize_call_with_signature(self, func, args)
2229 for dims in output_core_dims
2230 for dim in dims):
-> 2231 raise ValueError('cannot call vectorize with a signature '
2232 'including new output dimensions on size 0 '
2233 'inputs')

ValueError: cannot call vectorize with a signature including new output dimensions on size 0 inputs

This used to work like a charm...I however was sloppy in testing this functionality (a good reminder always to write tests immediately 🙄 ), and I was not able to determine a combination of dependencies that would work. I am still experimenting and will report back

Could this behaviour be a bug introduced in dask at some point (as indicated by @smartass101 above)? cc'ing @dcherian @shoyer @mrocklin

EDIT: I can confirm that it seems to be a dask issue. If I restrict my dask version to <2.0, my tests (very similar to the above example) work.

@smartass101
Copy link
Author

smartass101 commented Dec 12, 2019

Sounds similar. But I'm not sure why you get the 0d issue when even your chunks don't (from a quick reading) seem to have a 0 size in any of the dimensions. Could you please show us what is the resulting chunk setup?

@jbusecke
Copy link
Contributor

This is the chunk setup
image

Might this be a problem resulting from numpy.vectorize?

@shoyer
Copy link
Member

shoyer commented Dec 12, 2019

The problem is that Dask, as of version 2.0, calls functions applied to dask arrays with size zero inputs, to figure out the output array type, e.g., is the output a dense numpy.ndarray or a sparse array?

Unfortunately, numpy.vectorize doesn't know how to large of a size 0 array to make, because it doesn't have anything like the output_sizes argument.

For xarray, we have a couple of options:

  1. we can safely assume that if the applied function is a np.vectorize, then it should pass meta=np.ndarray into the relevant dask functions (e.g., dask.array.blockwise). This should avoid the need to evaluate with size 0 arrays.
  2. we could add an output_sizes argument to np.vectorize either upstream in NumPy or into a wrapper in Xarray.

(1) is probably easiest here.

@smartass101
Copy link
Author

The problem is that Dask, as of version 2.0, calls functions applied to dask arrays with size zero inputs, to figure out the output array type, e.g., is the output a dense numpy.ndarray or a sparse array?

Yes, now I recall that this was the issue, yeah. It doesn't even depend on your actual data really.

Possible option 3. is to address dask/dask#5642 directly (haven't found time to do a PR yet). Essentially from the code described in that issue I have the feeling that if a dtype is passed (as apply_ufunc does), then meta should not need to be calculated.

@dcherian
Copy link
Contributor

@shoyer's option 1 should be a relatively simple xarray PR is one of you is up for it.

@jbusecke
Copy link
Contributor

I can give it a shot if you could point me to the appropriate place, since I have never messed with the dask internals of xarray.

@dcherian
Copy link
Contributor

meta should be passed to blockwise through _apply_blockwise with default None (I think) and np.ndarray if vectorize is True. You'll have to pass the vectorize kwarg down to this level I think.

elif dask == "parallelized":
input_dims = [broadcast_dims + dims for dims in signature.input_core_dims]
numpy_func = func
def func(*arrays):
return _apply_blockwise(
numpy_func,
arrays,
input_dims,
output_dims,
signature,
output_dtypes,
output_sizes,
)

@smartass101
Copy link
Author

meta should be passed to blockwise through _apply_blockwise with default None (I think) and np.ndarray if vectorize is True. You'll have to pass the vectorize kwarg down to this level I think.

I'm afraid that passing meta=None will not help as explained in
dask/dask#5642 and seen around this line because in that case compute_meta will be called which might fail with a np.vectorize-wrapped function.
I belive a better solution would be to address dask/dask#5642 so that meta isn't computed even though we already provide an output dtype.

@dcherian
Copy link
Contributor

Right the xarray solution is to set meta = np.ndarray if vectorize is True else None if the user doesn't explicitly provide meta. Or am I missing something?

@smartass101
Copy link
Author

meta = np.ndarray if vectorize is True else None if the user doesn't explicitly provide meta.

Yes, sorry, written this way I now see what you meant and that will likely work indeed.

dcherian added a commit to dcherian/xarray that referenced this issue Jan 2, 2020
dcherian added a commit to dcherian/xarray that referenced this issue Jan 2, 2020
dcherian added a commit that referenced this issue Jan 22, 2020
* apply_func: Set meta=np.ndarray when vectorize=True and dask="parallelized"

Closes #3574

* Add meta kwarg to apply_ufunc.

* Bump minimum dask to 2.1.0

* Update distributed too

* bump minimum dask, distributed to 2.2

* Update whats-new

* minor.

* fix whats-new

* Attempt numpy=1.15

* Revert "Attempt numpy=1.15"

This reverts commit 2b22470.

* xfail test.

* More xfailed tests.

* Update xfail reason.

* fix whats-new

* Add test to ensure meta is passed on to dask.

* Use skipif instead of xfail.
jbusecke pushed a commit to jbusecke/pangeo-cloud-federation that referenced this issue Feb 11, 2020
I experienced problems with a computation due to a bug that was recently solved in xarray (pydata/xarray#3574 (comment)). This PR adds the most recent release of xarray
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants