diff --git a/mdps_ds_lib/lib/cumulus_stac/granules_catalog.py b/mdps_ds_lib/lib/cumulus_stac/granules_catalog.py index ae37281..7b55d24 100644 --- a/mdps_ds_lib/lib/cumulus_stac/granules_catalog.py +++ b/mdps_ds_lib/lib/cumulus_stac/granules_catalog.py @@ -9,6 +9,22 @@ class GranulesCatalog: + @staticmethod + def get_unity_formatted_collection_id(current_collection_id: str, project_venue_set: tuple): + if current_collection_id == '' or current_collection_id is None: + raise ValueError(f'NULL or EMPTY collection_id: {current_collection_id}') + collection_identifier_parts = current_collection_id.split(':') + if len(collection_identifier_parts) >= 6: + LOGGER.debug(f'current_collection_id is assumed to be in UNITY format: {current_collection_id}') + return current_collection_id + + LOGGER.info(f'current_collection_id is not UNITY formatted ID: {current_collection_id}') + if project_venue_set[0] is None or project_venue_set[1] is None: + raise ValueError(f'missing project or venue in ENV which is needed due to current_collection_id not UNITY format: {project_venue_set}') + new_collection = f'URN:NASA:UNITY:{project_venue_set[0]}:{project_venue_set[1]}:{current_collection_id}' + LOGGER.info(f'UNITY formatted ID: {new_collection}') + return new_collection + def update_catalog(self, catalog_file_path: str, file_paths: list, rel_name: str = 'item'): if not FileUtils.file_exist(catalog_file_path): raise ValueError(f'missing file: {catalog_file_path}') @@ -46,23 +62,23 @@ def extract_assets_href(self, granules_stac: Item, dir_name: str = '') -> dict: self_dir = os.path.dirname(granules_stac.self_href) except: self_dir = None - assets = defaultdict(list) + assets = defaultdict(dict) for k, v in granules_stac.get_assets().items(): href = v.href if v.roles is None or len(v.roles) < 1: LOGGER.warning(f'asset do not have roles: {v}') continue - k = v.roles[0] + role_key = v.roles[0] if not FileUtils.is_relative_path(href): - assets[k].append(href) + assets[role_key][k] = href continue if dir_name is not None and len(dir_name) > 0: - assets[k].append(os.path.join(dir_name, href)) + assets[role_key][k] = os.path.join(dir_name, href) continue if self_dir is not None and len(self_dir) > 0: - assets[k].append(os.path.join(self_dir, href)) + assets[role_key][k] = os.path.join(self_dir, href) continue - assets[k].append(href) + assets[role_key][k] = href return assets def update_assets_href(self, granules_stac: Item, new_assets: dict): diff --git a/mdps_ds_lib/stage_in_out/upload_arbitrary_files_as_granules.py b/mdps_ds_lib/stage_in_out/upload_arbitrary_files_as_granules.py index a545ce4..9b159be 100644 --- a/mdps_ds_lib/stage_in_out/upload_arbitrary_files_as_granules.py +++ b/mdps_ds_lib/stage_in_out/upload_arbitrary_files_as_granules.py @@ -111,6 +111,8 @@ def upload(self, **kwargs) -> str: :return: """ self._set_props_from_env() + if self._collection_id is None: + raise ValueError(f'missing COLLECTION ID in ENV') output_dir = os.environ.get(self.OUTPUT_DIRECTORY) if not FileUtils.dir_exist(output_dir): raise ValueError(f'OUTPUT_DIRECTORY: {output_dir} does not exist') diff --git a/mdps_ds_lib/stage_in_out/upload_granules_abstract.py b/mdps_ds_lib/stage_in_out/upload_granules_abstract.py index 5d803d0..3980405 100644 --- a/mdps_ds_lib/stage_in_out/upload_granules_abstract.py +++ b/mdps_ds_lib/stage_in_out/upload_granules_abstract.py @@ -23,7 +23,9 @@ class UploadGranulesAbstract(ABC): RESULT_PATH_PREFIX = 'RESULT_PATH_PREFIX' # s3 prefix DEFAULT_RESULT_PATH_PREFIX = 'stage_out' # default s3 prefix OUTPUT_DIRECTORY = 'OUTPUT_DIRECTORY' # To store successful & failed features json - COLLECTION_ID_KEY = 'COLLECTION_ID' # Need this + COLLECTION_ID_KEY = 'COLLECTION_ID' # Need this only for arbitrary upload + PROJECT_KEY = 'PROJECT' # Need this only for process stageout + VENUE_KEY = 'VENUE' # Need this only for process stageout STAGING_BUCKET_KEY = 'STAGING_BUCKET' # S3 Bucket VERIFY_SSL_KEY = 'VERIFY_SSL' DELETE_FILES_KEY = 'DELETE_FILES' @@ -31,6 +33,8 @@ class UploadGranulesAbstract(ABC): def __init__(self) -> None: super().__init__() self._collection_id = '' + self._project = '' + self._venue = '' self._staging_bucket = '' self._result_path_prefix = '' self._parallel_count = int(os.environ.get(Constants.PARALLEL_COUNT, '-1')) @@ -40,13 +44,17 @@ def __init__(self) -> None: self._delete_files = False def _set_props_from_env(self): - missing_keys = [k for k in [self.COLLECTION_ID_KEY, self.STAGING_BUCKET_KEY] if k not in os.environ] + missing_keys = [k for k in [self.STAGING_BUCKET_KEY] if k not in os.environ] if len(missing_keys) > 0: raise ValueError(f'missing environment keys: {missing_keys}') self._collection_id = os.environ.get(self.COLLECTION_ID_KEY) + self._project = os.environ.get(self.PROJECT_KEY) + self._venue = os.environ.get(self.VENUE_KEY) self._staging_bucket = os.environ.get(self.STAGING_BUCKET_KEY) self._result_path_prefix = os.environ.get(self.RESULT_PATH_PREFIX, self.DEFAULT_RESULT_PATH_PREFIX) + if self._result_path_prefix is None or self._result_path_prefix.strip() == '': + self._result_path_prefix = self.DEFAULT_RESULT_PATH_PREFIX self._result_path_prefix = self._result_path_prefix[:-1] if self._result_path_prefix.endswith('/') else self._result_path_prefix self._result_path_prefix = self._result_path_prefix[1:] if self._result_path_prefix.startswith('/') else self._result_path_prefix diff --git a/mdps_ds_lib/stage_in_out/upload_granules_by_complete_catalog_s3.py b/mdps_ds_lib/stage_in_out/upload_granules_by_complete_catalog_s3.py index 2f7d8e7..8f9bf20 100644 --- a/mdps_ds_lib/stage_in_out/upload_granules_by_complete_catalog_s3.py +++ b/mdps_ds_lib/stage_in_out/upload_granules_by_complete_catalog_s3.py @@ -5,7 +5,7 @@ from mdps_ds_lib.lib.utils.file_utils import FileUtils -from pystac import ItemCollection +from pystac import ItemCollection, Item from mdps_ds_lib.lib.cumulus_stac.granules_catalog import GranulesCatalog from mdps_ds_lib.lib.processing_jobs.job_executor_abstract import JobExecutorAbstract @@ -22,9 +22,9 @@ class UploadItemExecutor(JobExecutorAbstract): - def __init__(self, result_list, error_list, collection_id, staging_bucket, retry_wait_time_sec, retry_times, delete_files: bool) -> None: + def __init__(self, result_list, error_list, project_venue_set, staging_bucket, retry_wait_time_sec, retry_times, delete_files: bool) -> None: super().__init__() - self.__collection_id = collection_id + self.__project_venue_set = project_venue_set self.__staging_bucket = staging_bucket self.__delete_files = delete_files @@ -48,10 +48,13 @@ def validate_job(self, job_obj): # return def execute_job(self, each_child, lock) -> bool: - current_granule_stac = self.__gc.get_granules_item(each_child) + current_granule_stac: Item = self.__gc.get_granules_item(each_child) + current_collection_id = current_granule_stac.collection_id.strip() try: + current_collection_id = GranulesCatalog.get_unity_formatted_collection_id(current_collection_id, self.__project_venue_set) + LOGGER.debug(f'reformatted current_collection_id: {current_collection_id}') current_granules_dir = os.path.dirname(each_child) - current_assets = self.__gc.extract_assets_href(current_granule_stac, current_granules_dir) + current_assets = self.__gc.extract_assets_href(current_granule_stac, current_granules_dir) # returns defaultdict(list) if 'data' not in current_assets: # this is still ok .coz extract_assets_href is {'data': [url1, url2], ...} LOGGER.warning(f'skipping {each_child}. no data in {current_assets}') current_granule_stac.properties['upload_error'] = f'missing "data" in assets' @@ -63,21 +66,21 @@ def execute_job(self, each_child, lock) -> bool: updating_assets = {} uploading_current_granule_stac = None for asset_type, asset_hrefs in current_assets.items(): - for each_asset_href in asset_hrefs: - LOGGER.audit(f'uploading {asset_type}: {each_asset_href}') - s3_url = self.__s3.upload(each_asset_href, self.__staging_bucket, - f'{self.__collection_id}/{self.__collection_id}:{current_granule_id}', + for asset_name, asset_href in asset_hrefs.items(): + LOGGER.audit(f'uploading type={asset_type}, name={asset_name}, href={asset_href}') + s3_url = self.__s3.upload(asset_href, self.__staging_bucket, + f'{current_collection_id}/{current_collection_id}:{current_granule_id}', self.__delete_files) - if each_asset_href == each_child: + if asset_href == each_child: uploading_current_granule_stac = s3_url - updating_assets[os.path.basename(s3_url)] = s3_url + updating_assets[asset_name] = s3_url self.__gc.update_assets_href(current_granule_stac, updating_assets) current_granule_stac.id = current_granule_id - current_granule_stac.collection_id = self.__collection_id + current_granule_stac.collection_id = current_collection_id if uploading_current_granule_stac is not None: # upload metadata file again self.__s3.set_s3_url(uploading_current_granule_stac) self.__s3.upload_bytes(json.dumps(current_granule_stac.to_dict(False, False)).encode()) - current_granule_stac.id = f'{self.__collection_id}:{current_granule_id}' + current_granule_stac.id = f'{current_collection_id}:{current_granule_id}' self.__result_list.put(current_granule_stac.to_dict(False, False)) except Exception as e: current_granule_stac.properties['upload_error'] = str(e) @@ -110,10 +113,11 @@ def upload(self, **kwargs) -> str: for each_child in child_links: job_manager_props.memory_job_dict[each_child] = each_child + project_venue_set = (self._project, self._venue) # https://www.infoworld.com/article/3542595/6-python-libraries-for-parallel-processing.html multithread_processor_props = MultiThreadProcessorProps(self._parallel_count) multithread_processor_props.job_manager = JobManagerMemory(job_manager_props) - multithread_processor_props.job_executor = UploadItemExecutor(local_items, error_list, self._collection_id, self._staging_bucket, self._retry_wait_time_sec, self._retry_times, self._delete_files) + multithread_processor_props.job_executor = UploadItemExecutor(local_items, error_list, project_venue_set, self._staging_bucket, self._retry_wait_time_sec, self._retry_times, self._delete_files) multithread_processor = MultiThreadProcessor(multithread_processor_props) multithread_processor.start() @@ -130,12 +134,12 @@ def upload(self, **kwargs) -> str: failed_item_collections = ItemCollection(items=errors) successful_features_file = os.path.join(output_dir, 'successful_features.json') - - failed_features_file = os.path.join(output_dir, 'failed_features.json') LOGGER.debug(f'writing results: {successful_features_file} && {failed_features_file}') FileUtils.write_json(successful_features_file, successful_item_collections.to_dict(False)) FileUtils.write_json(failed_features_file, failed_item_collections.to_dict(False)) + if len(failed_item_collections.items) > 0: + LOGGER.fatal(f'One or more Failures: {failed_item_collections.to_dict(False)}') s3_url = self.__s3.upload(successful_features_file, self._staging_bucket, self._result_path_prefix, s3_name=f'successful_features_{TimeUtils.get_current_time()}.json', diff --git a/tests/integration_tests/test_docker_stage_out.py b/tests/integration_tests/test_docker_stage_out.py index a1b92b5..d23e1f2 100644 --- a/tests/integration_tests/test_docker_stage_out.py +++ b/tests/integration_tests/test_docker_stage_out.py @@ -1,4 +1,5 @@ import logging + logging.basicConfig(level=20, format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s") from datetime import datetime @@ -30,7 +31,7 @@ def setUp(self) -> None: self.tenant = 'UDS_MY_LOCAL_ARCHIVE_TEST' # 'uds_local_test' # 'uds_sandbox' self.tenant_venue = 'DEV' # 'DEV1' # 'dev' self.collection_name = 'UDS_UNIT_COLLECTION' # 'uds_collection' # 'sbx_collection' - self.collection_version = '24.08.29.09.00'.replace('.', '') # '2402011200' + self.collection_version = '24.10.21.12.00'.replace('.', '') # '2402011200' def not_in_used_test_03_upload(self): os.environ[Constants.USERNAME] = '/unity/uds/user/wphyo/username' @@ -335,7 +336,8 @@ def not_in_used_test_03_upload_catalog(self): def test_03_upload_complete_catalog(self): os.environ['VERIFY_SSL'] = 'FALSE' os.environ['RESULT_PATH_PREFIX'] = 'integration_test/stage_out' - os.environ['COLLECTION_ID'] = 'NEW_COLLECTION_EXAMPLE_L1B___9' + os.environ['PROJECT'] = 'LOCAL' + os.environ['VENUE'] = 'UNIT_TEST' os.environ['STAGING_BUCKET'] = 'uds-sbx-cumulus-staging' os.environ['GRANULES_SEARCH_DOMAIN'] = 'UNITY' @@ -477,7 +479,7 @@ def test_03_upload_complete_catalog(self): "datetime": "2022-03-23T15:48:19.079000Z" }, href=os.path.join('some_granules', f'{filename}.nc.stac.json'), - collection='NA', + collection=f'NA_{i:02d}', assets={ f'{filename}.nc': Asset(os.path.join('.', f'{filename}.nc'), title='test_file01.nc', roles=['data']), f'{filename}.nc.cas': Asset(os.path.join('.', f'{filename}.nc.cas'), title='test_file01.nc.cas', roles=['metadata']), @@ -519,10 +521,239 @@ def test_03_upload_complete_catalog(self): result_key_prefix = result_key.split('.')[0] self.assertTrue(f'{result_key_prefix}.nc.cas' in upload_result['assets'], f'missing assets#metadata asset: {result_key_prefix}.nc.cas') self.assertTrue('href' in upload_result['assets'][f'{result_key_prefix}.nc.cas'], 'missing assets#metadata__cas#href') - self.assertTrue(upload_result['assets'][f'{result_key_prefix}.nc.cas']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/{os.environ["COLLECTION_ID"]}/')) + self.assertTrue(upload_result['assets'][f'{result_key_prefix}.nc.cas']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/URN:NASA:UNITY:{os.environ["PROJECT"]}:{os.environ["VENUE"]}:NA_0')) self.assertTrue(f'{result_key_prefix}.nc' in upload_result['assets'], f'missing assets#data: {result_key_prefix}.nc') self.assertTrue('href' in upload_result['assets'][f'{result_key_prefix}.nc'], 'missing assets#data#href') - self.assertTrue(upload_result['assets'][f'{result_key_prefix}.nc']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/{os.environ["COLLECTION_ID"]}/')) + self.assertTrue(upload_result['assets'][f'{result_key_prefix}.nc']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/URN:NASA:UNITY:{os.environ["PROJECT"]}:{os.environ["VENUE"]}:NA_0')) + """ + Example output: + { + 'type': 'FeatureCollection', + 'features': [{ + 'type': 'Feature', + 'stac_version': '1.0.0', + 'id': 'NEW_COLLECTION_EXAMPLE_L1B___9:test_file01', + 'properties': {'start_datetime': '2016-01-31T18:00:00.009057Z', + 'end_datetime': '2016-01-31T19:59:59.991043Z', 'created': '2016-02-01T02:45:59.639000Z', + 'updated': '2022-03-23T15:48:21.578000Z', 'datetime': '1970-01-01T00:00:00Z'}, + 'geometry': {'type': 'Point', 'coordinates': [0.0, 0.0]}, 'links': [], + 'assets': {'data': { + 'href': 's3://uds-test-cumulus-staging/NEW_COLLECTION_EXAMPLE_L1B___9/NEW_COLLECTION_EXAMPLE_L1B___9:test_file01/test_file01.nc', + 'title': 'main data'}, 'metadata__cas': { + 'href': 's3://uds-test-cumulus-staging/NEW_COLLECTION_EXAMPLE_L1B___9/NEW_COLLECTION_EXAMPLE_L1B___9:test_file01/test_file01.nc.cas', + 'title': 'metadata cas'}, 'metadata__stac': { + 'href': 's3://uds-test-cumulus-staging/NEW_COLLECTION_EXAMPLE_L1B___9/NEW_COLLECTION_EXAMPLE_L1B___9:test_file01/test_file01.nc.stac.json', + 'title': 'metadata stac'}}, + 'bbox': [0.0, 0.0, 0.0, 0.0], + 'stac_extensions': [], + 'collection': 'NEW_COLLECTION_EXAMPLE_L1B___9'}]} + """ + s3 = AwsS3() + print(f'starting_time: {starting_time}') + s3_keys = [k for k in s3.get_child_s3_files(os.environ['STAGING_BUCKET'], + f"{os.environ['RESULT_PATH_PREFIX']}/successful_features_{starting_time}", + )] + s3_keys = sorted(s3_keys) + print(f's3_keys: {s3_keys}') + self.assertTrue(len(s3_keys) > 0, f'empty files in S3') + local_file = s3.set_s3_url(f's3://{os.environ["STAGING_BUCKET"]}/{s3_keys[-1][0]}').download(tmp_dir_name) + successful_feature_collection = ItemCollection.from_dict(FileUtils.read_json(local_file)) + successful_feature_collection = list(successful_feature_collection.items) + self.assertEqual(len(successful_feature_collection), total_files, f'wrong length: {successful_feature_collection}') + return + + def test_03_upload_complete_catalog_role_as_key(self): + os.environ['VERIFY_SSL'] = 'FALSE' + os.environ['RESULT_PATH_PREFIX'] = 'integration_test/stage_out' + os.environ['PROJECT'] = 'LOCAL' + os.environ['VENUE'] = 'UNIT_TEST' + os.environ['STAGING_BUCKET'] = 'uds-sbx-cumulus-staging' + + os.environ['GRANULES_SEARCH_DOMAIN'] = 'UNITY' + # os.environ['GRANULES_UPLOAD_TYPE'] = 'UPLOAD_S3_BY_STAC_CATALOG' + # defaulted to this value + + if len(argv) > 1: + argv.pop(-1) + argv.append('UPLOAD') + + starting_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M') + with tempfile.TemporaryDirectory() as tmp_dir_name: + os.environ['OUTPUT_FILE'] = os.path.join(tmp_dir_name, 'some_output', 'output.json') + os.environ['UPLOAD_DIR'] = '' # not needed + os.environ['OUTPUT_DIRECTORY'] = os.path.join(tmp_dir_name, 'output_dir') + FileUtils.mk_dir_p(os.environ.get('OUTPUT_DIRECTORY')) + os.environ['CATALOG_FILE'] = os.path.join(tmp_dir_name, 'catalog.json') + total_files = 10 + # os.environ['PARALLEL_COUNT'] = str(total_files) + granules_dir = os.path.join(tmp_dir_name, 'some_granules') + FileUtils.mk_dir_p(granules_dir) + catalog = Catalog( + id='NA', + description='NA') + catalog.set_self_href(os.environ['CATALOG_FILE']) + + for i in range(1, total_files+1): + filename = f'test_file{i:02d}' + with open(os.path.join(granules_dir, f'{filename}.nc'), 'w') as ff: + ff.write('sample_file') + with open(os.path.join(granules_dir, f'{filename}.nc.cas'), 'w') as ff: + ff.write(''' + + + AggregateDir + snppatmsl1a + + + AutomaticQualityFlag + Passed + + + BuildId + v01.43.00 + + + CollectionLabel + L1AMw_nominal2 + + + DataGroup + sndr + + + EndDateTime + 2016-01-14T10:06:00.000Z + + + EndTAI93 + 726919569.000 + + + FileFormat + nc4 + + + FileLocation + /pge/out + + + Filename + SNDR.SNPP.ATMS.L1A.nominal2.02.nc + + + GranuleNumber + 101 + + + JobId + f163835c-9945-472f-bee2-2bc12673569f + + + ModelId + urn:npp:SnppAtmsL1a + + + NominalDate + 2016-01-14 + + + ProductName + SNDR.SNPP.ATMS.20160114T1000.m06.g101.L1A.L1AMw_nominal2.v03_15_00.D.201214135000.nc + + + ProductType + SNDR_SNPP_ATMS_L1A + + + ProductionDateTime + 2020-12-14T13:50:00.000Z + + + ProductionLocation + Sounder SIPS: JPL/Caltech (Dev) + + + ProductionLocationCode + D + + + RequestId + 1215 + + + StartDateTime + 2016-01-14T10:00:00.000Z + + + StartTAI93 + 726919209.000 + + + TaskId + 8c3ae101-8f7c-46c8-b5c6-63e7b6d3c8cd + + ''') + stac_item = Item(id=filename, + geometry={ + "type": "Point", + "coordinates": [0.0, 0.0] + }, + bbox=[0.0, 0.0, 0.0, 0.0], + datetime=TimeUtils().parse_from_unix(0, True).get_datetime_obj(), + properties={ + "start_datetime": "2016-01-31T18:00:00.009057Z", + "end_datetime": "2016-01-31T19:59:59.991043Z", + "created": "2016-02-01T02:45:59.639000Z", + "updated": "2022-03-23T15:48:21.578000Z", + "datetime": "2022-03-23T15:48:19.079000Z" + }, + href=os.path.join('some_granules', f'{filename}.nc.stac.json'), + collection='NA', + assets={ + f'data': Asset(os.path.join('.', f'{filename}.nc'), title='test_file01.nc', roles=['data']), + f'metadata1': Asset(os.path.join('.', f'{filename}.nc.cas'), title='test_file01.nc.cas', roles=['metadata']), + f'metadata2': Asset(os.path.join('.', f'{filename}.nc.stac.json'), title='test_file01.nc.stac.json', roles=['metadata']), + }) + with open(os.path.join(granules_dir, f'{filename}.nc.stac.json'), 'w') as ff: + ff.write(json.dumps(stac_item.to_dict(False, False))) + catalog.add_link(Link('item', os.path.join('some_granules', f'{filename}.nc.stac.json'), 'application/json')) + print(json.dumps(catalog.to_dict(False, False))) + with open(os.environ['CATALOG_FILE'], 'w') as ff: + ff.write(json.dumps(catalog.to_dict(False, False))) + + upload_result = UploadGranulesFactory().get_class(os.getenv('GRANULES_UPLOAD_TYPE', UploadGranulesFactory.UPLOAD_S3_BY_STAC_CATALOG)).upload() + upload_result = json.loads(upload_result) + print(upload_result) + """ + {'type': 'Catalog', 'id': 'NA', 'stac_version': '1.0.0', 'description': 'NA', 'links': [{'rel': 'root', 'href': '/var/folders/33/xhq97d6s0dq78wg4h2smw23m0000gq/T/tmprew515jo/catalog.json', 'type': 'application/json'}, {'rel': 'item', 'href': '/var/folders/33/xhq97d6s0dq78wg4h2smw23m0000gq/T/tmprew515jo/successful_features.json', 'type': 'application/json'}, {'rel': 'item', 'href': '/var/folders/33/xhq97d6s0dq78wg4h2smw23m0000gq/T/tmprew515jo/failed_features.json', 'type': 'application/json'}]} + """ + self.assertTrue('type' in upload_result, 'missing type') + self.assertEqual(upload_result['type'], 'Catalog', 'missing type') + upload_result = Catalog.from_dict(upload_result) + child_links = [k.href for k in upload_result.get_links(rel='item')] + self.assertEqual(len(child_links), 2, f'wrong length: {child_links}') + self.assertTrue(FileUtils.file_exist(child_links[0]), f'missing file: {child_links[0]}') + successful_feature_collection = ItemCollection.from_dict(FileUtils.read_json(child_links[0])) + successful_feature_collection = list(successful_feature_collection.items) + self.assertEqual(len(successful_feature_collection), total_files, f'wrong length: {successful_feature_collection}') + + self.assertTrue(FileUtils.file_exist(child_links[1]), f'missing file: {child_links[1]}') + failed_feature_collection = ItemCollection.from_dict(FileUtils.read_json(child_links[1])) + failed_feature_collection = list(failed_feature_collection.items) + self.assertEqual(len(failed_feature_collection), 0, f'wrong length: {failed_feature_collection}') + + upload_result = successful_feature_collection[0].to_dict(False, False) + print(f'example feature: {upload_result}') + self.assertTrue('assets' in upload_result, 'missing assets') + result_key = [k for k in upload_result['assets'].keys()][0] + print(f'result_key: {result_key}') + self.assertEqual(result_key, 'data', f'worng asset key: {result_key}') + self.assertTrue(f'metadata1' in upload_result['assets'], f'missing assets#metadata asset: metadata1') + self.assertTrue('href' in upload_result['assets'][f'metadata1'], 'missing assets#metadata__cas#href') + self.assertTrue(upload_result['assets'][f'metadata1']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/URN:NASA:UNITY:{os.environ["PROJECT"]}:{os.environ["VENUE"]}:NA/')) + self.assertTrue(f'data' in upload_result['assets'], f'missing assets#data: data') + self.assertTrue('href' in upload_result['assets'][f'data'], 'missing assets#data#href') + self.assertTrue(upload_result['assets'][f'data']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/URN:NASA:UNITY:{os.environ["PROJECT"]}:{os.environ["VENUE"]}:NA/')) """ Example output: { @@ -561,7 +792,8 @@ def test_03_upload_complete_catalog(self): def test_03_02_upload_complete_catalog(self): os.environ['VERIFY_SSL'] = 'FALSE' - os.environ['COLLECTION_ID'] = 'NEW_COLLECTION_EXAMPLE_L1B___9' + os.environ['PROJECT'] = 'LOCAL' + os.environ['VENUE'] = 'UNIT_TEST' os.environ['STAGING_BUCKET'] = 'uds-sbx-cumulus-staging' os.environ['GRANULES_SEARCH_DOMAIN'] = 'UNITY' @@ -745,10 +977,10 @@ def test_03_02_upload_complete_catalog(self): result_key_prefix = result_key.split('.')[0] self.assertTrue(f'{result_key_prefix}.nc.cas' in upload_result['assets'], f'missing assets#metadata asset: {result_key_prefix}.nc.cas') self.assertTrue('href' in upload_result['assets'][f'{result_key_prefix}.nc.cas'], 'missing assets#metadata__cas#href') - self.assertTrue(upload_result['assets'][f'{result_key_prefix}.nc.cas']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/{os.environ["COLLECTION_ID"]}/')) + self.assertTrue(upload_result['assets'][f'{result_key_prefix}.nc.cas']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/URN:NASA:UNITY:{os.environ["PROJECT"]}:{os.environ["VENUE"]}:NA/')) self.assertTrue(f'{result_key_prefix}.nc' in upload_result['assets'], f'missing assets#data: {result_key_prefix}.nc') self.assertTrue('href' in upload_result['assets'][f'{result_key_prefix}.nc'], 'missing assets#data#href') - self.assertTrue(upload_result['assets'][f'{result_key_prefix}.nc']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/{os.environ["COLLECTION_ID"]}/')) + self.assertTrue(upload_result['assets'][f'{result_key_prefix}.nc']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/URN:NASA:UNITY:{os.environ["PROJECT"]}:{os.environ["VENUE"]}:NA/')) """ Example output: { @@ -791,7 +1023,7 @@ def test_03_03_upload_auxiliary_files(self): os.environ['COLLECTION_ID'] = temp_collection_id os.environ['STAGING_BUCKET'] = 'uds-sbx-cumulus-staging' os.environ['VERIFY_SSL'] = 'FALSE' - os.environ['RESULT_PATH_PREFIX'] = 'stage_out' + os.environ['RESULT_PATH_PREFIX'] = '' os.environ['PARALLEL_COUNT'] = '1' if len(argv) > 1: @@ -882,7 +1114,7 @@ def test_03_03_upload_auxiliary_files(self): """ s3 = AwsS3() s3_keys = [k for k in s3.get_child_s3_files(os.environ['STAGING_BUCKET'], - f"{os.environ['RESULT_PATH_PREFIX']}/successful_features_{starting_time}", + f"stage_out/successful_features_{starting_time}", )] s3_keys = sorted(s3_keys) print(f's3_keys: {s3_keys}') diff --git a/tests/mdps_ds_lib/lib/cumulus_stac/test_granules_catalog.py b/tests/mdps_ds_lib/lib/cumulus_stac/test_granules_catalog.py index 0e57f60..ef4c3c4 100644 --- a/tests/mdps_ds_lib/lib/cumulus_stac/test_granules_catalog.py +++ b/tests/mdps_ds_lib/lib/cumulus_stac/test_granules_catalog.py @@ -208,9 +208,10 @@ def test_extract_assets_href(self): pystac_catalog = gc.get_granules_item(granules_catalog_path) self.assertEqual(pystac_catalog.id, 'SNDR.SNPP.ATMS.L1A.nominal2.12') assets = gc.extract_assets_href(pystac_catalog) - expected_assets = {'data': ['s3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc'], - 'metadata__data': ['s3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas'], - 'metadata__cmr': ['s3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml']} + expected_assets = {'data': {'SNDR.SNPP.ATMS.L1A.nominal2.12.nc': 's3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc'}, + 'metadata__data': {'SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas': 's3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas'}, + 'metadata__cmr': {'SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml': 's3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml'} + } self.assertEqual(assets, expected_assets, 'wrong assets') return @@ -276,9 +277,9 @@ def test_extract_assets_relative_href_01(self): self.assertEqual(pystac_catalog.id, 'SNDR.SNPP.ATMS.L1A.nominal2.12') assets = gc.extract_assets_href(pystac_catalog) expected_assets = { - 'data': ['./SNDR.SNPP.ATMS.L1A.nominal2.12.nc'], - 'metadata__data': ['SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas'], - 'metadata__cmr': ['s3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml']} + 'data': {'SNDR.SNPP.ATMS.L1A.nominal2.12.nc': './SNDR.SNPP.ATMS.L1A.nominal2.12.nc'}, + 'metadata__data': {'SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas': 'SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas'}, + 'metadata__cmr': {'SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml': 's3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml'}} self.assertEqual(assets, expected_assets, 'wrong assets') return @@ -350,9 +351,10 @@ def test_extract_assets_relative_href_02(self): self.assertEqual(pystac_catalog.id, 'SNDR.SNPP.ATMS.L1A.nominal2.12') assets = gc.extract_assets_href(pystac_catalog, '/some/temp/directory/../hehe') expected_assets = { - 'data': ['/some/temp/directory/../hehe/./SNDR.SNPP.ATMS.L1A.nominal2.12.nc', 's3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.1.nc'], - 'metadata__data': ['/some/temp/directory/../hehe/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas'], - 'metadata__cmr': ['s3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml']} + 'data': {'SNDR.SNPP.ATMS.L1A.nominal2.12.nc': '/some/temp/directory/../hehe/./SNDR.SNPP.ATMS.L1A.nominal2.12.nc', + 'SNDR.SNPP.ATMS.L1A.nominal2.12.1.nc': 's3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.1.nc'}, + 'metadata__data': {'SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas': '/some/temp/directory/../hehe/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas'}, + 'metadata__cmr': {'SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml': 's3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml'}} self.assertEqual(assets, expected_assets, 'wrong assets') return @@ -428,9 +430,10 @@ def test_extract_assets_relative_href_03(self): self.assertEqual(pystac_catalog.id, 'SNDR.SNPP.ATMS.L1A.nominal2.12') assets = gc.extract_assets_href(pystac_catalog) expected_assets = { - 'data': ['/some/temp/directory/../hehe/./SNDR.SNPP.ATMS.L1A.nominal2.12.nc'], - 'metadata__data': ['/some/temp/directory/../hehe/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas', '/some/temp/directory/../hehe/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.2.cas'], - 'metadata__cmr': ['s3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml']} + 'data': {'SNDR.SNPP.ATMS.L1A.nominal2.12.nc': '/some/temp/directory/../hehe/./SNDR.SNPP.ATMS.L1A.nominal2.12.nc'}, + 'metadata__data': {'SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas': '/some/temp/directory/../hehe/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas', + 'SNDR.SNPP.ATMS.L1A.nominal2.12.nc.2.cas': '/some/temp/directory/../hehe/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.2.cas'}, + 'metadata__cmr': {'SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml': 's3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml'}} self.assertEqual(assets, expected_assets, 'wrong assets') return @@ -496,9 +499,9 @@ def test_update_assets_href(self): self.assertEqual(pystac_catalog.id, 'SNDR.SNPP.ATMS.L1A.nominal2.12') assets = gc.extract_assets_href(pystac_catalog) expected_assets = { - 'data': ['s3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc'], - 'metadata__data': ['s3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas'], - 'metadata__cmr': ['s3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml'] + 'data': {'SNDR.SNPP.ATMS.L1A.nominal2.12.nc': 's3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc'}, + 'metadata__data': {'SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas': 's3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas'}, + 'metadata__cmr': {'SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml': 's3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml'} } self.assertEqual(assets, expected_assets, 'wrong assets') updating_assets = { @@ -509,9 +512,96 @@ def test_update_assets_href(self): } updating_assets_result = { - 'data': ['file:///absolute/file/some/file/data'], - 'metadata__data': ['s3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas'], - 'metadata__cmr': ['s3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml'] + 'data': {'SNDR.SNPP.ATMS.L1A.nominal2.12.nc': 'file:///absolute/file/some/file/data'}, + 'metadata__data': {'SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas': 's3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas'}, + 'metadata__cmr': {'SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml': 's3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml'} + } + gc.update_assets_href(pystac_catalog, updating_assets) + updated_assets = gc.extract_assets_href(pystac_catalog) + self.assertEqual(updated_assets, updating_assets_result, 'wrong updated assets') + + return + + def test_update_assets_href_02(self): + sample_granules = { + "type": "Feature", + "stac_version": "1.0.0", + "id": "SNDR.SNPP.ATMS.L1A.nominal2.12", + "properties": { + "start_datetime": "2016-01-14T11:00:00Z", + "end_datetime": "2016-01-14T11:06:00Z", + "created": "2020-12-14T13:50:00Z", + "updated": "2022-08-15T06:26:25.344000Z", + "datetime": "2022-08-15T06:26:17.938000Z" + }, + "geometry": { + "type": "Point", + "coordinates": [ + 0.0, + 0.0 + ] + }, + "links": [ + { + "rel": "collection", + "href": "." + } + ], + "assets": { + "data1": { + "href": "s3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc", + "title": "SNDR.SNPP.ATMS.L1A.nominal2.12.nc", + "description": "SNDR.SNPP.ATMS.L1A.nominal2.12.nc", + "roles": ["data"], + }, + "metadata1": { + "href": "s3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas", + "title": "SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas", + "description": "SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas", + "roles": ["metadata"], + }, + "metadata2": { + "href": "s3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml", + "title": "SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml", + "description": "SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml", + "roles": ["metadata"], + } + }, + "bbox": [ + 0.0, + 0.0, + 0.0, + 0.0 + ], + "stac_extensions": [], + "collection": "SNDR_SNPP_ATMS_L1A___1" + } + with tempfile.TemporaryDirectory() as tmp_dir_name: + granules_catalog_path = os.path.join(tmp_dir_name, 'sample_granules.json') + FileUtils.write_json(granules_catalog_path, sample_granules) + gc = GranulesCatalog() + pystac_catalog = gc.get_granules_item(granules_catalog_path) + self.assertEqual(pystac_catalog.id, 'SNDR.SNPP.ATMS.L1A.nominal2.12') + assets = gc.extract_assets_href(pystac_catalog) + expected_assets = { + 'data': {'data1': 's3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc'}, + 'metadata': {'metadata1': 's3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas', + 'metadata2': 's3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml' + }, + } + self.assertEqual(assets, expected_assets, 'wrong assets') + updating_assets = { + 'data1': 'file:///absolute/file/some/file/data', + 'metadata1': 's3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas', + 'other.name': '/absolute/file/some/file/metadata__extra', + 'metadata2': 's3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml' + } + + updating_assets_result = { + 'data': {'data1': 'file:///absolute/file/some/file/data'}, + 'metadata': {'metadata1': 's3://uds-test-cumulus-protected/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.nc.cas', + 'metadata2': 's3://uds-test-cumulus-private/SNDR_SNPP_ATMS_L1A___1/SNDR.SNPP.ATMS.L1A.nominal2.12.cmr.xml' + }, } gc.update_assets_href(pystac_catalog, updating_assets) updated_assets = gc.extract_assets_href(pystac_catalog) @@ -822,3 +912,27 @@ def test_manual_validdate_stac(self): validation_result = stac_item.validate() return + def test_get_unity_formatted_collection_id(self): + with self.assertRaises(ValueError) as context: + GranulesCatalog.get_unity_formatted_collection_id(None, (None, None)) + self.assertTrue(str(context.exception).startswith('NULL or EMPTY collection_id')) + with self.assertRaises(ValueError) as context: + GranulesCatalog.get_unity_formatted_collection_id('', (None, None)) + self.assertTrue(str(context.exception).startswith('NULL or EMPTY collection_id')) + with self.assertRaises(ValueError) as context: + GranulesCatalog.get_unity_formatted_collection_id('NA', (None, None)) + self.assertTrue(str(context.exception).startswith('missing project or venue')) + with self.assertRaises(ValueError) as context: + GranulesCatalog.get_unity_formatted_collection_id('NA', (None, 'DEV')) + self.assertTrue(str(context.exception).startswith('missing project or venue')) + with self.assertRaises(ValueError) as context: + GranulesCatalog.get_unity_formatted_collection_id('NA', ('LOCAL', None)) + self.assertTrue(str(context.exception).startswith('missing project or venue')) + + result = GranulesCatalog.get_unity_formatted_collection_id('NA', ('LOCAL', 'DEV')) + self.assertEqual(result, 'URN:NASA:UNITY:LOCAL:DEV:NA', f'wrong collection id output') + + result = GranulesCatalog.get_unity_formatted_collection_id('URN:JPL:IDS:LOCAL1:DEV2:A:B:C:D:E:F:G', ('LOCAL', 'DEV')) + self.assertEqual(result, 'URN:JPL:IDS:LOCAL1:DEV2:A:B:C:D:E:F:G', f'wrong collection id output') + + return