Skip to content

Commit

Permalink
chore: describe with return types and new options
Browse files Browse the repository at this point in the history
fix: no typings
  • Loading branch information
renaudjester committed Oct 28, 2024
1 parent 5b4f545 commit 8202069
Show file tree
Hide file tree
Showing 34 changed files with 2,874 additions and 3,193 deletions.
11 changes: 11 additions & 0 deletions copernicusmarine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@
logging.Formatter.converter = time.gmtime

from copernicusmarine.catalogue_parser.models import (
CopernicusMarineCatalogue,
CopernicusMarineCoordinate,
CopernicusMarineDataset,
CopernicusMarinePart,
CopernicusMarineProduct,
CopernicusMarineService,
CopernicusMarineServiceFormat,
CopernicusMarineServiceNames,
CopernicusMarineVariable,
CopernicusMarineVersion,
CoperniusMarineServiceShortNames,
DatasetNotFound,
DatasetVersionNotFound,
DatasetVersionPartNotFound,
Expand Down
194 changes: 117 additions & 77 deletions copernicusmarine/catalogue_parser/catalogue_parser.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import logging
from dataclasses import dataclass
from enum import Enum
from itertools import groupby
from typing import Any, Optional
from typing import Any, Optional, Union

import pystac
from pydantic import BaseModel
from tqdm import tqdm

from copernicusmarine.catalogue_parser.models import (
CopernicusMarineCatalogue,
CopernicusMarineDataset,
CopernicusMarineProduct,
CopernicusMarineProductDataset,
DatasetIsNotPartOfTheProduct,
DatasetNotFound,
get_version_and_part_from_full_dataset_id,
)
Expand All @@ -19,7 +20,6 @@
)
from copernicusmarine.core_functions.utils import (
construct_query_params_for_marine_data_store_monitoring,
map_reject_none,
run_concurrently,
)

Expand Down Expand Up @@ -67,7 +67,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def get_dataset_metadata(
dataset_id: str, staging: bool
) -> Optional[CopernicusMarineProductDataset]:
) -> Optional[CopernicusMarineDataset]:
with CatalogParserConnection() as connection:
stac_url = (
MARINE_DATA_STORE_STAC_URL
Expand Down Expand Up @@ -145,7 +145,7 @@ def _parse_product_json_to_pystac_collection(

def _parse_and_sort_dataset_items(
dataset_items: list[pystac.Item],
) -> Optional[CopernicusMarineProductDataset]:
) -> Optional[CopernicusMarineDataset]:
"""
Return all dataset metadata parsed and sorted.
The first version and part are the default.
Expand All @@ -154,7 +154,7 @@ def _parse_and_sort_dataset_items(
dataset_id, _, _ = get_version_and_part_from_full_dataset_id(
dataset_item_example.id
)
dataset_part_version_merged = CopernicusMarineProductDataset(
dataset_part_version_merged = CopernicusMarineDataset(
dataset_id=dataset_id,
dataset_name=dataset_item_example.properties.get("title", dataset_id),
versions=[],
Expand Down Expand Up @@ -241,12 +241,15 @@ def fetch_dataset_items(
root_url: str,
connection: CatalogParserConnection,
collection: pystac.Collection,
force_dataset_id: Optional[str],
) -> list[pystac.Item]:
items = []
for link in collection.get_item_links():
if not link.owner:
logger.warning(f"Invalid Item, no owner for: {link.href}")
continue
if force_dataset_id and force_dataset_id not in link.href:
continue
url = root_url + "/" + link.owner.id + "/" + link.href
item_json = connection.get_json_file(url)
item = _parse_dataset_json_to_pystac_item(item_json)
Expand All @@ -259,11 +262,14 @@ def fetch_collection(
root_url: str,
connection: CatalogParserConnection,
url: str,
force_dataset_id: Optional[str],
) -> Optional[tuple[pystac.Collection, list[pystac.Item]]]:
json_collection = connection.get_json_file(url)
collection = _parse_product_json_to_pystac_collection(json_collection)
if collection:
items = fetch_dataset_items(root_url, connection, collection)
items = fetch_dataset_items(
root_url, connection, collection, force_dataset_id
)
return (collection, items)
return None

Expand All @@ -272,12 +278,18 @@ def fetch_product_items(
root_url: str,
connection: CatalogParserConnection,
child_links: list[pystac.Link],
force_product_id: Optional[str],
force_dataset_id: Optional[str],
max_concurrent_requests: int,
disable_progress_bar: bool,
) -> list[Optional[tuple[pystac.Collection, list[pystac.Item]]]]:
tasks = []
for link in child_links:
tasks.append((root_url, connection, link.absolute_href))
if force_product_id and force_product_id not in link.href:
continue
tasks.append(
(root_url, connection, link.absolute_href, force_dataset_id)
)
tdqm_bar_configuration = {
"desc": "Fetching products",
"disable": disable_progress_bar,
Expand All @@ -297,6 +309,8 @@ def fetch_product_items(

def fetch_all_products_items(
connection: CatalogParserConnection,
force_product_id: Optional[str],
force_dataset_id: Optional[str],
max_concurrent_requests: int,
staging: bool,
disable_progress_bar: bool,
Expand All @@ -319,25 +333,52 @@ def fetch_all_products_items(
root_url,
connection,
child_links,
force_product_id,
force_dataset_id,
max_concurrent_requests,
disable_progress_bar,
)
return childs


def parse_catalogue(
force_product_id: Optional[str],
force_dataset_id: Optional[str],
max_concurrent_requests: int,
disable_progress_bar: bool,
staging: bool = False,
) -> CopernicusMarineCatalogue:
logger.debug("Parsing catalogue...")
progress_bar = tqdm(
total=2, desc="Fetching catalog", disable=disable_progress_bar
total=2, desc="Fetching catalogue", disable=disable_progress_bar
)

with CatalogParserConnection() as connection:
if force_dataset_id:
root_url = (
MARINE_DATA_STORE_ROOT_METADATA_URL
if not staging
else MARINE_DATA_STORE_ROOT_METADATA_URL_STAGING
)
dataset_product_mapping_url = (
f"{root_url}/dataset_product_id_mapping.json"
)
product_id_from_mapping = connection.get_json_file(
dataset_product_mapping_url
).get(force_dataset_id)
if not product_id_from_mapping:
raise DatasetNotFound(force_dataset_id)
if (
force_product_id
and product_id_from_mapping != force_product_id
):
raise DatasetIsNotPartOfTheProduct(
force_dataset_id, force_product_id
)
force_product_id = product_id_from_mapping
marine_data_store_root_collections = fetch_all_products_items(
connection=connection,
force_product_id=force_product_id,
force_dataset_id=force_dataset_id,
max_concurrent_requests=max_concurrent_requests,
staging=staging,
disable_progress_bar=disable_progress_bar,
Expand Down Expand Up @@ -365,75 +406,74 @@ def parse_catalogue(
return full_catalog


@dataclass
class DistinctDatasetVersionPart:
dataset_id: str
dataset_version: str
dataset_part: str
layer_elements: list
raw_services: dict
stac_items_values: Optional[dict]


# ---------------------------------------
# --- Utils function on any catalogue ---
# --- Utils functions
# ---------------------------------------


def filter_catalogue_with_strings(
catalogue: CopernicusMarineCatalogue, tokens: list[str]
) -> dict[str, Any]:
return find_match_object(catalogue, tokens) or {}


def find_match_object(value: Any, tokens: list[str]) -> Any:
match: Any
if isinstance(value, str):
match = find_match_string(value, tokens)
elif isinstance(value, Enum):
match = find_match_enum(value, tokens)
elif isinstance(value, tuple):
match = find_match_tuple(value, tokens)
elif isinstance(value, list):
match = find_match_list(value, tokens)
elif hasattr(value, "__dict__"):
match = find_match_dict(value, tokens)
else:
match = None
return match


def find_match_string(string: str, tokens: list[str]) -> Optional[str]:
return string if any(token in string for token in tokens) else None


def find_match_enum(enum: Enum, tokens: list[str]) -> Any:
return find_match_object(enum.value, tokens)


def find_match_tuple(tuple: tuple, tokens: list[str]) -> Optional[list[Any]]:
return find_match_list(list(tuple), tokens)


def find_match_list(object_list: list[Any], tokens) -> Optional[list[Any]]:
def find_match(element: Any) -> Optional[Any]:
return find_match_object(element, tokens)

filtered_list: list[Any] = list(map_reject_none(find_match, object_list))
return filtered_list if filtered_list else None
def search_and_filter(
model: BaseModel, search_str: set[str]
) -> Union[BaseModel, None]:
filtered_fields = {}
search_str = {s.lower() for s in search_str}
for field, value in model:
if isinstance(value, BaseModel):
filtered_value = search_and_filter(value, search_str)
if filtered_value:
filtered_fields[field] = filtered_value

elif isinstance(value, list) or isinstance(value, tuple):
filtered_list = []
for item in value:
if isinstance(item, BaseModel):
filtered_item = search_and_filter(item, search_str)
if filtered_item:
filtered_list.append(filtered_item)
elif isinstance(item, str) and any(
s in item.lower() for s in search_str
):
filtered_list.append(item)

if filtered_list and isinstance(value, list):
filtered_fields[field] = filtered_list

if filtered_list and isinstance(value, tuple):
filtered_fields[field] = tuple(filtered_list)

elif isinstance(value, dict):
filtered_dict = {}
for key, val in value.items():
if isinstance(val, BaseModel):
filtered_val = search_and_filter(val, search_str)
if filtered_val:
filtered_dict[key] = filtered_val
elif isinstance(val, str) and any(
s in val.lower() for s in search_str
):
filtered_dict[key] = val

if filtered_dict:
filtered_fields[field] = filtered_dict

elif isinstance(value, Enum):
if any(s in value.name.lower() for s in search_str):
filtered_fields[field] = value

elif isinstance(value, str) and any(
s in value.lower() for s in search_str
):
filtered_fields[field] = value
if filtered_fields:
return model.model_copy(update=filtered_fields)
return None


def find_match_dict(
structure: dict[str, Any], tokens
) -> Optional[dict[str, Any]]:
filtered_dict = {
key: find_match_object(value, tokens)
for key, value in structure.__dict__.items()
if find_match_object(value, tokens)
}

found_match = any(filtered_dict.values())
if found_match:
new_dict = dict(structure.__dict__, **filtered_dict)
structure.__dict__ = new_dict
return structure if found_match else None
def filter_catalogue_with_strings(
catalogue: CopernicusMarineCatalogue, search_str: set[str]
) -> CopernicusMarineCatalogue:
filtered_models = []
for model in catalogue.products:
filtered_model = search_and_filter(model, search_str)
if filtered_model:
filtered_models.append(filtered_model)
return CopernicusMarineCatalogue(products=filtered_models)
Loading

0 comments on commit 8202069

Please sign in to comment.