From 376074749b72842cfe32289f686808f9f5dccbe3 Mon Sep 17 00:00:00 2001 From: clausmichele Date: Mon, 3 Apr 2023 17:34:41 +0200 Subject: [PATCH 1/7] fix for newer packages --- openeo/local/processing.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/openeo/local/processing.py b/openeo/local/processing.py index 59c8838fd..c7d7314c6 100644 --- a/openeo/local/processing.py +++ b/openeo/local/processing.py @@ -6,9 +6,13 @@ from pathlib import Path from openeo_pg_parser_networkx import ProcessRegistry -from openeo_processes_dask.core import process as openeo_process +from openeo_processes_dask.process_implementations.core import process +from openeo_pg_parser_networkx.process_registry import Process -standard_processes = [ +PROCESS_REGISTRY = ProcessRegistry(wrap_funcs=[process]) + +# Import these pre-defined processes from openeo_processes_dask and register them into registry +processes_from_module = [ func for _, func in inspect.getmembers( importlib.import_module("openeo_processes_dask.process_implementations"), @@ -16,12 +20,17 @@ ) ] -PROCESS_REGISTRY = ProcessRegistry(wrap_funcs=[openeo_process]) +specs_module = importlib.import_module("openeo_processes_dask.specs") +specs = { + func.__name__: getattr(specs_module, func.__name__) + for func in processes_from_module +} -for func in standard_processes: - PROCESS_REGISTRY[func.__name__] = func +for func in processes_from_module: + PROCESS_REGISTRY[func.__name__] = Process( + spec=specs[func.__name__], implementation=func + ) -#We need to define a custom `load_collection` process, used to load local netCDFs _log = logging.getLogger(__name__) def load_local_collection(*args, **kwargs): pretty_args = {k: type(v) for k, v in kwargs.items()} @@ -37,4 +46,5 @@ def load_local_collection(*args, **kwargs): data = rioxarray.open_rasterio(kwargs['id']).rename({'band':'bands'}) return data -PROCESS_REGISTRY["load_collection"] = load_local_collection \ No newline at end of file +from openeo_processes_dask.specs import load_collection as load_collection_spec +PROCESS_REGISTRY["load_collection"] = Process(spec=load_collection_spec, implementation=load_local_collection) \ No newline at end of file From cfdcf412cee032ddc4229725c6a7ac56b4fe5bdb Mon Sep 17 00:00:00 2001 From: clausmichele Date: Mon, 3 Apr 2023 17:49:35 +0200 Subject: [PATCH 2/7] improved band names handling --- openeo/local/processing.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/openeo/local/processing.py b/openeo/local/processing.py index c7d7314c6..b9cc6f839 100644 --- a/openeo/local/processing.py +++ b/openeo/local/processing.py @@ -43,7 +43,14 @@ def load_local_collection(*args, **kwargs): elif '.nc' in collection.suffixes: data = xr.open_dataset(kwargs['id'],chunks={},decode_coords='all').to_array(dim='bands') # Add decode_coords='all' if the crs as a band gives some issues elif '.tiff' in collection.suffixes or '.tif' in collection.suffixes: - data = rioxarray.open_rasterio(kwargs['id']).rename({'band':'bands'}) + data = rioxarray.open_rasterio(kwargs['id'],chunks={},band_as_variable=True) + for d in data.data_vars: + data_attrs_lowercase = [x.lower() for x in data[d].attrs] + data_attrs_original = [x for x in data[d].attrs] + data_attrs = dict(zip(data_attrs_lowercase,data_attrs_original)) + if 'description' in data_attrs_lowercase: + data = data.rename({d:data[d].attrs[data_attrs['description']]}) + data = data.to_array(dim='bands') return data from openeo_processes_dask.specs import load_collection as load_collection_spec From 6ff39bb1aefed10f5bed8e448d0e9f2134afc020 Mon Sep 17 00:00:00 2001 From: clausmichele Date: Thu, 6 Apr 2023 16:38:11 +0200 Subject: [PATCH 3/7] Dependencies bump --- requirements.txt | 3 +++ setup.py | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..2d05fd1cf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +# Tested versions of specific packages used in openeo[localprocessing] +openeo-pg-parser-networkx==2023.3.1 +openeo-processes-dask==2023.3.2 \ No newline at end of file diff --git a/setup.py b/setup.py index 3fac7f8c3..90adc89ee 100644 --- a/setup.py +++ b/setup.py @@ -39,8 +39,8 @@ localprocessing_require = [ "rioxarray>=0.13.0", "pyproj", - "openeo_pg_parser_networkx>=2023.1.2", - "openeo_processes_dask>=2023.3.0", + "openeo_pg_parser_networkx>=2023.3.1", + "openeo_processes_dask[implementations]>=2023.3.2", ] From 51009b179ca172bca07e2d7cfb45dae16b51994c Mon Sep 17 00:00:00 2001 From: clausmichele Date: Thu, 6 Apr 2023 17:10:23 +0200 Subject: [PATCH 4/7] Code improvement --- openeo/local/processing.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/openeo/local/processing.py b/openeo/local/processing.py index b9cc6f839..5cbfd0e72 100644 --- a/openeo/local/processing.py +++ b/openeo/local/processing.py @@ -45,11 +45,9 @@ def load_local_collection(*args, **kwargs): elif '.tiff' in collection.suffixes or '.tif' in collection.suffixes: data = rioxarray.open_rasterio(kwargs['id'],chunks={},band_as_variable=True) for d in data.data_vars: - data_attrs_lowercase = [x.lower() for x in data[d].attrs] - data_attrs_original = [x for x in data[d].attrs] - data_attrs = dict(zip(data_attrs_lowercase,data_attrs_original)) - if 'description' in data_attrs_lowercase: - data = data.rename({d:data[d].attrs[data_attrs['description']]}) + descriptions = [v for k, v in data[d].attrs.items() if k.lower() == "description"] + if descriptions: + data = data.rename({d: descriptions[0]}) data = data.to_array(dim='bands') return data From 47e0afa71f98d08d79f34547449c4c15b81fdbf8 Mon Sep 17 00:00:00 2001 From: clausmichele Date: Thu, 6 Apr 2023 17:15:04 +0200 Subject: [PATCH 5/7] Renamed requirements file and improved --- requirements-localprocessing.txt | 5 +++++ requirements.txt | 3 --- 2 files changed, 5 insertions(+), 3 deletions(-) create mode 100644 requirements-localprocessing.txt delete mode 100644 requirements.txt diff --git a/requirements-localprocessing.txt b/requirements-localprocessing.txt new file mode 100644 index 000000000..02f1948cf --- /dev/null +++ b/requirements-localprocessing.txt @@ -0,0 +1,5 @@ +# Those requirements are for the experimental and optional client side processing feature +# that can be installed via pip install openeo[localprocessing]. +# Please be aware that this functionality is not fully stable and may change quickly. +openeo-pg-parser-networkx==2023.3.1 +openeo-processes-dask==2023.3.2 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 2d05fd1cf..000000000 --- a/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -# Tested versions of specific packages used in openeo[localprocessing] -openeo-pg-parser-networkx==2023.3.1 -openeo-processes-dask==2023.3.2 \ No newline at end of file From b35f7589ad1d701352f16edc9b96c2d5ef3d49a4 Mon Sep 17 00:00:00 2001 From: clausmichele Date: Tue, 11 Apr 2023 10:36:07 +0200 Subject: [PATCH 6/7] wrap process_registry init in function --- openeo/local/processing.py | 47 +++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/openeo/local/processing.py b/openeo/local/processing.py index 5cbfd0e72..e346007de 100644 --- a/openeo/local/processing.py +++ b/openeo/local/processing.py @@ -7,29 +7,34 @@ from openeo_pg_parser_networkx import ProcessRegistry from openeo_processes_dask.process_implementations.core import process +import openeo_processes_dask.specs +import openeo_processes_dask.process_implementations from openeo_pg_parser_networkx.process_registry import Process -PROCESS_REGISTRY = ProcessRegistry(wrap_funcs=[process]) - -# Import these pre-defined processes from openeo_processes_dask and register them into registry -processes_from_module = [ - func - for _, func in inspect.getmembers( - importlib.import_module("openeo_processes_dask.process_implementations"), - inspect.isfunction, - ) -] - -specs_module = importlib.import_module("openeo_processes_dask.specs") -specs = { - func.__name__: getattr(specs_module, func.__name__) - for func in processes_from_module -} - -for func in processes_from_module: - PROCESS_REGISTRY[func.__name__] = Process( - spec=specs[func.__name__], implementation=func - ) +def init_process_registry(): + process_registry = ProcessRegistry(wrap_funcs=[process]) + + # Import these pre-defined processes from openeo_processes_dask and register them into registry + processes_from_module = [ + func + for _, func in inspect.getmembers( + openeo_processes_dask.process_implementations, + inspect.isfunction, + ) + ] + + specs = { + func.__name__: getattr(openeo_processes_dask.specs, func.__name__) + for func in processes_from_module + } + + for func in processes_from_module: + process_registry[func.__name__] = Process( + spec=specs[func.__name__], implementation=func + ) + return process_registry + +PROCESS_REGISTRY = init_process_registry() _log = logging.getLogger(__name__) def load_local_collection(*args, **kwargs): From 89d91fec0ce8763d2c41bcf6380bd32f4922eaac Mon Sep 17 00:00:00 2001 From: clausmichele Date: Tue, 11 Apr 2023 14:14:02 +0200 Subject: [PATCH 7/7] Improved code from suggestion --- openeo/local/processing.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/openeo/local/processing.py b/openeo/local/processing.py index e346007de..0442a9b50 100644 --- a/openeo/local/processing.py +++ b/openeo/local/processing.py @@ -56,5 +56,7 @@ def load_local_collection(*args, **kwargs): data = data.to_array(dim='bands') return data -from openeo_processes_dask.specs import load_collection as load_collection_spec -PROCESS_REGISTRY["load_collection"] = Process(spec=load_collection_spec, implementation=load_local_collection) \ No newline at end of file +PROCESS_REGISTRY["load_collection"] = Process( + spec=openeo_processes_dask.specs.load_collection, + implementation=load_local_collection, +) \ No newline at end of file