Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[localprocessing] processing.py updates #411

Merged
merged 10 commits into from
Apr 17, 2023
50 changes: 36 additions & 14 deletions openeo/local/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,36 @@
from pathlib import Path

from openeo_pg_parser_networkx import ProcessRegistry
from openeo_processes_dask.core import process as openeo_process
from openeo_processes_dask.process_implementations.core import process
import openeo_processes_dask.specs
import openeo_processes_dask.process_implementations
from openeo_pg_parser_networkx.process_registry import Process

standard_processes = [
func
for _, func in inspect.getmembers(
importlib.import_module("openeo_processes_dask.process_implementations"),
inspect.isfunction,
)
]
def init_process_registry():
process_registry = ProcessRegistry(wrap_funcs=[process])

PROCESS_REGISTRY = ProcessRegistry(wrap_funcs=[openeo_process])
# Import these pre-defined processes from openeo_processes_dask and register them into registry
processes_from_module = [
func
for _, func in inspect.getmembers(
openeo_processes_dask.process_implementations,
inspect.isfunction,
)
]

for func in standard_processes:
PROCESS_REGISTRY[func.__name__] = func
specs = {
func.__name__: getattr(openeo_processes_dask.specs, func.__name__)
for func in processes_from_module
}

for func in processes_from_module:
process_registry[func.__name__] = Process(
spec=specs[func.__name__], implementation=func
)
return process_registry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm missing something, I think this can be done more compactly with:

for name, func in inspect.getmembers(
    openeo_processes_dask.process_implementations,
    inspect.isfunction,
):
    process_registry[name] = Process(
        spec=getattr(openeo_processes.specs, name),
        implementation=func
    )

Also, wouldn't it be better that openeo_processes_dask provide helpers to list implementations/spec, instead of having to use ugly inspect.getmembers and getattr constructs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LukeWeidenwalker sorry if I tag you again, but I wasn't involved in this development of openeo-processes-dask so I couldn't tell the reason.

Copy link

@LukeWeidenwalker LukeWeidenwalker Apr 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah apologies, I missed this tag - yes, agree with Stefaan, this is something openeo-processes-dask could do better - have created an issue to do this at some point!
Open-EO/openeo-processes-dask#92


PROCESS_REGISTRY = init_process_registry()

#We need to define a custom `load_collection` process, used to load local netCDFs
_log = logging.getLogger(__name__)
def load_local_collection(*args, **kwargs):
pretty_args = {k: type(v) for k, v in kwargs.items()}
Expand All @@ -34,7 +48,15 @@ def load_local_collection(*args, **kwargs):
elif '.nc' in collection.suffixes:
data = xr.open_dataset(kwargs['id'],chunks={},decode_coords='all').to_array(dim='bands') # Add decode_coords='all' if the crs as a band gives some issues
elif '.tiff' in collection.suffixes or '.tif' in collection.suffixes:
data = rioxarray.open_rasterio(kwargs['id']).rename({'band':'bands'})
data = rioxarray.open_rasterio(kwargs['id'],chunks={},band_as_variable=True)
for d in data.data_vars:
descriptions = [v for k, v in data[d].attrs.items() if k.lower() == "description"]
if descriptions:
data = data.rename({d: descriptions[0]})
data = data.to_array(dim='bands')
return data

PROCESS_REGISTRY["load_collection"] = load_local_collection
PROCESS_REGISTRY["load_collection"] = Process(
spec=openeo_processes_dask.specs.load_collection,
implementation=load_local_collection,
)
5 changes: 5 additions & 0 deletions requirements-localprocessing.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Those requirements are for the experimental and optional client side processing feature
# that can be installed via pip install openeo[localprocessing].
# Please be aware that this functionality is not fully stable and may change quickly.
openeo-pg-parser-networkx==2023.3.1
openeo-processes-dask==2023.3.2
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
localprocessing_require = [
"rioxarray>=0.13.0",
"pyproj",
"openeo_pg_parser_networkx>=2023.1.2",
"openeo_processes_dask>=2023.3.0",
"openeo_pg_parser_networkx>=2023.3.1",
"openeo_processes_dask[implementations]>=2023.3.2",
]


Expand Down