Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix safe chunks validation #9513

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ Bug fixes
<https://github.com/spencerkclark>`_.
- Fix a few bugs affecting groupby reductions with `flox`. (:issue:`8090`, :issue:`9398`).
By `Deepak Cherian <https://github.com/dcherian>`_.

- Fix the safe_chunks validation option on the to_zarr method
(:issue:`5511`, :pull:`9513`). By `Joseph Nowak
<https://github.com/josephnowak>`_.

Documentation
~~~~~~~~~~~~~
Expand Down
118 changes: 74 additions & 44 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __getitem__(self, key):
# could possibly have a work-around for 0d data here


def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks):
def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, region):
"""
Given encoding chunks (possibly None or []) and variable chunks
(possibly None or []).
Expand Down Expand Up @@ -163,7 +163,7 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks):

if len(enc_chunks_tuple) != ndim:
# throw away encoding chunks, start over
return _determine_zarr_chunks(None, var_chunks, ndim, name, safe_chunks)
return _determine_zarr_chunks(None, var_chunks, ndim, name, safe_chunks, region)

for x in enc_chunks_tuple:
if not isinstance(x, int):
Expand All @@ -189,20 +189,38 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks):
# TODO: incorporate synchronizer to allow writes from multiple dask
# threads
if var_chunks and enc_chunks_tuple:
for zchunk, dchunks in zip(enc_chunks_tuple, var_chunks, strict=True):
for dchunk in dchunks[:-1]:
base_error = (
f"Specified zarr chunks encoding['chunks']={enc_chunks_tuple!r} for "
f"variable named {name!r} would overlap multiple dask chunks {var_chunks!r}. "
f"Writing this array in parallel with dask could lead to corrupted data."
f"Consider either rechunking using `chunk()`, deleting "
f"or modifying `encoding['chunks']`, or specify `safe_chunks=False`."
)

for zchunk, dchunks, interval in zip(
enc_chunks_tuple, var_chunks, region, strict=True
):
if not safe_chunks or len(dchunks) <= 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re "so would this PR be ready then" — am I right in thinking the proposed code changes the behavior such that a single partial chunk can be written on mode="r+"? I don't think we want that yet (though am not confident, it seems unlikely to be a big problem, but still possibly confusing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the code is going to allow writing on a single partial chunk on mode="r+".
From my perspective and based on the docs of the to_zarr method, the new behavior would not be confusing, "“r+” means modify existing array values only (raise an error if any metadata or shapes would change)", and writing in partial chunk means modify the existing array values.

Copy link
Collaborator

@max-sixty max-sixty Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it breaks the rule we discussed above, re it being safe writing to different locations from distributed processes.

It's unlikely, because it's rare to write to a single partial chunk from distributed processes. But is it worth the complication, relative to forcing people to pass mode="a"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@max-sixty I sent the following PR #9527 which contains all the changes requested, if you want I can add everything here or close this one, whatever you prefer.

# It is not necessary to perform any additional validation if the
# safe_chunks is False, or there are less than two dchunks
continue

start = 0
if interval.start:
# If the start of the interval is not None or 0, it means that the data
# is being appended or updated, and in both cases it is mandatory that
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we don't need to distinguish between append mode vs. others? If we have an array of (5,5,5) and we write to location (4,10) in chunks of [(4,5),(6,10)], then that's only OK if we're appending — otherwise we could be writing (1,4) in another process?

(To the extent the proposed code dominates the existing code — i.e. the existing code still has this problem — I would still vote to merge even if we can't solve that on this iteration, albeit with a TODO)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you specify that case using Xarray? I'm not able to understand correctly the chunk sizes that you are indicating, but I think that it is not necessary to distinguish between append and others because Xarray internally represents all as a region write, and for the update region it is always going to apply the logic of check if the first chunk is aligned with the border_size + N * zarr_chunk_size, for any integer N >= 0 and border_size being the amount of data between interval.start and the corresponding chunk end.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take the example above:

image

This seems safe or unsafe depending on whether we're appending:

  • If we're appending (mode="a"), then this is safe, because we can only append in a single process, so can only be writing the blue box
  • But if we're doing a region write without append (mode="r+"), then we can be writing to the yellow and blue outlined box in two separate processes, which would cause corruption. So when mode="r+", writing half a chunk isn't safe.

(tbc, you seem to understand this overall better than I do, so I suspect I'm wrong, but also that this is important enough that it's worth clarifying)

Copy link
Contributor Author

@josephnowak josephnowak Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that case is already covered by the validation, if the first chunk of the array (the blue box) is not of the size of the region that needs to be updated it is also going to raise an error.

For example, if you see the test on the image (test_zarr_region_chunk_partial_offset) the first chunk of the array is going to be written from position 5 until position 15, which means that it is going to touch only half of the first and second Zarr chunk, this would be completely valid if there is no other chunk in the array, but as there is another one it means that the second Zarr chunk and, only the second one is going to suffer the issue of more than one Dask chunk being written to the same Zarr chunk in parallel.
image

border_size = zchunk - interval.start % zchunk
if dchunks[0] % zchunk != border_size:

the translation of that condition is that the first chunk of Dask must be always greater or equal to the portion of the data that needs to be updated on the first Zarr chunk (the border size), and if it is greater then it must be exactly aligned with the other Zarr chunks unless there is no other Dask chunk

Copy link
Contributor Author

@josephnowak josephnowak Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope these examples help to better understand my explanation.

import xarray as xr

path = "C:/Users/josep/Desktop/test_zarr3"
arr = xr.DataArray(list(range(10)), dims=["a"], coords={"a": list(range(10))}, name="foo").chunk(a=3)
arr.to_zarr(path, mode="w")

try:
    arr.isel(a=slice(0, 3)).chunk(a=(2, 1)).to_zarr(path, region="auto")
except ValueError:
    print("Chunk error, the first chunk is not completely covering the border size")

# This is valid the border size is covered by the first chunk of Dask
arr.isel(a=slice(1, 4)).chunk(a=(2, 1)).to_zarr(path, region="auto")


try:
    arr.isel(a=slice(1, 5)).chunk(a=(3, 1)).to_zarr(path, region="auto")
except ValueError:
    print("Chunk error, the first chunk is covering the border size but it is not completely covering the second chunk")

# This is valid because there is a single dask chunk, so it is not possible to write multiple chunks in parallel 
arr.isel(a=slice(1, 5)).chunk(a=(4, )).to_zarr(path, region="auto")

try:
    arr.isel(a=slice(0, 5)).chunk(a=(3, 1, 1)).to_zarr(path, region="auto")
except ValueError:
    print("Chunk error, the first chunk is correct but the other two chunks are overlapping the same Zarr chunk")

# This is the simplest case, the first chunk is completely covering the first Zarr chunk
arr.isel(a=slice(0, 5)).chunk(a=(3, 2)).to_zarr(path, region="auto")


# The first chunk is covering the border size (2 elements) and also the second chunk (3 elements), so it is valid
arr.isel(a=slice(1, 8)).chunk(a=(5, 2)).to_zarr(path, region="auto")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, you are right it is hard to detect that alignment problem, and if that would be the meaning of the mode="a" it has sense, just that right now if I try to write on a region using that mode it raises the following error:

arr.isel(a=slice(1, 8)).chunk(a=(5, 2)).to_zarr(path, region="auto", mode="a")

ValueError: ``mode`` must be 'r+' when using ``region='auto'``, got 'a'

Do you think that we should allow to write on region using the "a" mode? if yes then I could proceed to change the validation code so it satisfy the condition that you specified before (partial writes for "a" and non-partial writes for "r+")

Copy link
Collaborator

@max-sixty max-sixty Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be a nice addition, yes! Though would need to refresh my memory to be confident, and others may have views.

I would encourage landing this PR with the improvements, and then another PR with that change, unless that's difficult!

Thank you!

Copy link
Contributor Author

@josephnowak josephnowak Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fine, so would this PR be ready then? and just to clarify would you like that in the other PR I add the following features?

  1. If the mode is "r+" then only allows full chunks.
  2. If the mode is "a" allows writing using region.
  3. If the mode is "a" allows partial writes.
  4. Probably it would be good to also update the docs of the to_zarr method, honestly they do not indicate any of those behaviors.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fine, so would this PR be ready then?

I added one comment above, otherwise I think so!

Copy link
Collaborator

@max-sixty max-sixty Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1. If the mode is "r+" then only allows full chunks.

2. If the mode is "a" allows writing using region.

r+ can write using region — that's the default — as long as they are full chunks. (I think the 1&3 cover things, bullet 2 isn't needed)

3. If the mode is "a" allows partial writes.

4. Probably it would be good to also update the docs of the to_zarr method, honestly they do not indicate any of those behaviors.

That would be really good, we've fallen behind a bit on docs. Possibly an LLM can help if that makes improving the docs a bit more appealing :)

# the residue of the division between the first dchunk and the zchunk
# being equal to the border size
border_size = zchunk - interval.start % zchunk
if dchunks[0] % zchunk != border_size:
raise ValueError(base_error)
# Avoid validating the first chunk inside the loop
start = 1

for dchunk in dchunks[start:-1]:
if dchunk % zchunk:
base_error = (
f"Specified zarr chunks encoding['chunks']={enc_chunks_tuple!r} for "
f"variable named {name!r} would overlap multiple dask chunks {var_chunks!r}. "
f"Writing this array in parallel with dask could lead to corrupted data."
)
if safe_chunks:
raise ValueError(
base_error
+ " Consider either rechunking using `chunk()`, deleting "
"or modifying `encoding['chunks']`, or specify `safe_chunks=False`."
)
raise ValueError(base_error)

return enc_chunks_tuple

raise AssertionError("We should never get here. Function logic must be wrong.")
Expand Down Expand Up @@ -243,14 +261,15 @@ def _get_zarr_dims_and_attrs(zarr_obj, dimension_key, try_nczarr):


def extract_zarr_variable_encoding(
variable, raise_on_invalid=False, name=None, safe_chunks=True
variable, region, raise_on_invalid=False, name=None, safe_chunks=True
):
"""
Extract zarr encoding dictionary from xarray Variable

