Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic parallelization for dask arrays in apply_ufunc #1517

Merged
merged 7 commits into from
Oct 9, 2017

Conversation

shoyer
Copy link
Member

@shoyer shoyer commented Aug 23, 2017

This lets you parallelize a function designed for numpy inputs just by adding a few keyword arguments to apply_ufunc.

Example usage, to calculate rank correlation between two variables (this will probably turn into an example for the docs):

import numpy as np
import xarray as xr
import bottleneck

def covariance_gufunc(x, y):
    return ((x - x.mean(axis=-1, keepdims=True))
            * (y - y.mean(axis=-1, keepdims=True))).mean(axis=-1)

def correlation_gufunc(x, y):
    return covariance_gufunc(x, y) / (x.std(axis=-1) * y.std(axis=-1))

def spearman_correlation_gufunc(x, y):
    x_ranks = bottleneck.rankdata(x, axis=-1)
    y_ranks = bottleneck.rankdata(y, axis=-1)
    return correlation_gufunc(x_ranks, y_ranks)

def spearman_correlation(x, y, dim):
    return xr.core.compuation.apply_ufunc(
        spearman_correlation_gufunc, x, y,
        input_core_dims=[[dim], [dim]],
        dask='parallelized',
        output_dtypes=[float])

We get a nice 5x speedup for this compute bound task:

In [56]: rs = np.random.RandomState(0)

In [57]: array1 = xr.DataArray(rs.randn(1000, 100000), dims=['place', 'time'])  # 800MB

In [58]: array2 = array1 + 0.5 * rs.randn(1000, 100000)

# using one core, on numpy arrays
In [61]: %time _ = spearman_correlation(array1, array2, 'time')
CPU times: user 21.6 s, sys: 2.84 s, total: 24.5 s
Wall time: 24.9 s

# using all my laptop's cores, with dask
In [63]: r = spearman_correlation(array1.chunk({'place': 10}), array2.chunk({'place': 10}), 'time')

In [64]: %time _ = r.compute()
CPU times: user 30.9 s, sys: 1.74 s, total: 32.6 s
Wall time: 4.59 s

Still needs examples in the documentation.

The next step is finally expose apply_ufunc as public API :).

cc @mrocklin

@clarkfitzg
Copy link
Member

Wow, this is great stuff!

What's rs.randn()?

When this makes it into the public facing API it would be nice to include some guidance on how the chunking scheme affects the run time. Imagine a plot with run time plotted as a function of chunk size or number of chunks. Of course it also depends on the data size and the number of cores available.

To say it in a different way, array1.chunk({'place': 10}) is a performance tuning parameter, semantically no different than array1.

More ambitiously I could imagine an API such as array1.chunk('place') or array1.chunk('auto') meaning to figure out a reasonable chunking scheme only once .compute() is called so that all the compute steps are known. Maybe this is more specific to dask than xarray. I believe it would also be difficult.

@shoyer
Copy link
Member Author

shoyer commented Aug 24, 2017

What's rs.randn()?

Oops, fixed.

When this makes it into the public facing API it would be nice to include some guidance on how the chunking scheme affects the run time.

We already have some tips here:
http://xarray.pydata.org/en/stable/dask.html#chunking-and-performance

More ambitiously I could imagine an API such as array1.chunk('place') or array1.chunk('auto') meaning to figure out a reasonable chunking scheme only once .compute() is called so that all the compute steps are known.

Yes, this would be great.

Maybe this is more specific to dask than xarray. I believe it would also be difficult.

I agree with both!

@mrocklin
Copy link
Contributor

I'm curious, how long does this line take:

r = spearman_correlation(array1.chunk({'place': 10}), array2.chunk({'place': 10}), 'time')

Have you consider setting name=False in your from_array call by default when doing this? I often avoid creating deterministic names when going back and forth rapidly between dask.array and numpy.

@shoyer
Copy link
Member Author

shoyer commented Aug 24, 2017

@mrocklin Yes, that took a few seconds (due to hashing the array contents). Would you suggest setting name=False by default for xarray's chunk() method?

@mrocklin
Copy link
Contributor

Yes if you don't care strongly about deduplication. The following will be slower:

b = (a.chunk(...) + 1) + (a.chunk(...) + 1)

In current operation this will be optimized to

tmp = a.chunk(...) + 1
b = tmp + tmp

