From 994d4203ead5a994d6fd7f6d1baf08c16fe97040 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Mon, 20 Mar 2023 13:37:33 -0400 Subject: [PATCH 1/2] Add management command to re-extract for asset metadata --- .../management/commands/extract_metadata.py | 121 ++++++++++++++++++ setup.py | 2 + 2 files changed, 123 insertions(+) create mode 100644 dandiapi/api/management/commands/extract_metadata.py diff --git a/dandiapi/api/management/commands/extract_metadata.py b/dandiapi/api/management/commands/extract_metadata.py new file mode 100644 index 000000000..55c110667 --- /dev/null +++ b/dandiapi/api/management/commands/extract_metadata.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +import logging +from pathlib import Path + +from dandi.dandiapi import RemoteReadableAsset +from dandi.metadata.nwb import nwb2asset +from dandi.misctypes import Digest, DigestType +from dandischema.models import get_schema_version +from django.contrib.auth.models import User +from django.db import transaction +import djclick as click +from tqdm import tqdm + +from dandiapi.api.models import Asset, Dandiset, Version +from dandiapi.api.services.asset import change_asset + +logger = logging.getLogger(__name__) + + +@click.group() +def group(): + pass + + +def get_asset_digest(asset: Asset) -> Digest: + if asset.zarr is not None: + return Digest(algorithm=DigestType.dandi_zarr_checksum, value=asset.zarr.checksum) + if asset.sha256 is not None: + return Digest(algorithm=DigestType.sha2_256, value=asset.sha256) + + # Default to Etag + if asset.blob is not None: + return Digest(algorithm=DigestType.dandi_etag, value=asset.blob.etag) + + raise Exception('Unsupported asset type') # noqa: TRY002 + + +def extract_asset_metadata(asset: Asset, draft_version: Version): + readable_asset = RemoteReadableAsset( + asset.s3_url, size=asset.size, mtime=asset.modified, name=Path(asset.path).name + ) + + if not asset.path.lower().endswith('.nwb'): + logger.info('Asset %s: Not an NWB file, skipping...', asset.path) + return + + new_metadata = nwb2asset( + readable_asset, digest=get_asset_digest(asset), schema_version=get_schema_version() + ).json_dict() + + # Use dandiset owner, default to some admin user + user = ( + draft_version.dandiset.owners.first() + or User.objects.filter(is_superuser=True, is_staff=True).first() + ) + + # Lock asset and version to ensure they don't change while performing this operation + with transaction.atomic(): + locked_asset = Asset.objects.select_for_update().get(id=asset.id) + locked_version = Version.objects.select_for_update().get(id=draft_version.id) + + # Ensure asset hasn't already been removed from the version + if not locked_version.assets.filter(id=locked_asset.id).exists(): + raise Exception(f'Asset {locked_asset} no longer exists in version {locked_version}') # noqa: TRY002 + + # Replace old asset with new asset containing updated metadata + change_asset( + user=user, + asset=locked_asset, + version=locked_version, + new_asset_blob=asset.blob, + new_zarr_archive=asset.zarr, + new_metadata=new_metadata, + ) + + +def extract_dandiset_assets(dandiset: Dandiset): + # Only update NWB assets which are out of date and do not belong to a published version + assets = dandiset.draft_version.assets.filter( + published=False, + path__iendswith='.nwb', + metadata__schemaVersion__lt=get_schema_version(), + ).select_related('blob', 'zarr') + if not assets: + logger.info('No old draft NWB assets found in dandiset %s. Skipping...', dandiset) + return + + for asset in tqdm(assets): + extract_asset_metadata(asset=asset, draft_version=dandiset.draft_version) + + +@group.command(help='Re-extracts the metadata of this asset') +@click.argument('asset_id') +def asset(asset_id: str): + asset = Asset.objects.get(asset_id=asset_id) + draft_versions = asset.versions.filter(version='draft') + if not draft_versions.exists(): + raise click.ClickException( + 'Cannot re-extract metadata of asset that has no associated draft versions.' + ) + + # Re-extract for every draft version + for version in draft_versions: + extract_asset_metadata(asset=asset, draft_version=version) + + +@group.command( + help='Re-extracts the metadata of all assets in the draft version of the provided dandiset' +) +@click.argument('dandiset_id') +def dandiset(dandiset_id: str): + dandiset = Dandiset.objects.get(id=int(dandiset_id)) + extract_dandiset_assets(dandiset) + + +@group.command(name='all', help='Re-extracts the metadata of all assets in all draft versions') +def all_dandisets(): + for dandiset in Dandiset.objects.all(): + logger.info('DANDISET: %s', dandiset.identifier) + extract_dandiset_assets(dandiset) diff --git a/setup.py b/setup.py index e7db9cc0a..7c1023a96 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ include_package_data=True, install_requires=[ 'celery', + 'dandi', # Pin dandischema to exact version to make explicit which schema version is being used 'dandischema==0.10.2', # schema version 0.6.8 'django~=4.1.0', @@ -59,6 +60,7 @@ 'djangorestframework-yaml', 'drf-extensions', 'drf-yasg', + 'fsspec[http]', 'jsonschema', 'boto3[s3]', 'more_itertools', From 637d2b7635569ece3b33ea3f3038a27992706b0f Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Tue, 15 Oct 2024 14:59:18 -0400 Subject: [PATCH 2/2] Use QuerySet.iterator to reduce memory usage --- dandiapi/api/management/commands/extract_metadata.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/dandiapi/api/management/commands/extract_metadata.py b/dandiapi/api/management/commands/extract_metadata.py index 55c110667..0a9a9ba79 100644 --- a/dandiapi/api/management/commands/extract_metadata.py +++ b/dandiapi/api/management/commands/extract_metadata.py @@ -2,6 +2,7 @@ import logging from pathlib import Path +from typing import TYPE_CHECKING from dandi.dandiapi import RemoteReadableAsset from dandi.metadata.nwb import nwb2asset @@ -15,6 +16,9 @@ from dandiapi.api.models import Asset, Dandiset, Version from dandiapi.api.services.asset import change_asset +if TYPE_CHECKING: + from django.db.models import QuerySet + logger = logging.getLogger(__name__) @@ -77,7 +81,7 @@ def extract_asset_metadata(asset: Asset, draft_version: Version): def extract_dandiset_assets(dandiset: Dandiset): # Only update NWB assets which are out of date and do not belong to a published version - assets = dandiset.draft_version.assets.filter( + assets: QuerySet[Asset] = dandiset.draft_version.assets.filter( published=False, path__iendswith='.nwb', metadata__schemaVersion__lt=get_schema_version(), @@ -86,7 +90,7 @@ def extract_dandiset_assets(dandiset: Dandiset): logger.info('No old draft NWB assets found in dandiset %s. Skipping...', dandiset) return - for asset in tqdm(assets): + for asset in tqdm(assets.iterator(), total=assets.count()): extract_asset_metadata(asset=asset, draft_version=dandiset.draft_version) @@ -101,7 +105,7 @@ def asset(asset_id: str): ) # Re-extract for every draft version - for version in draft_versions: + for version in draft_versions.iterator(): extract_asset_metadata(asset=asset, draft_version=version) @@ -116,6 +120,6 @@ def dandiset(dandiset_id: str): @group.command(name='all', help='Re-extracts the metadata of all assets in all draft versions') def all_dandisets(): - for dandiset in Dandiset.objects.all(): + for dandiset in Dandiset.objects.iterator(): logger.info('DANDISET: %s', dandiset.identifier) extract_dandiset_assets(dandiset)