Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply_ufunc(dask='parallelized') with multiple outputs #1815

Closed
jhamman opened this issue Jan 9, 2018 · 17 comments · Fixed by #4060
Closed

apply_ufunc(dask='parallelized') with multiple outputs #1815

jhamman opened this issue Jan 9, 2018 · 17 comments · Fixed by #4060

Comments

@jhamman
Copy link
Member

jhamman commented Jan 9, 2018

I have an application where I'd like to use apply_ufunc with dask on a function that requires multiple inputs and outputs. This was left as a TODO item in the #1517. However, its not clear to me looking at the code how this can be done given the current form of dask's atop. I'm hoping @shoyer has already thought of a clever solution here...

Code Sample, a copy-pastable example if possible

def func(foo, bar):
    
    assert foo.shape == bar.shape
    spam = np.zeros_like(bar)
    spam2 = np.full_like(bar, 2)

    
    return spam, spam2

foo = xr.DataArray(np.zeros((10, 10))).chunk()
bar = xr.DataArray(np.zeros((10, 10))).chunk() + 5

xrfunc = xr.apply_ufunc(func, foo, bar,
                        output_core_dims=[[], []],
                        dask='parallelized')

Problem description

This currently raises a NotImplementedError.

Expected Output

Multiple dask arrays. In my example above, two dask arrays.

Output of xr.show_versions()

INSTALLED VERSIONS

commit: None
python: 3.6.4.final.0
python-bits: 64
OS: Linux
OS-release: 4.4.86+
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: en_US.UTF-8
LANG: en_US.UTF-8
LOCALE: en_US.UTF-8

xarray: 0.10.0+dev.c92020a
pandas: 0.22.0
numpy: 1.13.3
scipy: 1.0.0
netCDF4: 1.3.1
h5netcdf: 0.5.0
Nio: None
zarr: 2.2.0a2.dev176
bottleneck: 1.2.1
cyordereddict: None
dask: 0.16.0
distributed: 1.20.2+36.g7387410
matplotlib: 2.1.1
cartopy: None
seaborn: None
setuptools: 38.4.0
pip: 9.0.1
conda: 4.3.29
pytest: 3.3.2
IPython: 6.2.1
sphinx: None

cc @mrocklin, @arbennett

@shoyer
Copy link
Member

shoyer commented Jan 9, 2018

