Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bridge support #5216

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions lib/iris/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ def callback(cube, field, filename):

"""

from collections.abc import Iterable
import contextlib
import glob
import importlib
import itertools
import os.path
import pathlib
import threading

import iris._constraints
Expand Down Expand Up @@ -256,7 +256,8 @@ def context(self, **kwargs):

def _generate_cubes(uris, callback, constraints):
"""Returns a generator of cubes given the URIs and a callback."""
if isinstance(uris, (str, pathlib.PurePath)):
if isinstance(uris, str) or not isinstance(uris, Iterable):
# Make a string, or other single item, into an iterable.
uris = [uris]

# Group collections of uris by their iris handler
Expand All @@ -273,6 +274,10 @@ def _generate_cubes(uris, callback, constraints):
urls = [":".join(x) for x in groups]
for cube in iris.io.load_http(urls, callback):
yield cube
elif scheme == "data":
data_objects = [x[1] for x in groups]
for cube in iris.io.load_data_objects(data_objects, callback):
yield cube
else:
raise ValueError("Iris cannot handle the URI scheme: %s" % scheme)

Expand Down
35 changes: 26 additions & 9 deletions lib/iris/fileformats/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

from iris.io.format_picker import (
DataSourceObjectProtocol,
FileExtension,
FormatAgent,
FormatSpecification,
Expand Down Expand Up @@ -125,16 +126,32 @@ def _load_grib(*args, **kwargs):
)


_nc_dap = FormatSpecification(
"NetCDF OPeNDAP",
UriProtocol(),
lambda protocol: protocol in ["http", "https"],
netcdf.load_cubes,
priority=6,
constraint_aware_handler=True,
FORMAT_AGENT.add_spec(
FormatSpecification(
"NetCDF OPeNDAP",
UriProtocol(),
lambda protocol: protocol in ["http", "https"],
netcdf.load_cubes,
priority=6,
constraint_aware_handler=True,
)
)
FORMAT_AGENT.add_spec(_nc_dap)
del _nc_dap

# NetCDF file presented as an open, readable netCDF4 dataset (or mimic).
FORMAT_AGENT.add_spec(
FormatSpecification(
"NetCDF dataset",
DataSourceObjectProtocol(),
lambda object: all(
hasattr(object, x)
for x in ("variables", "dimensions", "groups", "ncattrs")
),
netcdf.load_cubes, # using the same call : it must distinguish.
priority=4,
constraint_aware_handler=True,
)
)


#
# UM Fieldsfiles.
Expand Down
24 changes: 16 additions & 8 deletions lib/iris/fileformats/cf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,17 +1043,25 @@ class CFReader:
# TODO: remove once iris.experimental.ugrid.CFUGridReader is folded in.
CFGroup = CFGroup

def __init__(self, filename, warn=False, monotonic=False):
self._dataset = None
self._filename = os.path.expanduser(filename)
def __init__(self, file_source, warn=False, monotonic=False):
# Ensure safe operation for destructor, should init fail.
self._own_file = False
if isinstance(file_source, str):
# Create from filepath : open it + own it (=close when we die).
self._filename = os.path.expanduser(file_source)
self._dataset = _thread_safe_nc.DatasetWrapper(
self._filename, mode="r"
)
self._own_file = True
else:
# We have been passed an open dataset.
# We use it but don't own it (don't close it).
self._dataset = file_source
self._filename = self._dataset.filepath()

#: Collection of CF-netCDF variables associated with this netCDF file
self.cf_group = self.CFGroup()

self._dataset = _thread_safe_nc.DatasetWrapper(
self._filename, mode="r"
)

# Issue load optimisation warning.
if warn and self._dataset.file_format in [
"NETCDF3_CLASSIC",
Expand Down Expand Up @@ -1311,7 +1319,7 @@ def _reset(self):

def _close(self):
# Explicitly close dataset to prevent file remaining open.
if self._dataset is not None:
if self._own_file and self._dataset is not None:
self._dataset.close()
self._dataset = None

Expand Down
72 changes: 42 additions & 30 deletions lib/iris/fileformats/netcdf/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Also : `CF Conventions <https://cfconventions.org/>`_.

"""
from collections.abc import Iterable
import warnings

import numpy as np
Expand Down Expand Up @@ -174,25 +175,35 @@ def _get_actual_dtype(cf_var):


