Skip to content

Commit

Permalink
Merge pull request #322 from cisaacstern/file-type
Browse files Browse the repository at this point in the history
Implement `FilePattern.file_type`
  • Loading branch information
cisaacstern authored Mar 11, 2022
2 parents 2a68f2e + ced6ff4 commit 3379754
Show file tree
Hide file tree
Showing 9 changed files with 15,563 additions and 4,553 deletions.
6 changes: 6 additions & 0 deletions docs/development/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release Notes

## v0.8.3 -

- Added `.file_type` attribute to {class}`pangeo_forge_recipes.patterns.FilePattern`. This attribute will eventually supercede
`.is_opendap`, which will be deprecated in `0.9.0`. Until then, `FilePattern(..., is_opendap=True)` is supported as equivalent
to `FilePattern(..., file_type="opendap")`. {pull}`322`

## v0.8.2 - 2022-02-23

- Removed click from dependencies and removed cli entrypoint.
Expand Down
381 changes: 168 additions & 213 deletions docs/tutorials/hdf_reference/reference_cmip6.ipynb

Large diffs are not rendered by default.

4,526 changes: 3,795 additions & 731 deletions docs/tutorials/xarray_zarr/netcdf_zarr_sequential.ipynb

Large diffs are not rendered by default.

2,120 changes: 1,064 additions & 1,056 deletions docs/tutorials/xarray_zarr/opendap_subset_recipe.ipynb

Large diffs are not rendered by default.

12,899 changes: 10,386 additions & 2,513 deletions docs/tutorials/xarray_zarr/terraclimate.ipynb

Large diffs are not rendered by default.

46 changes: 40 additions & 6 deletions pangeo_forge_recipes/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
Filename / URL patterns.
"""
import inspect
import warnings
from dataclasses import dataclass, field, replace
from enum import Enum
from enum import Enum, auto
from itertools import product
from typing import (
Any,
Expand Down Expand Up @@ -99,6 +100,21 @@ class Index(FrozenSet[DimIndex]):
FilePatternIndex = Index


class AutoName(Enum):
# Recommended by official Python docs for auto naming:
# https://docs.python.org/3/library/enum.html#using-automatic-values
def _generate_next_value_(name, start, count, last_values):
return name


class FileType(AutoName):
unknown = auto()
netcdf3 = auto()
netcdf4 = auto()
grib = auto()
opendap = auto()


class FilePattern:
"""Represents an n-dimensional matrix of individual files to be combined
through a combination of merge and concat operations. Each operation generates
Expand All @@ -114,8 +130,9 @@ class FilePattern:
May include ``block_size``, ``username``, ``password``, etc.
:param query_string_secrets: If provided, these key/value pairs are appended to
the query string of each ``file_pattern`` url at runtime.
:param is_opendap: If True, assume all input fnames represent opendap endpoints.
Cannot be used with caching.
:param file_type: The file format of the source files for this pattern. Must be one of
the options defined by ``pangeo_forge_recipes.patterns.FileType``.
Note: ``FileType.opendap`` cannot be used with caching.
"""

def __init__(
Expand All @@ -124,17 +141,34 @@ def __init__(
*combine_dims: CombineDim,
fsspec_open_kwargs: Optional[Dict[str, Any]] = None,
query_string_secrets: Optional[Dict[str, str]] = None,
is_opendap: bool = False,
file_type: str = "netcdf4",
is_opendap: Optional[bool] = None,
):
self.format_function = format_function
self.combine_dims = combine_dims
self.fsspec_open_kwargs = fsspec_open_kwargs if fsspec_open_kwargs else {}
self.query_string_secrets = query_string_secrets if query_string_secrets else {}
self.file_type = FileType(file_type)

self.is_opendap = is_opendap
if self.fsspec_open_kwargs and self.is_opendap:
if self.is_opendap:
_deprecation_message = (
"`FilePattern(..., is_opendap=True)` will be deprecated in v0.9.0. "
"Please use `FilePattern(..., file_type='opendap')` instead."
)
warnings.warn(_deprecation_message, DeprecationWarning)
_maybe_default = "default" if self.file_type.value == "netcdf4" else ""
_overide_warning = (
f"`is_opendap` passed as `True`, overriding {_maybe_default} "
f"`file_type.value == '{self.file_type.value}' with `'opendap'`."
)
warnings.warn(_overide_warning)
self.file_type = FileType("opendap")

if self.fsspec_open_kwargs and self.file_type == FileType.opendap:
raise ValueError(
"OPeNDAP inputs are not opened with `fsspec`. "
"`is_opendap` must be `False` when passing `fsspec_open_kwargs`."
"When passing `fsspec_open_kwargs`, `file_type` cannot be `opendap`."
)

def __repr__(self):
Expand Down
99 changes: 78 additions & 21 deletions pangeo_forge_recipes/recipes/xarray_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from ..chunk_grid import ChunkGrid
from ..executors.base import Pipeline, Stage
from ..patterns import CombineOp, DimIndex, FilePattern, Index
from ..patterns import CombineOp, DimIndex, FilePattern, FileType, Index
from ..reference import create_hdf5_reference, unstrip_protocol
from ..storage import FSSpecTarget, MetadataTarget, file_opener
from ..utils import calc_subsets, fix_scalar_attr_encoding, lock_for_conflicts
Expand All @@ -36,6 +36,10 @@
if os.getenv("PANGEO_FORGE_MAX_MEMORY")
else 500_000_000
)
OPENER_MAP = {
FileType.netcdf3: dict(engine="scipy"),
FileType.netcdf4: dict(engine="h5netcdf"),
}

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -142,7 +146,7 @@ def chunk_position(chunk_key: ChunkKey) -> int:

def cache_input(input_key: InputKey, *, config: XarrayZarrRecipe) -> None:
if config.cache_inputs:
if config.file_pattern.is_opendap:
if config.file_pattern.file_type == FileType.opendap:
raise ValueError("Can't cache opendap inputs")
if config.storage_config.cache is None:
raise ValueError("input_cache is not set.")
Expand All @@ -167,7 +171,7 @@ def cache_input(input_key: InputKey, *, config: XarrayZarrRecipe) -> None:
logger.info(f"Metadata already ached for input '{input_key!s}'")

if config.open_input_with_kerchunk:
if config.file_pattern.is_opendap:
if config.file_pattern.file_type == FileType.opendap:
raise ValueError("Can't make references for opendap inputs")
if config.storage_config.metadata is None:
raise ValueError("Can't make references; no metadata cache assigned")
Expand Down Expand Up @@ -248,7 +252,7 @@ def open_input(input_key: InputKey, *, config: XarrayZarrRecipe) -> xr.Dataset:
fname = config.file_pattern[input_key]
logger.info(f"Opening input with Xarray {input_key!s}: '{fname}'")

if config.file_pattern.is_opendap:
if config.file_pattern.file_type == FileType.opendap:
if config.cache_inputs:
raise ValueError("Can't cache opendap inputs")
if config.copy_input_to_local_file:
Expand Down Expand Up @@ -286,33 +290,86 @@ def open_input(input_key: InputKey, *, config: XarrayZarrRecipe) -> xr.Dataset:
else:

cache = config.storage_config.cache if config.cache_inputs else None
bypass_open = True if config.file_pattern.file_type == FileType.opendap else False

with file_opener(
fname,
cache=cache,
copy_to_local=config.copy_input_to_local_file,
bypass_open=config.file_pattern.is_opendap,
bypass_open=bypass_open,
secrets=config.file_pattern.query_string_secrets,
**config.file_pattern.fsspec_open_kwargs,
) as f:
with dask.config.set(scheduler="single-threaded"): # make sure we don't use a scheduler
kw = config.xarray_open_kwargs.copy()
if "engine" not in kw:
kw["engine"] = "h5netcdf"
logger.debug(f"about to enter xr.open_dataset context on {f}")
with xr.open_dataset(f, **kw) as ds:
logger.debug("successfully opened dataset")
ds = fix_scalar_attr_encoding(ds)

if config.delete_input_encoding:
for var in ds.variables:
ds[var].encoding = {}

if config.process_input is not None:
ds = config.process_input(ds, str(fname))
file_type = config.file_pattern.file_type
if file_type in OPENER_MAP:
if "engine" in kw:
engine_message_base = (
"pangeo-forge-recipes will automatically set the xarray backend for "
f"files of type '{file_type.value}' to '{OPENER_MAP[file_type]}', "
)
warn_matching_msg = engine_message_base + (
"which is the same value you have passed via `xarray_open_kwargs`. "
f"If this input file is actually of type '{file_type.value}', you can "
f"remove `{{'engine': '{kw['engine']}'}}` from `xarray_open_kwargs`. "
)
error_mismatched_msg = engine_message_base + (
f"which is different from the value you have passed via "
"`xarray_open_kwargs`. If this input file is actually of type "
f"'{file_type.value}', please remove `{{'engine': '{kw['engine']}'}}` "
"from `xarray_open_kwargs`. "
)
engine_message_tail = (
f"If this input file is not of type '{file_type.value}', please update"
" this recipe by passing a different value to `FilePattern.file_type`."
)
warn_matching_msg += engine_message_tail
error_mismatched_msg += engine_message_tail

logger.debug(f"{ds}")
yield ds
if kw["engine"] == OPENER_MAP[file_type]["engine"]:
warnings.warn(warn_matching_msg)
elif kw["engine"] != OPENER_MAP[file_type]["engine"]:
raise ValueError(error_mismatched_msg)
else:
kw.update(OPENER_MAP[file_type])
logger.debug(f"about to enter xr.open_dataset context on {f}")
try:
with xr.open_dataset(f, **kw) as ds:
logger.debug("successfully opened dataset")
ds = fix_scalar_attr_encoding(ds)

if config.delete_input_encoding:
for var in ds.variables:
ds[var].encoding = {}

if config.process_input is not None:
ds = config.process_input(ds, str(fname))

logger.debug(f"{ds}")
yield ds
except OSError as e:
if "unable to open file (file signature not found)" in str(e).lower():
oserror_message_base = (
f"Unable to open file {f.path} " # type: ignore
f"with `{{engine: {kw.get('engine')}}}`, "
)
if "engine" in config.xarray_open_kwargs:
oserror_message = oserror_message_base + (
"which was set explicitly via `xarray_open_kwargs`. Please remove "
f"`{{engine: {kw.get('engine')}}}` from `xarray_open_kwargs`."
)
elif file_type == FileType.netcdf4:
oserror_message = oserror_message_base + (
"which was set automatically based on the fact that "
"`FilePattern.file_type` is using the default value of 'netcdf4'. "
"It seems likely that this input file is in NetCDF3 format. If "
"that is the case, please re-instantiate your `FilePattern` with "
'`FilePattern(..., file_type="netcdf3")`.'
)
raise OSError(oserror_message) from e
else:
raise e


def subset_dataset(ds: xr.Dataset, subset_spec: DimIndex) -> xr.Dataset:
Expand Down Expand Up @@ -743,7 +800,7 @@ def __post_init__(self):
)
self.nitems_per_input = self.file_pattern.nitems_per_input[self.concat_dim]

if self.file_pattern.is_opendap:
if self.file_pattern.file_type == FileType.opendap:
if self.cache_inputs:
raise ValueError("Can't cache opendap inputs.")
else:
Expand Down
2 changes: 1 addition & 1 deletion tests/recipe_tests/test_XarrayZarrRecipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def calc_sequence_length_fixture(netcdf_local_file_pattern, tmp_metadata_target,
*rest,
fsspec_open_kwargs=netcdf_local_file_pattern.fsspec_open_kwargs,
query_string_secrets=netcdf_local_file_pattern.query_string_secrets,
is_opendap=netcdf_local_file_pattern.is_opendap,
file_type=netcdf_local_file_pattern.file_type,
)

n_inputs = file_pattern.dims[concat_dim.name]
Expand Down
37 changes: 25 additions & 12 deletions tests/test_patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
CombineOp,
ConcatDim,
FilePattern,
FileType,
MergeDim,
pattern_from_file_sequence,
prune_pattern,
Expand Down Expand Up @@ -40,7 +41,7 @@ def concat_merge_pattern():
return make_concat_merge_pattern()


@pytest.fixture(params=[dict(fsspec_open_kwargs={"block_size": "foo"}), dict(is_opendap=True)])
@pytest.fixture(params=[dict(fsspec_open_kwargs={"block_size": "foo"}), dict(file_type="opendap")])
def concat_merge_pattern_with_kwargs(request):
return make_concat_merge_pattern(**request.param)

Expand Down Expand Up @@ -89,12 +90,12 @@ def test_file_pattern_concat_merge(runtime_secrets, pickle, concat_merge_pattern

if runtime_secrets:
if "fsspec_open_kwargs" in runtime_secrets.keys():
if not fp.is_opendap:
if not fp.file_type == FileType.opendap:
fp.fsspec_open_kwargs.update(runtime_secrets["fsspec_open_kwargs"])
else:
pytest.skip(
"`fsspec_open_kwargs` should never be used in combination with `is_opendap`. "
"This is checked in `FilePattern.__init__` but not when updating attributes. "
"`fsspec_open_kwargs` should never be used in combination with `opendap`."
" This is checked in `FilePattern.__init__` but not when updating attributes. "
"Proposed changes to secret handling will obviate the need for runtime updates"
" to attributes in favor of encryption. So for now, we'll just skip this."
)
Expand Down Expand Up @@ -128,20 +129,19 @@ def test_file_pattern_concat_merge(runtime_secrets, pickle, concat_merge_pattern
assert fp[key] == expected_fname

if "fsspec_open_kwargs" in kwargs.keys():
assert fp.is_opendap is False
assert fp.file_type != FileType.opendap
if "fsspec_open_kwargs" in runtime_secrets.keys():
kwargs["fsspec_open_kwargs"].update(runtime_secrets["fsspec_open_kwargs"])
assert fp.fsspec_open_kwargs == kwargs["fsspec_open_kwargs"]
if "query_string_secrets" in runtime_secrets.keys():
assert fp.query_string_secrets == runtime_secrets["query_string_secrets"]
if "is_opendap" in kwargs.keys():
assert fp.is_opendap == kwargs["is_opendap"]
assert fp.is_opendap is True
if kwargs.get("file_type", None) == "opendap":
assert fp.file_type == FileType.opendap
assert fp.fsspec_open_kwargs == {}


def test_incompatible_kwargs():
kwargs = dict(fsspec_open_kwargs={"block_size": "foo"}, is_opendap=True)
kwargs = dict(fsspec_open_kwargs={"block_size": "foo"}, file_type="opendap")
with pytest.raises(ValueError):
make_concat_merge_pattern(**kwargs)
return
Expand All @@ -154,12 +154,12 @@ def test_prune(nkeep, concat_merge_pattern_with_kwargs, runtime_secrets):

if runtime_secrets:
if "fsspec_open_kwargs" in runtime_secrets.keys():
if not fp.is_opendap:
if fp.file_type != FileType.opendap:
fp.fsspec_open_kwargs.update(runtime_secrets["fsspec_open_kwargs"])
else:
pytest.skip(
"`fsspec_open_kwargs` should never be used in combination with `is_opendap`. "
"This is checked in `FilePattern.__init__` but not when updating attributes. "
"`fsspec_open_kwargs` should never be used in combination with `opendap`."
" This is checked in `FilePattern.__init__` but not when updating attributes. "
"Proposed changes to secret handling will obviate the need for runtime updates"
" to attributes in favor of encryption. So for now, we'll just skip this."
)
Expand All @@ -180,3 +180,16 @@ def get_kwargs(file_pattern):
return kwargs

assert get_kwargs(fp) == get_kwargs(fp_pruned)


@pytest.mark.parametrize("file_type_value", [ft.value for ft in list(FileType)] + ["unsupported"])
def test_setting_file_types(file_type_value):

file_type_kwargs = {"file_type": file_type_value}

if not file_type_value == "unsupported":
fp = make_concat_merge_pattern(**file_type_kwargs)[0]
assert fp.file_type == FileType(file_type_value)
else:
with pytest.raises(ValueError, match=fr"'{file_type_value}' is not a valid FileType"):
fp = make_concat_merge_pattern(**file_type_kwargs)[0]

0 comments on commit 3379754

Please sign in to comment.