Skip to content

Commit

Permalink
Add basic indexing for collections. (#2080)
Browse files Browse the repository at this point in the history
* Add basic indexing for collections.

* Add script to reload collections db.

* Update collections db reload script with options.

* Add pagination to dynamodb results.
  • Loading branch information
DailyDreaming authored May 2, 2019
1 parent 8417a46 commit 18903af
Show file tree
Hide file tree
Showing 13 changed files with 466 additions and 150 deletions.
40 changes: 14 additions & 26 deletions dss-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ paths:
- code
/collections:
get:
operationId: dss.api.collections.listcollections
operationId: dss.api.collections.list_collections
security:
- dcpAuth: []
summary: Retrieve a user's collections.
Expand All @@ -1013,21 +1013,15 @@ paths:
Collections are replicated across storage replicas similarly to files and bundles.
parameters:
- name: replica
in: query
description: Replica to fetch from.
required: true
type: string
enum: [aws, gcp]
- name: per_page
in: query
description: Max number of results to return per page.
required: false
type: integer
format: int32
minimum: 50
maximum: 100
default: 100
minimum: 10
maximum: 500
default: 500
- name: start_at
in: query
description: >
Expand All @@ -1043,7 +1037,7 @@ paths:
type: object
properties:
collections:
description: A user's collections.
description: A user's collection UUIDs and versions.
type: array
items:
$ref: '#/definitions/CollectionOfCollectionsItem'
Expand All @@ -1053,7 +1047,7 @@ paths:
type: object
properties:
collections:
description: A user's collections.
description: A user's collection UUIDs and versions.
type: array
items:
$ref: '#/definitions/CollectionOfCollectionsItem'
Expand Down Expand Up @@ -1115,9 +1109,7 @@ paths:
pattern: "[A-Za-z0-9]{8}-[A-Za-z0-9]{4}-[A-Za-z0-9]{4}-[A-Za-z0-9]{4}-[A-Za-z0-9]{12}"
- name: version
in: query
description: >
Timestamp of collection creation in DSS_VERSION format format.
generated.
description: Timestamp of collection creation in DSS_VERSION format.
required: true
type: string
format: DSS_VERSION
Expand Down Expand Up @@ -1171,8 +1163,7 @@ paths:
security:
- dcpAuth: []
summary: Retrieve a collection given a UUID.
description: >
Given a collection UUID, return the associated collection object.
description: Given a collection UUID, return the associated collection object.
parameters:
- name: uuid
in: path
Expand Down Expand Up @@ -1748,7 +1739,7 @@ paths:
Add or remove files from a bundle. A specific version of the bundle to update must be provided, and a
new version will be written.
Bundles manifests exceeding 20,000 files will not be included in the Elasticsearch index document.
Bundle manifests exceeding 20,000 files will not be included in the Elasticsearch index document.
parameters:
- name: uuid
in: path
Expand Down Expand Up @@ -2456,19 +2447,16 @@ definitions:
CollectionOfCollectionsItem:
type: object
properties:
collection_uuid:
uuid:
type: string
description: A UUID identifying the collection.
pattern: "[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}"
collection_version:
type: string
version:
description: The version of the UUID identifying the collection.
collection:
$ref: '#/definitions/Collection'
type: string
required:
- collection_uuid
- collection_version
- collection
- uuid
- version
Collection:
type: object
properties:
Expand Down
67 changes: 39 additions & 28 deletions dss/api/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
from dss.error import DSSException, dss_handler
from dss.storage.blobstore import test_object_exists
from dss.storage.hcablobstore import BlobStore, compose_blob_key
from dss.storage.identifiers import CollectionFQID, CollectionTombstoneID
from dss.storage.identifiers import CollectionFQID, CollectionTombstoneID, COLLECTION_PREFIX
from dss.util import security, hashabledict, UrlBuilder
from dss.util.version import datetime_to_version_format
from dss.storage.blobstore import idempotent_save

from dss.collections import owner_lookup
from cloud_blobstore import BlobNotFoundError

MAX_METADATA_SIZE = 1024 * 1024

logger = logging.getLogger(__name__)


def get_impl(uuid: str, replica: str, version: str = None):
uuid = uuid.lower()
bucket = Replica[replica].bucket
Expand All @@ -46,43 +47,41 @@ def get_impl(uuid: str, replica: str, version: str = None):
raise DSSException(404, "not_found", "Could not find collection for UUID {}".format(uuid))
return json.loads(collection_blob)

def fetch_collections(handle, bucket, collection_keys):
authenticated_user_email = security.get_token_email(request.token_info)

all_collections = []
for key in collection_keys:
uuid, version = key[len('collections/'):].split('.', 1)
assert version != 'dead'
collection = json.loads(handle.get(bucket, key))
if collection['owner'] == authenticated_user_email:
all_collections.append({'collection_uuid': uuid,
'collection_version': version,
'collection': collection})
return all_collections

@dss_handler
@security.authorized_group_required(['hca'])
def listcollections(replica: str, per_page: int, start_at: int = 0):
bucket = Replica[replica].bucket
handle = Config.get_blobstore_handle(Replica[replica])
def list_collections(per_page: int, start_at: int = 0):
"""
Return a list of a user's collections.
Collection uuids are indexed and called by the user's email in a dynamoDB table.
:param int per_page: # of collections returned per paged response.
:param int start_at: Where the next chunk of paged response should start at.
:return: A dictionary containing a list of dictionaries looking like:
{'collections': [{'uuid': uuid, 'version': version}, {'uuid': uuid, 'version': version}, ... , ...]}
"""
# TODO: Replica is unused, so this does not use replica. Appropriate?
owner = security.get_token_email(request.token_info)

# expensively list every collection file in the bucket, even those not belonging to the user (possibly 1000's... )
collection_keys = [i for i in handle.list(bucket, prefix='collections') if not i.endswith('dead')]
collections = []
for collection in owner_lookup.get_collection_fqids_for_owner(owner):
fqid = CollectionFQID.from_key(f'{COLLECTION_PREFIX}/{collection}')
collections.append({'uuid': fqid.uuid, 'version': fqid.version})

# paged response
if len(collection_keys) - start_at > per_page:
if len(collections) - start_at > per_page:
next_url = UrlBuilder(request.url)
next_url.replace_query("start_at", str(start_at + per_page))
# each chunk will be searched for collections belonging to that user (even more expensive; per bucket file)
# hits returned will vary between zero and the "per_page" size of the chunk
collections = fetch_collections(handle, bucket, collection_keys[start_at:start_at + per_page])
response = make_response(jsonify({'collections': collections}), requests.codes.partial)
collection_page = collections[start_at:start_at + per_page]
response = make_response(jsonify({'collections': collection_page}), requests.codes.partial)
response.headers['Link'] = f"<{next_url}>; rel='next'"
return response
# single response returning all collections (or those remaining)
else:
collections = fetch_collections(handle, bucket, collection_keys[start_at:])
return jsonify({'collections': collections}), requests.codes.ok
collection_page = collections[start_at:]
return jsonify({'collections': collection_page}), requests.codes.ok


@dss_handler
@security.authorized_group_required(['hca'])
Expand All @@ -93,6 +92,7 @@ def get(uuid: str, replica: str, version: str = None):
raise DSSException(requests.codes.forbidden, "forbidden", f"Collection access denied")
return collection_body


@dss_handler
@security.authorized_group_required(['hca'])
def put(json_request_body: dict, replica: str, uuid: str, version: str):
Expand All @@ -107,11 +107,16 @@ def put(json_request_body: dict, replica: str, uuid: str, version: str):
timestamp = datetime.datetime.utcnow()
version = datetime_to_version_format(timestamp)
collection_version = version
# update dynamoDB; used to speed up lookup time; will not update if owner already associated w/uuid
owner_lookup.put_collection(owner=authenticated_user_email,
collection_fqid=str(CollectionFQID(collection_uuid, collection_version)))
# add the collection file to the bucket
handle.upload_file_handle(Replica[replica].bucket,
CollectionFQID(collection_uuid, collection_version).to_key(),
io.BytesIO(json.dumps(collection_body).encode("utf-8")))
return jsonify(dict(uuid=collection_uuid, version=collection_version)), requests.codes.created


@dss_handler
@security.authorized_group_required(['hca'])
def patch(uuid: str, json_request_body: dict, replica: str, version: str):
Expand Down Expand Up @@ -143,12 +148,14 @@ def patch(uuid: str, json_request_body: dict, replica: str, version: str):
io.BytesIO(json.dumps(collection).encode("utf-8")))
return jsonify(dict(uuid=uuid, version=new_collection_version)), requests.codes.ok


