Skip to content

Commit

Permalink
feat(api-updates): update mappings (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
williamhaley authored Jun 17, 2021
1 parent ebd1854 commit 4fb4cd0
Show file tree
Hide file tree
Showing 14 changed files with 369 additions and 176 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ jobs:
postgres:
image: postgres:9.6
env:
POSTGRES_USER: mds
POSTGRES_PASSWORD: mds
POSTGRES_DB: test_mds
POSTGRES_USER: metadata_user
POSTGRES_PASSWORD: metadata_pass
POSTGRES_DB: test_metadata
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
Expand Down Expand Up @@ -40,8 +40,8 @@ jobs:
env:
DB_HOST: localhost
DB_PORT: ${{ job.services.postgres.ports[5432] }}
DB_USER: mds
DB_PASSWORD: mds
DB_USER: metadata_user
DB_PASSWORD: metadata_pass
USE_AGG_MDS: true
run: |
$HOME/.poetry/bin/poetry run pytest --cov=src --cov=migrations/versions --cov-fail-under=94 --cov-report xml
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ services:
- redis_migration
environment:
- DB_HOST=db
- DB_USER=mds
- DB_USER=metadata_user
- USE_AGG_MDS=true
- REDIS_DB_HOST=redis
command: /env/bin/uvicorn --host 0.0.0.0 --port 80 mds.asgi:app --reload
Expand All @@ -24,7 +24,7 @@ services:
- db
environment:
- DB_HOST=db
- DB_USER=mds
- DB_USER=metadata_user
command: /env/bin/alembic upgrade head
redis_migration:
build: .
Expand All @@ -38,7 +38,7 @@ services:
db:
image: postgres
environment:
- POSTGRES_USER=mds
- POSTGRES_USER=metadata_user
- POSTGRES_HOST_AUTH_METHOD=trust
volumes:
- ./postgres-data:/var/lib/postgresql/data
Expand Down
2 changes: 1 addition & 1 deletion postgres-init/init.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/env sh

createdb -U mds test_mds
createdb -U metadata_user test_metadata
45 changes: 45 additions & 0 deletions src/mds/agg_mds/datastore/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from .redis_cache import redis_cache as redis_client


async def init(hostname, port):
await redis_client.init_cache(hostname, port)


async def drop_all():
await redis_client.json_sets("commons", [])


async def close():
await redis_client.close()


async def get_status():
return await redis_client.get_status()


async def update_metadata(*args):
await redis_client.update_metadata(*args)


async def get_commons_metadata(*args):
return await redis_client.get_commons_metadata(*args)


async def get_all_named_commons_metadata(*args):
return await redis_client.get_all_named_commons_metadata(*args)


async def get_commons_metadata_guid(*args):
return await redis_client.get_commons_metadata_guid(*args)


async def get_commons_attribute(*args):
return await redis_client.get_commons_attribute(*args)


async def get_commons():
return await redis_client.get_commons()


async def get_all_metadata(*args):
return await redis_client.get_all_metadata(*args)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ def __init__(self):
self.redis_cache: Optional[Redis] = None

async def init_cache(self, hostname: str = "0.0.0.0", port: int = 6379):
print(create_redis_pool)
self.redis_cache = await create_redis_pool(
f"redis://{hostname}:{port}/0?encoding=utf-8"
)
Expand Down Expand Up @@ -48,16 +47,14 @@ async def close(self):
async def update_metadata(
self,
name: str,
data: dict,
mapping: dict,
data: List[Dict],
guid_arr: List[str],
tags: Dict[str, List[str]],
info: Dict[str, str],
aggregations: Dict[str, Dict[str, str]],
):
await self.json_sets(f"{name}", {})
await self.json_sets(name, data, ".metadata")
await self.json_sets(name, mapping, ".field_mapping")
await self.json_sets(name, guid_arr, ".guids")
await self.json_sets(name, tags, ".tags")
await self.json_sets(name, info, ".info")
Expand Down
37 changes: 12 additions & 25 deletions src/mds/agg_mds/query.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from fastapi import HTTPException, Query, APIRouter, Request
from starlette.status import HTTP_404_NOT_FOUND
from mds.agg_mds.redis_cache import redis_cache
from mds.agg_mds import datastore
from mds import config

mod = APIRouter()
Expand All @@ -12,7 +12,7 @@ async def get_commons():
Returns a list of all registered commons
:return:
"""
return await redis_cache.get_commons()
return await datastore.get_commons()


@mod.get("/aggregate/metadata")
Expand All @@ -35,16 +35,15 @@ async def metadata(
...
}
"""

return await redis_cache.get_all_metadata(limit, offset)
return await datastore.get_all_metadata(limit, offset)


@mod.get("/aggregate/metadata/{name}")
async def metdata_name(name: str):
async def metadata_name(name: str):
"""
Returns the all the metadata from the named commons.
"""
res = await redis_cache.get_all_named_commons_metadata(name)
res = await datastore.get_all_named_commons_metadata(name)
if res:
return res
else:
Expand All @@ -55,11 +54,11 @@ async def metdata_name(name: str):


@mod.get("/aggregate/metadata/{name}/tags")
async def metdata_tags(name: str):
async def metadata_tags(name: str):
"""
Returns the tags associated with the named commons.
"""
res = await redis_cache.get_commons_attribute(name, "tags")
res = await datastore.get_commons_attribute(name, "tags")
if res:
return res
else:
Expand All @@ -70,23 +69,11 @@ async def metdata_tags(name: str):


@mod.get("/aggregate/metadata/{name}/info")
async def metdata_info(name: str):
async def metadata_info(name: str):
"""
Returns information from the named commons.
"""
res = await redis_cache.get_commons_attribute(name, "info")
if res:
return res
else:
raise HTTPException(
HTTP_404_NOT_FOUND,
{"message": f"no common exists with the given: {name}", "code": 404},
)


@mod.get("/aggregate/metadata/{name}/columns_to_fields")
async def metadata_columns_to_fields(name: str):
res = await redis_cache.get_commons_attribute(name, "field_mapping")
res = await datastore.get_commons_attribute(name, "info")
if res:
return res
else:
Expand All @@ -97,8 +84,8 @@ async def metadata_columns_to_fields(name: str):


@mod.get("/aggregate/metadata/{name}/aggregations")
async def metadtata_aggregations(name: str):
res = await redis_cache.get_commons_attribute(name, "aggregations")
async def metadata_aggregations(name: str):
res = await datastore.get_commons_attribute(name, "aggregations")
if res:
return res
else:
Expand All @@ -111,7 +98,7 @@ async def metadtata_aggregations(name: str):
@mod.get("/aggregate/metadata/{name}/guid/{guid}:path")
async def metadata_name_guid(name: str, guid: str):
"""Get the metadata of the GUID in the named commons."""
res = await redis_cache.get_commons_metadata_guid(name, guid)
res = await datastore.get_commons_metadata_guid(name, guid)
if res:
return res
else:
Expand Down
2 changes: 1 addition & 1 deletion src/mds/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, value):
REDIS_PORT = config("REDIS_DB_PORT", cast=int, default=6379)

