Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

__dask_tokenize__ #3446

Merged
merged 6 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@ v0.14.1 (unreleased)
Breaking changes
~~~~~~~~~~~~~~~~

- Minimum cftime version is now 1.0.3. By `Deepak Cherian <https://github.com/dcherian>`_.
- Broken compatibility with cftime < 1.0.3.
By `Deepak Cherian <https://github.com/dcherian>`_.

.. note::

cftime version 1.0.4 is broken (`cftime/126 <https://github.com/Unidata/cftime/issues/126>`_), use version 1.0.4.2 instead.
cftime version 1.0.4 is broken
(`cftime/126 <https://github.com/Unidata/cftime/issues/126>`_);
please use version 1.0.4.2 instead.

- All leftover support for dates from non-standard calendars through netcdftime, the
module included in versions of netCDF4 prior to 1.4 that eventually became the
cftime package, has been removed in favor of relying solely on the standalone
cftime package (:pull:`3450`). By `Spencer Clark
<https://github.com/spencerkclark>`_.
cftime package (:pull:`3450`).
By `Spencer Clark <https://github.com/spencerkclark>`_.

New Features
~~~~~~~~~~~~
Expand All @@ -52,6 +55,14 @@ New Features
for now. Enable it with :py:meth:`xarray.set_options(display_style="html")`.
(:pull:`3425`) by `Benoit Bovy <https://github.com/benbovy>`_ and
`Julia Signell <https://github.com/jsignell>`_.
- Implement `dask deterministic hashing
<https://docs.dask.org/en/latest/custom-collections.html#deterministic-hashing>`_
for xarray objects. Note that xarray objects with a dask.array backend already used
deterministic hashing in previous releases; this change implements it when whole
xarray objects are embedded in a dask graph, e.g. when :meth:`DataArray.map` is
invoked. (:issue:`3378`, :pull:`3446`)
By `Deepak Cherian <https://github.com/dcherian>`_ and
`Guido Imperiale <https://github.com/crusaderky>`_.

Bug fixes
~~~~~~~~~
Expand Down Expand Up @@ -96,6 +107,7 @@ Internal Changes
- Use Python 3.6 idioms throughout the codebase. (:pull:3419)
By `Maximilian Roos <https://github.com/max-sixty>`_


.. _whats-new.0.14.0:

v0.14.0 (14 Oct 2019)
Expand Down
3 changes: 3 additions & 0 deletions xarray/core/dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,9 @@ def reset_coords(
dataset[self.name] = self.variable
return dataset

def __dask_tokenize__(self):
return (type(self), self._variable, self._coords, self._name)

def __dask_graph__(self):
return self._to_temp_dataset().__dask_graph__()

Expand Down
3 changes: 3 additions & 0 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,9 @@ def load(self, **kwargs) -> "Dataset":

return self

def __dask_tokenize__(self):
return (type(self), self._variables, self._coord_names, self._attrs)

def __dask_graph__(self):
graphs = {k: v.__dask_graph__() for k, v in self.variables.items()}
graphs = {k: v for k, v in graphs.items() if v is not None}
Expand Down
9 changes: 9 additions & 0 deletions xarray/core/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@ def compute(self, **kwargs):
new = self.copy(deep=False)
return new.load(**kwargs)

def __dask_tokenize__(self):
# Use v.data, instead of v._data, in order to cope with the wrappers
# around NetCDF and the like
return type(self), self._dims, self.data, self._attrs

def __dask_graph__(self):
if isinstance(self._data, dask_array_type):
return self._data.__dask_graph__()
Expand Down Expand Up @@ -1963,6 +1968,10 @@ def __init__(self, dims, data, attrs=None, encoding=None, fastpath=False):
if not isinstance(self._data, PandasIndexAdapter):
self._data = PandasIndexAdapter(self._data)

def __dask_tokenize__(self):
# Don't waste time converting pd.Index to np.ndarray
return (type(self), self._dims, self._data.array, self._attrs)

def load(self):
# data is already loaded into memory for IndexVariable
return self
Expand Down
94 changes: 94 additions & 0 deletions xarray/tests/test_dask.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import operator
import pickle
import sys
from contextlib import suppress
from distutils.version import LooseVersion
from textwrap import dedent
Expand All @@ -21,12 +22,16 @@
assert_frame_equal,
assert_identical,
raises_regex,
requires_scipy_or_netCDF4,
)
from .test_backends import create_tmp_file

dask = pytest.importorskip("dask")
da = pytest.importorskip("dask.array")
dd = pytest.importorskip("dask.dataframe")

