From 18903af7ef09dbeea4c68ed2c91d83eb94652f2d Mon Sep 17 00:00:00 2001 From: Lon Blauvelt Date: Wed, 1 May 2019 18:50:06 -0700 Subject: [PATCH] Add basic indexing for collections. (#2080) * Add basic indexing for collections. * Add script to reload collections db. * Update collections db reload script with options. * Add pagination to dynamodb results. --- dss-api.yml | 40 +++---- dss/api/collections.py | 67 +++++++----- dss/collections/__init__.py | 0 dss/collections/owner_lookup.py | 54 ++++++++++ dss/dynamodb/__init__.py | 61 ++++++----- dss/storage/identifiers.py | 2 + dss/subscriptions_v2/__init__.py | 11 +- dss/test.log | 0 dss/util/async_state.py | 8 +- iam/policy-templates/ci-cd.json | 3 +- infra/collections_db/main.tf | 31 ++++++ scripts/update_collection_db.py | 180 +++++++++++++++++++++++++++++++ tests/test_collections.py | 159 ++++++++++++++++----------- 13 files changed, 466 insertions(+), 150 deletions(-) create mode 100644 dss/collections/__init__.py create mode 100644 dss/collections/owner_lookup.py create mode 100644 dss/test.log create mode 100644 infra/collections_db/main.tf create mode 100644 scripts/update_collection_db.py diff --git a/dss-api.yml b/dss-api.yml index 1759533157..901e58d111 100644 --- a/dss-api.yml +++ b/dss-api.yml @@ -997,7 +997,7 @@ paths: - code /collections: get: - operationId: dss.api.collections.listcollections + operationId: dss.api.collections.list_collections security: - dcpAuth: [] summary: Retrieve a user's collections. @@ -1013,21 +1013,15 @@ paths: Collections are replicated across storage replicas similarly to files and bundles. parameters: - - name: replica - in: query - description: Replica to fetch from. - required: true - type: string - enum: [aws, gcp] - name: per_page in: query description: Max number of results to return per page. required: false type: integer format: int32 - minimum: 50 - maximum: 100 - default: 100 + minimum: 10 + maximum: 500 + default: 500 - name: start_at in: query description: > @@ -1043,7 +1037,7 @@ paths: type: object properties: collections: - description: A user's collections. + description: A user's collection UUIDs and versions. type: array items: $ref: '#/definitions/CollectionOfCollectionsItem' @@ -1053,7 +1047,7 @@ paths: type: object properties: collections: - description: A user's collections. + description: A user's collection UUIDs and versions. type: array items: $ref: '#/definitions/CollectionOfCollectionsItem' @@ -1115,9 +1109,7 @@ paths: pattern: "[A-Za-z0-9]{8}-[A-Za-z0-9]{4}-[A-Za-z0-9]{4}-[A-Za-z0-9]{4}-[A-Za-z0-9]{12}" - name: version in: query - description: > - Timestamp of collection creation in DSS_VERSION format format. - generated. + description: Timestamp of collection creation in DSS_VERSION format. required: true type: string format: DSS_VERSION @@ -1171,8 +1163,7 @@ paths: security: - dcpAuth: [] summary: Retrieve a collection given a UUID. - description: > - Given a collection UUID, return the associated collection object. + description: Given a collection UUID, return the associated collection object. parameters: - name: uuid in: path @@ -1748,7 +1739,7 @@ paths: Add or remove files from a bundle. A specific version of the bundle to update must be provided, and a new version will be written. - Bundles manifests exceeding 20,000 files will not be included in the Elasticsearch index document. + Bundle manifests exceeding 20,000 files will not be included in the Elasticsearch index document. parameters: - name: uuid in: path @@ -2456,19 +2447,16 @@ definitions: CollectionOfCollectionsItem: type: object properties: - collection_uuid: + uuid: type: string description: A UUID identifying the collection. pattern: "[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}" - collection_version: - type: string + version: description: The version of the UUID identifying the collection. - collection: - $ref: '#/definitions/Collection' + type: string required: - - collection_uuid - - collection_version - - collection + - uuid + - version Collection: type: object properties: diff --git a/dss/api/collections.py b/dss/api/collections.py index 640becc8bc..70ab1a231e 100644 --- a/dss/api/collections.py +++ b/dss/api/collections.py @@ -13,17 +13,18 @@ from dss.error import DSSException, dss_handler from dss.storage.blobstore import test_object_exists from dss.storage.hcablobstore import BlobStore, compose_blob_key -from dss.storage.identifiers import CollectionFQID, CollectionTombstoneID +from dss.storage.identifiers import CollectionFQID, CollectionTombstoneID, COLLECTION_PREFIX from dss.util import security, hashabledict, UrlBuilder from dss.util.version import datetime_to_version_format from dss.storage.blobstore import idempotent_save - +from dss.collections import owner_lookup from cloud_blobstore import BlobNotFoundError MAX_METADATA_SIZE = 1024 * 1024 logger = logging.getLogger(__name__) + def get_impl(uuid: str, replica: str, version: str = None): uuid = uuid.lower() bucket = Replica[replica].bucket @@ -46,43 +47,41 @@ def get_impl(uuid: str, replica: str, version: str = None): raise DSSException(404, "not_found", "Could not find collection for UUID {}".format(uuid)) return json.loads(collection_blob) -def fetch_collections(handle, bucket, collection_keys): - authenticated_user_email = security.get_token_email(request.token_info) - - all_collections = [] - for key in collection_keys: - uuid, version = key[len('collections/'):].split('.', 1) - assert version != 'dead' - collection = json.loads(handle.get(bucket, key)) - if collection['owner'] == authenticated_user_email: - all_collections.append({'collection_uuid': uuid, - 'collection_version': version, - 'collection': collection}) - return all_collections @dss_handler @security.authorized_group_required(['hca']) -def listcollections(replica: str, per_page: int, start_at: int = 0): - bucket = Replica[replica].bucket - handle = Config.get_blobstore_handle(Replica[replica]) +def list_collections(per_page: int, start_at: int = 0): + """ + Return a list of a user's collections. + + Collection uuids are indexed and called by the user's email in a dynamoDB table. + + :param int per_page: # of collections returned per paged response. + :param int start_at: Where the next chunk of paged response should start at. + :return: A dictionary containing a list of dictionaries looking like: + {'collections': [{'uuid': uuid, 'version': version}, {'uuid': uuid, 'version': version}, ... , ...]} + """ + # TODO: Replica is unused, so this does not use replica. Appropriate? + owner = security.get_token_email(request.token_info) - # expensively list every collection file in the bucket, even those not belonging to the user (possibly 1000's... ) - collection_keys = [i for i in handle.list(bucket, prefix='collections') if not i.endswith('dead')] + collections = [] + for collection in owner_lookup.get_collection_fqids_for_owner(owner): + fqid = CollectionFQID.from_key(f'{COLLECTION_PREFIX}/{collection}') + collections.append({'uuid': fqid.uuid, 'version': fqid.version}) # paged response - if len(collection_keys) - start_at > per_page: + if len(collections) - start_at > per_page: next_url = UrlBuilder(request.url) next_url.replace_query("start_at", str(start_at + per_page)) - # each chunk will be searched for collections belonging to that user (even more expensive; per bucket file) - # hits returned will vary between zero and the "per_page" size of the chunk - collections = fetch_collections(handle, bucket, collection_keys[start_at:start_at + per_page]) - response = make_response(jsonify({'collections': collections}), requests.codes.partial) + collection_page = collections[start_at:start_at + per_page] + response = make_response(jsonify({'collections': collection_page}), requests.codes.partial) response.headers['Link'] = f"<{next_url}>; rel='next'" return response # single response returning all collections (or those remaining) else: - collections = fetch_collections(handle, bucket, collection_keys[start_at:]) - return jsonify({'collections': collections}), requests.codes.ok + collection_page = collections[start_at:] + return jsonify({'collections': collection_page}), requests.codes.ok + @dss_handler @security.authorized_group_required(['hca']) @@ -93,6 +92,7 @@ def get(uuid: str, replica: str, version: str = None): raise DSSException(requests.codes.forbidden, "forbidden", f"Collection access denied") return collection_body + @dss_handler @security.authorized_group_required(['hca']) def put(json_request_body: dict, replica: str, uuid: str, version: str): @@ -107,11 +107,16 @@ def put(json_request_body: dict, replica: str, uuid: str, version: str): timestamp = datetime.datetime.utcnow() version = datetime_to_version_format(timestamp) collection_version = version + # update dynamoDB; used to speed up lookup time; will not update if owner already associated w/uuid + owner_lookup.put_collection(owner=authenticated_user_email, + collection_fqid=str(CollectionFQID(collection_uuid, collection_version))) + # add the collection file to the bucket handle.upload_file_handle(Replica[replica].bucket, CollectionFQID(collection_uuid, collection_version).to_key(), io.BytesIO(json.dumps(collection_body).encode("utf-8"))) return jsonify(dict(uuid=collection_uuid, version=collection_version)), requests.codes.created + @dss_handler @security.authorized_group_required(['hca']) def patch(uuid: str, json_request_body: dict, replica: str, version: str): @@ -143,12 +148,14 @@ def patch(uuid: str, json_request_body: dict, replica: str, version: str): io.BytesIO(json.dumps(collection).encode("utf-8"))) return jsonify(dict(uuid=uuid, version=new_collection_version)), requests.codes.ok + def _dedpuplicate_contents(contents: List) -> List: dedup_collection: OrderedDict[int, dict] = OrderedDict() for item in contents: dedup_collection[hash(tuple(sorted(item.items())))] = item return list(dedup_collection.values()) + @dss_handler @security.authorized_group_required(['hca']) def delete(uuid: str, replica: str): @@ -175,9 +182,11 @@ def delete(uuid: str, replica: str): f"collection tombstone with UUID {uuid} already exists") status_code = requests.codes.ok response_body = dict() # type: dict - + # update dynamoDB + owner_lookup.delete_collection_uuid(owner=authenticated_user_email, uuid=uuid) return jsonify(response_body), status_code + @functools.lru_cache(maxsize=64) def get_json_metadata(entity_type: str, uuid: str, version: str, replica: Replica, blobstore_handle: BlobStore): try: @@ -198,6 +207,7 @@ def get_json_metadata(entity_type: str, uuid: str, version: str, replica: Replic "invalid_link", "Could not find file for UUID {}".format(uuid)) + def resolve_content_item(replica: Replica, blobstore_handle: BlobStore, item: dict): try: if item["type"] in {"file", "bundle", "collection"}: @@ -221,6 +231,7 @@ def resolve_content_item(replica: Replica, blobstore_handle: BlobStore, item: di 'Error while parsing the link "{}": {}: {}'.format(item, type(e).__name__, e) ) + def verify_collection(contents: List[dict], replica: Replica, blobstore_handle: BlobStore, batch_size=64): """ Given user-supplied collection contents that pass schema validation, resolve all entities in the collection and diff --git a/dss/collections/__init__.py b/dss/collections/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dss/collections/owner_lookup.py b/dss/collections/owner_lookup.py new file mode 100644 index 0000000000..b8d4577ca9 --- /dev/null +++ b/dss/collections/owner_lookup.py @@ -0,0 +1,54 @@ +import os +from botocore.exceptions import ClientError + +from dss import dynamodb # type: ignore + + +collection_db_table = f"dss-collections-db-{os.environ['DSS_DEPLOYMENT_STAGE']}" + + +def put_collection(owner: str, collection_fqid: str, permission_level: str = 'owner'): + try: + dynamodb.put_item(table=collection_db_table, + hash_key=owner, + sort_key=collection_fqid, + value=permission_level, + dont_overwrite='sort_key') + except ClientError as e: + if e.response['Error']['Code'] != 'ConditionalCheckFailedException': + raise + + +def get_collection(owner: str, collection_fqid: str): + return dynamodb.get_item(table=collection_db_table, + hash_key=owner, + sort_key=collection_fqid, + return_key='sort_key') + + +def get_collection_fqids_for_owner(owner: str): + """Returns an Iterator of uuid strings.""" + return dynamodb.get_primary_key_items(table=collection_db_table, + key=owner, + return_key='sort_key') + + +def get_all_collection_keys(): + """Returns an Iterator of (owner, uuid) for all items in the collections db table.""" + return dynamodb.get_all_table_items(table=collection_db_table, both_keys=True) + + +def delete_collection(owner: str, collection_fqid: str): + """Deletes one collection item from a database.""" + dynamodb.delete_item(table=collection_db_table, + hash_key=owner, + sort_key=collection_fqid) + + +def delete_collection_uuid(owner: str, uuid: str): + """Deletes all versions of a uuid in the database.""" + for collection_fqid in get_collection_fqids_for_owner(owner): + if collection_fqid.startswith(uuid): + dynamodb.delete_item(table=collection_db_table, + hash_key=owner, + sort_key=collection_fqid) diff --git a/dss/dynamodb/__init__.py b/dss/dynamodb/__init__.py index 0ec3f071ef..99cff097e3 100644 --- a/dss/dynamodb/__init__.py +++ b/dss/dynamodb/__init__.py @@ -3,6 +3,10 @@ from dss.util.aws.clients import dynamodb as db # type: ignore +class DynamoDBItemNotFound(Exception): + pass + + def _format_item(hash_key, sort_key, value): item = {'hash_key': {'S': hash_key}} if value: @@ -12,7 +16,7 @@ def _format_item(hash_key, sort_key, value): return item -def put_item(*, table: str, hash_key: str, sort_key: str=None, value: str, overwrite: str=None): +def put_item(*, table: str, hash_key: str, sort_key: str=None, value: str, dont_overwrite: str=None): """ Put an item into a dynamoDB table. @@ -24,18 +28,18 @@ def put_item(*, table: str, hash_key: str, sort_key: str=None, value: str, overw :param str hash_key: 1st primary key that can be used to fetch associated sort_keys and values. :param str sort_key: 2nd primary key, used with hash_key to fetch a specific value. Note: If not specified, this will PUT only 1 key (hash_key) and 1 value. - :param str overwrite: Don't overwrite if this parameter exists. For example, setting this - to 'sort_key' won't overwrite if that sort_key already exists in the table. + :param str dont_overwrite: Don't overwrite if this parameter exists. For example, setting this + to 'sort_key' won't overwrite if that sort_key already exists in the table. :return: None """ query = {'TableName': table, 'Item': _format_item(hash_key=hash_key, sort_key=sort_key, value=value)} - if overwrite: - query['ConditionExpression'] = f'attribute_not_exists({overwrite})' + if dont_overwrite: + query['ConditionExpression'] = f'attribute_not_exists({dont_overwrite})' db.put_item(**query) -def get_item(*, table: str, hash_key: str, sort_key: str=None): +def get_item(*, table: str, hash_key: str, sort_key: str=None, return_key: str='body'): """ Get associated value for a given set of keys from a dynamoDB table. @@ -46,14 +50,15 @@ def get_item(*, table: str, hash_key: str, sort_key: str=None): :param str hash_key: 1st primary key that can be used to fetch associated sort_keys and values. :param str sort_key: 2nd primary key, used with hash_key to fetch a specific value. Note: If not specified, this will GET only 1 key (hash_key) and 1 value. + :param str return_key: Either "body" (to return all values) or "sort_key" (to return all 2nd primary keys). :return: None or str """ query = {'TableName': table, 'Key': _format_item(hash_key=hash_key, sort_key=sort_key, value=None)} item = db.get_item(**query).get('Item') - if item is not None: - return item['body']['S'] - return item + if item is None: + raise DynamoDBItemNotFound(f'Query failed to fetch item from database: {query}') + return item[return_key]['S'] def get_primary_key_items(*, table: str, key: str, return_key: str='body') -> Generator[str, None, None]: @@ -65,18 +70,17 @@ def get_primary_key_items(*, table: str, key: str, return_key: str='body') -> Ge :param str return_key: Either "body" (to return all values) or "sort_key" (to return all 2nd primary keys). :return: Iterable (str) """ - db_resp = db.query( - TableName=table, - ScanIndexForward=False, # True = ascending, False = descending - KeyConditionExpression="#hash_key=:key", - ExpressionAttributeNames={'#hash_key': "hash_key"}, - ExpressionAttributeValues={':key': {'S': key}} - ) - for item in db_resp.get('Items', []): - yield item[return_key]['S'] - - -def get_all_table_items(*, table: str, return_key: str='body') -> Generator[str, None, None]: + paginator = db.get_paginator('query') + for db_resp in paginator.paginate(TableName=table, + ScanIndexForward=False, # True = ascending, False = descending + KeyConditionExpression="#hash_key=:key", + ExpressionAttributeNames={'#hash_key': "hash_key"}, + ExpressionAttributeValues={':key': {'S': key}}): + for item in db_resp.get('Items', []): + yield item[return_key]['S'] + + +def get_all_table_items(*, table: str, both_keys: bool=False): """ Return all items from a dynamoDB table. @@ -84,9 +88,13 @@ def get_all_table_items(*, table: str, return_key: str='body') -> Generator[str, :param str return_key: Either "body" (to return all values) or "sort_key" (to return all 2nd primary keys). :return: Iterable (str) """ - db_resp = db.scan(TableName=table) - for item in db_resp.get('Items', []): - yield item[return_key]['S'] + paginator = db.get_paginator('scan') + for db_resp in paginator.paginate(TableName=table): + for item in db_resp.get('Items', []): + if both_keys: + yield item['hash_key']['S'], item['sort_key']['S'] + else: + yield item['body']['S'] def delete_item(*, table: str, hash_key: str, sort_key: str=None): @@ -96,6 +104,11 @@ def delete_item(*, table: str, hash_key: str, sort_key: str=None): Will determine the type of db this is being called on by the number of keys provided (omit sort_key to DELETE from a db with only 1 primary key). + NOTE: + Unless you specify conditions, DeleteItem is an idempotent operation; running it multiple times + on the same item or attribute does not result in an error response: + https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_DeleteItem.html + :param table: Name of the table in AWS. :param str hash_key: 1st primary key that can be used to fetch associated sort_keys and values. :param str sort_key: 2nd primary key, used with hash_key to fetch a specific value. diff --git a/dss/storage/identifiers.py b/dss/storage/identifiers.py index c08ec12b74..3251da65b9 100644 --- a/dss/storage/identifiers.py +++ b/dss/storage/identifiers.py @@ -116,9 +116,11 @@ class BundleTombstoneID(TombstoneID): prefix = BUNDLE_PREFIX subject_identity_cls = BundleFQID + class CollectionFQID(ObjectIdentifier): prefix = COLLECTION_PREFIX + class CollectionTombstoneID(TombstoneID): prefix = COLLECTION_PREFIX subject_identity_cls = CollectionFQID diff --git a/dss/subscriptions_v2/__init__.py b/dss/subscriptions_v2/__init__.py index bc05163af4..17a202c61e 100644 --- a/dss/subscriptions_v2/__init__.py +++ b/dss/subscriptions_v2/__init__.py @@ -29,10 +29,13 @@ def put_subscription(doc: dict): def get_subscription(replica: Replica, owner: str, uuid: str): - item = dynamodb.get_item(table=subscription_db_table.format(replica.name), - hash_key=owner, - sort_key=uuid) - return json.loads(item) if item else None + try: + item = dynamodb.get_item(table=subscription_db_table.format(replica.name), + hash_key=owner, + sort_key=uuid) + return json.loads(item) + except dynamodb.DynamoDBItemNotFound: + return None def get_subscriptions_for_owner(replica: Replica, owner: str) -> list: diff --git a/dss/test.log b/dss/test.log new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dss/util/async_state.py b/dss/util/async_state.py index f3e7f67642..e9e649f1d1 100644 --- a/dss/util/async_state.py +++ b/dss/util/async_state.py @@ -2,7 +2,7 @@ import json import typing -from dss.dynamodb import get_item, put_item, delete_item +from dss.dynamodb import get_item, put_item, delete_item, DynamoDBItemNotFound class AsyncStateItem: @@ -36,11 +36,13 @@ def put(cls, key: str, body: dict = None) -> typing.Any: @classmethod def get(cls, key: str) -> typing.Any: - item = get_item(table=cls.table, hash_key=key) - if item: + try: + item = get_item(table=cls.table, hash_key=key) body = json.loads(item) item_class = _all_subclasses(cls)[body['_type']] return item_class(key, body) + except DynamoDBItemNotFound: + return None @classmethod def delete(cls, key: str) -> None: diff --git a/iam/policy-templates/ci-cd.json b/iam/policy-templates/ci-cd.json index 43e6a44a29..9e32e909f4 100644 --- a/iam/policy-templates/ci-cd.json +++ b/iam/policy-templates/ci-cd.json @@ -111,7 +111,8 @@ "arn:aws:dynamodb:*:$account_id:table/dss-async-state-staging", "arn:aws:dynamodb:*:$account_id:table/dss-subscriptions-v2-*-dev", "arn:aws:dynamodb:*:$account_id:table/dss-subscriptions-v2-*-integration", - "arn:aws:dynamodb:*:$account_id:table/dss-subscriptions-v2-*-staging" + "arn:aws:dynamodb:*:$account_id:table/dss-subscriptions-v2-*-staging", + "arn:aws:dynamodb:*:$account_id:table/dss-collections-db-*" ] }, { diff --git a/infra/collections_db/main.tf b/infra/collections_db/main.tf new file mode 100644 index 0000000000..92415bdabd --- /dev/null +++ b/infra/collections_db/main.tf @@ -0,0 +1,31 @@ +data "aws_caller_identity" "current" {} +locals { + common_tags = "${map( + "managedBy" , "terraform", + "Name" , "${var.DSS_INFRA_TAG_PROJECT}-${var.DSS_DEPLOYMENT_STAGE}-${var.DSS_INFRA_TAG_SERVICE}", + "project" , "${var.DSS_INFRA_TAG_PROJECT}", + "env" , "${var.DSS_DEPLOYMENT_STAGE}", + "service" , "${var.DSS_INFRA_TAG_SERVICE}", + "owner" , "${var.DSS_INFRA_TAG_OWNER}" + )}", + replicas = ["aws", "gcp"] +} + +resource "aws_dynamodb_table" "collections-db-aws" { + name = "dss-collections-db-${var.DSS_DEPLOYMENT_STAGE}" + billing_mode = "PAY_PER_REQUEST" + hash_key = "hash_key" + range_key = "sort_key" + + attribute { + name = "hash_key" + type = "S" + } + + attribute { + name = "sort_key" + type = "S" + } + + tags = "${local.common_tags}" +} diff --git a/scripts/update_collection_db.py b/scripts/update_collection_db.py new file mode 100644 index 0000000000..20cdfd30e9 --- /dev/null +++ b/scripts/update_collection_db.py @@ -0,0 +1,180 @@ +""" +Updates the dynamoDB table that tracks collections. + +To update the table run (slow): + scripts/update_collection_db.py + +CAUTION: Doing a hard-reset will break some collections usage during the time it is updating. +To run a hard reset on the table (delete table and repopulate from bucket): + scripts/update_collection_db.py hard-reset + +Tries to be efficient, since this potentially opens 1000's of files one by one and takes a long time. +""" +import os +import sys +import json +import time +import argparse +import textwrap +from cloud_blobstore import BlobNotFoundError + +pkg_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) # noqa +sys.path.insert(0, pkg_root) # noqa + +from dss import BucketConfig, Config, Replica +from dss.storage.identifiers import TOMBSTONE_SUFFIX, COLLECTION_PREFIX, CollectionFQID +from dss.collections import owner_lookup +from dss.dynamodb import DynamoDBItemNotFound + + +Config.set_config(BucketConfig.NORMAL) + + +def heredoc(template, indent=''): + template = textwrap.dedent(template) + return template.replace('\n', '\n' + indent) + '\n' + + +class CollectionDatabaseTools(object): + def __init__(self, replica='aws'): + self.replica = replica + self.bucket = Replica[replica].bucket + self.handle = Config.get_blobstore_handle(Replica[replica]) + self.bucket_name = f'{Replica[replica].storage_schema}://{self.bucket}' + self.tombstone_cache = dict() + self.tombstone_cache_max_len = 100000 + + self.total_database_collection_items = 0 + self.total_bucket_collection_items = 0 + self.total_tombstoned_bucket_collection_items = 0 + + def _is_uuid_tombstoned(self, uuid: str): + if len(self.tombstone_cache) >= self.tombstone_cache_max_len: + self.tombstone_cache.popitem() + + if uuid not in self.tombstone_cache: + try: + self.tombstone_cache[uuid] = self.handle.get(self.bucket, + key=f'{COLLECTION_PREFIX}/{uuid}.{TOMBSTONE_SUFFIX}') + except BlobNotFoundError: + self.tombstone_cache[uuid] = None + return self.tombstone_cache[uuid] + + def _collections_in_database_but_not_in_bucket(self): + """ + Determines collection items in the table that: + 1. No longer exist in the bucket. + 2. Are tombstoned in the bucket. + 3. Have an owner that doesn't match the owner found in the bucket's collection file. + + Returns an iterable tuple of strings: (owner, collection_fqid) representing the item's key pair. + + The returned keys can then be removed from the collections dynamodb table. + """ + for owner, collection_fqid in owner_lookup.get_all_collection_keys(): + self.total_database_collection_items += 1 + collection = CollectionFQID.from_key(f'{COLLECTION_PREFIX}/{collection_fqid}') + try: + collection_owner = json.loads(self.handle.get(self.bucket, collection.to_key()))['owner'] + + assert not self._is_uuid_tombstoned(collection.uuid) + assert collection_owner == owner + + except BlobNotFoundError: + yield owner, collection_fqid + + except AssertionError: + yield owner, collection_fqid + + def _collections_in_bucket_but_not_in_database(self): + """ + Returns any (owner, collection_fqid) present in the bucket but not in the collections table. + + Returns an iterable tuple of strings: (owner, collection_fqid) representing the item's key pair. + + The returned keys can then be added to the collections dynamodb table. + """ + for collection_key in self.handle.list(self.bucket, prefix=f'{COLLECTION_PREFIX}/'): + self.total_bucket_collection_items += 1 + collection_fqid = CollectionFQID.from_key(collection_key) + if not self._is_uuid_tombstoned(collection_fqid.uuid): + try: + collection = json.loads(self.handle.get(self.bucket, collection_key)) + try: + owner_lookup.get_collection(owner=collection['owner'], collection_fqid=str(collection_fqid)) + except DynamoDBItemNotFound: + yield collection['owner'], str(collection_fqid) + except BlobNotFoundError: + pass # if deleted from bucket while being listed + except KeyError: + pass # unexpected response + else: + self.total_tombstoned_bucket_collection_items += 1 + + def remove_collections_from_database(self, all: bool=False): + if all: + collections = owner_lookup.get_all_collection_keys() + text = 'ALL' + else: + collections = self._collections_in_database_but_not_in_bucket() + text = 'INVALID' + + print(f'\nRemoving {text} user-collection associations from database: {owner_lookup.collection_db_table}\n') + removed = 0 + for owner, collection_fqid in collections: + print(f"Removing {owner}'s collection: {collection_fqid}") + owner_lookup.delete_collection(owner=owner, collection_fqid=collection_fqid) + removed += 1 + print(f'{removed} collection items removed from: {owner_lookup.collection_db_table}') + return removed + + def add_missing_collections_to_database(self): + print(f'\nAdding missing user-collection associations to database from: {self.bucket_name}\n') + added = 0 + for owner, collection_fqid in self._collections_in_bucket_but_not_in_database(): + print(f"Adding {owner}'s collection: {collection_fqid}") + owner_lookup.put_collection(owner=owner, collection_fqid=collection_fqid) + added += 1 + print(f'{added} collection items added from: {self.bucket_name}') + return added + + +def main(argv=sys.argv[1:]): + parser = argparse.ArgumentParser(description=heredoc(""" + Updates the dynamoDB table that tracks collections. It's fairly slow, though it tries to be efficient. + + The current bottleneck in speed is that in order to repopulate from a bucket, each file needs to be + opened one by one to determine ownership. + + To update the table run: + scripts/update_collection_db.py + + CAUTION: See the "--hard-reset" option below for the full purge option. + """), formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument('--hard-reset', dest="hard_reset", default=False, required=False, action='store_true', + help='CAUTION: Breaks some collections usage during the time it is updating.\n' + 'This deletes the entire table and then repopulates from the bucket files.') + o = parser.parse_args(argv) + start = time.time() + c = CollectionDatabaseTools(replica='aws') + + if o.hard_reset: + removed = c.remove_collections_from_database(all=True) + else: + removed = c.remove_collections_from_database() + + print(f'Removal took: {time.time() - start} seconds.') + + added = c.add_missing_collections_to_database() + + print(f'Database had: {c.total_database_collection_items} items.') + print(f'Bucket had : {c.total_bucket_collection_items} items.') + print(f'Of which : {c.total_tombstoned_bucket_collection_items} items were tombstoned.') + print(f'From bucket: {c.bucket_name} to dynamodb table: {owner_lookup.collection_db_table}') + print(f'{removed} collection items were removed.') + print(f'{added} collection items were added.') + print(f'Collections Database Updated Successfully in {time.time() - start} seconds.') + + +if __name__ == '__main__': + main() diff --git a/tests/test_collections.py b/tests/test_collections.py index 9fe8706b0e..2139fddfc6 100755 --- a/tests/test_collections.py +++ b/tests/test_collections.py @@ -13,7 +13,7 @@ from requests.utils import parse_header_links from botocore.vendored import requests from dcplib.s3_multipart import get_s3_multipart_chunk_size -from urllib.parse import parse_qsl, urlparse, urlsplit +from urllib.parse import parse_qsl, urlsplit pkg_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) # noqa sys.path.insert(0, pkg_root) # noqa @@ -24,8 +24,11 @@ from tests.fixtures.cloud_uploader import ChecksummingSink from dss.util.version import datetime_to_version_format from dss.util import UrlBuilder +from dss.collections import owner_lookup +from dss.dynamodb import DynamoDBItemNotFound +# @testmode.integration class TestCollections(unittest.TestCase, DSSAssertMixin, DSSUploadMixin): @classmethod def setUpClass(cls): @@ -41,6 +44,9 @@ def setUpClass(cls): cls.invalid_ptr = dict(type="foo", uuid=cls.file_uuid, version=cls.file_version, fragment="/xyz") cls.replicas = ('aws', 'gcp') + with open(os.environ['GOOGLE_APPLICATION_CREDENTIALS'], "r") as fh: + cls.owner_email = json.loads(fh.read())['client_email'] + @classmethod def teardownClass(cls): cls._delete_collection(cls, cls.uuid) @@ -76,28 +82,14 @@ def upload_file(app, contents): resp_obj.raise_for_status() return file_uuid, resp_obj.json()["version"] - def _test_collection_get_paging(self, codes, replica: str, per_page: int, fetch_all: bool=False): - """ - Attempts to ensure that a GET /collections call responds with a 206. - - If unsuccessful on the first attempt, temp collections will be added for the user to ensure a response - and the GET /collections will be reattempted. - """ - paging_res = self._fetch_collection_get_paging_responses(codes, replica, per_page, fetch_all) - - # only create a ton of collections when not enough collections exist in the bucket to elicit a paging response - if not paging_res: - self.create_temp_user_collections(num=per_page + 1) # guarantee a paging response - paging_res = self._fetch_collection_get_paging_responses(codes, replica, per_page, fetch_all) - self.assertTrue(paging_res) - def create_temp_user_collections(self, num: int): - for i in range(num): - contents = [self.col_file_item, self.col_ptr_item] - uuid, _ = self._put(contents) - self.addCleanup(self._delete_collection, uuid) + contents = [self.col_file_item, self.col_ptr_item] + for replica in self.replicas: + for i in range(num): + uuid, _ = self._put(contents, replica=replica) + self.addCleanup(self._delete_collection, uuid, replica) - def _fetch_collection_get_paging_responses(self, codes, replica: str, per_page: int, fetch_all: bool): + def fetch_collection_paging_response(self, codes, replica: str, per_page: int): """ GET /collections and iterate through the paging responses containing all of a user's collections. @@ -107,13 +99,17 @@ def _fetch_collection_get_paging_responses(self, codes, replica: str, per_page: url.add_query("replica", replica) url.add_query("per_page", str(per_page)) resp_obj = self.assertGetResponse(str(url), codes, headers=get_auth_header(authorized=True)) - link_header = resp_obj.response.headers.get('Link') if codes == requests.codes.bad_request: return True - paging_res, normal_res = None, None + link_header = resp_obj.response.headers.get('Link') + paging_response = False + while link_header: + # Make sure we're getting the expected response status code + self.assertEqual(resp_obj.response.status_code, requests.codes.partial) + paging_response = True link = parse_header_links(link_header)[0] self.assertEquals(link['rel'], 'next') parsed = urlsplit(link['url']) @@ -123,56 +119,93 @@ def _fetch_collection_get_paging_responses(self, codes, replica: str, per_page: headers=get_auth_header(authorized=True)) link_header = resp_obj.response.headers.get('Link') - # Make sure we're getting the expected response status code - if link_header: - self.assertEqual(resp_obj.response.status_code, requests.codes.partial) - paging_res = True - if not fetch_all: - return paging_res - else: - self.assertEqual(resp_obj.response.status_code, requests.codes.ok) - normal_res = True - if fetch_all: - self.assertTrue(normal_res) - return paging_res - - @testmode.standalone - def test_get(self): - """GET a list of all collections belonging to the user.""" - res = self.app.get('/v1/collections', - headers=get_auth_header(authorized=True), - params=dict(replica='aws')) - res.raise_for_status() - self.assertIn('collections', res.json()) + self.assertEqual(resp_obj.response.status_code, requests.codes.ok) + return paging_response def test_collection_paging(self): - # seems to take about 15 seconds per page when "per_page" == 100 - # so this scales linearly with the total number of collections in the bucket - # slow because the collection API has to open ALL collections files in the bucket - # since it cannot determine the owner without opening the file - # TODO collections desperately need indexing to run in a reasonable amount of time + min_page = 10 + self.create_temp_user_collections(num=min_page + 1) # guarantee at least one paging response codes = {requests.codes.ok, requests.codes.partial} - for replica in ['aws']: # TODO: change ['aws'] to self.replicas when GET collections is faster (indexed) - for per_page in [50, 100]: + for replica in self.replicas: + for per_page in [min_page, 100, 500]: with self.subTest(replica=replica, per_page=per_page): - # only check a full run if per_page == 100 because it takes forever - fetch_all = True if per_page == 100 else False - self._test_collection_get_paging(codes=codes, - replica=replica, - per_page=per_page, - fetch_all=fetch_all) + paging_response = self.fetch_collection_paging_response(codes=codes, + replica=replica, + per_page=per_page) + if per_page == min_page: + self.assertTrue(paging_response) + + def test_collections_db(self): + """Test that the dynamoDB functions work for a collection.""" + fake_uuid = str(uuid4()) + + with self.subTest("Assert uuid is not already among the user's collections."): + for value in owner_lookup.get_collection_fqids_for_owner(owner=self.owner_email): + self.assertNotEqual(fake_uuid, value) + + with self.subTest("Test dynamoDB put_collection."): + owner_lookup.put_collection(owner=self.owner_email, collection_fqid=fake_uuid) + + with self.subTest("Test dynamoDB get_collections_for_owner finds the put collection."): + found = False + for value in owner_lookup.get_collection_fqids_for_owner(owner=self.owner_email): + if fake_uuid == value: + found = True + break + self.assertEqual(found, True) + + with self.subTest("Test dynamoDB get_collection successfully finds collection."): + owner_lookup.get_collection(owner=self.owner_email, collection_fqid=fake_uuid) + + with self.subTest("Test dynamoDB delete_collection."): + owner_lookup.delete_collection(owner=self.owner_email, collection_fqid=fake_uuid) + for value in owner_lookup.get_collection_fqids_for_owner(owner=self.owner_email): + self.assertNotEqual(fake_uuid, value) + + with self.subTest("Test dynamoDB delete_collection silently deletes now non-existent item."): + owner_lookup.delete_collection(owner=self.owner_email, collection_fqid=fake_uuid) + + with self.subTest("Test dynamoDB put_collection (2 versions)."): + owner_lookup.put_collection(owner=self.owner_email, collection_fqid=fake_uuid + '.v1') + owner_lookup.put_collection(owner=self.owner_email, collection_fqid=fake_uuid + '.v2') + versions = 0 + for value in owner_lookup.get_collection_fqids_for_owner(owner=self.owner_email): + if value.startswith(fake_uuid): + versions += 1 + self.assertEqual(versions, 2) + + with self.subTest("Test dynamoDB delete_collection uuid (test 2 versions get deleted using one uuid)."): + owner_lookup.delete_collection_uuid(owner=self.owner_email, uuid=fake_uuid) + for value in owner_lookup.get_collection_fqids_for_owner(owner=self.owner_email): + if value.startswith(fake_uuid): + raise ValueError(f'{fake_uuid} was removed from db, but {value} was found.') + + with self.subTest("Test dynamoDB get_collection does not find deleted versions."): + with self.assertRaises(DynamoDBItemNotFound): + owner_lookup.get_collection(owner=self.owner_email, collection_fqid=fake_uuid + '.v1') + with self.assertRaises(DynamoDBItemNotFound): + owner_lookup.get_collection(owner=self.owner_email, collection_fqid=fake_uuid + '.v2') def test_collection_paging_too_small(self): """Should NOT be able to use a too-small per_page.""" for replica in self.replicas: with self.subTest(replica): - self._test_collection_get_paging(replica=replica, per_page=49, codes=requests.codes.bad_request) + self.fetch_collection_paging_response(replica=replica, per_page=9, codes=requests.codes.bad_request) def test_collection_paging_too_large(self): """Should NOT be able to use a too-large per_page.""" for replica in self.replicas: with self.subTest(replica): - self._test_collection_get_paging(replica=replica, per_page=101, codes=requests.codes.bad_request) + self.fetch_collection_paging_response(replica=replica, per_page=501, codes=requests.codes.bad_request) + + @testmode.standalone + def test_get(self): + """GET a list of all collections belonging to the user.""" + res = self.app.get('/v1/collections', + headers=get_auth_header(authorized=True), + params=dict()) + res.raise_for_status() + self.assertIn('collections', res.json()) def test_put(self): """PUT new collection.""" @@ -228,17 +261,15 @@ def test_patch(self): col_file_item = dict(type="file", uuid=self.file_uuid, version=self.file_version) col_ptr_item = dict(type="foo", uuid=self.file_uuid, version=self.file_version, fragment="/foo") contents = [col_file_item] * 8 + [col_ptr_item] * 8 - uuid, version = self._put(contents) - - with open(os.environ['GOOGLE_APPLICATION_CREDENTIALS'], "r") as fh: - owner_email = json.loads(fh.read())['client_email'] + uuid, version = self._put(contents, replica='aws') + self.addCleanup(self._delete_collection, uuid, replica='aws') expected_contents = {'contents': [col_file_item, col_ptr_item], 'description': 'd', 'details': {}, 'name': 'n', - 'owner': owner_email + 'owner': self.owner_email } tests = [(dict(), None), (dict(description="foo", name="cn"), dict(description="foo", name="cn")),