We need atop to support multiple output arguments (dask/dask#702), or potentially a specialized wrapper for generalized ufuncs in dask (dask/dask#1176).

@jhamman
Copy link
Member Author

jhamman commented Nov 19, 2018

@shoyer - dask now has a apply_gufunc. Is this something we should try to include in xr.apply_ufunct or a new function xr.apply_gufunc?

xref: dask/dask#3109

@mrocklin
Copy link
Contributor

FYI @magonser

@shoyer
Copy link
Member

shoyer commented Nov 20, 2018

I think we can do this inside the existing xarray.apply_ufunc, simply by using apply_gufunc instead of atop for the case where signature.num_outputs > 1 (which current raises NotImplementedError):
https://github.com/pydata/xarray/blob/master/xarray/core/computation.py#L601

@andersy005
Copy link
Member

Any updates or progress here? I’m trying to use xarray.apply_ufunc with scipy.stats.linregress which returns

  • slope
  • intercept
  • rvalue
  • pvalue
  • stderr

@stefraynaud
Copy link
Contributor

@andersy005 here is a very little demo of linear regression using lstsq (not linregress) in which only slope and intercept are kept. It is here applied to an array of sea surface temperature.
I hope it can help.

ds = xr.open_dataset('sst_2D.nc', chunks={'X': 30, 'Y': 30})
def ulinregress(x, y): # the universal function
    ny, nx, nt = y.shape ; y = np.moveaxis(y, -1, 0).reshape((nt, -1)) # nt, ny*nx
    return np.linalg.lstsq(np.vstack([x, np.ones(nt)]).T, y)[0].T.reshape(ny, nx, 2)
time = (ds['time'] - np.datetime64("1950-01-01")) / np.timedelta64(1, 'D')
ab = xr.apply_ufunc(ulinregress, time, ds['sst'], dask='parallelized', 
                    input_core_dims=[['time'], ['time']], 
                    output_dtypes=['d'], output_sizes={'coef': 2, }, output_core_dims=[['coef']])
series = ds['sst'][:, 0, 0].load()
line = series.copy() ; line[:] = ab[0, 0, 0] * time + ab[0, 0, 1]
series.plot(label='Original') ; line.plot(label='Linear regression') ; plt.legend();

@rabernat
Copy link
Contributor

rabernat commented Oct 7, 2019

@andersy005 - is what you are working on related to #3349?

@andersy005
Copy link
Member

is what you are working on related to #3349?

@rabernat, indeed! Let me know if you have bandwidth to take on the polyfit implementation in xarray. Otherwise, I'd be happy to help out/work on it late October.

@rabernat
Copy link
Contributor

rabernat commented Oct 7, 2019 via email

@bradyrx
Copy link
Contributor

bradyrx commented Apr 15, 2020

This looks essentially the same to @stefraynaud's answer, but I came across this stackoverflow response here: https://stackoverflow.com/questions/52094320/with-xarray-how-to-parallelize-1d-operations-on-a-multidimensional-dataset.

@andersy005, I imagine you're far past this now. And this might have been related to discussions with Genevieve and I anyways.

def new_linregress(x, y):
    # Wrapper around scipy linregress to use in apply_ufunc
    slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
    return np.array([slope, intercept, r_value, p_value, std_err])

# return a new DataArray
stats = xr.apply_ufunc(new_linregress, ds[x], ds[y],
                       input_core_dims=[['year'], ['year']],
                       output_core_dims=[["parameter"]],
                       vectorize=True,
                       dask="parallelized",
                       output_dtypes=['float64'],
                       output_sizes={"parameter": 5},
                      )

@andersy005
Copy link
Member

I imagine you're far past this now. And this might have been related to discussions with Genevieve and I anyways.

Thank you for the update, @bradyrx! Yes, it was related to discussions with @gelsworth

@bradyrx
Copy link
Contributor

bradyrx commented Apr 15, 2020

I think ideally it would be nice to return multiple DataArrays or a Dataset of variables. But I'm really happy with this solution. I'm using it on a 600GB dataset of particle trajectories and was able to write a ufunc to go through and return each particle's x, y, z location when it met a certain condition.

I think having something simple like the stackoverflow snippet I posted would be great for the docs as an apply_ufunc example. I'd be happy to lead this if folks think it's a good idea.

@kmuehlbauer
Copy link
Contributor

I think ideally it would be nice to return multiple DataArrays or a Dataset of variables.

What's the current status of this? I've similar requirements, single DataArray as input, multiple DataArrays as output.

@dcherian
Copy link
Contributor

Still needs to be implemented. Stephan's comment suggests a path forward (#1815 (comment))

@bradyrx
Copy link
Contributor

bradyrx commented May 13, 2020

One issue I see is that this would return multiple dask objects, correct? So to get the results from them, you'd have to run .compute() on each separately. I think it's a valid assumption to expect that the multiple output objects would share a lot of the same computational pipeline. So would you be re-doing the same computation by running .compute() separately on these objects?

The earlier mentioned code snippets provide a nice path forward, since you can just run compute on one object, and then split its result (or however you name it) dimension into multiple individual objects. Thoughts?

@dcherian
Copy link
Contributor

So would you be re-doing the same computation by running .compute() separately on these objects?

Yes. but you can do dask.compute(xarray_obj1, xarray_obj2,...) or combine those objects appropriately into a Dataset and then call compute on that.

@bradyrx
Copy link
Contributor

bradyrx commented May 13, 2020

So would you be re-doing the same computation by running .compute() separately on these objects?

Yes. but you can do dask.compute(xarray_obj1, xarray_obj2,...) or combine those objects appropriately into a Dataset and then call compute on that.

Good call. I figured there was a workaround.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants