Skip to content

Commit

Permalink
Support netCDF load+save on dataset-like objects as well as filepaths.
Browse files Browse the repository at this point in the history
  • Loading branch information
pp-mo committed May 10, 2023
1 parent b035094 commit b7ac505
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 59 deletions.
9 changes: 7 additions & 2 deletions lib/iris/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ def callback(cube, field, filename):
"""

from collections.abc import Iterable
import contextlib
import glob
import importlib
import itertools
import os.path
import pathlib
import threading

import iris._constraints
Expand Down Expand Up @@ -256,7 +256,8 @@ def context(self, **kwargs):

def _generate_cubes(uris, callback, constraints):
"""Returns a generator of cubes given the URIs and a callback."""
if isinstance(uris, (str, pathlib.PurePath)):
if isinstance(uris, str) or not isinstance(uris, Iterable):
# Make a string, or other single item, into an iterable.
uris = [uris]

# Group collections of uris by their iris handler
Expand All @@ -273,6 +274,10 @@ def _generate_cubes(uris, callback, constraints):
urls = [":".join(x) for x in groups]
for cube in iris.io.load_http(urls, callback):
yield cube
elif scheme == "data":
data_objects = [x[1] for x in groups]
for cube in iris.io.load_data_objects(data_objects, callback):
yield cube
else:
raise ValueError("Iris cannot handle the URI scheme: %s" % scheme)

Expand Down
35 changes: 26 additions & 9 deletions lib/iris/fileformats/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

from iris.io.format_picker import (
DataSourceObjectProtocol,
FileExtension,
FormatAgent,
FormatSpecification,
Expand Down Expand Up @@ -125,16 +126,32 @@ def _load_grib(*args, **kwargs):
)


_nc_dap = FormatSpecification(
"NetCDF OPeNDAP",
UriProtocol(),
lambda protocol: protocol in ["http", "https"],
netcdf.load_cubes,
priority=6,
constraint_aware_handler=True,
FORMAT_AGENT.add_spec(
FormatSpecification(
"NetCDF OPeNDAP",
UriProtocol(),
lambda protocol: protocol in ["http", "https"],
netcdf.load_cubes,
priority=6,
constraint_aware_handler=True,
)
)
FORMAT_AGENT.add_spec(_nc_dap)
del _nc_dap

# NetCDF file presented as an open, readable netCDF4 dataset (or mimic).
FORMAT_AGENT.add_spec(
FormatSpecification(
"NetCDF dataset",
DataSourceObjectProtocol(),
lambda object: all(
hasattr(object, x)
for x in ("variables", "dimensions", "groups", "ncattrs")
),
netcdf.load_cubes, # using the same call : it must distinguish.
priority=4,
constraint_aware_handler=True,
)
)


#
# UM Fieldsfiles.
Expand Down
24 changes: 16 additions & 8 deletions lib/iris/fileformats/cf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,17 +1043,25 @@ class CFReader:
# TODO: remove once iris.experimental.ugrid.CFUGridReader is folded in.
CFGroup = CFGroup

def __init__(self, filename, warn=False, monotonic=False):
self._dataset = None
self._filename = os.path.expanduser(filename)
def __init__(self, file_source, warn=False, monotonic=False):
# Ensure safe operation for destructor, should init fail.
self._own_file = False
if isinstance(file_source, str):
# Create from filepath : open it + own it (=close when we die).
self._filename = os.path.expanduser(file_source)
self._dataset = _thread_safe_nc.DatasetWrapper(
self._filename, mode="r"
)
self._own_file = True
else:
# We have been passed an open dataset.
# We use it but don't own it (don't close it).
self._dataset = file_source
self._filename = self._dataset.filepath()

#: Collection of CF-netCDF variables associated with this netCDF file
self.cf_group = self.CFGroup()

self._dataset = _thread_safe_nc.DatasetWrapper(
self._filename, mode="r"
)

# Issue load optimisation warning.
if warn and self._dataset.file_format in [
"NETCDF3_CLASSIC",
Expand Down Expand Up @@ -1311,7 +1319,7 @@ def _reset(self):

def _close(self):
# Explicitly close dataset to prevent file remaining open.
if self._dataset is not None:
if self._own_file and self._dataset is not None:
self._dataset.close()
self._dataset = None

Expand Down
24 changes: 13 additions & 11 deletions lib/iris/fileformats/netcdf/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Also : `CF Conventions <https://cfconventions.org/>`_.
"""
from collections.abc import Iterable
import warnings