def _dedpuplicate_contents(contents: List) -> List:
dedup_collection: OrderedDict[int, dict] = OrderedDict()
for item in contents:
dedup_collection[hash(tuple(sorted(item.items())))] = item
return list(dedup_collection.values())


@dss_handler
@security.authorized_group_required(['hca'])
def delete(uuid: str, replica: str):
Expand All @@ -175,9 +182,11 @@ def delete(uuid: str, replica: str):
f"collection tombstone with UUID {uuid} already exists")
status_code = requests.codes.ok
response_body = dict() # type: dict

# update dynamoDB
owner_lookup.delete_collection_uuid(owner=authenticated_user_email, uuid=uuid)
return jsonify(response_body), status_code


@functools.lru_cache(maxsize=64)
def get_json_metadata(entity_type: str, uuid: str, version: str, replica: Replica, blobstore_handle: BlobStore):
try:
Expand All @@ -198,6 +207,7 @@ def get_json_metadata(entity_type: str, uuid: str, version: str, replica: Replic
"invalid_link",
"Could not find file for UUID {}".format(uuid))


def resolve_content_item(replica: Replica, blobstore_handle: BlobStore, item: dict):
try:
if item["type"] in {"file", "bundle", "collection"}:
Expand All @@ -221,6 +231,7 @@ def resolve_content_item(replica: Replica, blobstore_handle: BlobStore, item: di
'Error while parsing the link "{}": {}: {}'.format(item, type(e).__name__, e)
)


