Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

breaking: accept empty result_path_prefix + do not need to use filename as keys #50

Merged
merged 9 commits into from
Oct 28, 2024
28 changes: 22 additions & 6 deletions mdps_ds_lib/lib/cumulus_stac/granules_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,22 @@


class GranulesCatalog:
@staticmethod
def get_unity_formatted_collection_id(current_collection_id: str, project_venue_set: tuple):
if current_collection_id == '' or current_collection_id is None:
raise ValueError(f'NULL or EMPTY collection_id: {current_collection_id}')
collection_identifier_parts = current_collection_id.split(':')
if len(collection_identifier_parts) >= 6:
LOGGER.debug(f'current_collection_id is assumed to be in UNITY format: {current_collection_id}')
return current_collection_id

LOGGER.info(f'current_collection_id is not UNITY formatted ID: {current_collection_id}')
if project_venue_set[0] is None or project_venue_set[1] is None:
raise ValueError(f'missing project or venue in ENV which is needed due to current_collection_id not UNITY format: {project_venue_set}')
new_collection = f'URN:NASA:UNITY:{project_venue_set[0]}:{project_venue_set[1]}:{current_collection_id}'
LOGGER.info(f'UNITY formatted ID: {new_collection}')
return new_collection

def update_catalog(self, catalog_file_path: str, file_paths: list, rel_name: str = 'item'):
if not FileUtils.file_exist(catalog_file_path):
raise ValueError(f'missing file: {catalog_file_path}')
Expand Down Expand Up @@ -46,23 +62,23 @@ def extract_assets_href(self, granules_stac: Item, dir_name: str = '') -> dict:
self_dir = os.path.dirname(granules_stac.self_href)
except:
self_dir = None
assets = defaultdict(list)
assets = defaultdict(dict)
for k, v in granules_stac.get_assets().items():
href = v.href
if v.roles is None or len(v.roles) < 1:
LOGGER.warning(f'asset do not have roles: {v}')
continue
k = v.roles[0]
role_key = v.roles[0]
if not FileUtils.is_relative_path(href):
assets[k].append(href)
assets[role_key][k] = href
continue
if dir_name is not None and len(dir_name) > 0:
assets[k].append(os.path.join(dir_name, href))
assets[role_key][k] = os.path.join(dir_name, href)
continue
if self_dir is not None and len(self_dir) > 0:
assets[k].append(os.path.join(self_dir, href))
assets[role_key][k] = os.path.join(self_dir, href)
continue
assets[k].append(href)
assets[role_key][k] = href
return assets

def update_assets_href(self, granules_stac: Item, new_assets: dict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def upload(self, **kwargs) -> str:
:return:
"""
self._set_props_from_env()
if self._collection_id is None:
raise ValueError(f'missing COLLECTION ID in ENV')
output_dir = os.environ.get(self.OUTPUT_DIRECTORY)
if not FileUtils.dir_exist(output_dir):
raise ValueError(f'OUTPUT_DIRECTORY: {output_dir} does not exist')
Expand Down
12 changes: 10 additions & 2 deletions mdps_ds_lib/stage_in_out/upload_granules_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ class UploadGranulesAbstract(ABC):
RESULT_PATH_PREFIX = 'RESULT_PATH_PREFIX' # s3 prefix
DEFAULT_RESULT_PATH_PREFIX = 'stage_out' # default s3 prefix
OUTPUT_DIRECTORY = 'OUTPUT_DIRECTORY' # To store successful & failed features json
COLLECTION_ID_KEY = 'COLLECTION_ID' # Need this
COLLECTION_ID_KEY = 'COLLECTION_ID' # Need this only for arbitrary upload
PROJECT_KEY = 'PROJECT' # Need this only for process stageout
VENUE_KEY = 'VENUE' # Need this only for process stageout
STAGING_BUCKET_KEY = 'STAGING_BUCKET' # S3 Bucket
VERIFY_SSL_KEY = 'VERIFY_SSL'
DELETE_FILES_KEY = 'DELETE_FILES'

def __init__(self) -> None:
super().__init__()
self._collection_id = ''
self._project = ''
self._venue = ''
self._staging_bucket = ''
self._result_path_prefix = ''
self._parallel_count = int(os.environ.get(Constants.PARALLEL_COUNT, '-1'))
Expand All @@ -40,13 +44,17 @@ def __init__(self) -> None:
self._delete_files = False

def _set_props_from_env(self):
missing_keys = [k for k in [self.COLLECTION_ID_KEY, self.STAGING_BUCKET_KEY] if k not in os.environ]
missing_keys = [k for k in [self.STAGING_BUCKET_KEY] if k not in os.environ]
if len(missing_keys) > 0:
raise ValueError(f'missing environment keys: {missing_keys}')

self._collection_id = os.environ.get(self.COLLECTION_ID_KEY)
self._project = os.environ.get(self.PROJECT_KEY)
self._venue = os.environ.get(self.VENUE_KEY)
self._staging_bucket = os.environ.get(self.STAGING_BUCKET_KEY)
self._result_path_prefix = os.environ.get(self.RESULT_PATH_PREFIX, self.DEFAULT_RESULT_PATH_PREFIX)
if self._result_path_prefix is None or self._result_path_prefix.strip() == '':
self._result_path_prefix = self.DEFAULT_RESULT_PATH_PREFIX
self._result_path_prefix = self._result_path_prefix[:-1] if self._result_path_prefix.endswith('/') else self._result_path_prefix
self._result_path_prefix = self._result_path_prefix[1:] if self._result_path_prefix.startswith('/') else self._result_path_prefix

Expand Down
36 changes: 20 additions & 16 deletions mdps_ds_lib/stage_in_out/upload_granules_by_complete_catalog_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from mdps_ds_lib.lib.utils.file_utils import FileUtils

from pystac import ItemCollection
from pystac import ItemCollection, Item

from mdps_ds_lib.lib.cumulus_stac.granules_catalog import GranulesCatalog
from mdps_ds_lib.lib.processing_jobs.job_executor_abstract import JobExecutorAbstract
Expand All @@ -22,9 +22,9 @@


class UploadItemExecutor(JobExecutorAbstract):
def __init__(self, result_list, error_list, collection_id, staging_bucket, retry_wait_time_sec, retry_times, delete_files: bool) -> None:
def __init__(self, result_list, error_list, project_venue_set, staging_bucket, retry_wait_time_sec, retry_times, delete_files: bool) -> None:
super().__init__()
self.__collection_id = collection_id
self.__project_venue_set = project_venue_set
self.__staging_bucket = staging_bucket
self.__delete_files = delete_files

Expand All @@ -48,10 +48,13 @@ def validate_job(self, job_obj):
# return

def execute_job(self, each_child, lock) -> bool:
current_granule_stac = self.__gc.get_granules_item(each_child)
current_granule_stac: Item = self.__gc.get_granules_item(each_child)
current_collection_id = current_granule_stac.collection_id.strip()
try:
current_collection_id = GranulesCatalog.get_unity_formatted_collection_id(current_collection_id, self.__project_venue_set)
LOGGER.debug(f'reformatted current_collection_id: {current_collection_id}')
current_granules_dir = os.path.dirname(each_child)
current_assets = self.__gc.extract_assets_href(current_granule_stac, current_granules_dir)
current_assets = self.__gc.extract_assets_href(current_granule_stac, current_granules_dir) # returns defaultdict(list)
if 'data' not in current_assets: # this is still ok .coz extract_assets_href is {'data': [url1, url2], ...}
LOGGER.warning(f'skipping {each_child}. no data in {current_assets}')
current_granule_stac.properties['upload_error'] = f'missing "data" in assets'
Expand All @@ -63,21 +66,21 @@ def execute_job(self, each_child, lock) -> bool:
updating_assets = {}
uploading_current_granule_stac = None
for asset_type, asset_hrefs in current_assets.items():
for each_asset_href in asset_hrefs:
LOGGER.audit(f'uploading {asset_type}: {each_asset_href}')
s3_url = self.__s3.upload(each_asset_href, self.__staging_bucket,
f'{self.__collection_id}/{self.__collection_id}:{current_granule_id}',
for asset_name, asset_href in asset_hrefs.items():
LOGGER.audit(f'uploading type={asset_type}, name={asset_name}, href={asset_href}')
s3_url = self.__s3.upload(asset_href, self.__staging_bucket,
f'{current_collection_id}/{current_collection_id}:{current_granule_id}',
self.__delete_files)
if each_asset_href == each_child:
if asset_href == each_child:
uploading_current_granule_stac = s3_url
updating_assets[os.path.basename(s3_url)] = s3_url
updating_assets[asset_name] = s3_url
self.__gc.update_assets_href(current_granule_stac, updating_assets)
current_granule_stac.id = current_granule_id
current_granule_stac.collection_id = self.__collection_id
current_granule_stac.collection_id = current_collection_id
if uploading_current_granule_stac is not None: # upload metadata file again
self.__s3.set_s3_url(uploading_current_granule_stac)
self.__s3.upload_bytes(json.dumps(current_granule_stac.to_dict(False, False)).encode())
current_granule_stac.id = f'{self.__collection_id}:{current_granule_id}'
current_granule_stac.id = f'{current_collection_id}:{current_granule_id}'
self.__result_list.put(current_granule_stac.to_dict(False, False))
except Exception as e:
current_granule_stac.properties['upload_error'] = str(e)
Expand Down Expand Up @@ -110,10 +113,11 @@ def upload(self, **kwargs) -> str:
for each_child in child_links:
job_manager_props.memory_job_dict[each_child] = each_child

project_venue_set = (self._project, self._venue)
# https://www.infoworld.com/article/3542595/6-python-libraries-for-parallel-processing.html
multithread_processor_props = MultiThreadProcessorProps(self._parallel_count)
multithread_processor_props.job_manager = JobManagerMemory(job_manager_props)
multithread_processor_props.job_executor = UploadItemExecutor(local_items, error_list, self._collection_id, self._staging_bucket, self._retry_wait_time_sec, self._retry_times, self._delete_files)
multithread_processor_props.job_executor = UploadItemExecutor(local_items, error_list, project_venue_set, self._staging_bucket, self._retry_wait_time_sec, self._retry_times, self._delete_files)
multithread_processor = MultiThreadProcessor(multithread_processor_props)
multithread_processor.start()

Expand All @@ -130,12 +134,12 @@ def upload(self, **kwargs) -> str:
failed_item_collections = ItemCollection(items=errors)
successful_features_file = os.path.join(output_dir, 'successful_features.json')



failed_features_file = os.path.join(output_dir, 'failed_features.json')
LOGGER.debug(f'writing results: {successful_features_file} && {failed_features_file}')
FileUtils.write_json(successful_features_file, successful_item_collections.to_dict(False))
FileUtils.write_json(failed_features_file, failed_item_collections.to_dict(False))
if len(failed_item_collections.items) > 0:
LOGGER.fatal(f'One or more Failures: {failed_item_collections.to_dict(False)}')
s3_url = self.__s3.upload(successful_features_file, self._staging_bucket,
self._result_path_prefix,
s3_name=f'successful_features_{TimeUtils.get_current_time()}.json',
Expand Down
Loading
Loading