So you'll lose that, but I suspect that in your case chunking the same dataset many times is somewhat rare.

@shoyer
Copy link
Member Author

shoyer commented Aug 24, 2017

@mrocklin I split that discussion off to #1525.

@nbren12
Copy link
Contributor

nbren12 commented Sep 9, 2017

This looks great!

I am not sure if this is the right place to bring this up, but is there any way to add ghost cell functionality to apply_ufunc or perhaps to create an apply_ufunc_ghosted function. This would be very handy for performing finite differences and filters. I have successfully used dask.ghost.map_blocksas well as dask.array.ghost.ghost+dask.array.core.atop for this, so I doubtt this would be too hard.

@shoyer
Copy link
Member Author

shoyer commented Sep 9, 2017

@nbren12 Probably the best way to do ghosting with the current interface is to write a function that acts on dask array objects to apply the ghosting, and then apply it using apply_ufunc. I don't see an easy way to incorporate it into the current interface, which is already getting pretty complicated.

@nbren12
Copy link
Contributor

nbren12 commented Sep 10, 2017

Ok thanks. just so I understand you correctly, are you recommending something like this:

xarr_ghosted  =  apply_ufunc(partial(da.ghost.ghost, ...), xarr, dask='allowed',...)
fd_ghosted = apply_ufunc(finite_difference_func, xarr_ghosted, dask='parallelized',...)
fd   = apply_ufunc(partial(da.ghost.trim_internal, ...), fd_ghosted, dask='allowed')

Wouldn't xarray complain because the ghosted axes data would have different size than the corresponding coordinates?

@spencerkclark
Copy link
Member

@nbren12 for similar use cases I've had success writing a single function that does the ghosting, applies a function with map_blocks, and trims the edges. Then I apply that single function on a DataArray with apply_ufunc (so a single call to apply_ufunc rather than three). As an example, a simple centered difference on an array with periodic boundaries might be accomplished with:

def centered_diff_numpy(arr, axis=-1, spacing=1.):
    return (np.roll(arr, -1, axis=axis) - np.roll(arr, 1, axis=axis)) / (2. * spacing)

def centered_diff(da, dim, spacing=1.):
    def apply_centered_diff(arr, spacing=1.):
        if isinstance(arr, np.ndarray):
            return centered_diff_numpy(arr, spacing=spacing)
        else:
            axis = len(arr.shape) - 1
            g = darray.ghost.ghost(arr, depth={axis: 1}, boundary={axis: 'periodic'})
            result = darray.map_blocks(centered_diff_numpy, g, spacing=spacing)
            return darray.ghost.trim_internal(result, {axis: 1})
    
    return computation.apply_ufunc(
        apply_centered_diff, da, input_core_dims=[[dim]],
        output_core_dims=[[dim]], dask_array='allowed', kwargs={'spacing': spacing})

Depending on your use case, you might also consider dask.ghost.map_overlap to do all of those three steps in one line, i.e. replace apply_centered_diff with the following:

def apply_centered_diff(arr, spacing=1.):
    if isinstance(arr, np.ndarray):
        return centered_diff_numpy(arr, spacing=spacing)
    else:
        axis = len(arr.shape) - 1
        return darray.ghost.map_overlap(
            arr, centered_diff_numpy, depth={axis: 1}, boundary={axis: 'periodic'},
            spacing=spacing)

(Not sure if this is what @shoyer had in mind, but just offering an example)

@nbren12
Copy link
Contributor

nbren12 commented Sep 10, 2017

Hey Spencer! Thanks. That makes much more sense. I have written nearly identical code for centered differencing, but did not know about apply_ufunc, so I had a couple manual calls to transpose and xr.DataArray. The thing I was hoping for an official way to pass centerd_diff_numpy directly to apply_ufunc, which would avoid some of the boiler-plate in your centered_diff function, which basically rewrites much of the code in this PRs version apply_ufunc.

@nbren12
Copy link
Contributor

nbren12 commented Sep 10, 2017

I guess the key issue here is that some computations (E.g. finite differences) cannot be boiled down to passing one numpy function to atop. Instead, these calculations consist of three steps: 1) some dask calls, 2) a call to atop, map_blocks, etc, and 3) some more dask calls. In spencer's example, step 1 would be a the call to da.ghost.ghost and step 3 would be the call to da.ghost.trim_internal. While many dask functions have analogies in XArray, there are others that do not, such as da.ghost.ghost, so it won't always be possible to support this three step class of calculations by replacing the dask calls before and after atop with xarray calls.

