Skip to content

Commit

Permalink
Merge a0cc79a into ad58a7d
Browse files Browse the repository at this point in the history
  • Loading branch information
wphyojpl authored Dec 3, 2024
2 parents ad58a7d + a0cc79a commit 9735c2f
Show file tree
Hide file tree
Showing 12 changed files with 468 additions and 52 deletions.
46 changes: 46 additions & 0 deletions cumulus_lambda_functions/cumulus_wrapper/query_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

class CollectionsQuery(CumulusBase):
__collections_key = 'collections'
__providers_key = 'providers'
__rules_key = 'rules'
__stats_key = 'stats'
__collection_id_key = 'collectionId'
Expand Down Expand Up @@ -337,3 +338,48 @@ def query(self):
LOGGER.exception('error during cumulus query')
return {'server_error': str(e)}
return {'results': [CollectionTransformer().to_stac(k) for k in query_result]}

def create_provider(self, provider_name: str, s3_bucket: str, private_api_prefix: str):
# INSERT INTO providers (name, protocol, host) VALUES ('unity', 's3', 'https://dev.mdps.mcp.nasa.gov');
# TODO : this fails
payload = {
'httpMethod': 'POST',
'resource': '/{proxy+}',
'path': f'/{self.__providers_key}',
'headers': {
'Content-Type': 'application/json',
},
'body': json.dumps({
"id": provider_name,
"host": s3_bucket,
"protocol": "s3",
# "port": 443,
"globalConnectionLimit": 1000,
# "maxDownloadTime": 300,
# "username": "na",
# "password": "na",
# "privateKey": "na",
# "cmKeyId": "na",
# "allowedRedirects": "na",
})
}
LOGGER.debug(f'payload: {payload}')
try:
query_result = self._invoke_api(payload, private_api_prefix)
"""
{'statusCode': 500, 'body': '', 'headers': {}}
"""
if query_result['statusCode'] >= 500:
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
return {'server_error': query_result}
if query_result['statusCode'] >= 400:
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
return {'client_error': query_result}
query_result = json.loads(query_result['body'])
LOGGER.debug(f'json query_result: {query_result}')
if 'message' not in query_result:
return {'server_error': f'invalid response: {query_result}'}
except Exception as e:
LOGGER.exception('error while invoking')
return {'server_error': f'error while invoking:{str(e)}'}
return {'status': query_result['message']}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

from mdps_ds_lib.lib.aws.aws_message_transformers import AwsMessageTransformers
from cumulus_lambda_functions.lib.uds_db.uds_collections import UdsCollections
Expand All @@ -10,7 +11,6 @@

from mdps_ds_lib.lib.cumulus_stac.unity_collection_stac import UnityCollectionStac
from cumulus_lambda_functions.uds_api.dapa.collections_dapa_creation import CollectionDapaCreation
from mdps_ds_lib.lib.cumulus_stac.item_transformer import ItemTransformer
from pystac import ItemCollection, Item
from mdps_ds_lib.lib.utils.file_utils import FileUtils
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
Expand Down Expand Up @@ -118,41 +118,58 @@ def extract_collection_id(self):
if len(self.successful_features.items) < 1:
LOGGER.error(f'not required to process. No Granules: {self.successful_features.to_dict(False)}')
return
self.collection_id = self.successful_features.items[0].collection_id
self.collection_id = list(set([k.collection_id for k in self.successful_features.items]))
return

def has_collection(self):
uds_collection_result = self.__uds_collection.get_collection(self.collection_id)
def has_collection(self, collection_id_custom=None):
collection_id_custom = collection_id_custom if collection_id_custom is not None else self.collection_id
uds_collection_result = self.__uds_collection.get_collection(collection_id_custom)
return len(uds_collection_result) > 0

def create_collection(self):
def create_one_collection(self, collection_id):
try:
if collection_id is None:
raise RuntimeError(f'NULL collection_id')
if self.has_collection(collection_id):
LOGGER.debug(f'{collection_id} already exists. continuing..')
return {'status': 'success'}
# ref: https://github.com/unity-sds/unity-py/blob/0.4.0/unity_sds_client/services/data_service.py
dapa_collection = UnityCollectionStac() \
.with_id(collection_id) \
.with_graule_id_regex("^test_file.*$") \
.with_granule_id_extraction_regex("(^test_file.*)(\\.nc|\\.nc\\.cas|\\.cmr\\.xml)") \
.with_title(f'Collection: {collection_id}') \
.with_process('stac') \
.with_provider(self.__default_provider) \
.add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'unknown_bucket', 'application/json', 'root') \
.add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'protected', 'data', 'item') \
.add_file_type("test_file01.nc.cas", "^test_file.*\\.nc.cas$", 'protected', 'metadata', 'item') \
.add_file_type("test_file01.nc.cmr.xml", "^test_file.*\\.nc.cmr.xml$", 'protected', 'metadata', 'item') \
.add_file_type("test_file01.nc.stac.json", "^test_file.*\\.nc.stac.json$", 'protected', 'metadata',
'item')

stac_collection = dapa_collection.start()
creation_result = CollectionDapaCreation(stac_collection).create()
if creation_result['statusCode'] >= 400:
raise RuntimeError(
f'failed to create collection: {collection_id}. details: {creation_result["body"]}')
time.sleep(3) # cool off period before checking DB
if not self.has_collection(collection_id):
LOGGER.error(f'missing collection. (failed to create): {collection_id}')
raise ValueError(f'missing collection. (failed to create): {collection_id}')
except Exception as e:
return {'status': 'error', 'details': str(e)}
return {'status': 'success'}

def create_collection_async(self):
if self.collection_id is None:
raise RuntimeError(f'NULL collection_id')
if self.has_collection():
LOGGER.debug(f'{self.collection_id} already exists. continuing..')
return
# ref: https://github.com/unity-sds/unity-py/blob/0.4.0/unity_sds_client/services/data_service.py
dapa_collection = UnityCollectionStac() \
.with_id(self.collection_id) \
.with_graule_id_regex("^test_file.*$") \
.with_granule_id_extraction_regex("(^test_file.*)(\\.nc|\\.nc\\.cas|\\.cmr\\.xml)") \
.with_title(f'Collection: {self.collection_id}') \
.with_process('stac') \
.with_provider(self.__default_provider) \
.add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'unknown_bucket', 'application/json', 'root') \
.add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'protected', 'data', 'item') \
.add_file_type("test_file01.nc.cas", "^test_file.*\\.nc.cas$", 'protected', 'metadata', 'item') \
.add_file_type("test_file01.nc.cmr.xml", "^test_file.*\\.nc.cmr.xml$", 'protected', 'metadata', 'item') \
.add_file_type("test_file01.nc.stac.json", "^test_file.*\\.nc.stac.json$", 'protected', 'metadata', 'item')

stac_collection = dapa_collection.start()
creation_result = CollectionDapaCreation(stac_collection).create()
if creation_result['statusCode'] >= 400:
raise RuntimeError(f'failed to create collection: {self.collection_id}. details: {creation_result["body"]}')
time.sleep(3) # cool off period before checking DB
if not self.has_collection():
LOGGER.error(f'missing collection. (failed to create): {self.collection_id}')
raise ValueError(f'missing collection. (failed to create): {self.collection_id}')
with ThreadPoolExecutor() as executor:
futures = [executor.submit(self.create_one_collection, collection_id) for collection_id in self.collection_id]
results = [future.result() for future in as_completed(futures)]
errors = [k['details'] for k in results if k['status'] == 'error']
if len(errors) > 0:
raise ValueError(f'error while creating collections: {errors}')
return

def send_cnm_msg(self):
Expand Down Expand Up @@ -188,6 +205,6 @@ def start(self, event):
self.load_successful_features_s3(s3_url)
self.validate_granules()
self.extract_collection_id()
self.create_collection()
self.create_collection_async()
self.send_cnm_msg()
return
8 changes: 7 additions & 1 deletion cumulus_lambda_functions/lib/uds_db/granules_db_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,13 @@ def add_entry(self, tenant: str, tenant_venue: str, json_body: dict, doc_id: str

def dsl_search(self, tenant: str, tenant_venue: str, search_dsl: dict):
read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip()
original_size = search_dsl['size']
if 'sort' not in search_dsl:
search_result = self.__es.query(search_dsl,
querying_index=read_alias_name) if 'sort' in search_dsl else self.__es.query(
search_dsl, querying_index=read_alias_name)
LOGGER.debug(f'search_finished: {len(search_result["hits"]["hits"])}')
return search_result
original_size = search_dsl['size'] if 'size' in search_dsl else 20
result = []
duplicates = set([])
while len(result) < original_size:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"required": [
"type"
],
"oneOf": [
"anyOf": [
{"required": ["bucket", "key"]},
{"required": ["url_path"]}
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def start(self):
"version": '1.6.0', # TODO
'product': {
'name': each_granule['id'],
'dataVersion': collection_id_version[1],
'dataVersion': collection_id_version[1] if len(collection_id_version) > 1 else '',
'files': [self.__generate_cumulus_asset(v) for k, v in each_granule['assets'].items()],
}
}
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jsonschema==4.23.0
jsonschema-specifications==2023.12.1
lark==0.12.0
mangum==0.18.0
mdps-ds-lib==1.0.0
mdps-ds-lib==1.0.0.dev3
pydantic==2.9.2
pydantic_core==2.23.4
pygeofilter==0.2.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,10 @@ def test_rules_04(self):
print(json.dumps(rules, indent=4))
# self.assertTrue(False, rules)
return

def test_create_provider(self):
lambda_prefix = 'uds-sbx-cumulus'
collection_query = CollectionsQuery('NA', 'NA')
result = collection_query.create_provider('william-test2', 'uds-sbx-staging', lambda_prefix)
print(result)
return
Loading

0 comments on commit 9735c2f

Please sign in to comment.