if TESTING:
DB_DATABASE = "test_" + (DB_DATABASE or "mds")
DB_DATABASE = "test_" + (DB_DATABASE or "metadata")
TEST_KEEP_DB = config("TEST_KEEP_DB", cast=bool, default=False)

DB_DSN = config(
Expand Down
11 changes: 5 additions & 6 deletions src/mds/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from fastapi import FastAPI, APIRouter
import httpx

from mds.agg_mds.redis_cache import redis_cache
from mds.agg_mds import datastore as aggregate_datastore

try:
from importlib.metadata import entry_points
Expand All @@ -32,17 +32,16 @@ def get_app():
@app.on_event("shutdown")
async def shutdown_event():
if config.USE_AGG_MDS:
logger.info("Closing redis cache.")
await redis_cache.close()
logger.info("Closing aggregate datastore.")
await aggregate_datastore.close()
logger.info("Closing async client.")
await redis_cache.close()
await app.async_client.aclose()

@app.on_event("startup")
async def startup_event():
if config.USE_AGG_MDS:
logger.info("Starting redis cache.")
await redis_cache.init_cache(
logger.info("Creating aggregate datastore.")
await aggregate_datastore.init(
hostname=config.REDIS_HOST, port=config.REDIS_PORT
)

Expand Down
28 changes: 20 additions & 8 deletions src/mds/populate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from argparse import Namespace
from typing import Any, Dict, List
from collections import Counter
from mds.agg_mds import datastore
from mds.agg_mds.mds import pull_mds
from mds.agg_mds.commons import MDSInstance, Commons, parse_config_from_file
from mds.agg_mds.redis_cache import redis_cache
from mds import config
from pathlib import Path
import argparse
Expand Down Expand Up @@ -47,8 +47,8 @@ async def main(commons_config: Commons, hostname: str, port: int) -> None:
print("aggregate MDS disabled")
exit(1)

await redis_cache.init_cache(hostname, port)
await redis_cache.json_sets("commons", [])
await datastore.init(hostname, port)
await datastore.drop_all()

for name, common in commons_config.commons.items():
results = pull_mds(common.mds_url)
Expand Down Expand Up @@ -103,17 +103,29 @@ async def main(commons_config: Commons, hostname: str, port: int) -> None:
for k, v in tags.items():
tags[k] = list(tags[k])

def normalize(entry: Dict[Any, Any]):
# The entry is an object with one top-level key. Its own id.
id = list(entry.keys())[0]
for column, field in common.columns_to_fields.items():
if field == column:
continue
if column in entry[id]["gen3_discovery"]:
entry[id]["gen3_discovery"][field] = entry[id]["gen3_discovery"][
column
]
return entry

data = [normalize(x) for x in mds_arr]

# build index of keys. which is used to compute the index into the .metadata array
# Admittedly a hack but will be faster than using json path, until the release of RedisJson v1.2
keys = list(results.keys())
info = {"commons_url": common.commons_url}
await redis_cache.update_metadata(
name, mds_arr, common.columns_to_fields, keys, tags, info, aggregations
)
await datastore.update_metadata(name, mds_arr, keys, tags, info, aggregations)

res = await redis_cache.get_status()
res = await datastore.get_status()
print(res)
await redis_cache.close()
await datastore.close()


async def filter_entries(
Expand Down
Loading

0 comments on commit 4fb4cd0

Please sign in to comment.