import numpy as np
Expand Down Expand Up @@ -483,14 +484,15 @@ def inner(cf_datavar):
return result


def load_cubes(filenames, callback=None, constraints=None):
def load_cubes(file_sources, callback=None, constraints=None):
"""
Loads cubes from a list of NetCDF filenames/OPeNDAP URLs.
Loads cubes from a list of NetCDF file_sources/OPeNDAP URLs.
Args:
* filenames (string/list):
One or more NetCDF filenames/OPeNDAP URLs to load from.
* file_sources (string/list):
One or more NetCDF file_sources/OPeNDAP URLs to load from.
OR open datasets.
Kwargs:
Expand Down Expand Up @@ -518,18 +520,18 @@ def load_cubes(filenames, callback=None, constraints=None):
# Create an actions engine.
engine = _actions_engine()

if isinstance(filenames, str):
filenames = [filenames]
if isinstance(file_sources, str) or not isinstance(file_sources, Iterable):
file_sources = [file_sources]

for filename in filenames:
# Ingest the netCDF file.
for file_source in file_sources:
# Ingest the file. At present may be a filepath or an open netCDF4.Dataset.
meshes = {}
if PARSE_UGRID_ON_LOAD:
cf_reader_class = CFUGridReader
else:
cf_reader_class = iris.fileformats.cf.CFReader

with cf_reader_class(filename) as cf:
with cf_reader_class(file_source) as cf:
if PARSE_UGRID_ON_LOAD:
meshes = _meshes_from_cf(cf)

Expand Down Expand Up @@ -563,7 +565,7 @@ def load_cubes(filenames, callback=None, constraints=None):
if mesh is not None:
mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var)

cube = _load_cube(engine, cf, cf_var, filename)
cube = _load_cube(engine, cf, cf_var, cf.filename)

# Attach the mesh (if present) to the cube.
for mesh_coord in mesh_coords:
Expand All @@ -577,7 +579,7 @@ def load_cubes(filenames, callback=None, constraints=None):
warnings.warn("{}".format(e))

# Perform any user registered callback function.
cube = run_callback(callback, cube, cf_var, filename)
cube = run_callback(callback, cube, cf_var, file_source)

# Callback mechanism may return None, which must not be yielded
if cube is None:
Expand Down
53 changes: 33 additions & 20 deletions lib/iris/fileformats/netcdf/saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ def __init__(self, filename, netcdf_format, compute=True):
----------
filename : string
Name of the netCDF file to save the cube.
OR a writeable object supporting the netCF4.Dataset api.
netcdf_format : string
Underlying netCDF file format, one of 'NETCDF4', 'NETCDF4_CLASSIC',
Expand Down Expand Up @@ -427,7 +428,7 @@ def __init__(self, filename, netcdf_format, compute=True):
#: A dictionary, mapping formula terms to owner cf variable name
self._formula_terms_cache = {}
#: Target filepath
self.filepath = os.path.abspath(filename)
self.filepath = None
#: A list of delayed writes for lazy saving
self._delayed_writes = (
[]
Expand All @@ -438,32 +439,44 @@ def __init__(self, filename, netcdf_format, compute=True):
#: A per-file write lock to prevent dask attempting overlapping writes.
self.file_write_lock = _dask_locks.get_worker_lock(self.filepath)
#: NetCDF dataset
self._dataset = None
try:
self._dataset = _thread_safe_nc.DatasetWrapper(
self.filepath, mode="w", format=netcdf_format
)
except RuntimeError:
dir_name = os.path.dirname(self.filepath)
if not os.path.isdir(dir_name):
msg = "No such file or directory: {}".format(dir_name)
raise IOError(msg)
if not os.access(dir_name, os.R_OK | os.W_OK):
msg = "Permission denied: {}".format(self.filepath)
raise IOError(msg)
else:
raise
self._dataset = None # this line just for the API page

# Detect if we were passed a pre-opened dataset (or something like one)
self._to_open_dataset = hasattr(filename, "createVariable")
if self._to_open_dataset:
# Given a dataset : derive instance filepath from the dataset
self._dataset = filename
self.filepath = self._dataset.filepath()
else:
# Given a filepath string/path : create a dataset there
try:
self.filepath = os.path.abspath(filename)
self._dataset = _thread_safe_nc.DatasetWrapper(
self.filepath, mode="w", format=netcdf_format
)
except RuntimeError:
dir_name = os.path.dirname(self.filepath)
if not os.path.isdir(dir_name):
msg = "No such file or directory: {}".format(dir_name)
raise IOError(msg)
if not os.access(dir_name, os.R_OK | os.W_OK):
msg = "Permission denied: {}".format(self.filepath)
raise IOError(msg)
else:
raise

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
"""Flush any buffered data to the CF-netCDF file before closing."""

self._dataset.sync()
self._dataset.close()
if self.compute:
self.complete()
if not self._to_open_dataset:
# Only close if the Saver created it.
self._dataset.close()
# Complete after closing, if required
if self.compute:
self.complete()

def write(
self,
Expand Down
50 changes: 41 additions & 9 deletions lib/iris/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ def decode_uri(uri, default="file"):
In addition to well-formed URIs, it also supports bare file paths as strings
or :class:`pathlib.PurePath`. Both Windows and UNIX style paths are
accepted.
It also supports 'bare objects', i.e. anything which is not a string.
These are identified with a scheme of 'data', and returned unchanged.
.. testsetup::
Expand All @@ -119,20 +121,31 @@ def decode_uri(uri, default="file"):
>>> print(decode_uri('dataZoo/...'))
('file', 'dataZoo/...')
>>> print(decode_uri({}))
('data', {})
"""
if isinstance(uri, pathlib.PurePath):
uri = str(uri)
# make sure scheme has at least 2 letters to avoid windows drives
# put - last in the brackets so it refers to the character, not a range
# reference on valid schemes: http://tools.ietf.org/html/std66#section-3.1
match = re.match(r"^([a-zA-Z][a-zA-Z0-9+.-]+):(.+)", uri)
if match:
scheme = match.group(1)
part = match.group(2)