ON_WINDOWS = sys.platform == "win32"


class CountingScheduler:
""" Simple dask scheduler counting the number of computes.
Expand Down Expand Up @@ -1135,3 +1140,92 @@ def test_make_meta(map_ds):
for variable in map_ds.data_vars:
assert variable in meta.data_vars
assert meta.data_vars[variable].shape == (0,) * meta.data_vars[variable].ndim


@pytest.mark.parametrize(
"obj", [make_da(), make_da().compute(), make_ds(), make_ds().compute()]
)
@pytest.mark.parametrize(
"transform",
[
lambda x: x.reset_coords(),
lambda x: x.reset_coords(drop=True),
lambda x: x.isel(x=1),
lambda x: x.attrs.update(new_attrs=1),
lambda x: x.assign_coords(cxy=1),
lambda x: x.rename({"x": "xnew"}),
lambda x: x.rename({"cxy": "cxynew"}),
],
)
def test_token_changes_on_transform(obj, transform):
with raise_if_dask_computes():
assert dask.base.tokenize(obj) != dask.base.tokenize(transform(obj))


@pytest.mark.parametrize(
"obj", [make_da(), make_da().compute(), make_ds(), make_ds().compute()]
)
def test_token_changes_when_data_changes(obj):
with raise_if_dask_computes():
t1 = dask.base.tokenize(obj)

# Change data_var
if isinstance(obj, DataArray):
obj *= 2
else:
obj["a"] *= 2
with raise_if_dask_computes():
t2 = dask.base.tokenize(obj)
assert t2 != t1

# Change non-index coord
obj.coords["ndcoord"] *= 2
with raise_if_dask_computes():
t3 = dask.base.tokenize(obj)
assert t3 != t2

# Change IndexVariable
obj.coords["x"] *= 2
with raise_if_dask_computes():
t4 = dask.base.tokenize(obj)
assert t4 != t3


@pytest.mark.parametrize("obj", [make_da().compute(), make_ds().compute()])
def test_token_changes_when_buffer_changes(obj):
with raise_if_dask_computes():
t1 = dask.base.tokenize(obj)

if isinstance(obj, DataArray):
obj[0, 0] = 123
else:
obj["a"][0, 0] = 123
with raise_if_dask_computes():
t2 = dask.base.tokenize(obj)
assert t2 != t1

obj.coords["ndcoord"][0] = 123
with raise_if_dask_computes():
t3 = dask.base.tokenize(obj)
assert t3 != t2


@pytest.mark.parametrize(
"transform",
[lambda x: x, lambda x: x.copy(deep=False), lambda x: x.copy(deep=True)],
)
@pytest.mark.parametrize("obj", [make_da(), make_ds(), make_ds().variables["a"]])
dcherian marked this conversation as resolved.
Show resolved Hide resolved
def test_token_identical(obj, transform):
with raise_if_dask_computes():
assert dask.base.tokenize(obj) == dask.base.tokenize(transform(obj))
assert dask.base.tokenize(obj.compute()) == dask.base.tokenize(
transform(obj.compute())
)


@requires_scipy_or_netCDF4
def test_normalize_token_with_backend(map_ds):
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as tmp_file:
map_ds.to_netcdf(tmp_file)
read = xr.open_dataset(tmp_file)
assert not dask.base.tokenize(map_ds) == dask.base.tokenize(read)
22 changes: 21 additions & 1 deletion xarray/tests/test_sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from xarray.core.npcompat import IS_NEP18_ACTIVE
from xarray.core.pycompat import sparse_array_type

from . import assert_equal, assert_identical
from . import assert_equal, assert_identical, requires_dask

param = pytest.param
xfail = pytest.mark.xfail
Expand Down Expand Up @@ -849,3 +849,23 @@ def test_chunk():
dsc = ds.chunk(2)
assert dsc.chunks == {"dim_0": (2, 2)}
assert_identical(dsc, ds)


@requires_dask
def test_dask_token():
import dask

s = sparse.COO.from_numpy(np.array([0, 0, 1, 2]))
a = DataArray(s)
t1 = dask.base.tokenize(a)
t2 = dask.base.tokenize(a)
t3 = dask.base.tokenize(a + 1)
assert t1 == t2
assert t3 != t2
assert isinstance(a.data, sparse.COO)

ac = a.chunk(2)
t4 = dask.base.tokenize(ac)
t5 = dask.base.tokenize(ac + 1)
assert t4 != t5
assert isinstance(ac.data._meta, sparse.COO)