Parameters
----------
variable : Variable
region: tuple[slice]
raise_on_invalid : bool, optional

Returns
Expand Down Expand Up @@ -285,7 +304,12 @@ def extract_zarr_variable_encoding(
del encoding[k]

chunks = _determine_zarr_chunks(
encoding.get("chunks"), variable.chunks, variable.ndim, name, safe_chunks
encoding.get("chunks"),
variable.chunks,
variable.ndim,
name,
safe_chunks,
region,
)
encoding["chunks"] = chunks
return encoding
Expand Down Expand Up @@ -762,16 +786,9 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
if v.encoding == {"_FillValue": None} and fill_value is None:
v.encoding = {}

# We need to do this for both new and existing variables to ensure we're not
# writing to a partial chunk, even though we don't use the `encoding` value
# when writing to an existing variable. See
# https://github.com/pydata/xarray/issues/8371 for details.
encoding = extract_zarr_variable_encoding(
v,
raise_on_invalid=vn in check_encoding_set,
name=vn,
safe_chunks=self._safe_chunks,
)
zarr_array = None
write_region = self._write_region if self._write_region is not None else {}
write_region = {dim: write_region.get(dim, slice(None)) for dim in dims}

if name in existing_keys:
# existing variable
Expand Down Expand Up @@ -801,7 +818,36 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
)
else:
zarr_array = self.zarr_group[name]
else:

if self._append_dim is not None and self._append_dim in dims:
# resize existing variable
append_axis = dims.index(self._append_dim)
assert write_region[self._append_dim] == slice(None)
write_region[self._append_dim] = slice(
zarr_array.shape[append_axis], None
)

new_shape = list(zarr_array.shape)
new_shape[append_axis] += v.shape[append_axis]
zarr_array.resize(new_shape)

region = tuple(write_region[dim] for dim in dims)

# We need to do this for both new and existing variables to ensure we're not
# writing to a partial chunk, even though we don't use the `encoding` value
# when writing to an existing variable. See
# https://github.com/pydata/xarray/issues/8371 for details.
# Note: Ideally there should be two functions, one for validating the chunks and
# another one for extracting the encoding.
encoding = extract_zarr_variable_encoding(
v,
region=region,
raise_on_invalid=vn in check_encoding_set,
name=vn,
safe_chunks=self._safe_chunks,
)

if name not in existing_keys:
# new variable
encoded_attrs = {}
# the magic for storing the hidden dimension data
Expand Down Expand Up @@ -833,22 +879,6 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
)
zarr_array = _put_attrs(zarr_array, encoded_attrs)

write_region = self._write_region if self._write_region is not None else {}
write_region = {dim: write_region.get(dim, slice(None)) for dim in dims}

if self._append_dim is not None and self._append_dim in dims:
# resize existing variable
append_axis = dims.index(self._append_dim)
assert write_region[self._append_dim] == slice(None)
write_region[self._append_dim] = slice(
zarr_array.shape[append_axis], None
)

new_shape = list(zarr_array.shape)
new_shape[append_axis] += v.shape[append_axis]
zarr_array.resize(new_shape)

region = tuple(write_region[dim] for dim in dims)
writer.add(v.data, zarr_array, region)

def close(self) -> None:
Expand Down
66 changes: 59 additions & 7 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -5496,24 +5496,26 @@ def test_encode_zarr_attr_value() -> None:

@requires_zarr
def test_extract_zarr_variable_encoding() -> None:
# The region is not useful in these cases, but I still think that it must be mandatory
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the region kwarg is required now, should we make it mandatory? Or at least add a TODO in the function if we want to push that off to another PR?

Copy link
Contributor Author

