diff --git a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py index 319040b..449d487 100644 --- a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py +++ b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py @@ -12,6 +12,7 @@ class CollectionsQuery(CumulusBase): __collections_key = 'collections' + __providers_key = 'providers' __rules_key = 'rules' __stats_key = 'stats' __collection_id_key = 'collectionId' @@ -337,3 +338,48 @@ def query(self): LOGGER.exception('error during cumulus query') return {'server_error': str(e)} return {'results': [CollectionTransformer().to_stac(k) for k in query_result]} + + def create_provider(self, provider_name: str, s3_bucket: str, private_api_prefix: str): + # INSERT INTO providers (name, protocol, host) VALUES ('unity', 's3', 'https://dev.mdps.mcp.nasa.gov'); + # TODO : this fails + payload = { + 'httpMethod': 'POST', + 'resource': '/{proxy+}', + 'path': f'/{self.__providers_key}', + 'headers': { + 'Content-Type': 'application/json', + }, + 'body': json.dumps({ + "id": provider_name, + "host": s3_bucket, + "protocol": "s3", + # "port": 443, + "globalConnectionLimit": 1000, + # "maxDownloadTime": 300, + # "username": "na", + # "password": "na", + # "privateKey": "na", + # "cmKeyId": "na", + # "allowedRedirects": "na", + }) + } + LOGGER.debug(f'payload: {payload}') + try: + query_result = self._invoke_api(payload, private_api_prefix) + """ + {'statusCode': 500, 'body': '', 'headers': {}} + """ + if query_result['statusCode'] >= 500: + LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}') + return {'server_error': query_result} + if query_result['statusCode'] >= 400: + LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}') + return {'client_error': query_result} + query_result = json.loads(query_result['body']) + LOGGER.debug(f'json query_result: {query_result}') + if 'message' not in query_result: + return {'server_error': f'invalid response: {query_result}'} + except Exception as e: + LOGGER.exception('error while invoking') + return {'server_error': f'error while invoking:{str(e)}'} + return {'status': query_result['message']} diff --git a/cumulus_lambda_functions/granules_cnm_ingester/granules_cnm_ingester_logic.py b/cumulus_lambda_functions/granules_cnm_ingester/granules_cnm_ingester_logic.py index 054f757..e46e06f 100644 --- a/cumulus_lambda_functions/granules_cnm_ingester/granules_cnm_ingester_logic.py +++ b/cumulus_lambda_functions/granules_cnm_ingester/granules_cnm_ingester_logic.py @@ -1,5 +1,6 @@ import os import time +from concurrent.futures import ThreadPoolExecutor, as_completed from mdps_ds_lib.lib.aws.aws_message_transformers import AwsMessageTransformers from cumulus_lambda_functions.lib.uds_db.uds_collections import UdsCollections @@ -10,7 +11,6 @@ from mdps_ds_lib.lib.cumulus_stac.unity_collection_stac import UnityCollectionStac from cumulus_lambda_functions.uds_api.dapa.collections_dapa_creation import CollectionDapaCreation -from mdps_ds_lib.lib.cumulus_stac.item_transformer import ItemTransformer from pystac import ItemCollection, Item from mdps_ds_lib.lib.utils.file_utils import FileUtils from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator @@ -118,41 +118,58 @@ def extract_collection_id(self): if len(self.successful_features.items) < 1: LOGGER.error(f'not required to process. No Granules: {self.successful_features.to_dict(False)}') return - self.collection_id = self.successful_features.items[0].collection_id + self.collection_id = list(set([k.collection_id for k in self.successful_features.items])) return - def has_collection(self): - uds_collection_result = self.__uds_collection.get_collection(self.collection_id) + def has_collection(self, collection_id_custom=None): + collection_id_custom = collection_id_custom if collection_id_custom is not None else self.collection_id + uds_collection_result = self.__uds_collection.get_collection(collection_id_custom) return len(uds_collection_result) > 0 - def create_collection(self): + def create_one_collection(self, collection_id): + try: + if collection_id is None: + raise RuntimeError(f'NULL collection_id') + if self.has_collection(collection_id): + LOGGER.debug(f'{collection_id} already exists. continuing..') + return {'status': 'success'} + # ref: https://github.com/unity-sds/unity-py/blob/0.4.0/unity_sds_client/services/data_service.py + dapa_collection = UnityCollectionStac() \ + .with_id(collection_id) \ + .with_graule_id_regex("^test_file.*$") \ + .with_granule_id_extraction_regex("(^test_file.*)(\\.nc|\\.nc\\.cas|\\.cmr\\.xml)") \ + .with_title(f'Collection: {collection_id}') \ + .with_process('stac') \ + .with_provider(self.__default_provider) \ + .add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'unknown_bucket', 'application/json', 'root') \ + .add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'protected', 'data', 'item') \ + .add_file_type("test_file01.nc.cas", "^test_file.*\\.nc.cas$", 'protected', 'metadata', 'item') \ + .add_file_type("test_file01.nc.cmr.xml", "^test_file.*\\.nc.cmr.xml$", 'protected', 'metadata', 'item') \ + .add_file_type("test_file01.nc.stac.json", "^test_file.*\\.nc.stac.json$", 'protected', 'metadata', + 'item') + + stac_collection = dapa_collection.start() + creation_result = CollectionDapaCreation(stac_collection).create() + if creation_result['statusCode'] >= 400: + raise RuntimeError( + f'failed to create collection: {collection_id}. details: {creation_result["body"]}') + time.sleep(3) # cool off period before checking DB + if not self.has_collection(collection_id): + LOGGER.error(f'missing collection. (failed to create): {collection_id}') + raise ValueError(f'missing collection. (failed to create): {collection_id}') + except Exception as e: + return {'status': 'error', 'details': str(e)} + return {'status': 'success'} + + def create_collection_async(self): if self.collection_id is None: raise RuntimeError(f'NULL collection_id') - if self.has_collection(): - LOGGER.debug(f'{self.collection_id} already exists. continuing..') - return - # ref: https://github.com/unity-sds/unity-py/blob/0.4.0/unity_sds_client/services/data_service.py - dapa_collection = UnityCollectionStac() \ - .with_id(self.collection_id) \ - .with_graule_id_regex("^test_file.*$") \ - .with_granule_id_extraction_regex("(^test_file.*)(\\.nc|\\.nc\\.cas|\\.cmr\\.xml)") \ - .with_title(f'Collection: {self.collection_id}') \ - .with_process('stac') \ - .with_provider(self.__default_provider) \ - .add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'unknown_bucket', 'application/json', 'root') \ - .add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'protected', 'data', 'item') \ - .add_file_type("test_file01.nc.cas", "^test_file.*\\.nc.cas$", 'protected', 'metadata', 'item') \ - .add_file_type("test_file01.nc.cmr.xml", "^test_file.*\\.nc.cmr.xml$", 'protected', 'metadata', 'item') \ - .add_file_type("test_file01.nc.stac.json", "^test_file.*\\.nc.stac.json$", 'protected', 'metadata', 'item') - - stac_collection = dapa_collection.start() - creation_result = CollectionDapaCreation(stac_collection).create() - if creation_result['statusCode'] >= 400: - raise RuntimeError(f'failed to create collection: {self.collection_id}. details: {creation_result["body"]}') - time.sleep(3) # cool off period before checking DB - if not self.has_collection(): - LOGGER.error(f'missing collection. (failed to create): {self.collection_id}') - raise ValueError(f'missing collection. (failed to create): {self.collection_id}') + with ThreadPoolExecutor() as executor: + futures = [executor.submit(self.create_one_collection, collection_id) for collection_id in self.collection_id] + results = [future.result() for future in as_completed(futures)] + errors = [k['details'] for k in results if k['status'] == 'error'] + if len(errors) > 0: + raise ValueError(f'error while creating collections: {errors}') return def send_cnm_msg(self): @@ -188,6 +205,6 @@ def start(self, event): self.load_successful_features_s3(s3_url) self.validate_granules() self.extract_collection_id() - self.create_collection() + self.create_collection_async() self.send_cnm_msg() return diff --git a/cumulus_lambda_functions/lib/uds_db/granules_db_index.py b/cumulus_lambda_functions/lib/uds_db/granules_db_index.py index dfcce86..f993f16 100644 --- a/cumulus_lambda_functions/lib/uds_db/granules_db_index.py +++ b/cumulus_lambda_functions/lib/uds_db/granules_db_index.py @@ -218,7 +218,13 @@ def add_entry(self, tenant: str, tenant_venue: str, json_body: dict, doc_id: str def dsl_search(self, tenant: str, tenant_venue: str, search_dsl: dict): read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip() - original_size = search_dsl['size'] + if 'sort' not in search_dsl: + search_result = self.__es.query(search_dsl, + querying_index=read_alias_name) if 'sort' in search_dsl else self.__es.query( + search_dsl, querying_index=read_alias_name) + LOGGER.debug(f'search_finished: {len(search_result["hits"]["hits"])}') + return search_result + original_size = search_dsl['size'] if 'size' in search_dsl else 20 result = [] duplicates = set([]) while len(result) < original_size: diff --git a/cumulus_lambda_functions/metadata_stac_generate_cmr/generate_cmr.py b/cumulus_lambda_functions/metadata_stac_generate_cmr/generate_cmr.py index 55a2f54..4c912e4 100644 --- a/cumulus_lambda_functions/metadata_stac_generate_cmr/generate_cmr.py +++ b/cumulus_lambda_functions/metadata_stac_generate_cmr/generate_cmr.py @@ -69,7 +69,7 @@ "required": [ "type" ], - "oneOf": [ + "anyOf": [ {"required": ["bucket", "key"]}, {"required": ["url_path"]} ], diff --git a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_cnm.py b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_cnm.py index 7dbcf53..9638188 100644 --- a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_cnm.py +++ b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_cnm.py @@ -212,7 +212,7 @@ def start(self): "version": '1.6.0', # TODO 'product': { 'name': each_granule['id'], - 'dataVersion': collection_id_version[1], + 'dataVersion': collection_id_version[1] if len(collection_id_version) > 1 else '', 'files': [self.__generate_cumulus_asset(v) for k, v in each_granule['assets'].items()], } } diff --git a/requirements.txt b/requirements.txt index cb02b89..181146b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,7 @@ jsonschema==4.23.0 jsonschema-specifications==2023.12.1 lark==0.12.0 mangum==0.18.0 -mdps-ds-lib==1.0.0 +mdps-ds-lib==1.0.0.dev3 pydantic==2.9.2 pydantic_core==2.23.4 pygeofilter==0.2.4 diff --git a/tests/cumulus_lambda_functions/cumulus_wrapper/test_query_collection.py b/tests/cumulus_lambda_functions/cumulus_wrapper/test_query_collection.py index 4389f7f..4ed51e0 100644 --- a/tests/cumulus_lambda_functions/cumulus_wrapper/test_query_collection.py +++ b/tests/cumulus_lambda_functions/cumulus_wrapper/test_query_collection.py @@ -180,3 +180,10 @@ def test_rules_04(self): print(json.dumps(rules, indent=4)) # self.assertTrue(False, rules) return + + def test_create_provider(self): + lambda_prefix = 'uds-sbx-cumulus' + collection_query = CollectionsQuery('NA', 'NA') + result = collection_query.create_provider('william-test2', 'uds-sbx-staging', lambda_prefix) + print(result) + return diff --git a/tests/integration_tests/test_stage_out_ingestion.py b/tests/integration_tests/test_stage_out_ingestion.py new file mode 100644 index 0000000..ca4ba83 --- /dev/null +++ b/tests/integration_tests/test_stage_out_ingestion.py @@ -0,0 +1,339 @@ +import base64 +import json +import os +import random +import tempfile +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime +from sys import argv +from time import sleep +from unittest import TestCase + +import requests +from dotenv import load_dotenv +from mdps_ds_lib.lib.aws.aws_s3 import AwsS3 +from mdps_ds_lib.lib.cognito_login.cognito_login import CognitoLogin +from mdps_ds_lib.lib.utils.file_utils import FileUtils +from mdps_ds_lib.lib.utils.time_utils import TimeUtils +from mdps_ds_lib.stage_in_out.upoad_granules_factory import UploadGranulesFactory +from pystac import Catalog, Asset, Link, ItemCollection, Item + + +class TestStageOutIngestion(TestCase): + def setUp(self) -> None: + super().setUp() + load_dotenv() + self._url_prefix = f'{os.environ.get("UNITY_URL")}/{os.environ.get("UNITY_STAGE", "sbx-uds-dapa")}' + self.cognito_login = CognitoLogin() \ + .with_client_id(os.environ.get('CLIENT_ID', '')) \ + .with_cognito_url(os.environ.get('COGNITO_URL', '')) \ + .with_verify_ssl(False) \ + .start(base64.standard_b64decode(os.environ.get('USERNAME')).decode(), + base64.standard_b64decode(os.environ.get('PASSWORD')).decode()) + self.bearer_token = self.cognito_login.token + self.stage = os.environ.get("UNITY_URL").split('/')[-1] + self.uds_url = f'{os.environ.get("UNITY_URL")}/{os.environ.get("UNITY_STAGE", "sbx-uds-dapa")}/' + self.custom_metadata_body = { + 'tag': {'type': 'keyword'}, + 'c_data1': {'type': 'long'}, + 'c_data2': {'type': 'boolean'}, + 'c_data3': {'type': 'keyword'}, + } + + self.tenant = 'UDS_LOCAL_TEST_3' # 'uds_local_test' # 'uds_sandbox' + self.tenant_venue = 'DEV' # 'DEV1' # 'dev' + self.collection_name = 'AAA' # 'uds_collection' # 'sbx_collection' + self.collection_version = '24.03.20.14.40'.replace('.', '') # '2402011200' + return + + def test_01_setup_permissions(self): + collection_url = f'{self._url_prefix}/admin/auth' + admin_add_body = { + "actions": ["READ", "CREATE"], + "resources": [f"URN:NASA:UNITY:{self.tenant}:{self.tenant_venue}:.*"], + "tenant": self.tenant, + "venue": self.tenant_venue, + "group_name": "Unity_Viewer" + } + s = requests.session() + s.trust_env = False + response = s.put(url=collection_url, headers={ + 'Authorization': f'Bearer {self.cognito_login.token}', + 'Content-Type': 'application/json', + }, verify=False, data=json.dumps(admin_add_body)) + self.assertEqual(response.status_code, 200, f'wrong status code: {response.text}') + response_json = response.content.decode() + print(response_json) + return + + def test_02_01_setup_custom_metadata_index(self): + post_url = f'{self._url_prefix}/admin/custom_metadata/{self.tenant}?venue={self.tenant_venue}' # MCP Dev + print(post_url) + headers = { + 'Authorization': f'Bearer {self.cognito_login.token}', + 'Content-Type': 'application/json', + } + query_result = requests.put(url=post_url, + headers=headers, + json=self.custom_metadata_body, + ) + self.assertEqual(query_result.status_code, 200, f'wrong status code. {query_result.text}') + response_json = query_result.content.decode() + print(response_json) + return + + def test_02_02_get_custom_metadata_fields(self): + temp_collection_id = f'URN:NASA:UNITY:{self.tenant}:{self.tenant_venue}:William' + post_url = f'{self._url_prefix}/collections/{temp_collection_id}/variables' # MCP Dev + print(post_url) + headers = { + 'Authorization': f'Bearer {self.cognito_login.token}', + } + query_result = requests.get(url=post_url, + headers=headers) + print(query_result.text) + self.assertEqual(query_result.status_code, 200, f'wrong status code. {query_result.text}') + self.assertEqual(json.loads(query_result.text), self.custom_metadata_body, f'wrong body') + return + + def test_03_upload_complete_catalog_role_as_key(self): + os.environ['VERIFY_SSL'] = 'FALSE' + os.environ['RESULT_PATH_PREFIX'] = '' + os.environ['PROJECT'] = self.tenant + os.environ['VENUE'] = self.tenant_venue + os.environ['STAGING_BUCKET'] = 'uds-sbx-cumulus-staging' + + os.environ['GRANULES_SEARCH_DOMAIN'] = 'UNITY' + # os.environ['GRANULES_UPLOAD_TYPE'] = 'UPLOAD_S3_BY_STAC_CATALOG' + # defaulted to this value + + if len(argv) > 1: + argv.pop(-1) + argv.append('UPLOAD') + + starting_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M') + with tempfile.TemporaryDirectory() as tmp_dir_name: + os.environ['OUTPUT_FILE'] = os.path.join(tmp_dir_name, 'some_output', 'output.json') + os.environ['UPLOAD_DIR'] = '' # not needed + os.environ['OUTPUT_DIRECTORY'] = os.path.join(tmp_dir_name, 'output_dir') + FileUtils.mk_dir_p(os.environ.get('OUTPUT_DIRECTORY')) + os.environ['CATALOG_FILE'] = os.path.join(tmp_dir_name, 'catalog.json') + total_files = 10 + # os.environ['PARALLEL_COUNT'] = str(total_files) + granules_dir = os.path.join(tmp_dir_name, 'some_granules') + FileUtils.mk_dir_p(granules_dir) + catalog = Catalog( + id='NA', + description='NA') + catalog.set_self_href(os.environ['CATALOG_FILE']) + + for i in range(1, total_files+1): + filename = f'test_file{i:02d}' + with open(os.path.join(granules_dir, f'{filename}.nc'), 'w') as ff: + ff.write('sample_file') + with open(os.path.join(granules_dir, f'{filename}.nc.cas'), 'w') as ff: + ff.write(''' + + + AggregateDir + snppatmsl1a + + + AutomaticQualityFlag + Passed + + + BuildId + v01.43.00 + + + CollectionLabel + L1AMw_nominal2 + + + DataGroup + sndr + + + EndDateTime + 2016-01-14T10:06:00.000Z + + + EndTAI93 + 726919569.000 + + + FileFormat + nc4 + + + FileLocation + /pge/out + + + Filename + SNDR.SNPP.ATMS.L1A.nominal2.02.nc + + + GranuleNumber + 101 + + + JobId + f163835c-9945-472f-bee2-2bc12673569f + + + ModelId + urn:npp:SnppAtmsL1a + + + NominalDate + 2016-01-14 + + + ProductName + SNDR.SNPP.ATMS.20160114T1000.m06.g101.L1A.L1AMw_nominal2.v03_15_00.D.201214135000.nc + + + ProductType + SNDR_SNPP_ATMS_L1A + + + ProductionDateTime + 2020-12-14T13:50:00.000Z + + + ProductionLocation + Sounder SIPS: JPL/Caltech (Dev) + + + ProductionLocationCode + D + + + RequestId + 1215 + + + StartDateTime + 2016-01-14T10:00:00.000Z + + + StartTAI93 + 726919209.000 + + + TaskId + 8c3ae101-8f7c-46c8-b5c6-63e7b6d3c8cd + + ''') + stac_item = Item(id=filename, + geometry={ + "type": "Point", + "coordinates": [0.0, 0.0] + }, + bbox=[0.0, 0.1, 0.1, 0.0], + datetime=TimeUtils().parse_from_unix(0, True).get_datetime_obj(), + properties={ + "start_datetime": "2016-01-31T18:00:00.009057Z", + "end_datetime": "2016-01-31T19:59:59.991043Z", + "created": "2016-02-01T02:45:59.639000Z", + "updated": "2022-03-23T15:48:21.578000Z", + "datetime": "2022-03-23T15:48:19.079000Z" + }, + href=os.path.join('some_granules', f'{filename}.nc.stac.json'), + collection=f'{self.collection_name}-{i:02d}', + assets={ + f'data': Asset(os.path.join('.', f'{filename}.nc'), title='test_file01.nc', roles=['data']), + f'metadata1': Asset(os.path.join('.', f'{filename}.nc.cas'), title='test_file01.nc.cas', roles=['metadata']), + f'metadata2': Asset(os.path.join('.', f'{filename}.nc.stac.json'), title='test_file01.nc.stac.json', roles=['metadata']), + }) + with open(os.path.join(granules_dir, f'{filename}.nc.stac.json'), 'w') as ff: + ff.write(json.dumps(stac_item.to_dict(False, False))) + catalog.add_link(Link('item', os.path.join('some_granules', f'{filename}.nc.stac.json'), 'application/json')) + print(json.dumps(catalog.to_dict(False, False))) + with open(os.environ['CATALOG_FILE'], 'w') as ff: + ff.write(json.dumps(catalog.to_dict(False, False))) + + upload_result = UploadGranulesFactory().get_class(os.getenv('GRANULES_UPLOAD_TYPE', UploadGranulesFactory.UPLOAD_S3_BY_STAC_CATALOG)).upload() + upload_result = json.loads(upload_result) + print(upload_result) + """ + {'type': 'Catalog', 'id': 'NA', 'stac_version': '1.0.0', 'description': 'NA', 'links': [{'rel': 'root', 'href': '/var/folders/33/xhq97d6s0dq78wg4h2smw23m0000gq/T/tmprew515jo/catalog.json', 'type': 'application/json'}, {'rel': 'item', 'href': '/var/folders/33/xhq97d6s0dq78wg4h2smw23m0000gq/T/tmprew515jo/successful_features.json', 'type': 'application/json'}, {'rel': 'item', 'href': '/var/folders/33/xhq97d6s0dq78wg4h2smw23m0000gq/T/tmprew515jo/failed_features.json', 'type': 'application/json'}]} + """ + self.assertTrue('type' in upload_result, 'missing type') + self.assertEqual(upload_result['type'], 'Catalog', 'missing type') + upload_result = Catalog.from_dict(upload_result) + child_links = [k.href for k in upload_result.get_links(rel='item')] + self.assertEqual(len(child_links), 2, f'wrong length: {child_links}') + self.assertTrue(FileUtils.file_exist(child_links[0]), f'missing file: {child_links[0]}') + successful_feature_collection = ItemCollection.from_dict(FileUtils.read_json(child_links[0])) + successful_feature_collection = list(successful_feature_collection.items) + self.assertEqual(len(successful_feature_collection), total_files, f'wrong length: {successful_feature_collection}') + + self.assertTrue(FileUtils.file_exist(child_links[1]), f'missing file: {child_links[1]}') + failed_feature_collection = ItemCollection.from_dict(FileUtils.read_json(child_links[1])) + failed_feature_collection = list(failed_feature_collection.items) + self.assertEqual(len(failed_feature_collection), 0, f'wrong length: {failed_feature_collection}') + + upload_result = successful_feature_collection[0].to_dict(False, False) + print(f'example feature: {upload_result}') + self.assertTrue('assets' in upload_result, 'missing assets') + result_key = [k for k in upload_result['assets'].keys()][0] + print(f'result_key: {result_key}') + self.assertEqual(result_key, 'data', f'worng asset key: {result_key}') + self.assertTrue(f'metadata1' in upload_result['assets'], f'missing assets#metadata asset: metadata1') + self.assertTrue('href' in upload_result['assets'][f'metadata1'], 'missing assets#metadata__cas#href') + self.assertTrue(upload_result['assets'][f'metadata1']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/URN:NASA:UNITY:{os.environ["PROJECT"]}:{os.environ["VENUE"]}:{self.collection_name}')) + self.assertTrue(f'data' in upload_result['assets'], f'missing assets#data: data') + self.assertTrue('href' in upload_result['assets'][f'data'], 'missing assets#data#href') + self.assertTrue(upload_result['assets'][f'data']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/URN:NASA:UNITY:{os.environ["PROJECT"]}:{os.environ["VENUE"]}:{self.collection_name}')) + """ + Example output: + { + 'type': 'FeatureCollection', + 'features': [{ + 'type': 'Feature', + 'stac_version': '1.0.0', + 'id': 'NEW_COLLECTION_EXAMPLE_L1B___9:test_file01', + 'properties': {'start_datetime': '2016-01-31T18:00:00.009057Z', + 'end_datetime': '2016-01-31T19:59:59.991043Z', 'created': '2016-02-01T02:45:59.639000Z', + 'updated': '2022-03-23T15:48:21.578000Z', 'datetime': '1970-01-01T00:00:00Z'}, + 'geometry': {'type': 'Point', 'coordinates': [0.0, 0.0]}, 'links': [], + 'assets': {'data': { + 'href': 's3://uds-test-cumulus-staging/NEW_COLLECTION_EXAMPLE_L1B___9/NEW_COLLECTION_EXAMPLE_L1B___9:test_file01/test_file01.nc', + 'title': 'main data'}, 'metadata__cas': { + 'href': 's3://uds-test-cumulus-staging/NEW_COLLECTION_EXAMPLE_L1B___9/NEW_COLLECTION_EXAMPLE_L1B___9:test_file01/test_file01.nc.cas', + 'title': 'metadata cas'}, 'metadata__stac': { + 'href': 's3://uds-test-cumulus-staging/NEW_COLLECTION_EXAMPLE_L1B___9/NEW_COLLECTION_EXAMPLE_L1B___9:test_file01/test_file01.nc.stac.json', + 'title': 'metadata stac'}}, + 'bbox': [0.0, 0.0, 0.0, 0.0], + 'stac_extensions': [], + 'collection': 'NEW_COLLECTION_EXAMPLE_L1B___9'}]} + """ + s3 = AwsS3() + s3_keys = [k for k in s3.get_child_s3_files(os.environ['STAGING_BUCKET'], + f"stage_out/successful_features_{starting_time}", + )] + s3_keys = sorted(s3_keys) + print(f's3_keys: {s3_keys}') + self.assertTrue(len(s3_keys) > 0, f'empty files in S3') + local_file = s3.set_s3_url(f's3://{os.environ["STAGING_BUCKET"]}/{s3_keys[-1][0]}').download(tmp_dir_name) + successful_feature_collection = ItemCollection.from_dict(FileUtils.read_json(local_file)) + successful_feature_collection = list(successful_feature_collection.items) + self.assertEqual(len(successful_feature_collection), total_files, f'wrong length: {successful_feature_collection}') + return + + def test_single_granule_get(self): + post_url = f'{self.uds_url}collections/URN:NASA:UNITY:{self.tenant}:{self.tenant_venue}:{self.collection_name}-01___001/items/URN:NASA:UNITY:{self.tenant}:{self.tenant_venue}:{self.collection_name}-01___001:test_file01' + headers = { + 'Authorization': f'Bearer {self.bearer_token}', + } + print(post_url) + query_result = requests.get(url=post_url, + headers=headers, + ) + response_json = json.loads(query_result.text) + print(json.dumps(response_json, indent=4)) + self.assertEqual(query_result.status_code, 200, f'wrong status code. {query_result.text}') + return diff --git a/tests/integration_tests/test_uds_api.py b/tests/integration_tests/test_uds_api.py index 49bb96e..0580e5a 100644 --- a/tests/integration_tests/test_uds_api.py +++ b/tests/integration_tests/test_uds_api.py @@ -38,21 +38,19 @@ def setUp(self) -> None: self.tenant = 'UDS_LOCAL_TEST' # 'uds_local_test' # 'uds_sandbox' self.tenant_venue = 'DEV' # 'DEV1' # 'dev' - self.collection_name = 'SNDR-SNPP_ATMS@L1B$OUTPUT' # 'uds_collection' # 'sbx_collection' + self.collection_name = 'TEST2' # 'uds_collection' # 'sbx_collection' self.collection_version = '24.03.20.14.40'.replace('.', '') # '2402011200' return def test_add_admin_01(self): collection_url = f'{self.uds_url}admin/auth' - tenant, tenant_venue = 'uds_local_test', 'DEV1' - tenant, tenant_venue = 'MAIN_PROJECT', 'DEV' admin_add_body = { "actions": ["READ", "CREATE"], - "resources": [f"URN:NASA:UNITY:{tenant}:{tenant_venue}:.*"], - "tenant": tenant, + "resources": [f"URN:NASA:UNITY:{self.tenant}:{self.tenant_venue}:.*"], + "tenant": self.tenant, # "venue": f"DEV1-{int(datetime.utcnow().timestamp())}", - "venue": tenant_venue, + "venue": self.tenant_venue, "group_name": "Unity_Viewer" } s = requests.session() @@ -68,7 +66,8 @@ def test_add_admin_01(self): return def test_01_setup_custom_metadata_index(self): - post_url = f'{self.uds_url}admin/custom_metadata/MAIN_PROJECT?venue=DEV' # MCP Dev + post_url = f'{self.uds_url}admin/custom_metadata/{self.tenant}?venue={self.tenant_venue}' # MCP Dev + print(f'post_url: {post_url}') headers = { 'Authorization': f'Bearer {self.bearer_token}', 'Content-Type': 'application/json', @@ -89,7 +88,7 @@ def test_03_create_collection(self): 'Content-Type': 'application/json', } print(post_url) - temp_collection_id = 'URN:NASA:UNITY:MAIN_PROJECT:DEV:NEW_COLLECTION_EXAMPLE_L1B___9' + temp_collection_id = f'URN:NASA:UNITY:{self.tenant}:{self.tenant_venue}:{self.collection_name}___{self.collection_version}' dapa_collection = UnityCollectionStac() \ .with_id(temp_collection_id) \ .with_graule_id_regex("^test_file.*$") \ diff --git a/tf-module/unity-cumulus/granules_cnm_ingester.tf b/tf-module/unity-cumulus/granules_cnm_ingester.tf index dd27388..f240261 100644 --- a/tf-module/unity-cumulus/granules_cnm_ingester.tf +++ b/tf-module/unity-cumulus/granules_cnm_ingester.tf @@ -81,9 +81,9 @@ resource "aws_lambda_function" "granules_cnm_response_writer" { tags = var.tags } -data "aws_sns_topic" "granules_cnm_response_topic" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sns_topic.html - name = var.granules_cnm_response_topic -} +#data "aws_sns_topic" "granules_cnm_response_topic" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sns_topic.html +# name = var.granules_cnm_response_topic +#} module "granules_cnm_response_writer" { source = "./sqs--sns-lambda-connector" @@ -93,5 +93,6 @@ module "granules_cnm_response_writer" { lambda_processing_role_arn = var.lambda_processing_role_arn name = "granules_cnm_response_writer" prefix = var.prefix - sns_arn = data.aws_sns_topic.granules_cnm_response_topic.arn +# sns_arn = data.aws_sns_topic.granules_cnm_response_topic.arn + sns_arn = var.granules_cnm_response_topic } diff --git a/tf-module/unity-cumulus/sns_policy.json b/tf-module/unity-cumulus/sns_policy.json index 5ccb308..4595583 100644 --- a/tf-module/unity-cumulus/sns_policy.json +++ b/tf-module/unity-cumulus/sns_policy.json @@ -20,7 +20,7 @@ "Resource": "arn:aws:sns:${region}:${accountId}:${snsName}", "Condition": { "ArnLike": { - "aws:SourceArn": "arn:aws:s3:*:*:${s3Glob}" + "aws:SourceArn": ["arn:aws:s3:*:*:${s3Glob}"] } } } diff --git a/tf-module/unity-cumulus/sqs-sns.tf b/tf-module/unity-cumulus/sqs-sns.tf index bdef524..8f3e801 100644 --- a/tf-module/unity-cumulus/sqs-sns.tf +++ b/tf-module/unity-cumulus/sqs-sns.tf @@ -1,6 +1,6 @@ -data "aws_sns_topic" "report_granules_topic" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sns_topic.html - name = var.report_granules_topic -} +#data "aws_sns_topic" "report_granules_topic" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sns_topic.html +# name = var.report_granules_topic +#} module "granules_to_es" { source = "./sqs--sns-lambda-connector" @@ -10,5 +10,6 @@ module "granules_to_es" { lambda_processing_role_arn = var.lambda_processing_role_arn name = "granules_to_es" prefix = var.prefix - sns_arn = data.aws_sns_topic.report_granules_topic.arn +# sns_arn = data.aws_sns_topic.report_granules_topic.arn + sns_arn = var.report_granules_topic }