Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support netCDF load+save on dataset-like objects as well as filepaths. #5214

Merged
merged 10 commits into from
May 19, 2023
9 changes: 7 additions & 2 deletions lib/iris/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ def callback(cube, field, filename):

"""

from collections.abc import Iterable
import contextlib
import glob
import importlib
import itertools
import os.path
import pathlib
import threading

import iris._constraints
Expand Down Expand Up @@ -256,7 +256,8 @@ def context(self, **kwargs):

def _generate_cubes(uris, callback, constraints):
"""Returns a generator of cubes given the URIs and a callback."""
if isinstance(uris, (str, pathlib.PurePath)):
if isinstance(uris, str) or not isinstance(uris, Iterable):
# Make a string, or other single item, into an iterable.
uris = [uris]

# Group collections of uris by their iris handler
Expand All @@ -273,6 +274,10 @@ def _generate_cubes(uris, callback, constraints):
urls = [":".join(x) for x in groups]
for cube in iris.io.load_http(urls, callback):
yield cube
elif scheme == "data":
data_objects = [x[1] for x in groups]
for cube in iris.io.load_data_objects(data_objects, callback):
yield cube
else:
raise ValueError("Iris cannot handle the URI scheme: %s" % scheme)

Expand Down
35 changes: 26 additions & 9 deletions lib/iris/fileformats/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

from iris.io.format_picker import (
DataSourceObjectProtocol,
FileExtension,
FormatAgent,
FormatSpecification,
Expand Down Expand Up @@ -125,16 +126,32 @@ def _load_grib(*args, **kwargs):
)


_nc_dap = FormatSpecification(
"NetCDF OPeNDAP",
UriProtocol(),
lambda protocol: protocol in ["http", "https"],
netcdf.load_cubes,
priority=6,
constraint_aware_handler=True,
FORMAT_AGENT.add_spec(
FormatSpecification(
"NetCDF OPeNDAP",
UriProtocol(),
lambda protocol: protocol in ["http", "https"],
netcdf.load_cubes,
priority=6,
constraint_aware_handler=True,
)
)
FORMAT_AGENT.add_spec(_nc_dap)
del _nc_dap

# NetCDF file presented as an open, readable netCDF4 dataset (or mimic).
FORMAT_AGENT.add_spec(
FormatSpecification(
"NetCDF dataset",
DataSourceObjectProtocol(),
lambda object: all(
hasattr(object, x)
for x in ("variables", "dimensions", "groups", "ncattrs")
),
netcdf.load_cubes, # using the same call : it must distinguish.
ESadek-MO marked this conversation as resolved.
Show resolved Hide resolved
priority=4,
constraint_aware_handler=True,
)
)


#
# UM Fieldsfiles.
Expand Down
24 changes: 16 additions & 8 deletions lib/iris/fileformats/cf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,17 +1043,25 @@ class CFReader:
# TODO: remove once iris.experimental.ugrid.CFUGridReader is folded in.
CFGroup = CFGroup

def __init__(self, filename, warn=False, monotonic=False):
self._dataset = None
self._filename = os.path.expanduser(filename)
def __init__(self, file_source, warn=False, monotonic=False):
# Ensure safe operation for destructor, should init fail.
self._own_file = False
if isinstance(file_source, str):
pp-mo marked this conversation as resolved.
Show resolved Hide resolved
# Create from filepath : open it + own it (=close when we die).
self._filename = os.path.expanduser(file_source)
self._dataset = _thread_safe_nc.DatasetWrapper(
self._filename, mode="r"
)
self._own_file = True
else:
# We have been passed an open dataset.
# We use it but don't own it (don't close it).
self._dataset = file_source
self._filename = self._dataset.filepath()

#: Collection of CF-netCDF variables associated with this netCDF file
self.cf_group = self.CFGroup()

self._dataset = _thread_safe_nc.DatasetWrapper(
self._filename, mode="r"
)

# Issue load optimisation warning.
if warn and self._dataset.file_format in [
"NETCDF3_CLASSIC",
Expand Down Expand Up @@ -1311,7 +1319,7 @@ def _reset(self):

def _close(self):
# Explicitly close dataset to prevent file remaining open.
if self._dataset is not None:
if self._own_file and self._dataset is not None:
self._dataset.close()
self._dataset = None

Expand Down
24 changes: 13 additions & 11 deletions lib/iris/fileformats/netcdf/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Also : `CF Conventions <https://cfconventions.org/>`_.

"""
from collections.abc import Iterable
import warnings

import numpy as np
Expand Down Expand Up @@ -483,14 +484,15 @@ def inner(cf_datavar):
return result


def load_cubes(filenames, callback=None, constraints=None):
def load_cubes(file_sources, callback=None, constraints=None):
"""
Loads cubes from a list of NetCDF filenames/OPeNDAP URLs.
Loads cubes from a list of NetCDF file_sources/OPeNDAP URLs.
pp-mo marked this conversation as resolved.
Show resolved Hide resolved

Args:

* filenames (string/list):
One or more NetCDF filenames/OPeNDAP URLs to load from.
* file_sources (string/list):
One or more NetCDF file_sources/OPeNDAP URLs to load from.
pp-mo marked this conversation as resolved.
Show resolved Hide resolved
OR open datasets.

Kwargs:

Expand Down Expand Up @@ -518,18 +520,18 @@ def load_cubes(filenames, callback=None, constraints=None):
# Create an actions engine.
engine = _actions_engine()

if isinstance(filenames, str):
filenames = [filenames]
if isinstance(file_sources, str) or not isinstance(file_sources, Iterable):
pp-mo marked this conversation as resolved.
Show resolved Hide resolved
file_sources = [file_sources]

for filename in filenames:
# Ingest the netCDF file.
for file_source in file_sources:
# Ingest the file. At present may be a filepath or an open netCDF4.Dataset.
meshes = {}
if PARSE_UGRID_ON_LOAD:
cf_reader_class = CFUGridReader
else:
cf_reader_class = iris.fileformats.cf.CFReader

with cf_reader_class(filename) as cf:
with cf_reader_class(file_source) as cf:
if PARSE_UGRID_ON_LOAD:
meshes = _meshes_from_cf(cf)

Expand Down Expand Up @@ -563,7 +565,7 @@ def load_cubes(filenames, callback=None, constraints=None):
if mesh is not None:
mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var)

cube = _load_cube(engine, cf, cf_var, filename)
cube = _load_cube(engine, cf, cf_var, cf.filename)

# Attach the mesh (if present) to the cube.
for mesh_coord in mesh_coords:
Expand All @@ -577,7 +579,7 @@ def load_cubes(filenames, callback=None, constraints=None):
warnings.warn("{}".format(e))

# Perform any user registered callback function.
cube = run_callback(callback, cube, cf_var, filename)
cube = run_callback(callback, cube, cf_var, file_source)

# Callback mechanism may return None, which must not be yielded
if cube is None:
Expand Down
67 changes: 42 additions & 25 deletions lib/iris/fileformats/netcdf/saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ def __init__(self, filename, netcdf_format, compute=True):
----------
filename : string
Name of the netCDF file to save the cube.
OR a writeable object supporting the netCF4.Dataset api.

netcdf_format : string
Underlying netCDF file format, one of 'NETCDF4', 'NETCDF4_CLASSIC',
Expand Down Expand Up @@ -427,43 +428,59 @@ def __init__(self, filename, netcdf_format, compute=True):
#: A dictionary, mapping formula terms to owner cf variable name
self._formula_terms_cache = {}
#: Target filepath
self.filepath = os.path.abspath(filename)
#: A list of delayed writes for lazy saving
self._delayed_writes = (
[]
) # a list of triples (source, target, fill-info)
self.filepath = (
None # this line just for the API page -- value is set later
)
#: Whether to complete delayed saves on exit (and raise associated warnings).
self.compute = compute
# N.B. the file-write-lock *type* actually depends on the dask scheduler type.
#: A per-file write lock to prevent dask attempting overlapping writes.
self.file_write_lock = (
None # this line just for the API page -- value is set later
)

# A list of delayed writes for lazy saving
# a list of triples (source, target, fill-info).
self._delayed_writes = []

# Detect if we were passed a pre-opened dataset (or something like one)
self._to_open_dataset = hasattr(filename, "createVariable")
if self._to_open_dataset:
# Given a dataset : derive instance filepath from the dataset
self._dataset = filename
self.filepath = self._dataset.filepath()
else:
# Given a filepath string/path : create a dataset from that
try:
self.filepath = os.path.abspath(filename)
self._dataset = _thread_safe_nc.DatasetWrapper(
self.filepath, mode="w", format=netcdf_format
)
except RuntimeError:
dir_name = os.path.dirname(self.filepath)
if not os.path.isdir(dir_name):
msg = "No such file or directory: {}".format(dir_name)
raise IOError(msg)
if not os.access(dir_name, os.R_OK | os.W_OK):
msg = "Permission denied: {}".format(self.filepath)
raise IOError(msg)
else:
raise

self.file_write_lock = _dask_locks.get_worker_lock(self.filepath)
#: NetCDF dataset
self._dataset = None
try:
self._dataset = _thread_safe_nc.DatasetWrapper(
self.filepath, mode="w", format=netcdf_format
)
except RuntimeError:
dir_name = os.path.dirname(self.filepath)
if not os.path.isdir(dir_name):
msg = "No such file or directory: {}".format(dir_name)
raise IOError(msg)
if not os.access(dir_name, os.R_OK | os.W_OK):
msg = "Permission denied: {}".format(self.filepath)
raise IOError(msg)
else:
raise

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
"""Flush any buffered data to the CF-netCDF file before closing."""

self._dataset.sync()
self._dataset.close()
if self.compute:
self.complete()
if not self._to_open_dataset:
# Only close if the Saver created it.
self._dataset.close()
# Complete after closing, if required
if self.compute:
self.complete()

def write(
self,
Expand Down
50 changes: 41 additions & 9 deletions lib/iris/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ def decode_uri(uri, default="file"):
In addition to well-formed URIs, it also supports bare file paths as strings
or :class:`pathlib.PurePath`. Both Windows and UNIX style paths are
accepted.
It also supports 'bare objects', i.e. anything which is not a string.
These are identified with a scheme of 'data', and returned unchanged.

.. testsetup::

Expand All @@ -119,20 +121,31 @@ def decode_uri(uri, default="file"):
>>> print(decode_uri('dataZoo/...'))
('file', 'dataZoo/...')

>>> print(decode_uri({}))
('data', {})

"""
if isinstance(uri, pathlib.PurePath):
uri = str(uri)
# make sure scheme has at least 2 letters to avoid windows drives
# put - last in the brackets so it refers to the character, not a range
# reference on valid schemes: http://tools.ietf.org/html/std66#section-3.1
match = re.match(r"^([a-zA-Z][a-zA-Z0-9+.-]+):(.+)", uri)
if match:
scheme = match.group(1)
part = match.group(2)

if isinstance(uri, str):
# make sure scheme has at least 2 letters to avoid windows drives
# put - last in the brackets so it refers to the character, not a range
# reference on valid schemes: http://tools.ietf.org/html/std66#section-3.1
match = re.match(r"^([a-zA-Z][a-zA-Z0-9+.-]+):(.+)", uri)
if match:
scheme = match.group(1)
part = match.group(2)
else:
# Catch bare UNIX and Windows paths
scheme = default
part = uri
else:
# Catch bare UNIX and Windows paths
scheme = default
# We can pass things other than strings, like open files.
# These are simply identified as 'data objects'.
scheme = "data"
part = uri

return scheme, part


Expand Down Expand Up @@ -255,6 +268,25 @@ def load_http(urls, callback):
yield cube


def load_data_objects(urls, callback):
ESadek-MO marked this conversation as resolved.
Show resolved Hide resolved
"""
Takes a list of data-source objects and a callback function, and returns a
generator of Cubes.
The 'objects' take the place of 'uris' in the load calls.
The appropriate types of the data-source objects are expected to be
recognised by the handlers : This is done in the usual way by passing the
context to the format picker to get a handler for each.

.. note::

Typically, this function should not be called directly; instead, the
intended interface for loading is :func:`iris.load`.

"""
# NOTE: this operation is currently *identical* to the http one.
ESadek-MO marked this conversation as resolved.
Show resolved Hide resolved
yield from load_http(urls, callback)


def _dot_save(cube, target):
# A simple wrapper for `iris.fileformats.dot.save` which allows the
# saver to be registered without triggering the import of
Expand Down
19 changes: 19 additions & 0 deletions lib/iris/io/format_picker.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,22 @@ def get_element(self, basename, file_handle):
from iris.io import decode_uri

return decode_uri(basename)[0]


class DataSourceObjectProtocol(FileElement):
"""
A :class:`FileElement` that simply returns the URI entry itself.

This enables a arbitrary non-string data object to be passed, subject to
subsequent checks on the object itself (specified in the handler).

"""

def __init__(self):
super().__init__(requires_fh=False)

def get_element(self, basename, file_handle):
# In this context, there should *not* be a file opened by the handler.
# Just return 'basename', which in this case is not a name, or even a
# string, but a passed 'data object'.
return basename
Loading