Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement FilePattern.file_type #322

Merged
merged 11 commits into from
Mar 11, 2022
6 changes: 6 additions & 0 deletions docs/development/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release Notes

## v0.8.3 -

- Added `.file_type` attribute to {class}`pangeo_forge_recipes.patterns.FilePattern`. This attribute will eventually supercede
`.is_opendap`, which will be deprecated in `0.9.0`. Until then, `FilePattern(..., is_opendap=True)` is supported as equivalent
to `FilePattern(..., file_type="opendap")`. {pull}`322`

## v0.8.2 - 2022-02-23

- Removed click from dependencies and removed cli entrypoint.
Expand Down
381 changes: 168 additions & 213 deletions docs/tutorials/hdf_reference/reference_cmip6.ipynb

Large diffs are not rendered by default.

4,526 changes: 3,795 additions & 731 deletions docs/tutorials/xarray_zarr/netcdf_zarr_sequential.ipynb

Large diffs are not rendered by default.

2,120 changes: 1,064 additions & 1,056 deletions docs/tutorials/xarray_zarr/opendap_subset_recipe.ipynb

Large diffs are not rendered by default.

12,899 changes: 10,386 additions & 2,513 deletions docs/tutorials/xarray_zarr/terraclimate.ipynb

Large diffs are not rendered by default.

46 changes: 40 additions & 6 deletions pangeo_forge_recipes/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
Filename / URL patterns.
"""
import inspect
import warnings
from dataclasses import dataclass, field, replace
from enum import Enum
from enum import Enum, auto
from itertools import product
from typing import (
Any,
Expand Down Expand Up @@ -99,6 +100,21 @@ class Index(FrozenSet[DimIndex]):
FilePatternIndex = Index


class AutoName(Enum):
# Recommended by official Python docs for auto naming:
# https://docs.python.org/3/library/enum.html#using-automatic-values
def _generate_next_value_(name, start, count, last_values):
return name


class FileType(AutoName):
unknown = auto()
netcdf3 = auto()
netcdf4 = auto()
grib = auto()
opendap = auto()


class FilePattern:
"""Represents an n-dimensional matrix of individual files to be combined
through a combination of merge and concat operations. Each operation generates
Expand All @@ -114,8 +130,9 @@ class FilePattern:
May include ``block_size``, ``username``, ``password``, etc.
:param query_string_secrets: If provided, these key/value pairs are appended to
the query string of each ``file_pattern`` url at runtime.
:param is_opendap: If True, assume all input fnames represent opendap endpoints.
Cannot be used with caching.
Comment on lines -117 to -118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted to be nice to our users, we would not just remove this but deprecate it. Now that we have a few users, do we want to be more conservative about breaking changes? Or do we just want to move fast and not worry about that.

