Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[localprocessing] processing.py updates #411

Merged
merged 10 commits into from
Apr 17, 2023
33 changes: 25 additions & 8 deletions openeo/local/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,31 @@
from pathlib import Path

from openeo_pg_parser_networkx import ProcessRegistry
from openeo_processes_dask.core import process as openeo_process
from openeo_processes_dask.process_implementations.core import process
from openeo_pg_parser_networkx.process_registry import Process

standard_processes = [
PROCESS_REGISTRY = ProcessRegistry(wrap_funcs=[process])

# Import these pre-defined processes from openeo_processes_dask and register them into registry
processes_from_module = [
func
for _, func in inspect.getmembers(
importlib.import_module("openeo_processes_dask.process_implementations"),
inspect.isfunction,
)
]

PROCESS_REGISTRY = ProcessRegistry(wrap_funcs=[openeo_process])
specs_module = importlib.import_module("openeo_processes_dask.specs")
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
specs = {
func.__name__: getattr(specs_module, func.__name__)
for func in processes_from_module
}

for func in standard_processes:
PROCESS_REGISTRY[func.__name__] = func
for func in processes_from_module:
PROCESS_REGISTRY[func.__name__] = Process(
spec=specs[func.__name__], implementation=func
)
clausmichele marked this conversation as resolved.
Show resolved Hide resolved

#We need to define a custom `load_collection` process, used to load local netCDFs
_log = logging.getLogger(__name__)
def load_local_collection(*args, **kwargs):
pretty_args = {k: type(v) for k, v in kwargs.items()}
Expand All @@ -34,7 +43,15 @@ def load_local_collection(*args, **kwargs):
elif '.nc' in collection.suffixes:
data = xr.open_dataset(kwargs['id'],chunks={},decode_coords='all').to_array(dim='bands') # Add decode_coords='all' if the crs as a band gives some issues
elif '.tiff' in collection.suffixes or '.tif' in collection.suffixes:
data = rioxarray.open_rasterio(kwargs['id']).rename({'band':'bands'})
data = rioxarray.open_rasterio(kwargs['id'],chunks={},band_as_variable=True)
for d in data.data_vars:
data_attrs_lowercase = [x.lower() for x in data[d].attrs]
data_attrs_original = [x for x in data[d].attrs]
data_attrs = dict(zip(data_attrs_lowercase,data_attrs_original))
if 'description' in data_attrs_lowercase:
data = data.rename({d:data[d].attrs[data_attrs['description']]})
clausmichele marked this conversation as resolved.
Show resolved Hide resolved
data = data.to_array(dim='bands')
return data

PROCESS_REGISTRY["load_collection"] = load_local_collection
from openeo_processes_dask.specs import load_collection as load_collection_spec
PROCESS_REGISTRY["load_collection"] = Process(spec=load_collection_spec, implementation=load_local_collection)
clausmichele marked this conversation as resolved.
Show resolved Hide resolved