-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix safe chunks validation #9513
Changes from 5 commits
85c49d3
0160d48
60a7a3f
6c41f4b
5d91250
0f14b77
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -112,7 +112,7 @@ def __getitem__(self, key): | |
# could possibly have a work-around for 0d data here | ||
|
||
|
||
def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks): | ||
def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks, region): | ||
""" | ||
Given encoding chunks (possibly None or []) and variable chunks | ||
(possibly None or []). | ||
|
@@ -163,7 +163,7 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks): | |
|
||
if len(enc_chunks_tuple) != ndim: | ||
# throw away encoding chunks, start over | ||
return _determine_zarr_chunks(None, var_chunks, ndim, name, safe_chunks) | ||
return _determine_zarr_chunks(None, var_chunks, ndim, name, safe_chunks, region) | ||
|
||
for x in enc_chunks_tuple: | ||
if not isinstance(x, int): | ||
|
@@ -189,20 +189,38 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks): | |
# TODO: incorporate synchronizer to allow writes from multiple dask | ||
# threads | ||
if var_chunks and enc_chunks_tuple: | ||
for zchunk, dchunks in zip(enc_chunks_tuple, var_chunks, strict=True): | ||
for dchunk in dchunks[:-1]: | ||
base_error = ( | ||
f"Specified zarr chunks encoding['chunks']={enc_chunks_tuple!r} for " | ||
f"variable named {name!r} would overlap multiple dask chunks {var_chunks!r}. " | ||
f"Writing this array in parallel with dask could lead to corrupted data." | ||
f"Consider either rechunking using `chunk()`, deleting " | ||
f"or modifying `encoding['chunks']`, or specify `safe_chunks=False`." | ||
) | ||
|
||
for zchunk, dchunks, interval in zip( | ||
enc_chunks_tuple, var_chunks, region, strict=True | ||
): | ||
if not safe_chunks or len(dchunks) <= 1: | ||
# It is not necessary to perform any additional validation if the | ||
# safe_chunks is False, or there are less than two dchunks | ||
continue | ||
|
||
start = 0 | ||
if interval.start: | ||
# If the start of the interval is not None or 0, it means that the data | ||
# is being appended or updated, and in both cases it is mandatory that | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure we don't need to distinguish between append mode vs. others? If we have an array of (5,5,5) and we write to location (4,10) in chunks of [(4,5),(6,10)], then that's only OK if we're appending — otherwise we could be writing (1,4) in another process? (To the extent the proposed code dominates the existing code — i.e. the existing code still has this problem — I would still vote to merge even if we can't solve that on this iteration, albeit with a TODO) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you specify that case using Xarray? I'm not able to understand correctly the chunk sizes that you are indicating, but I think that it is not necessary to distinguish between append and others because Xarray internally represents all as a region write, and for the update region it is always going to apply the logic of check if the first chunk is aligned with the border_size + N * zarr_chunk_size, for any integer N >= 0 and border_size being the amount of data between interval.start and the corresponding chunk end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Take the example above: This seems safe or unsafe depending on whether we're appending:
(tbc, you seem to understand this overall better than I do, so I suspect I'm wrong, but also that this is important enough that it's worth clarifying) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that case is already covered by the validation, if the first chunk of the array (the blue box) is not of the size of the region that needs to be updated it is also going to raise an error. For example, if you see the test on the image (test_zarr_region_chunk_partial_offset) the first chunk of the array is going to be written from position 5 until position 15, which means that it is going to touch only half of the first and second Zarr chunk, this would be completely valid if there is no other chunk in the array, but as there is another one it means that the second Zarr chunk and, only the second one is going to suffer the issue of more than one Dask chunk being written to the same Zarr chunk in parallel. border_size = zchunk - interval.start % zchunk
if dchunks[0] % zchunk != border_size: the translation of that condition is that the first chunk of Dask must be always greater or equal to the portion of the data that needs to be updated on the first Zarr chunk (the border size), and if it is greater then it must be exactly aligned with the other Zarr chunks unless there is no other Dask chunk There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I hope these examples help to better understand my explanation. import xarray as xr
path = "C:/Users/josep/Desktop/test_zarr3"
arr = xr.DataArray(list(range(10)), dims=["a"], coords={"a": list(range(10))}, name="foo").chunk(a=3)
arr.to_zarr(path, mode="w")
try:
arr.isel(a=slice(0, 3)).chunk(a=(2, 1)).to_zarr(path, region="auto")
except ValueError:
print("Chunk error, the first chunk is not completely covering the border size")
# This is valid the border size is covered by the first chunk of Dask
arr.isel(a=slice(1, 4)).chunk(a=(2, 1)).to_zarr(path, region="auto")
try:
arr.isel(a=slice(1, 5)).chunk(a=(3, 1)).to_zarr(path, region="auto")
except ValueError:
print("Chunk error, the first chunk is covering the border size but it is not completely covering the second chunk")
# This is valid because there is a single dask chunk, so it is not possible to write multiple chunks in parallel
arr.isel(a=slice(1, 5)).chunk(a=(4, )).to_zarr(path, region="auto")
try:
arr.isel(a=slice(0, 5)).chunk(a=(3, 1, 1)).to_zarr(path, region="auto")
except ValueError:
print("Chunk error, the first chunk is correct but the other two chunks are overlapping the same Zarr chunk")
# This is the simplest case, the first chunk is completely covering the first Zarr chunk
arr.isel(a=slice(0, 5)).chunk(a=(3, 2)).to_zarr(path, region="auto")
# The first chunk is covering the border size (2 elements) and also the second chunk (3 elements), so it is valid
arr.isel(a=slice(1, 8)).chunk(a=(5, 2)).to_zarr(path, region="auto") There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh ok, you are right it is hard to detect that alignment problem, and if that would be the meaning of the mode="a" it has sense, just that right now if I try to write on a region using that mode it raises the following error: arr.isel(a=slice(1, 8)).chunk(a=(5, 2)).to_zarr(path, region="auto", mode="a")
ValueError: ``mode`` must be 'r+' when using ``region='auto'``, got 'a' Do you think that we should allow to write on region using the "a" mode? if yes then I could proceed to change the validation code so it satisfy the condition that you specified before (partial writes for "a" and non-partial writes for "r+") There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that would be a nice addition, yes! Though would need to refresh my memory to be confident, and others may have views. I would encourage landing this PR with the improvements, and then another PR with that change, unless that's difficult! Thank you! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds fine, so would this PR be ready then? and just to clarify would you like that in the other PR I add the following features?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I added one comment above, otherwise I think so! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That would be really good, we've fallen behind a bit on docs. Possibly an LLM can help if that makes improving the docs a bit more appealing :) |
||
# the residue of the division between the first dchunk and the zchunk | ||
# being equal to the border size | ||
border_size = zchunk - interval.start % zchunk | ||
if dchunks[0] % zchunk != border_size: | ||
raise ValueError(base_error) | ||
# Avoid validating the first chunk inside the loop | ||
start = 1 | ||
|
||
for dchunk in dchunks[start:-1]: | ||
if dchunk % zchunk: | ||
base_error = ( | ||
f"Specified zarr chunks encoding['chunks']={enc_chunks_tuple!r} for " | ||
f"variable named {name!r} would overlap multiple dask chunks {var_chunks!r}. " | ||
f"Writing this array in parallel with dask could lead to corrupted data." | ||
) | ||
if safe_chunks: | ||
raise ValueError( | ||
base_error | ||
+ " Consider either rechunking using `chunk()`, deleting " | ||
"or modifying `encoding['chunks']`, or specify `safe_chunks=False`." | ||
) | ||
raise ValueError(base_error) | ||
|
||
return enc_chunks_tuple | ||
|
||
raise AssertionError("We should never get here. Function logic must be wrong.") | ||
|
@@ -243,14 +261,15 @@ def _get_zarr_dims_and_attrs(zarr_obj, dimension_key, try_nczarr): | |
|
||
|
||
def extract_zarr_variable_encoding( | ||
variable, raise_on_invalid=False, name=None, safe_chunks=True | ||
variable, region, raise_on_invalid=False, name=None, safe_chunks=True | ||
): | ||
""" | ||
Extract zarr encoding dictionary from xarray Variable | ||
|
||
Parameters | ||
---------- | ||
variable : Variable | ||
region: tuple[slice] | ||
raise_on_invalid : bool, optional | ||
|
||
Returns | ||
|
@@ -285,7 +304,12 @@ def extract_zarr_variable_encoding( | |
del encoding[k] | ||
|
||
chunks = _determine_zarr_chunks( | ||
encoding.get("chunks"), variable.chunks, variable.ndim, name, safe_chunks | ||
encoding.get("chunks"), | ||
variable.chunks, | ||
variable.ndim, | ||
name, | ||
safe_chunks, | ||
region, | ||
) | ||
encoding["chunks"] = chunks | ||
return encoding | ||
|
@@ -762,16 +786,9 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No | |
if v.encoding == {"_FillValue": None} and fill_value is None: | ||
v.encoding = {} | ||
|
||
# We need to do this for both new and existing variables to ensure we're not | ||
# writing to a partial chunk, even though we don't use the `encoding` value | ||
# when writing to an existing variable. See | ||
# https://github.com/pydata/xarray/issues/8371 for details. | ||
encoding = extract_zarr_variable_encoding( | ||
v, | ||
raise_on_invalid=vn in check_encoding_set, | ||
name=vn, | ||
safe_chunks=self._safe_chunks, | ||
) | ||
zarr_array = None | ||
write_region = self._write_region if self._write_region is not None else {} | ||
write_region = {dim: write_region.get(dim, slice(None)) for dim in dims} | ||
|
||
if name in existing_keys: | ||
# existing variable | ||
|
@@ -801,7 +818,36 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No | |
) | ||
else: | ||
zarr_array = self.zarr_group[name] | ||
else: | ||
|
||
if self._append_dim is not None and self._append_dim in dims: | ||
# resize existing variable | ||
append_axis = dims.index(self._append_dim) | ||
assert write_region[self._append_dim] == slice(None) | ||
write_region[self._append_dim] = slice( | ||
zarr_array.shape[append_axis], None | ||
) | ||
|
||
new_shape = list(zarr_array.shape) | ||
new_shape[append_axis] += v.shape[append_axis] | ||
zarr_array.resize(new_shape) | ||
|
||
region = tuple(write_region[dim] for dim in dims) | ||
|
||
# We need to do this for both new and existing variables to ensure we're not | ||
# writing to a partial chunk, even though we don't use the `encoding` value | ||
# when writing to an existing variable. See | ||
# https://github.com/pydata/xarray/issues/8371 for details. | ||
# Note: Ideally there should be two functions, one for validating the chunks and | ||
# another one for extracting the encoding. | ||
encoding = extract_zarr_variable_encoding( | ||
v, | ||
region=region, | ||
raise_on_invalid=vn in check_encoding_set, | ||
name=vn, | ||
safe_chunks=self._safe_chunks, | ||
) | ||
|
||
if name not in existing_keys: | ||
# new variable | ||
encoded_attrs = {} | ||
# the magic for storing the hidden dimension data | ||
|
@@ -833,22 +879,6 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No | |
) | ||
zarr_array = _put_attrs(zarr_array, encoded_attrs) | ||
|
||
write_region = self._write_region if self._write_region is not None else {} | ||
write_region = {dim: write_region.get(dim, slice(None)) for dim in dims} | ||
|
||
if self._append_dim is not None and self._append_dim in dims: | ||
# resize existing variable | ||
append_axis = dims.index(self._append_dim) | ||
assert write_region[self._append_dim] == slice(None) | ||
write_region[self._append_dim] = slice( | ||
zarr_array.shape[append_axis], None | ||
) | ||
|
||
new_shape = list(zarr_array.shape) | ||
new_shape[append_axis] += v.shape[append_axis] | ||
zarr_array.resize(new_shape) | ||
|
||
region = tuple(write_region[dim] for dim in dims) | ||
writer.add(v.data, zarr_array, region) | ||
|
||
def close(self) -> None: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5496,24 +5496,26 @@ def test_encode_zarr_attr_value() -> None: | |
|
||
@requires_zarr | ||
def test_extract_zarr_variable_encoding() -> None: | ||
# The region is not useful in these cases, but I still think that it must be mandatory | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that ideally, the validation should be outside the extraction of the encoding, and that would avoid the use of the region inside the extract_zarr_variable_encoding function, I added a comment with that option in the zarr.py file but, I tried to reduce the amount of modifications to the code so I decided to not apply it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If needed I can add a TODO indicating that it would be good to apply that modification. |
||
# because the validation of the chunks is in the same function | ||
var = xr.Variable("x", [1, 2]) | ||
actual = backends.zarr.extract_zarr_variable_encoding(var) | ||
actual = backends.zarr.extract_zarr_variable_encoding(var, region=tuple()) | ||
assert "chunks" in actual | ||
assert actual["chunks"] is None | ||
|
||
var = xr.Variable("x", [1, 2], encoding={"chunks": (1,)}) | ||
actual = backends.zarr.extract_zarr_variable_encoding(var) | ||
actual = backends.zarr.extract_zarr_variable_encoding(var, region=tuple()) | ||
assert actual["chunks"] == (1,) | ||
|
||
# does not raise on invalid | ||
var = xr.Variable("x", [1, 2], encoding={"foo": (1,)}) | ||
actual = backends.zarr.extract_zarr_variable_encoding(var) | ||
actual = backends.zarr.extract_zarr_variable_encoding(var, region=tuple()) | ||
|
||
# raises on invalid | ||
var = xr.Variable("x", [1, 2], encoding={"foo": (1,)}) | ||
with pytest.raises(ValueError, match=r"unexpected encoding parameters"): | ||
actual = backends.zarr.extract_zarr_variable_encoding( | ||
var, raise_on_invalid=True | ||
var, raise_on_invalid=True, region=tuple() | ||
) | ||
|
||
|
||
|
@@ -6096,6 +6098,56 @@ def test_zarr_region_chunk_partial_offset(tmp_path): | |
store, safe_chunks=False, region="auto" | ||
) | ||
|
||
# This write is unsafe, and should raise an error, but does not. | ||
# with pytest.raises(ValueError): | ||
# da.isel(x=slice(5, 25)).chunk(x=(10, 10)).to_zarr(store, region="auto") | ||
with pytest.raises(ValueError): | ||
da.isel(x=slice(5, 25)).chunk(x=(10, 10)).to_zarr(store, region="auto") | ||
|
||
|
||
@requires_zarr | ||
@requires_dask | ||
def test_zarr_safe_chunk_append_dim(tmp_path): | ||
# https://github.com/pydata/xarray/pull/8459#issuecomment-1819417545 | ||
store = tmp_path / "foo.zarr" | ||
data = np.ones((20,)) | ||
da = xr.DataArray(data, dims=["x"], coords={"x": range(20)}, name="foo").chunk(x=5) | ||
|
||
da.isel(x=slice(0, 7)).to_zarr(store, safe_chunks=True, mode="w") | ||
with pytest.raises(ValueError): | ||
# If the first chunk is smaller than the border size then raise an error | ||
da.isel(x=slice(7, 11)).chunk(x=(2, 2)).to_zarr( | ||
store, append_dim="x", safe_chunks=True | ||
) | ||
|
||
da.isel(x=slice(0, 7)).to_zarr(store, safe_chunks=True, mode="w") | ||
# If the first chunk is of the size of the border size then it is valid | ||
da.isel(x=slice(7, 11)).chunk(x=(3, 1)).to_zarr( | ||
max-sixty marked this conversation as resolved.
Show resolved
Hide resolved
|
||
store, safe_chunks=True, append_dim="x" | ||
) | ||
assert xr.open_zarr(store)["foo"].equals(da.isel(x=slice(0, 11))) | ||
|
||
da.isel(x=slice(0, 7)).to_zarr(store, safe_chunks=True, mode="w") | ||
# If the first chunk is of the size of the border size + N * zchunk then it is valid | ||
da.isel(x=slice(7, 17)).chunk(x=(8, 2)).to_zarr( | ||
store, safe_chunks=True, append_dim="x" | ||
) | ||
assert xr.open_zarr(store)["foo"].equals(da.isel(x=slice(0, 17))) | ||
|
||
da.isel(x=slice(0, 7)).to_zarr(store, safe_chunks=True, mode="w") | ||
with pytest.raises(ValueError): | ||
# If the first chunk is valid but the other are not then raise an error | ||
da.isel(x=slice(7, 14)).chunk(x=(3, 3, 1)).to_zarr( | ||
store, append_dim="x", safe_chunks=True | ||
) | ||
|
||
da.isel(x=slice(0, 7)).to_zarr(store, safe_chunks=True, mode="w") | ||
with pytest.raises(ValueError): | ||
# If the first chunk have a size bigger than the border size but not enough | ||
# to complete the size of the next chunk then an error must be raised | ||
da.isel(x=slice(7, 14)).chunk(x=(4, 3)).to_zarr( | ||
store, append_dim="x", safe_chunks=True | ||
) | ||
|
||
da.isel(x=slice(0, 7)).to_zarr(store, safe_chunks=True, mode="w") | ||
# Append with a single chunk it's totally valid, | ||
# and it does not matter the size of the chunk | ||
da.isel(x=slice(7, 19)).chunk(x=-1).to_zarr(store, append_dim="x", safe_chunks=True) | ||
assert xr.open_zarr(store)["foo"].equals(da.isel(x=slice(0, 19))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re "so would this PR be ready then" — am I right in thinking the proposed code changes the behavior such that a single partial chunk can be written on
mode="r+"
? I don't think we want that yet (though am not confident, it seems unlikely to be a big problem, but still possibly confusing)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the code is going to allow writing on a single partial chunk on mode="r+".
From my perspective and based on the docs of the to_zarr method, the new behavior would not be confusing, "“r+” means modify existing array values only (raise an error if any metadata or shapes would change)", and writing in partial chunk means modify the existing array values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it breaks the rule we discussed above, re it being safe writing to different locations from distributed processes.
It's unlikely, because it's rare to write to a single partial chunk from distributed processes. But is it worth the complication, relative to forcing people to pass
mode="a"
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@max-sixty I sent the following PR #9527 which contains all the changes requested, if you want I can add everything here or close this one, whatever you prefer.