Skip to content

Commit

Permalink
Rolling window with as_strided (#1837)
Browse files Browse the repository at this point in the history
* Rolling_window for np.ndarray

* Add pad method to Variable

* Added rolling_window to DataArray and Dataset

* remove pad_value option. Support dask.rolling_window

* Refactor rolling.reduce

* add as_strided to npcompat. Tests added for reduce(np.nanmean)

* Support boolean in maybe_promote

* move rolling_window into duck_array_op. Make DataArray.rolling_window public.

* Added to_dataarray and to_dataset to rolling object.

* Use pad in rolling to make compatible to pandas. Expose pad_with_fill_value to public.

* Refactor rolling

* flake8

* Added a comment for dask's pad.

* Use fastpath in rolling.to_dataarray

* Doc added.

* Revert not to use fastpath

* Remove maybe_prompt for Boolean. Some improvements based on @shoyer's review.

* Update test.

* Bug fix in test_rolling_count_correct

* fill_value for boolean array

* rolling_window(array, axis, window) -> rolling_window(array, window, axis)

* support stride in rolling.to_dataarray

* flake8

* Improve doc. Add DataArrayRolling to api.rst

* Improve docs in common.rolling.

* Expose groupby docs to public

* Default fill_value=dtypes.NA, stride=1. Add comment for DataArrayRollig.

* Default fill_value=dtypes.NA, stride=1. Add comment for DataArrayRollig.

* Add fill_value option to rolling.to_dataarray

* Convert non-numeric array in reduce.

* Fill_value = False for boolean array in rolling.reduce

* Support old numpy plus bottleneck combination. Suppress warning for all-nan slice reduce.

* flake8

* Add benchmark

* Dataset.count. Benchmark

* Classize benchmark

* Decoratorize for asv benchmark

* Classize benchmarks/indexing.py

* Working with nanreduce

* Support .sum for object dtype.

* Remove unused if-statements.

* Default skipna for rolling.reduce

* Pass tests. Test added to make sure the consistency to pandas' behavior.

* Delete duplicate file. flake8

* flake8 again

* Working with numpy<1.13

* Revert "Classize benchmarks/indexing.py"

This reverts commit 4189d71.

* rolling_window with dask.ghost

* Optimize rolling.count.

* Fixing style errors.

* Remove unused npcompat.nansum etc

* flake8

* require_dask -> has_dask

* npcompat -> np

* flake8

* Skip tests for old numpy.

* Improve doc. Optmize missing._get_valid_fill_mask

* to_dataarray -> construct

* remove assert_allclose_with_nan

* Fixing style errors.

* typo

* `to_dataset` -> `construct`

* Update doc

* Change boundary and add comments for dask_rolling_window.

* Refactor dask_array_ops.rolling_window and np_utils.rolling_window

* flake8

* Simplify tests

* flake8 again.

* cleanup roling_window for dask.

* remove duplicates

* remvove duplicate

* flake8

* delete unnecessary file.
  • Loading branch information
fujiisoup authored Mar 1, 2018
1 parent f3bbb3e commit dc3eebf
Show file tree
Hide file tree
Showing 19 changed files with 824 additions and 159 deletions.
8 changes: 8 additions & 0 deletions asv_bench/benchmarks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@
_counter = itertools.count()


def parameterized(names, params):
def decorator(func):
func.param_names = names
func.params = params
return func
return decorator


def requires_dask():
try:
import dask # noqa
Expand Down
50 changes: 50 additions & 0 deletions asv_bench/benchmarks/rolling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import pandas as pd
import xarray as xr

from . import parameterized, randn, requires_dask

nx = 3000
ny = 2000
nt = 1000
window = 20


class Rolling(object):
def setup(self, *args, **kwargs):
self.ds = xr.Dataset(
{'var1': (('x', 'y'), randn((nx, ny), frac_nan=0.1)),
'var2': (('x', 't'), randn((nx, nt))),
'var3': (('t', ), randn(nt))},
coords={'x': np.arange(nx),
'y': np.linspace(0, 1, ny),
't': pd.date_range('1970-01-01', periods=nt, freq='D'),
'x_coords': ('x', np.linspace(1.1, 2.1, nx))})

@parameterized(['func', 'center'],
(['mean', 'count'], [True, False]))
def time_rolling(self, func, center):
getattr(self.ds.rolling(x=window, center=center), func)()

@parameterized(['window_', 'min_periods'],
([20, 40], [5, None]))
def time_rolling_np(self, window_, min_periods):
self.ds.rolling(x=window_, center=False,
min_periods=min_periods).reduce(getattr(np, 'nanmean'))

@parameterized(['center', 'stride'],
([True, False], [1, 200]))
def time_rolling_construct(self, center, stride):
self.ds.rolling(x=window, center=center).construct(
'window_dim', stride=stride).mean(dim='window_dim')


class RollingDask(Rolling):
def setup(self, *args, **kwargs):
requires_dask()
super(RollingDask, self).setup(**kwargs)
self.ds = self.ds.chunk({'x': 100, 'y': 50, 't': 50})
26 changes: 26 additions & 0 deletions doc/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,32 @@ DataArray methods
DataArray.load
DataArray.chunk

Rolling objects
===============

.. autosummary::
:toctree: generated/

core.rolling.DataArrayRolling
core.rolling.DataArrayRolling.construct
core.rolling.DataArrayRolling.reduce
core.rolling.DatasetRolling
core.rolling.DatasetRolling.construct
core.rolling.DatasetRolling.reduce

GroupByObjects
==============

.. autosummary::
:toctree: generated/

core.groupby.DataArrayGroupBy
core.groupby.DataArrayGroupBy.apply
core.groupby.DataArrayGroupBy.reduce
core.groupby.DatasetGroupBy
core.groupby.DatasetGroupBy.apply
core.groupby.DatasetGroupBy.reduce

Plotting
========

Expand Down
30 changes: 26 additions & 4 deletions doc/computation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,42 @@ Aggregation and summary methods can be applied directly to the ``Rolling`` objec
r.mean()
r.reduce(np.std)
Note that rolling window aggregations are much faster (both asymptotically and
because they avoid a loop in Python) when bottleneck_ is installed. Otherwise,
we fall back to a slower, pure Python implementation.
Note that rolling window aggregations are faster when bottleneck_ is installed.

.. _bottleneck: https://github.com/kwgoodman/bottleneck/

Finally, we can manually iterate through ``Rolling`` objects:
We can also manually iterate through ``Rolling`` objects:

.. ipython:: python
@verbatim
for label, arr_window in r:
# arr_window is a view of x
Finally, the rolling object has ``construct`` method, which gives a
view of the original ``DataArray`` with the windowed dimension attached to
the last position.
You can use this for more advanced rolling operations, such as strided rolling,
windowed rolling, convolution, short-time FFT, etc.

.. ipython:: python
# rolling with 2-point stride
rolling_da = r.construct('window_dim', stride=2)
rolling_da
rolling_da.mean('window_dim', skipna=False)
Because the ``DataArray`` given by ``r.construct('window_dim')`` is a view
of the original array, it is memory efficient.

.. note::
numpy's Nan-aggregation functions such as ``nansum`` copy the original array.
In xarray, we internally use these functions in our aggregation methods
(such as ``.sum()``) if ``skipna`` argument is not specified or set to True.
This means ``rolling_da.mean('window_dim')`` is memory inefficient.
To avoid this, use ``skipna=False`` as the above example.


.. _compute.broadcasting:

Broadcasting by dimension name
Expand Down
14 changes: 13 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ Documentation
Enhancements
~~~~~~~~~~~~

- Improve :py:func:`~xarray.DataArray.rolling` logic.
:py:func:`~xarray.DataArrayRolling` object now supports
:py:func:`~xarray.DataArrayRolling.construct` method that returns a view
of the DataArray / Dataset object with the rolling-window dimension added
to the last axis. This enables more flexible operation, such as strided
rolling, windowed rolling, ND-rolling, short-time FFT and convolution.
(:issue:`1831`, :issue:`1142`, :issue:`819`)
By `Keisuke Fujii <https://github.com/fujiisoup>`_.

Bug fixes
~~~~~~~~~

Expand All @@ -64,7 +73,6 @@ Documentation

Enhancements
~~~~~~~~~~~~

**New functions and methods**:

- Added :py:meth:`DataArray.to_iris` and
Expand Down Expand Up @@ -140,6 +148,10 @@ Enhancements

Bug fixes
~~~~~~~~~
- Rolling aggregation with ``center=True`` option now gives the same result
with pandas including the last element (:issue:`1046`).
By `Keisuke Fujii <https://github.com/fujiisoup>`_.

- Support indexing with a 0d-np.ndarray (:issue:`1921`).
By `Keisuke Fujii <https://github.com/fujiisoup>`_.
- Added warning in api.py of a netCDF4 bug that occurs when
Expand Down
16 changes: 12 additions & 4 deletions xarray/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,11 @@ def groupby(self, group, squeeze=True):
grouped : GroupBy
A `GroupBy` object patterned after `pandas.GroupBy` that can be
iterated over in the form of `(unique_value, grouped_array)` pairs.
See Also
--------
core.groupby.DataArrayGroupBy
core.groupby.DatasetGroupBy
"""
return self._groupby_cls(self, group, squeeze=squeeze)

Expand Down Expand Up @@ -483,9 +488,6 @@ def rolling(self, min_periods=None, center=False, **windows):
"""
Rolling window object.
Rolling window aggregations are much faster when bottleneck is
installed.
Parameters
----------
min_periods : int, default None
Expand All @@ -503,7 +505,8 @@ def rolling(self, min_periods=None, center=False, **windows):
Returns
-------
rolling : type of input argument
Rolling object (core.rolling.DataArrayRolling for DataArray,
core.rolling.DatasetRolling for Dataset.)
Examples
--------
Expand Down Expand Up @@ -531,6 +534,11 @@ def rolling(self, min_periods=None, center=False, **windows):
array([ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10.])
Coordinates:
* time (time) datetime64[ns] 2000-02-15 2000-03-15 2000-04-15 ...
See Also
--------
core.rolling.DataArrayRolling
core.rolling.DatasetRolling
"""

return self._rolling_cls(self, min_periods=min_periods,
Expand Down
74 changes: 72 additions & 2 deletions xarray/core/dask_array_ops.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Define core operations for xarray objects.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
from . import nputils

try:
import dask.array as da
Expand All @@ -24,3 +27,70 @@ def dask_rolling_wrapper(moving_func, a, window, min_count=None, axis=-1):
# trim array
result = da.ghost.trim_internal(out, depth)
return result


def rolling_window(a, axis, window, center, fill_value):
""" Dask's equivalence to np.utils.rolling_window """
orig_shape = a.shape
# inputs for ghost
if axis < 0:
axis = a.ndim + axis
depth = {d: 0 for d in range(a.ndim)}
depth[axis] = int(window / 2)
# For evenly sized window, we need to crop the first point of each block.
offset = 1 if window % 2 == 0 else 0

if depth[axis] > min(a.chunks[axis]):
raise ValueError(
"For window size %d, every chunk should be larger than %d, "
"but the smallest chunk size is %d. Rechunk your array\n"
"with a larger chunk size or a chunk size that\n"
"more evenly divides the shape of your array." %
(window, depth[axis], min(a.chunks[axis])))

# Although dask.ghost pads values to boundaries of the array,
# the size of the generated array is smaller than what we want
# if center == False.
if center:
start = int(window / 2) # 10 -> 5, 9 -> 4
end = window - 1 - start
else:
start, end = window - 1, 0
pad_size = max(start, end) + offset - depth[axis]
drop_size = 0
# pad_size becomes more than 0 when the ghosted array is smaller than
# needed. In this case, we need to enlarge the original array by padding
# before ghosting.
if pad_size > 0:
if pad_size < depth[axis]:
# Ghosting requires each chunk larger than depth. If pad_size is
# smaller than the depth, we enlarge this and truncate it later.
drop_size = depth[axis] - pad_size
pad_size = depth[axis]
shape = list(a.shape)
shape[axis] = pad_size
chunks = list(a.chunks)
chunks[axis] = (pad_size, )
fill_array = da.full(shape, fill_value, dtype=a.dtype, chunks=chunks)
a = da.concatenate([fill_array, a], axis=axis)

boundary = {d: fill_value for d in range(a.ndim)}

# create ghosted arrays
ag = da.ghost.ghost(a, depth=depth, boundary=boundary)

# apply rolling func
def func(x, window, axis=-1):
x = np.asarray(x)
rolling = nputils._rolling_window(x, window, axis)
return rolling[(slice(None), ) * axis + (slice(offset, None), )]

chunks = list(a.chunks)
chunks.append(window)
out = ag.map_blocks(func, dtype=a.dtype, new_axis=a.ndim, chunks=chunks,
window=window, axis=axis)

# crop boundary.
index = (slice(None),) * axis + (slice(drop_size,
drop_size + orig_shape[axis]), )
return out[index]
19 changes: 18 additions & 1 deletion xarray/core/duck_array_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import numpy as np
import pandas as pd

from . import dtypes, npcompat
from . import dask_array_ops, dtypes, npcompat, nputils
from .nputils import nanfirst, nanlast
from .pycompat import dask_array_type

Expand Down Expand Up @@ -278,6 +278,10 @@ def f(values, axis=None, skipna=None, **kwargs):
dtype = kwargs.get('dtype', None)
values = asarray(values)

# dask requires dtype argument for object dtype
if (values.dtype == 'object' and name in ['sum', ]):
kwargs['dtype'] = values.dtype if dtype is None else dtype

if coerce_strings and values.dtype.kind in 'SU':
values = values.astype(object)

Expand Down Expand Up @@ -369,3 +373,16 @@ def last(values, axis, skipna=None):
_fail_on_dask_array_input_skipna(values)
return nanlast(values, axis)
return take(values, -1, axis=axis)


def rolling_window(array, axis, window, center, fill_value):
"""
Make an ndarray with a rolling window of axis-th dimension.
The rolling dimension will be placed at the last dimension.
"""
if isinstance(array, dask_array_type):
return dask_array_ops.rolling_window(
array, axis, window, center, fill_value)
else: # np.ndarray
return nputils.rolling_window(
array, axis, window, center, fill_value)
7 changes: 6 additions & 1 deletion xarray/core/missing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import numpy as np
import pandas as pd

from . import rolling
from .computation import apply_ufunc
from .npcompat import flip
from .pycompat import iteritems
Expand Down Expand Up @@ -326,4 +327,8 @@ def _get_valid_fill_mask(arr, dim, limit):
'''helper function to determine values that can be filled when limit is not
None'''
kw = {dim: limit + 1}
return arr.isnull().rolling(min_periods=1, **kw).sum() <= limit
# we explicitly use construct method to avoid copy.
new_dim = rolling._get_new_dimname(arr.dims, '_window')
return (arr.isnull().rolling(min_periods=1, **kw)
.construct(new_dim, fill_value=False)
.sum(new_dim, skipna=False)) <= limit
11 changes: 11 additions & 0 deletions xarray/core/npcompat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
from __future__ import absolute_import, division, print_function

import numpy as np
from distutils.version import LooseVersion


if LooseVersion(np.__version__) >= LooseVersion('1.12'):
as_strided = np.lib.stride_tricks.as_strided
else:
def as_strided(x, shape=None, strides=None, subok=False, writeable=True):
array = np.lib.stride_tricks.as_strided(x, shape, strides, subok)
array.setflags(write=writeable)
return array


try:
from numpy import nancumsum, nancumprod, flip
Expand Down
Loading

0 comments on commit dc3eebf

Please sign in to comment.