@shoyer Would it unreasonably complicated to add some sort of pre_dask_atop and post_dask_atop arguments to apply_ufunc. Alternatively apply_ufunc could see if the func object has a pre_dask_atop method, and apply it if it does.

@shoyer
Copy link
Member Author

shoyer commented Sep 17, 2017

Alternatively apply_ufunc could see if the func object has a pre_dask_atop method, and apply it if it does.

This seems like a reasonable option to me. Once we get this merged, want to make a PR?

@jhamman could you give this a review? I have not included extensive documentation yet, but I am also reluctant to squeeze that into this PR before we make it public API. (Which I'd like to save for another one.)

@nbren12
Copy link
Contributor

nbren12 commented Sep 17, 2017 via email

Copy link
Member

@jhamman jhamman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shoyer - this is looking quite nice/clean. I had one comment that really just comes down to making sure the inputs are well defined and that the errors are clear to the user.

I haven't used dask's atop function so maybe @spencerkclark or @nbren12 could review that section to add one more set of eyes to this before we merge.

if output_dtypes is None:
raise ValueError('output dtypes (output_dtypes) must be supplied to '
"apply_func when using dask='parallelized'")
if len(output_dtypes) != signature.num_outputs:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to make sure output_dtypes is an iterable before calling len.

Copy link
Contributor

@nbren12 nbren12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, i took a look and the atop call looks pretty similar to what I am used to. I was having a little trouble parsing the dosctring on apply ufunc though. I might submit a separate issue about that.

array1 = da.from_array(rs.randn(4, 4), chunks=(2, 2))
array2 = da.from_array(rs.randn(4, 4), chunks=(2, 2))
data_array_1 = xr.DataArray(array1, dims=('x', 'y'))
data_array_2 = xr.DataArray(array2, dims=('x', 'y'))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worthwhile to add a test which tries apply_ufunc on DataArrays which only share one dimension.

@spencerkclark
Copy link
Member

I was not aware of dask's atop function before reading this PR (it looks pretty cool), so I defer to @nbren12 there.

@shoyer
Copy link
Member Author

shoyer commented Sep 19, 2017

I have a design question here: how should we handle cases where a core dimension exists in multiple chunks? For example, suppose you are applying a function that needs access to every point along the "time" axis at once (e.g., an auto-correlation function).

Should we:

  1. Automatically rechunk along "time" into a single chunk, or
  2. Raise an error, and require the user to rechunk manually (xref Short-cut for rechunking an array into a single chunk along an axis? dask/dask#2689 for API on this)

Currently we do behavior 1, but behavior 2 might be more user friendly. Otherwise it could be pretty easy to inadvertently pass in a dask array (e.g., in small chunks along time) that apply_ufunc would load into memory by putting in a single chunk.

dask.array has some heuristics to protect against this in rechunk() but I'm not sure they are effective enough to catch this. (@mrocklin?)

@mrocklin
Copy link
Contributor

The heuristics we have are I think just of the form "did you make way more chunks than you had previously". I can imagine other heuristics of the form "some of your new chunks are several times larger than your previous chunks". In general these heuristics might be useful in several places. It might make sense to build them in a dask/array/utils.py file.

@jhamman
Copy link
Member

jhamman commented Sep 20, 2017

@shoyer - My vote is for something closer to #2.

Your example scenario is something I run into frequently. In cases like this, I think its better to tell the user that they are not providing an appropriate input rather than attempting to rechunk a dataset.

(This is somewhat related to #1440)

@jhamman
Copy link
Member

jhamman commented Oct 9, 2017

@shoyer - anything left to do here?

@shoyer
Copy link
Member Author

shoyer commented Oct 9, 2017

I think this is ready.

@jhamman
Copy link
Member

jhamman commented Oct 9, 2017

Great. Go ahead and merge it then. I'm very excited about this feature.

@shoyer shoyer merged commit b46fcd6 into pydata:master Oct 9, 2017
@shoyer
Copy link
Member Author

shoyer commented Oct 9, 2017

I'll start on my PR to expose this as public API -- hopefully will make some progress on my flight from NY to SF tonight.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants