Skip to content

Commit

Permalink
fix(cleanup): pep8 standards, better /_status
Browse files Browse the repository at this point in the history
  • Loading branch information
Will committed Jun 30, 2021
1 parent a67df2a commit dc1f17d
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 28 deletions.
7 changes: 7 additions & 0 deletions src/mds/agg_mds/datastore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ async def close():


async def get_status():
"""
Returns "OK" or raises an error indicating the status of the datastore:
"""
return await client.get_status()


Expand Down Expand Up @@ -55,3 +58,7 @@ async def get_all_metadata(*args):

async def get_aggregations(*args):
return await client.get_aggregations(*args)


async def search(*args):
return await client.search(*args)
73 changes: 48 additions & 25 deletions src/mds/agg_mds/datastore/elasticsearch_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
from mds import logger


agg_mds_index = "commons-index"
agg_mds_type = "commons"
# TODO WFH Why do we have both __manifest and _file_manifest?
FIELDS_TO_NORMALIZE = ["__manifest", "_file_manifest", "advSearchFilters"]


agg_mds_info_index = "commons-info-index"
agg_mds_info_type = "commons-info"
AGG_MDS_INDEX = "commons-index"
AGG_MDS_TYPE = "commons"


mapping = {
AGG_MDS_INFO_INDEX = "commons-info-index"
AGG_MDS_INFO_TYPE = "commons-info"


MAPPING = {
"mappings": {
"commons": {
"properties": {
Expand Down Expand Up @@ -43,12 +47,12 @@ async def init(hostname: str = "0.0.0.0", port: int = 9200):
async def drop_all():
res = elastic_search_client.indices.delete(index="_all", ignore=[400, 404])
logger.debug(f"deleted all indexes: {res}")
res = elastic_search_client.indices.create(index=agg_mds_index, body=mapping)
logger.debug(f"created index {agg_mds_index}: {res}")
res = elastic_search_client.indices.create(index=AGG_MDS_INDEX, body=MAPPING)
logger.debug(f"created index {AGG_MDS_INDEX}: {res}")
res = elastic_search_client.indices.create(
index=agg_mds_info_index,
index=AGG_MDS_INFO_INDEX,
)
logger.debug(f"created index {agg_mds_info_index}: {res}")
logger.debug(f"created index {AGG_MDS_INFO_INDEX}: {res}")


def normalize_string_or_object(doc, key):
Expand All @@ -65,8 +69,8 @@ async def update_metadata(
info: Dict[str, str],
):
elastic_search_client.index(
index=agg_mds_info_index,
doc_type=agg_mds_info_type,
index=AGG_MDS_INFO_INDEX,
doc_type=AGG_MDS_INFO_TYPE,
id=name,
body=info,
)
Expand All @@ -76,17 +80,18 @@ async def update_metadata(
# Flatten out this structure
doc = doc[key]["gen3_discovery"]

normalize_string_or_object(doc, "__manifest")
# TODO WFH Why do we have this redundant field? Which commons has this?
normalize_string_or_object(doc, "_file_manifest")
normalize_string_or_object(doc, "advSearchFilters")
for field in FIELDS_TO_NORMALIZE:
normalize_string_or_object(doc, field)

elastic_search_client.index(
index=agg_mds_index, doc_type=agg_mds_type, id=key, body=doc
index=AGG_MDS_INDEX, doc_type=AGG_MDS_TYPE, id=key, body=doc
)


async def get_status():
return elastic_search_client.cluster.health()
if not elastic_search_client.ping():
raise ValueError("Connection failed")
return "OK"


async def close():
Expand All @@ -96,7 +101,7 @@ async def close():
async def get_commons():
try:
res = elastic_search_client.search(
index=agg_mds_index,
index=AGG_MDS_INDEX,
body={
"size": 0,
"aggs": {"commons_names": {"terms": {"field": "commons_name.keyword"}}},
Expand All @@ -115,7 +120,7 @@ async def get_commons():
async def get_all_metadata(limit, offset):
try:
res = elastic_search_client.search(
index=agg_mds_index,
index=AGG_MDS_INDEX,
body={"size": limit, "from": offset, "query": {"match_all": {}}},
)
byCommons = {}
Expand All @@ -141,7 +146,7 @@ async def get_all_metadata(limit, offset):
async def get_all_named_commons_metadata(name):
try:
return elastic_search_client.search(
index=agg_mds_index,
index=AGG_MDS_INDEX,
body={"query": {"match": {"commons_name.keyword": name}}},
)
except Exception as error:
Expand All @@ -152,7 +157,7 @@ async def get_all_named_commons_metadata(name):
async def metadata_tags(name):
try:
return elastic_search_client.search(
index=agg_mds_index,
index=AGG_MDS_INDEX,
body={
"size": 0,
"aggs": {
Expand Down Expand Up @@ -182,7 +187,7 @@ async def metadata_tags(name):
async def get_commons_attribute(name, what):
try:
data = elastic_search_client.search(
index=agg_mds_info_index,
index=AGG_MDS_INFO_INDEX,
body={
"query": {
"terms": {
Expand All @@ -200,7 +205,7 @@ async def get_commons_attribute(name, what):
async def get_aggregations(name):
try:
res = elastic_search_client.search(
index=agg_mds_index,
index=AGG_MDS_INDEX,
body={
"size": 0,
"query": {
Expand All @@ -224,11 +229,29 @@ async def get_aggregations(name):
async def get_by_guid(guid):
try:
data = elastic_search_client.get(
index=agg_mds_index,
doc_type=agg_mds_type,
index=AGG_MDS_INDEX,
doc_type=AGG_MDS_TYPE,
id=guid,
)
return data["_source"]
except Exception as error:
logger.error(error)
return None


async def search(term):
try:
res = elastic_search_client.search(
index=AGG_MDS_INDEX,
body={
"query": {
"query_string": {
"query": term,
}
}
},
)
return [x["_source"] for x in res["hits"]["hits"]]
except Exception as error:
logger.error(error)
return []
5 changes: 5 additions & 0 deletions src/mds/agg_mds/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ async def metadata_name_guid(guid: str):
)


@mod.get("/aggregate/search/{term}")
async def metadata_search(term):
return await datastore.search(term)


def init_app(app):
if config.USE_AGG_MDS:
app.include_router(mod, tags=["Query"])
9 changes: 8 additions & 1 deletion src/mds/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,18 @@ def get_version():
@router.get("/_status")
async def get_status():
"""
Returns the status of all the cached commons. There are two fields per common:
Returns the status of the MDS:
* error: if there was no error this will be "none"
* last_update: timestamp of the last data pull from the commons
* count: number of entries
:return:
"""
now = await db.scalar("SELECT now()")

try:
await aggregate_datastore.get_status()
except Exception as error:
logger.error("error with aggregate datastore connection: %s", error)
return dict(error="aggregate datastore offline")

return dict(status="OK", timestamp=now)
4 changes: 2 additions & 2 deletions tests/test_agg_mds_elasticsearch_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest
import mds
from mds.agg_mds.datastore import elasticsearch_dao
from mds.agg_mds.datastore.elasticsearch_dao import mapping
from mds.agg_mds.datastore.elasticsearch_dao import MAPPING
import nest_asyncio


Expand All @@ -27,7 +27,7 @@ async def test_drop_all():
mock_indices.delete.assert_called_with(index="_all", ignore=[400, 404])
mock_indices.create.assert_has_calls(
[
call(body=mapping, index="commons-index"),
call(body=MAPPING, index="commons-index"),
call(index="commons-info-index"),
],
any_order=True,
Expand Down

0 comments on commit dc1f17d

Please sign in to comment.