if isinstance(uri, str):
# make sure scheme has at least 2 letters to avoid windows drives
# put - last in the brackets so it refers to the character, not a range
# reference on valid schemes: http://tools.ietf.org/html/std66#section-3.1
match = re.match(r"^([a-zA-Z][a-zA-Z0-9+.-]+):(.+)", uri)
if match:
scheme = match.group(1)
part = match.group(2)
else:
# Catch bare UNIX and Windows paths
scheme = default
part = uri
else:
# Catch bare UNIX and Windows paths
scheme = default
# We can pass things other than strings, like open files.
# These are simply identified as 'data objects'.
scheme = "data"
part = uri

return scheme, part


Expand Down Expand Up @@ -255,6 +268,25 @@ def load_http(urls, callback):
yield cube


def load_data_objects(urls, callback):
"""
Takes a list of data-source objects and a callback function, and returns a
generator of Cubes.
The 'objects' take the place of 'uris' in the load calls.
The appropriate types of the data-source objects are expected to be
recognised by the handlers : This is done in the usual way by passing the
context to the format picker to get a handler for each.
.. note::
Typically, this function should not be called directly; instead, the
intended interface for loading is :func:`iris.load`.
"""
# NOTE: this operation is currently *identical* to the http one.
yield from load_http(urls, callback)


def _dot_save(cube, target):
# A simple wrapper for `iris.fileformats.dot.save` which allows the
# saver to be registered without triggering the import of
Expand Down
19 changes: 19 additions & 0 deletions lib/iris/io/format_picker.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,22 @@ def get_element(self, basename, file_handle):
from iris.io import decode_uri

return decode_uri(basename)[0]


class DataSourceObjectProtocol(FileElement):
"""
A :class:`FileElement` that simply returns the URI entry itself.
This enables a arbitrary non-string data object to be passed, subject to
subsequent checks on the object itself (specified in the handler).
"""

def __init__(self):
super().__init__(requires_fh=False)

def get_element(self, basename, file_handle):
# In this context, there should *not* be a file opened by the handler.
# Just return 'basename', which in this case is not a name, or even a
# string, but a passed 'data object'.
return basename
Loading

0 comments on commit b7ac505

Please sign in to comment.