def verify_collection(contents: List[dict], replica: Replica, blobstore_handle: BlobStore, batch_size=64):
"""
Given user-supplied collection contents that pass schema validation, resolve all entities in the collection and
Expand Down
Empty file added dss/collections/__init__.py
Empty file.
54 changes: 54 additions & 0 deletions dss/collections/owner_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import os
from botocore.exceptions import ClientError

from dss import dynamodb # type: ignore


collection_db_table = f"dss-collections-db-{os.environ['DSS_DEPLOYMENT_STAGE']}"


def put_collection(owner: str, collection_fqid: str, permission_level: str = 'owner'):
try:
dynamodb.put_item(table=collection_db_table,
hash_key=owner,
sort_key=collection_fqid,
value=permission_level,
dont_overwrite='sort_key')
except ClientError as e:
if e.response['Error']['Code'] != 'ConditionalCheckFailedException':
raise


def get_collection(owner: str, collection_fqid: str):
return dynamodb.get_item(table=collection_db_table,
hash_key=owner,
sort_key=collection_fqid,
return_key='sort_key')


def get_collection_fqids_for_owner(owner: str):
"""Returns an Iterator of uuid strings."""
return dynamodb.get_primary_key_items(table=collection_db_table,
key=owner,
return_key='sort_key')


def get_all_collection_keys():
"""Returns an Iterator of (owner, uuid) for all items in the collections db table."""
return dynamodb.get_all_table_items(table=collection_db_table, both_keys=True)


def delete_collection(owner: str, collection_fqid: str):
"""Deletes one collection item from a database."""
dynamodb.delete_item(table=collection_db_table,
hash_key=owner,
sort_key=collection_fqid)


def delete_collection_uuid(owner: str, uuid: str):
"""Deletes all versions of a uuid in the database."""
for collection_fqid in get_collection_fqids_for_owner(owner):
if collection_fqid.startswith(uuid):
dynamodb.delete_item(table=collection_db_table,
hash_key=owner,
sort_key=collection_fqid)
Loading

0 comments on commit 18903af

Please sign in to comment.