:param file_type: The file format of the source files for this pattern. Must be one of
the options defined by ``pangeo_forge_recipes.patterns.FileType``.
Note: ``FileType.opendap`` cannot be used with caching.
"""

def __init__(
Expand All @@ -124,17 +141,34 @@ def __init__(
*combine_dims: CombineDim,
fsspec_open_kwargs: Optional[Dict[str, Any]] = None,
query_string_secrets: Optional[Dict[str, str]] = None,
is_opendap: bool = False,
file_type: str = "netcdf4",
is_opendap: Optional[bool] = None,
):
self.format_function = format_function
self.combine_dims = combine_dims
self.fsspec_open_kwargs = fsspec_open_kwargs if fsspec_open_kwargs else {}
self.query_string_secrets = query_string_secrets if query_string_secrets else {}
self.file_type = FileType(file_type)

self.is_opendap = is_opendap
if self.fsspec_open_kwargs and self.is_opendap:
if self.is_opendap:
_deprecation_message = (
"`FilePattern(..., is_opendap=True)` will be deprecated in v0.9.0. "
"Please use `FilePattern(..., file_type='opendap')` instead."
)
warnings.warn(_deprecation_message, DeprecationWarning)
_maybe_default = "default" if self.file_type.value == "netcdf4" else ""
_overide_warning = (
f"`is_opendap` passed as `True`, overriding {_maybe_default} "
f"`file_type.value == '{self.file_type.value}' with `'opendap'`."
)
warnings.warn(_overide_warning)
self.file_type = FileType("opendap")

if self.fsspec_open_kwargs and self.file_type == FileType.opendap:
raise ValueError(
"OPeNDAP inputs are not opened with `fsspec`. "
"`is_opendap` must be `False` when passing `fsspec_open_kwargs`."
"When passing `fsspec_open_kwargs`, `file_type` cannot be `opendap`."
)

def __repr__(self):
Expand Down
99 changes: 78 additions & 21 deletions pangeo_forge_recipes/recipes/xarray_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from ..chunk_grid import ChunkGrid
from ..executors.base import Pipeline, Stage
from ..patterns import CombineOp, DimIndex, FilePattern, Index
from ..patterns import CombineOp, DimIndex, FilePattern, FileType, Index
from ..reference import create_hdf5_reference, unstrip_protocol
from ..storage import FSSpecTarget, MetadataTarget, file_opener
from ..utils import calc_subsets, fix_scalar_attr_encoding, lock_for_conflicts
Expand All @@ -36,6 +36,10 @@
if os.getenv("PANGEO_FORGE_MAX_MEMORY")
else 500_000_000
)
OPENER_MAP = {
FileType.netcdf3: dict(engine="scipy"),
FileType.netcdf4: dict(engine="h5netcdf"),
}

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -142,7 +146,7 @@ def chunk_position(chunk_key: ChunkKey) -> int:

def cache_input(input_key: InputKey, *, config: XarrayZarrRecipe) -> None:
if config.cache_inputs:
if config.file_pattern.is_opendap:
if config.file_pattern.file_type == FileType.opendap:
raise ValueError("Can't cache opendap inputs")
if config.storage_config.cache is None:
raise ValueError("input_cache is not set.")
Expand All @@ -167,7 +171,7 @@ def cache_input(input_key: InputKey, *, config: XarrayZarrRecipe) -> None:
logger.info(f"Metadata already ached for input '{input_key!s}'")

if config.open_input_with_kerchunk:
if config.file_pattern.is_opendap:
if config.file_pattern.file_type == FileType.opendap:
raise ValueError("Can't make references for opendap inputs")
if config.storage_config.metadata is None:
raise ValueError("Can't make references; no metadata cache assigned")
Expand Down Expand Up @@ -248,7 +252,7 @@ def open_input(input_key: InputKey, *, config: XarrayZarrRecipe) -> xr.Dataset:
fname = config.file_pattern[input_key]
logger.info(f"Opening input with Xarray {input_key!s}: '{fname}'")

if config.file_pattern.is_opendap:
if config.file_pattern.file_type == FileType.opendap:
if config.cache_inputs:
raise ValueError("Can't cache opendap inputs")
if config.copy_input_to_local_file:
Expand Down Expand Up @@ -286,33 +290,86 @@ def open_input(input_key: InputKey, *, config: XarrayZarrRecipe) -> xr.Dataset:
else:

cache = config.storage_config.cache if config.cache_inputs else None
bypass_open = True if config.file_pattern.file_type == FileType.opendap else False

with file_opener(
fname,
cache=cache,
copy_to_local=config.copy_input_to_local_file,
bypass_open=config.file_pattern.is_opendap,
bypass_open=bypass_open,
secrets=config.file_pattern.query_string_secrets,
**config.file_pattern.fsspec_open_kwargs,
) as f:
with dask.config.set(scheduler="single-threaded"): # make sure we don't use a scheduler
kw = config.xarray_open_kwargs.copy()
if "engine" not in kw:
kw["engine"] = "h5netcdf"
logger.debug(f"about to enter xr.open_dataset context on {f}")
with xr.open_dataset(f, **kw) as ds:
logger.debug("successfully opened dataset")
ds = fix_scalar_attr_encoding(ds)

if config.delete_input_encoding:
for var in ds.variables:
ds[var].encoding = {}

if config.process_input is not None:
ds = config.process_input(ds, str(fname))
file_type = config.file_pattern.file_type
if file_type in OPENER_MAP:
if "engine" in kw:
engine_message_base = (
"pangeo-forge-recipes will automatically set the xarray backend for "
f"files of type '{file_type.value}' to '{OPENER_MAP[file_type]}', "
)
warn_matching_msg = engine_message_base + (
"which is the same value you have passed via `xarray_open_kwargs`. "
f"If this input file is actually of type '{file_type.value}', you can "
f"remove `{{'engine': '{kw['engine']}'}}` from `xarray_open_kwargs`. "
)
error_mismatched_msg = engine_message_base + (
f"which is different from the value you have passed via "
"`xarray_open_kwargs`. If this input file is actually of type "
f"'{file_type.value}', please remove `{{'engine': '{kw['engine']}'}}` "
"from `xarray_open_kwargs`. "
)
engine_message_tail = (
f"If this input file is not of type '{file_type.value}', please update"
" this recipe by passing a different value to `FilePattern.file_type`."
)
warn_matching_msg += engine_message_tail
error_mismatched_msg += engine_message_tail

logger.debug(f"{ds}")
yield ds
if kw["engine"] == OPENER_MAP[file_type]["engine"]:
warnings.warn(warn_matching_msg)
elif kw["engine"] != OPENER_MAP[file_type]["engine"]:
raise ValueError(error_mismatched_msg)
else:
kw.update(OPENER_MAP[file_type])
logger.debug(f"about to enter xr.open_dataset context on {f}")
try:
with xr.open_dataset(f, **kw) as ds:
logger.debug("successfully opened dataset")
ds = fix_scalar_attr_encoding(ds)

if config.delete_input_encoding:
for var in ds.variables:
ds[var].encoding = {}

if config.process_input is not None:
ds = config.process_input(ds, str(fname))

logger.debug(f"{ds}")
yield ds
except OSError as e:
if "unable to open file (file signature not found)" in str(e).lower():
oserror_message_base = (
f"Unable to open file {f.path} " # type: ignore
f"with `{{engine: {kw.get('engine')}}}`, "
)
if "engine" in config.xarray_open_kwargs:
oserror_message = oserror_message_base + (
"which was set explicitly via `xarray_open_kwargs`. Please remove "
f"`{{engine: {kw.get('engine')}}}` from `xarray_open_kwargs`."
)
elif file_type == FileType.netcdf4:
oserror_message = oserror_message_base + (
"which was set automatically based on the fact that "
"`FilePattern.file_type` is using the default value of 'netcdf4'. "
"It seems likely that this input file is in NetCDF3 format. If "
"that is the case, please re-instantiate your `FilePattern` with "
'`FilePattern(..., file_type="netcdf3")`.'
)
raise OSError(oserror_message) from e
else:
raise e


def subset_dataset(ds: xr.Dataset, subset_spec: DimIndex) -> xr.Dataset:
Expand Down Expand Up @@ -743,7 +800,7 @@ def __post_init__(self):
)
self.nitems_per_input = self.file_pattern.nitems_per_input[self.concat_dim]

if self.file_pattern.is_opendap:
if self.file_pattern.file_type == FileType.opendap:
if self.cache_inputs:
raise ValueError("Can't cache opendap inputs.")
else:
Expand Down
2 changes: 1 addition & 1 deletion tests/recipe_tests/test_XarrayZarrRecipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def calc_sequence_length_fixture(netcdf_local_file_pattern, tmp_metadata_target,
*rest,
fsspec_open_kwargs=netcdf_local_file_pattern.fsspec_open_kwargs,
query_string_secrets=netcdf_local_file_pattern.query_string_secrets,
is_opendap=netcdf_local_file_pattern.is_opendap,
file_type=netcdf_local_file_pattern.file_type,
)

n_inputs = file_pattern.dims[concat_dim.name]
Expand Down
37 changes: 25 additions & 12 deletions tests/test_patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
CombineOp,
ConcatDim,
FilePattern,
FileType,
MergeDim,
pattern_from_file_sequence,
prune_pattern,
Expand Down Expand Up @@ -40,7 +41,7 @@ def concat_merge_pattern():
return make_concat_merge_pattern()


@pytest.fixture(params=[dict(fsspec_open_kwargs={"block_size": "foo"}), dict(is_opendap=True)])
@pytest.fixture(params=[dict(fsspec_open_kwargs={"block_size": "foo"}), dict(file_type="opendap")])
def concat_merge_pattern_with_kwargs(request):
return make_concat_merge_pattern(**request.param)

Expand Down Expand Up @@ -89,12 +90,12 @@ def test_file_pattern_concat_merge(runtime_secrets, pickle, concat_merge_pattern

if runtime_secrets:
if "fsspec_open_kwargs" in runtime_secrets.keys():
if not fp.is_opendap:
if not fp.file_type == FileType.opendap:
fp.fsspec_open_kwargs.update(runtime_secrets["fsspec_open_kwargs"])
else:
pytest.skip(
"`fsspec_open_kwargs` should never be used in combination with `is_opendap`. "
"This is checked in `FilePattern.__init__` but not when updating attributes. "
"`fsspec_open_kwargs` should never be used in combination with `opendap`."
" This is checked in `FilePattern.__init__` but not when updating attributes. "
"Proposed changes to secret handling will obviate the need for runtime updates"
" to attributes in favor of encryption. So for now, we'll just skip this."
)
Expand Down Expand Up @@ -128,20 +129,19 @@ def test_file_pattern_concat_merge(runtime_secrets, pickle, concat_merge_pattern
assert fp[key] == expected_fname

if "fsspec_open_kwargs" in kwargs.keys():
assert fp.is_opendap is False
assert fp.file_type != FileType.opendap
if "fsspec_open_kwargs" in runtime_secrets.keys():
kwargs["fsspec_open_kwargs"].update(runtime_secrets["fsspec_open_kwargs"])
assert fp.fsspec_open_kwargs == kwargs["fsspec_open_kwargs"]
if "query_string_secrets" in runtime_secrets.keys():
assert fp.query_string_secrets == runtime_secrets["query_string_secrets"]
if "is_opendap" in kwargs.keys():
assert fp.is_opendap == kwargs["is_opendap"]
assert fp.is_opendap is True
if kwargs.get("file_type", None) == "opendap":
assert fp.file_type == FileType.opendap
assert fp.fsspec_open_kwargs == {}


def test_incompatible_kwargs():
kwargs = dict(fsspec_open_kwargs={"block_size": "foo"}, is_opendap=True)
kwargs = dict(fsspec_open_kwargs={"block_size": "foo"}, file_type="opendap")
with pytest.raises(ValueError):
make_concat_merge_pattern(**kwargs)
return
Expand All @@ -154,12 +154,12 @@ def test_prune(nkeep, concat_merge_pattern_with_kwargs, runtime_secrets):

if runtime_secrets:
if "fsspec_open_kwargs" in runtime_secrets.keys():
if not fp.is_opendap:
if fp.file_type != FileType.opendap:
fp.fsspec_open_kwargs.update(runtime_secrets["fsspec_open_kwargs"])
else:
pytest.skip(
"`fsspec_open_kwargs` should never be used in combination with `is_opendap`. "
"This is checked in `FilePattern.__init__` but not when updating attributes. "
"`fsspec_open_kwargs` should never be used in combination with `opendap`."
" This is checked in `FilePattern.__init__` but not when updating attributes. "
"Proposed changes to secret handling will obviate the need for runtime updates"
" to attributes in favor of encryption. So for now, we'll just skip this."
)
Expand All @@ -180,3 +180,16 @@ def get_kwargs(file_pattern):
return kwargs

assert get_kwargs(fp) == get_kwargs(fp_pruned)


@pytest.mark.parametrize("file_type_value", [ft.value for ft in list(FileType)] + ["unsupported"])
def test_setting_file_types(file_type_value):

file_type_kwargs = {"file_type": file_type_value}

if not file_type_value == "unsupported":
fp = make_concat_merge_pattern(**file_type_kwargs)[0]
assert fp.file_type == FileType(file_type_value)
else:
with pytest.raises(ValueError, match=fr"'{file_type_value}' is not a valid FileType"):
fp = make_concat_merge_pattern(**file_type_kwargs)[0]