def _get_cf_var_data(cf_var, filename):
# Get lazy chunked data out of a cf variable.
dtype = _get_actual_dtype(cf_var)

# Create cube with deferred data, but no metadata
fill_value = getattr(
cf_var.cf_data,
"_FillValue",
_thread_safe_nc.default_fillvals[cf_var.dtype.str[1:]],
)
proxy = NetCDFDataProxy(
cf_var.shape, dtype, filename, cf_var.cf_name, fill_value
)
# Get the chunking specified for the variable : this is either a shape, or
# maybe the string "contiguous".
chunks = cf_var.cf_data.chunking()
# In the "contiguous" case, pass chunks=None to 'as_lazy_data'.
if chunks == "contiguous":
chunks = None
return as_lazy_data(proxy, chunks=chunks)
if hasattr(cf_var, "_in_memory_data"):
# The variable is not an actual netCDF4 file variable, but an emulating
# object with an attached data array (either numpy or dask), which can be
# returned immediately as-is. This is used as a hook to translate data to/from
# netcdf data container objects in other packages, such as xarray.
# See https://github.com/SciTools/iris/issues/4994 "Xarray bridge".
result = cf_var._in_memory_data
else:
# Get lazy chunked data out of a cf variable.
dtype = _get_actual_dtype(cf_var)

# Create cube with deferred data, but no metadata
fill_value = getattr(
cf_var.cf_data,
"_FillValue",
_thread_safe_nc.default_fillvals[cf_var.dtype.str[1:]],
)
proxy = NetCDFDataProxy(
cf_var.shape, dtype, filename, cf_var.cf_name, fill_value
)
# Get the chunking specified for the variable : this is either a shape, or
# maybe the string "contiguous".
chunks = cf_var.cf_data.chunking()
# In the "contiguous" case, pass chunks=None to 'as_lazy_data'.
if chunks == "contiguous":
chunks = None
result = as_lazy_data(proxy, chunks=chunks)

return result


class _OrderedAddableList(list):
Expand Down Expand Up @@ -456,14 +467,15 @@ def inner(cf_datavar):
return result


def load_cubes(filenames, callback=None, constraints=None):
def load_cubes(file_sources, callback=None, constraints=None):
"""
Loads cubes from a list of NetCDF filenames/OPeNDAP URLs.
Loads cubes from a list of NetCDF file_sources/OPeNDAP URLs.

Args:

* filenames (string/list):
One or more NetCDF filenames/OPeNDAP URLs to load from.
* file_sources (string/list):
One or more NetCDF file_sources/OPeNDAP URLs to load from.
OR open datasets.

Kwargs:

Expand Down Expand Up @@ -491,18 +503,18 @@ def load_cubes(filenames, callback=None, constraints=None):
# Create an actions engine.
engine = _actions_engine()

if isinstance(filenames, str):
filenames = [filenames]
if isinstance(file_sources, str) or not isinstance(file_sources, Iterable):
file_sources = [file_sources]

for filename in filenames:
# Ingest the netCDF file.
for file_source in file_sources:
# Ingest the file. At present may be a filepath or an open netCDF4.Dataset.
meshes = {}
if PARSE_UGRID_ON_LOAD:
cf_reader_class = CFUGridReader
else:
cf_reader_class = iris.fileformats.cf.CFReader

with cf_reader_class(filename) as cf:
with cf_reader_class(file_source) as cf:
if PARSE_UGRID_ON_LOAD:
meshes = _meshes_from_cf(cf)

Expand Down Expand Up @@ -536,7 +548,7 @@ def load_cubes(filenames, callback=None, constraints=None):
if mesh is not None:
mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var)

cube = _load_cube(engine, cf, cf_var, filename)
cube = _load_cube(engine, cf, cf_var, cf.filename)

# Attach the mesh (if present) to the cube.
for mesh_coord in mesh_coords:
Expand All @@ -550,7 +562,7 @@ def load_cubes(filenames, callback=None, constraints=None):
warnings.warn("{}".format(e))

# Perform any user registered callback function.
cube = run_callback(callback, cube, cf_var, filename)
cube = run_callback(callback, cube, cf_var, file_source)

# Callback mechanism may return None, which must not be yielded
if cube is None:
Expand Down
Loading