@josephnowak josephnowak Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that ideally, the validation should be outside the extraction of the encoding, and that would avoid the use of the region inside the extract_zarr_variable_encoding function, I added a comment with that option in the zarr.py file but, I tried to reduce the amount of modifications to the code so I decided to not apply it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If needed I can add a TODO indicating that it would be good to apply that modification.

# because the validation of the chunks is in the same function
var = xr.Variable("x", [1, 2])
actual = backends.zarr.extract_zarr_variable_encoding(var)
actual = backends.zarr.extract_zarr_variable_encoding(var, region=tuple())
assert "chunks" in actual
assert actual["chunks"] is None

var = xr.Variable("x", [1, 2], encoding={"chunks": (1,)})
actual = backends.zarr.extract_zarr_variable_encoding(var)
actual = backends.zarr.extract_zarr_variable_encoding(var, region=tuple())
assert actual["chunks"] == (1,)

# does not raise on invalid
var = xr.Variable("x", [1, 2], encoding={"foo": (1,)})
actual = backends.zarr.extract_zarr_variable_encoding(var)
actual = backends.zarr.extract_zarr_variable_encoding(var, region=tuple())

# raises on invalid
var = xr.Variable("x", [1, 2], encoding={"foo": (1,)})
with pytest.raises(ValueError, match=r"unexpected encoding parameters"):
actual = backends.zarr.extract_zarr_variable_encoding(
var, raise_on_invalid=True
var, raise_on_invalid=True, region=tuple()
)


Expand Down Expand Up @@ -6096,6 +6098,56 @@ def test_zarr_region_chunk_partial_offset(tmp_path):
store, safe_chunks=False, region="auto"
)

# This write is unsafe, and should raise an error, but does not.
# with pytest.raises(ValueError):
# da.isel(x=slice(5, 25)).chunk(x=(10, 10)).to_zarr(store, region="auto")
with pytest.raises(ValueError):
da.isel(x=slice(5, 25)).chunk(x=(10, 10)).to_zarr(store, region="auto")


@requires_zarr
@requires_dask
def test_zarr_safe_chunk_append_dim(tmp_path):
# https://github.com/pydata/xarray/pull/8459#issuecomment-1819417545
store = tmp_path / "foo.zarr"
data = np.ones((20,))
da = xr.DataArray(data, dims=["x"], coords={"x": range(20)}, name="foo").chunk(x=5)

da.isel(x=slice(0, 7)).to_zarr(store, safe_chunks=True, mode="w")
with pytest.raises(ValueError):
# If the first chunk is smaller than the border size then raise an error
da.isel(x=slice(7, 11)).chunk(x=(2, 2)).to_zarr(
store, append_dim="x", safe_chunks=True
)

da.isel(x=slice(0, 7)).to_zarr(store, safe_chunks=True, mode="w")
# If the first chunk is of the size of the border size then it is valid
da.isel(x=slice(7, 11)).chunk(x=(3, 1)).to_zarr(
max-sixty marked this conversation as resolved.
Show resolved Hide resolved
store, safe_chunks=True, append_dim="x"
)
assert xr.open_zarr(store)["foo"].equals(da.isel(x=slice(0, 11)))

da.isel(x=slice(0, 7)).to_zarr(store, safe_chunks=True, mode="w")
# If the first chunk is of the size of the border size + N * zchunk then it is valid
da.isel(x=slice(7, 17)).chunk(x=(8, 2)).to_zarr(
store, safe_chunks=True, append_dim="x"
)
assert xr.open_zarr(store)["foo"].equals(da.isel(x=slice(0, 17)))

da.isel(x=slice(0, 7)).to_zarr(store, safe_chunks=True, mode="w")
with pytest.raises(ValueError):
# If the first chunk is valid but the other are not then raise an error
da.isel(x=slice(7, 14)).chunk(x=(3, 3, 1)).to_zarr(
store, append_dim="x", safe_chunks=True
)

da.isel(x=slice(0, 7)).to_zarr(store, safe_chunks=True, mode="w")
with pytest.raises(ValueError):
# If the first chunk have a size bigger than the border size but not enough
# to complete the size of the next chunk then an error must be raised
da.isel(x=slice(7, 14)).chunk(x=(4, 3)).to_zarr(
store, append_dim="x", safe_chunks=True
)

da.isel(x=slice(0, 7)).to_zarr(store, safe_chunks=True, mode="w")
# Append with a single chunk it's totally valid,
# and it does not matter the size of the chunk
da.isel(x=slice(7, 19)).chunk(x=-1).to_zarr(store, append_dim="x", safe_chunks=True)
assert xr.open_zarr(store)["foo"].equals(da.isel(x=slice(0, 19)))
Loading