Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel map/apply powered by dask.array #585

Closed
shoyer opened this issue Sep 20, 2015 · 11 comments
Closed

Parallel map/apply powered by dask.array #585

shoyer opened this issue Sep 20, 2015 · 11 comments
Milestone

Comments

@shoyer
Copy link
Member

shoyer commented Sep 20, 2015

Dask is awesome, but it isn't always easy to use it for parallel operations. In many cases, especially when wrapping routines from external libraries, it is most straightforward to express operations in terms of a function that expects and returns xray objects loaded into memory.

Dask array has a map_blocks function/method, but it's applicability is limited because dask.array doesn't have axis names for unambiguously identifying dimensions. da.atop can handle many of these cases, but it's not the easiest to use. Fortunately, we have sufficient metadata in xray that we could probably parallelize many atop operations automatically by inferring result dimensions and dtypes from applying the function once. See here for more discussion on the dask side: dask/dask#702

So I would like to add some convenience methods for automatic parallelization with dask of a function defined on xray objects loaded into memory. In addition to a map_blocks method/function, it would be useful to add some sort of parallel_apply method to groupby objects that works very similarly, by lazily applying a function that takes and returns xray objects loaded into memory.

@shoyer shoyer added this to the before 1.0 milestone Sep 20, 2015
@clarkfitzg
Copy link
Member

But do the xray objects have to exist in memory? I was thinking this could also work along with open_mfdataset. It just loads and operates on the chunk it needs.

Like the idea of applying this to groupby objects. I wonder if it could be done transparently to the user...

@shoyer
Copy link
Member Author

shoyer commented Sep 23, 2015

Indeed, there's no need to load the entire dataset into memory first. I think open_mfdataset is the model to emulate here -- it's parallelism that just works.

I'm not quite sure how to do this transparently in groupby operations yet. The problem is that you do want to apply some groupby operations on dask arrays without loading the entire group into memory, if there are only a few groups on a large datasets and the function itself is written in terms of dask operations. I think we will probably need some syntax to disambiguate that scenario.

@rabernat
Copy link
Contributor

👍 Very useful idea!

@shoyer
Copy link
Member Author

shoyer commented Jun 15, 2016

With the single machine version of dask, we need to run one block first to infer the appropriate metadata for constructing the combined dataset.

Potentially a better approach would be to optionally leverage dask.distributed, which has the ability to run computation at the same time as graph construction. map_blocks could then kick off a bunch of map tasks to execute in parallel, and only worry about reassembling the blocks in a reduce after the results have come in.

@monocongo
Copy link

I'm adding this note to express an interest in the functionality described in Stephan's original description, i.e. a parallel_apply method/function which would apply a function in parallel utilizing multiple CPUs. I have (finally) worked out how to use groupby and apply for my application but it would be much more useful if I could apply functions in parallel to take advantage of multiple CPUs. What's the expected effort to make something like this available in xarray? Several months ago I worked on doing this sort of thing without xarray using the multiprocessing module and a shared memory object and I may revisit that soon, but I expect that a solution using xarray will be more elegant so if such a thing is coming in the foreseeable future then I may wait on that and focus on other tasks. Can anyone advise?

@rabernat
Copy link
Contributor

Does #964 help on this?

@shoyer
Copy link
Member Author

shoyer commented Sep 22, 2016

I think #964 provides a viable path forward here.

Previously, I was imagining the user provides an function that maps xarray.DataArray -> xarray.DataArray. Such functions are tricky to parallelize with dask.array because need to run them to figure out the result dimensions/coordinates.

In contrast, with a user defined function ndarray -> ndarray, it's fairly straightforward to parallelize these with dask array (e.g., using dask.array.elemwise or dask.array.map_blocks). Then we could add the metadata back in afterwards with #964.

In principle, we could do this automatically -- especially if dask had a way to parallelize arbitrary NumPy generalized universal functions. Then the user could write something like xarray.apply(func, data, signature=signature, dask_array='auto') to automatically parallelize func over their data. In fact, I had this in some previous commits for #964, but took it out for now, just to reduce scope for the change.

@monocongo
Copy link

monocongo commented Sep 22, 2016

This is good news for me as the functions I will apply take a ndarray as
input and return a corresponding ndarray as output. Once this is available
in xarray I'll be eager to give it a whirl...

@shoyer
Copy link
Member Author

shoyer commented Aug 24, 2017

I have a preliminary implementation up in #1517

@rabernat
Copy link
Contributor

This issue was closed by #1517. But there was plenty of discussion above about parallelizing groupby. Does #1517 make parallel groupby automatically work? My understanding is no. If that's the case, we probably need to open a new issue for parallel groupby.

cc @mrocklin

@shoyer
Copy link
Member Author

shoyer commented Oct 13, 2017

@rabernat Agreed, let's open a new issue for that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants