From dc1f17de85135fe7024a4b9b04a7d630d3077245 Mon Sep 17 00:00:00 2001 From: Will Date: Wed, 30 Jun 2021 09:49:50 -0500 Subject: [PATCH] fix(cleanup): pep8 standards, better /_status --- src/mds/agg_mds/datastore/__init__.py | 7 ++ .../agg_mds/datastore/elasticsearch_dao.py | 73 ++++++++++++------- src/mds/agg_mds/query.py | 5 ++ src/mds/main.py | 9 ++- tests/test_agg_mds_elasticsearch_dao.py | 4 +- 5 files changed, 70 insertions(+), 28 deletions(-) diff --git a/src/mds/agg_mds/datastore/__init__.py b/src/mds/agg_mds/datastore/__init__.py index 493f663f..a049d1e2 100644 --- a/src/mds/agg_mds/datastore/__init__.py +++ b/src/mds/agg_mds/datastore/__init__.py @@ -22,6 +22,9 @@ async def close(): async def get_status(): + """ + Returns "OK" or raises an error indicating the status of the datastore: + """ return await client.get_status() @@ -55,3 +58,7 @@ async def get_all_metadata(*args): async def get_aggregations(*args): return await client.get_aggregations(*args) + + +async def search(*args): + return await client.search(*args) diff --git a/src/mds/agg_mds/datastore/elasticsearch_dao.py b/src/mds/agg_mds/datastore/elasticsearch_dao.py index 0c5edcf1..6e40b227 100644 --- a/src/mds/agg_mds/datastore/elasticsearch_dao.py +++ b/src/mds/agg_mds/datastore/elasticsearch_dao.py @@ -5,15 +5,19 @@ from mds import logger -agg_mds_index = "commons-index" -agg_mds_type = "commons" +# TODO WFH Why do we have both __manifest and _file_manifest? +FIELDS_TO_NORMALIZE = ["__manifest", "_file_manifest", "advSearchFilters"] -agg_mds_info_index = "commons-info-index" -agg_mds_info_type = "commons-info" +AGG_MDS_INDEX = "commons-index" +AGG_MDS_TYPE = "commons" -mapping = { +AGG_MDS_INFO_INDEX = "commons-info-index" +AGG_MDS_INFO_TYPE = "commons-info" + + +MAPPING = { "mappings": { "commons": { "properties": { @@ -43,12 +47,12 @@ async def init(hostname: str = "0.0.0.0", port: int = 9200): async def drop_all(): res = elastic_search_client.indices.delete(index="_all", ignore=[400, 404]) logger.debug(f"deleted all indexes: {res}") - res = elastic_search_client.indices.create(index=agg_mds_index, body=mapping) - logger.debug(f"created index {agg_mds_index}: {res}") + res = elastic_search_client.indices.create(index=AGG_MDS_INDEX, body=MAPPING) + logger.debug(f"created index {AGG_MDS_INDEX}: {res}") res = elastic_search_client.indices.create( - index=agg_mds_info_index, + index=AGG_MDS_INFO_INDEX, ) - logger.debug(f"created index {agg_mds_info_index}: {res}") + logger.debug(f"created index {AGG_MDS_INFO_INDEX}: {res}") def normalize_string_or_object(doc, key): @@ -65,8 +69,8 @@ async def update_metadata( info: Dict[str, str], ): elastic_search_client.index( - index=agg_mds_info_index, - doc_type=agg_mds_info_type, + index=AGG_MDS_INFO_INDEX, + doc_type=AGG_MDS_INFO_TYPE, id=name, body=info, ) @@ -76,17 +80,18 @@ async def update_metadata( # Flatten out this structure doc = doc[key]["gen3_discovery"] - normalize_string_or_object(doc, "__manifest") - # TODO WFH Why do we have this redundant field? Which commons has this? - normalize_string_or_object(doc, "_file_manifest") - normalize_string_or_object(doc, "advSearchFilters") + for field in FIELDS_TO_NORMALIZE: + normalize_string_or_object(doc, field) + elastic_search_client.index( - index=agg_mds_index, doc_type=agg_mds_type, id=key, body=doc + index=AGG_MDS_INDEX, doc_type=AGG_MDS_TYPE, id=key, body=doc ) async def get_status(): - return elastic_search_client.cluster.health() + if not elastic_search_client.ping(): + raise ValueError("Connection failed") + return "OK" async def close(): @@ -96,7 +101,7 @@ async def close(): async def get_commons(): try: res = elastic_search_client.search( - index=agg_mds_index, + index=AGG_MDS_INDEX, body={ "size": 0, "aggs": {"commons_names": {"terms": {"field": "commons_name.keyword"}}}, @@ -115,7 +120,7 @@ async def get_commons(): async def get_all_metadata(limit, offset): try: res = elastic_search_client.search( - index=agg_mds_index, + index=AGG_MDS_INDEX, body={"size": limit, "from": offset, "query": {"match_all": {}}}, ) byCommons = {} @@ -141,7 +146,7 @@ async def get_all_metadata(limit, offset): async def get_all_named_commons_metadata(name): try: return elastic_search_client.search( - index=agg_mds_index, + index=AGG_MDS_INDEX, body={"query": {"match": {"commons_name.keyword": name}}}, ) except Exception as error: @@ -152,7 +157,7 @@ async def get_all_named_commons_metadata(name): async def metadata_tags(name): try: return elastic_search_client.search( - index=agg_mds_index, + index=AGG_MDS_INDEX, body={ "size": 0, "aggs": { @@ -182,7 +187,7 @@ async def metadata_tags(name): async def get_commons_attribute(name, what): try: data = elastic_search_client.search( - index=agg_mds_info_index, + index=AGG_MDS_INFO_INDEX, body={ "query": { "terms": { @@ -200,7 +205,7 @@ async def get_commons_attribute(name, what): async def get_aggregations(name): try: res = elastic_search_client.search( - index=agg_mds_index, + index=AGG_MDS_INDEX, body={ "size": 0, "query": { @@ -224,11 +229,29 @@ async def get_aggregations(name): async def get_by_guid(guid): try: data = elastic_search_client.get( - index=agg_mds_index, - doc_type=agg_mds_type, + index=AGG_MDS_INDEX, + doc_type=AGG_MDS_TYPE, id=guid, ) return data["_source"] except Exception as error: logger.error(error) return None + + +async def search(term): + try: + res = elastic_search_client.search( + index=AGG_MDS_INDEX, + body={ + "query": { + "query_string": { + "query": term, + } + } + }, + ) + return [x["_source"] for x in res["hits"]["hits"]] + except Exception as error: + logger.error(error) + return [] diff --git a/src/mds/agg_mds/query.py b/src/mds/agg_mds/query.py index f3478094..d3254c17 100644 --- a/src/mds/agg_mds/query.py +++ b/src/mds/agg_mds/query.py @@ -114,6 +114,11 @@ async def metadata_name_guid(guid: str): ) +@mod.get("/aggregate/search/{term}") +async def metadata_search(term): + return await datastore.search(term) + + def init_app(app): if config.USE_AGG_MDS: app.include_router(mod, tags=["Query"]) diff --git a/src/mds/main.py b/src/mds/main.py index 5389e60b..0a8f8847 100644 --- a/src/mds/main.py +++ b/src/mds/main.py @@ -111,11 +111,18 @@ def get_version(): @router.get("/_status") async def get_status(): """ - Returns the status of all the cached commons. There are two fields per common: + Returns the status of the MDS: * error: if there was no error this will be "none" * last_update: timestamp of the last data pull from the commons * count: number of entries :return: """ now = await db.scalar("SELECT now()") + + try: + await aggregate_datastore.get_status() + except Exception as error: + logger.error("error with aggregate datastore connection: %s", error) + return dict(error="aggregate datastore offline") + return dict(status="OK", timestamp=now) diff --git a/tests/test_agg_mds_elasticsearch_dao.py b/tests/test_agg_mds_elasticsearch_dao.py index 96bdc357..a1156d1e 100644 --- a/tests/test_agg_mds_elasticsearch_dao.py +++ b/tests/test_agg_mds_elasticsearch_dao.py @@ -4,7 +4,7 @@ import pytest import mds from mds.agg_mds.datastore import elasticsearch_dao -from mds.agg_mds.datastore.elasticsearch_dao import mapping +from mds.agg_mds.datastore.elasticsearch_dao import MAPPING import nest_asyncio @@ -27,7 +27,7 @@ async def test_drop_all(): mock_indices.delete.assert_called_with(index="_all", ignore=[400, 404]) mock_indices.create.assert_has_calls( [ - call(body=mapping, index="commons-index"), + call(body=MAPPING, index="commons-index"), call(index="commons-info-index"), ], any_order=True,