Skip to content

Commit

Permalink
test: to_xarray using fsspec
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrunato committed Dec 17, 2024
1 parent d42f6d5 commit 2db1426
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 70 deletions.
11 changes: 9 additions & 2 deletions eodag_cube/api/product/_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def _get_storage_options(
auth = self.downloader_auth.authenticate() if self.downloader_auth else None

# order if product is offline
if self.properties["storageStatus"] == OFFLINE_STATUS and hasattr(
if self.properties.get("storageStatus") == OFFLINE_STATUS and hasattr(
self.downloader, "order"
):
self.downloader.order(self, auth, wait=wait, timeout=timeout)
Expand Down Expand Up @@ -351,14 +351,20 @@ def to_xarray(
"""
if asset_key is None and len(self.assets) > 0:
# assets

# have roles been set in assets ?
roles_exist = any("roles" in a for a in self.assets)

xd = XarrayDict()
with concurrent.futures.ThreadPoolExecutor() as executor:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
futures = (
executor.submit(self.to_xarray, key, wait, timeout, **xarray_kwargs)
for key, asset in self.assets.items()
if roles
and asset.get("roles")
and any(r in asset["roles"] for r in roles)
or not roles
or not roles_exist
)
for future in concurrent.futures.as_completed(futures):
try:
Expand All @@ -381,6 +387,7 @@ def to_xarray(
except (
UnsupportedDatasetAddressScheme,
FileNotFoundError,
IsADirectoryError,
DatasetCreationError,
) as e:
logger.debug(f"Cannot open {self} {asset_key if asset_key else ''}: {e}")
Expand Down
27 changes: 13 additions & 14 deletions eodag_cube/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,23 @@ def fsspec_file_headers(file: OpenFile) -> Optional[dict[str, Any]]:
:param file: fsspec https OpenFile
:returns: file headers or ``None``
"""
headers = None
file_kwargs = getattr(file, "kwargs", {})
if "https" in file.fs.protocol:
try:
resp = requests.head(file.path, **file.kwargs)
resp = requests.head(file.path, **file_kwargs)
resp.raise_for_status()
except requests.RequestException:
pass
else:
headers = resp.headers
if not headers:
# if HEAD method is not available, try to get a minimal part of the file
try:
resp = requests.get(file.path, stream=True, **file.kwargs)
resp.raise_for_status()
except requests.RequestException:
pass
else:
headers = resp.headers
return resp.headers
# if HEAD method is not available, try to get a minimal part of the file
try:
resp = requests.get(file.path, **file_kwargs)
resp.raise_for_status()
except requests.RequestException:
pass
else:
return resp.headers
return None


Expand All @@ -84,8 +83,8 @@ def fsspec_file_extension(file: OpenFile) -> Optional[str]:
Optional[str],
parse_header(content_disposition).get_param("filename", None),
)
_, extension = os.path.splitext(filename) if filename else None, None
if extension:
_, extension = os.path.splitext(filename) if filename else (None, None)
if not extension:
mime_type = headers.get("content-type", "").split(";")[0]
if mime_type not in IGNORED_MIMETYPES:
extension = guess_extension(mime_type)
Expand Down
8 changes: 4 additions & 4 deletions eodag_cube/utils/xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import fsspec
import rioxarray
import xarray as xr
from fsspec.implementations.local import LocalFileOpener

from eodag_cube.types import XarrayDict
from eodag_cube.utils import fsspec_file_extension
Expand Down Expand Up @@ -61,16 +60,17 @@ def try_open_dataset(file: OpenFile, **xarray_kwargs: dict[str, Any]) -> xr.Data
:param file: fsspec https OpenFile
:param xarray_kwargs: (optional) keyword arguments passed to xarray.open_dataset
:returns: opened xarray dataset
"""
LOCALFILE_ONLY_ENGINES = ["netcdf4", "cfgrib"]

if engine := xarray_kwargs.pop("engine", None):
all_engines = [
engine,
]
else:
all_engines = guess_engines(file) or list(xr.backends.list_engines().keys())

if isinstance(file, LocalFileOpener):
if "file" in file.fs.protocol:
engines = all_engines

# use path str as cfgrib does not support fsspec OpenFile as input
Expand All @@ -93,7 +93,7 @@ def try_open_dataset(file: OpenFile, **xarray_kwargs: dict[str, Any]) -> xr.Data
else:
# remove engines that do not support remote access
# https://tutorial.xarray.dev/intermediate/remote_data/remote-data.html#supported-format-read-from-buffers-remote-access
engines = [eng for eng in all_engines if eng not in ["netcdf4", "cfgrib"]]
engines = [eng for eng in all_engines if eng not in LOCALFILE_ONLY_ENGINES]

file_or_path = file

Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ line_length=88
known_first_party = eodag,tests
known_third_party = concurrent.futures
default_section = THIRDPARTY
ensure_newline_before_comments = True
skip =
.git,
__pycache__,
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"faker",
"coverage",
"moto >= 5",
"responses < 0.24.0",
"twine",
"wheel",
]
Expand Down
18 changes: 17 additions & 1 deletion tests/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,27 @@
from eodag_cube.api.product.drivers.generic import GenericDriver
from eodag_cube.api.product.drivers.sentinel2_l1c import Sentinel2L1C
from eodag_cube.api.product.drivers.stac_assets import StacAssets
from eodag_cube.utils import fsspec_file_headers, fsspec_file_extension
from eodag_cube.utils.exceptions import DatasetCreationError
from eodag_cube.utils.xarray import (
guess_engines,
try_open_dataset,
build_local_xarray_dict,
)
from eodag.plugins.authentication.base import Authentication
from eodag.plugins.authentication.aws_auth import AwsAuth
from eodag.plugins.authentication.header import HTTPHeaderAuth
from eodag.plugins.authentication.qsauth import HttpQueryStringAuth
from eodag.plugins.download.base import Download
from eodag.plugins.download.aws import AwsDownload
from eodag.utils import DEFAULT_PROJ, path_to_uri
from eodag.utils import (
DEFAULT_PROJ,
path_to_uri,
USER_AGENT,
DEFAULT_DOWNLOAD_TIMEOUT,
DEFAULT_DOWNLOAD_WAIT,
path_to_uri,
)
from eodag.utils.exceptions import (
AddressNotFound,
DownloadError,
Expand Down
Loading

0 comments on commit 2db1426

Please sign in to comment.