diff --git a/openeo/local/processing.py b/openeo/local/processing.py index 59c8838fd..0442a9b50 100644 --- a/openeo/local/processing.py +++ b/openeo/local/processing.py @@ -6,22 +6,36 @@ from pathlib import Path from openeo_pg_parser_networkx import ProcessRegistry -from openeo_processes_dask.core import process as openeo_process +from openeo_processes_dask.process_implementations.core import process +import openeo_processes_dask.specs +import openeo_processes_dask.process_implementations +from openeo_pg_parser_networkx.process_registry import Process -standard_processes = [ - func - for _, func in inspect.getmembers( - importlib.import_module("openeo_processes_dask.process_implementations"), - inspect.isfunction, - ) -] +def init_process_registry(): + process_registry = ProcessRegistry(wrap_funcs=[process]) -PROCESS_REGISTRY = ProcessRegistry(wrap_funcs=[openeo_process]) + # Import these pre-defined processes from openeo_processes_dask and register them into registry + processes_from_module = [ + func + for _, func in inspect.getmembers( + openeo_processes_dask.process_implementations, + inspect.isfunction, + ) + ] -for func in standard_processes: - PROCESS_REGISTRY[func.__name__] = func + specs = { + func.__name__: getattr(openeo_processes_dask.specs, func.__name__) + for func in processes_from_module + } + + for func in processes_from_module: + process_registry[func.__name__] = Process( + spec=specs[func.__name__], implementation=func + ) + return process_registry + +PROCESS_REGISTRY = init_process_registry() -#We need to define a custom `load_collection` process, used to load local netCDFs _log = logging.getLogger(__name__) def load_local_collection(*args, **kwargs): pretty_args = {k: type(v) for k, v in kwargs.items()} @@ -34,7 +48,15 @@ def load_local_collection(*args, **kwargs): elif '.nc' in collection.suffixes: data = xr.open_dataset(kwargs['id'],chunks={},decode_coords='all').to_array(dim='bands') # Add decode_coords='all' if the crs as a band gives some issues elif '.tiff' in collection.suffixes or '.tif' in collection.suffixes: - data = rioxarray.open_rasterio(kwargs['id']).rename({'band':'bands'}) + data = rioxarray.open_rasterio(kwargs['id'],chunks={},band_as_variable=True) + for d in data.data_vars: + descriptions = [v for k, v in data[d].attrs.items() if k.lower() == "description"] + if descriptions: + data = data.rename({d: descriptions[0]}) + data = data.to_array(dim='bands') return data -PROCESS_REGISTRY["load_collection"] = load_local_collection \ No newline at end of file +PROCESS_REGISTRY["load_collection"] = Process( + spec=openeo_processes_dask.specs.load_collection, + implementation=load_local_collection, +) \ No newline at end of file diff --git a/requirements-localprocessing.txt b/requirements-localprocessing.txt new file mode 100644 index 000000000..02f1948cf --- /dev/null +++ b/requirements-localprocessing.txt @@ -0,0 +1,5 @@ +# Those requirements are for the experimental and optional client side processing feature +# that can be installed via pip install openeo[localprocessing]. +# Please be aware that this functionality is not fully stable and may change quickly. +openeo-pg-parser-networkx==2023.3.1 +openeo-processes-dask==2023.3.2 \ No newline at end of file diff --git a/setup.py b/setup.py index 3fac7f8c3..90adc89ee 100644 --- a/setup.py +++ b/setup.py @@ -39,8 +39,8 @@ localprocessing_require = [ "rioxarray>=0.13.0", "pyproj", - "openeo_pg_parser_networkx>=2023.1.2", - "openeo_processes_dask>=2023.3.0", + "openeo_pg_parser_networkx>=2023.3.1", + "openeo_processes_dask[implementations]>=2023.3.2", ]