From 6d2bb4dbabda053b4eb935a3ecf68c9299c59235 Mon Sep 17 00:00:00 2001 From: BillClifford Date: Wed, 12 Jul 2023 11:41:59 -0700 Subject: [PATCH] Last v15 changes. Mostly validation. --- bq/utils/publish_dataset/publish_dataset.py | 66 ++++--- .../publish_idc_pdp_staging_datasets.py | 12 +- .../instance_manifest.py | 4 +- .../vALL_instance_manifest.py | 2 +- .../vX_instance_manifest.py | 2 +- gcs/copy_bucket_mp.py | 14 +- ...dev.py => copy_premerge_buckets_to_dev.py} | 12 +- .../delete_premerge_buckets.py | 17 +- gcs/empty_bucket_mp/empty_bucket_mp.py | 29 +-- .../copy_staging_buckets_to_public_buckets.py | 16 +- .../validate_idc_dev_cr.py | 8 +- .../validate_idc_dev_defaced.py | 8 +- .../validate_idc_dev_excluded.py | 2 +- .../validate_idc_dev_open.py | 7 +- .../validate_idc_dev_redacted.py | 2 +- .../validate_idc_dev_cr.py | 45 +++++ .../validate_idc_dev_defaced.py | 46 +++++ .../validate_idc_dev_excluded.py | 44 +++++ .../validate_idc_dev_open.py | 48 +++++ .../validate_idc_dev_redacted.py | 44 +++++ .../public_buckets}/validate_idc_open_cr.py | 7 +- .../public_buckets}/validate_idc_open_idc.py | 2 +- .../public_buckets}/validate_idc_open_idc1.py | 9 +- .../validate_public_datasets_idc.py | 9 +- .../validate_idc_open_cr_staging.py | 0 .../validate_idc_open_idc1_staging.py | 0 .../validate_public_datasets_idc_staging.py | 26 ++- .../validate_bucket.py} | 74 +++++--- gcs/validate_buckets/validate_bucket_mp.py | 169 ++++++++++++++++++ idc/models.py | 2 +- ingestion/instance.py | 74 +++----- ...ulate_idc_metadata_tables_visible_human.py | 8 +- utilities/logging_config.py | 9 +- 33 files changed, 631 insertions(+), 186 deletions(-) rename gcs/{copy_premerge_to_dev_buckets/copy_premerge_to_dev.py => copy_premerge_buckets_to_dev.py} (91%) rename gcs/{release_gcs_data/validate_bucket => validate_buckets/dev_buckets_postmerge}/validate_idc_dev_cr.py (89%) rename gcs/{release_gcs_data/validate_bucket => validate_buckets/dev_buckets_postmerge}/validate_idc_dev_defaced.py (89%) rename gcs/{release_gcs_data/validate_bucket => validate_buckets/dev_buckets_postmerge}/validate_idc_dev_excluded.py (96%) rename gcs/{release_gcs_data/validate_bucket => validate_buckets/dev_buckets_postmerge}/validate_idc_dev_open.py (88%) rename gcs/{release_gcs_data/validate_bucket => validate_buckets/dev_buckets_postmerge}/validate_idc_dev_redacted.py (96%) create mode 100644 gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_cr.py create mode 100644 gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_defaced.py create mode 100644 gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_excluded.py create mode 100644 gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_open.py create mode 100644 gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_redacted.py rename gcs/{release_gcs_data/validate_bucket => validate_buckets/public_buckets}/validate_idc_open_cr.py (89%) rename gcs/{release_gcs_data/validate_bucket => validate_buckets/public_buckets}/validate_idc_open_idc.py (96%) rename gcs/{release_gcs_data/validate_bucket => validate_buckets/public_buckets}/validate_idc_open_idc1.py (89%) rename gcs/{release_gcs_data/validate_bucket => validate_buckets/public_buckets}/validate_public_datasets_idc.py (85%) rename gcs/{release_gcs_data/validate_bucket => validate_buckets/staging_buckets}/validate_idc_open_cr_staging.py (100%) rename gcs/{release_gcs_data/validate_bucket => validate_buckets/staging_buckets}/validate_idc_open_idc1_staging.py (100%) rename gcs/{release_gcs_data/validate_bucket => validate_buckets/staging_buckets}/validate_public_datasets_idc_staging.py (78%) rename gcs/{release_gcs_data/validate_bucket/validate_bucket_mp.py => validate_buckets/validate_bucket.py} (51%) create mode 100644 gcs/validate_buckets/validate_bucket_mp.py diff --git a/bq/utils/publish_dataset/publish_dataset.py b/bq/utils/publish_dataset/publish_dataset.py index 34eacc3..be297e5 100644 --- a/bq/utils/publish_dataset/publish_dataset.py +++ b/bq/utils/publish_dataset/publish_dataset.py @@ -64,10 +64,11 @@ def bq_dataset_exists(client, project , target_dataset): def copy_table(client, args, table_id): - try: - table = client.get_table(f'{args.trg_project}.{args.trg_dataset}.{table_id}') - progresslogger.info(f'Table {table} already exists.') - except: + # try: + # table = client.get_table(f'{args.trg_project}.{args.trg_dataset}.{table_id}') + # progresslogger.info(f'Table {table} already exists.') + # except: + if True: src_table_id = f'{args.src_project}.{args.src_dataset}.{table_id}' trg_table_id = f'{args.trg_project}.{args.trg_dataset}.{table_id}' @@ -75,7 +76,7 @@ def copy_table(client, args, table_id): client = bigquery.Client() job_config = bigquery.CopyJobConfig() job_config.operation_type = 'COPY' - job_config.write_disposition = 'WRITE_EMPTY' + job_config.write_disposition = 'WRITE_TRUNCATE' # Construct and run a copy job. job = client.copy_table( @@ -93,32 +94,39 @@ def copy_table(client, args, table_id): def copy_view(client, args, view_id): - try: - view = client.get_table(f'{args.trg_project}.{args.trg_dataset}.{view_id}') - progresslogger.info(f'View {view} already exists.') - except: - view = client.get_table(f'{args.src_project}.{args.src_dataset}.{view_id}') - - new_view = bigquery.Table(f'{args.trg_project}.{args.trg_dataset}.{view_id}') - new_view.view_query = view.view_query.replace(args.src_project,args.pdp_project). \ - replace(args.src_dataset,args.trg_dataset) - - new_view.friendly_name = view.friendly_name - new_view.description = view.description - new_view.labels = view.labels - installed_view = client.create_table(new_view) - - installed_view.schema = view.schema - try: - # # Update the schema after creating the view - # installed_view.schema = view.schema - client.update_table(installed_view, ['schema']) - progresslogger.info(f'Copy of view {view_id}: DONE') - except BadRequest as exc: - errlogger.error(f'{exc}') - + view = client.get_table(f'{args.trg_project}.{args.trg_dataset}.{view_id}') + progresslogger.info(f'View {view} already exists.') + client.delete_table(f'{args.trg_project}.{args.trg_dataset}.{view_id}', not_found_ok=True) + progresslogger.info(f'Deleted {view}.') + except: + progresslogger.info(f'View {view_id} does not exist.') + + finally: + view = client.get_table(f'{args.src_project}.{args.src_dataset}.{view_id}') + + new_view = bigquery.Table(f'{args.trg_project}.{args.trg_dataset}.{view_id}') + new_view.view_query = view.view_query.replace(args.src_project,args.trg_project). \ + replace(args.src_dataset,args.trg_dataset) + + new_view.friendly_name = view.friendly_name + new_view.description = view.description + new_view.labels = view.labels + installed_view = client.create_table(new_view) + + installed_view.schema = view.schema + + try: + # # Update the schema after creating the view + # installed_view.schema = view.schema + client.update_table(installed_view, ['schema']) + progresslogger.info(f'Copy of view {view_id}: DONE') + except BadRequest as exc: + errlogger.error(f'{exc}') + except Exception as exc: + errlogger.error((f'{exc}')) + progresslogger.info((f'Really done')) return def publish_dataset(args): diff --git a/bq/utils/publish_dataset/publish_idc_pdp_staging_datasets.py b/bq/utils/publish_dataset/publish_idc_pdp_staging_datasets.py index 4b85190..2ac9a19 100644 --- a/bq/utils/publish_dataset/publish_idc_pdp_staging_datasets.py +++ b/bq/utils/publish_dataset/publish_idc_pdp_staging_datasets.py @@ -32,8 +32,12 @@ progresslogger.info(f'args: {json.dumps(args.__dict__, indent=2)}') - for src_dataset in ( - f'idc_v{settings.CURRENT_VERSION}', - f'idc_v1{settings.CURRENT_VERSION}_clinical', - ): + for src_dataset in [ + # f'idc_v{settings.CURRENT_VERSION}', + # f'idc_current', + f'idc_v{settings.CURRENT_VERSION}_clinical', + # 'idc_current_clinical' + ]: + args.src_dataset = src_dataset + args.trg_dataset = src_dataset publish_dataset(args) diff --git a/dcf/gen_instance_manifest/instance_manifest.py b/dcf/gen_instance_manifest/instance_manifest.py index 47df3f2..5efb616 100644 --- a/dcf/gen_instance_manifest/instance_manifest.py +++ b/dcf/gen_instance_manifest/instance_manifest.py @@ -58,8 +58,8 @@ def gen_instance_manifest(args): ON 1=1 WHERE - # idc_version IN {args.versions} - # AND + i_rev_idc_version IN {args.versions} + AND ((i_source='tcia' AND tcia_access='Public') OR (i_source='idc' diff --git a/dcf/gen_instance_manifest/vALL_instance_manifest.py b/dcf/gen_instance_manifest/vALL_instance_manifest.py index 0fbd150..bce0af1 100644 --- a/dcf/gen_instance_manifest/vALL_instance_manifest.py +++ b/dcf/gen_instance_manifest/vALL_instance_manifest.py @@ -15,7 +15,7 @@ # -# Generate a manifest of new instance versions in the current (latest) IDC version +# Generate a manifest of new instances in some or all IDC versions import argparse import settings diff --git a/dcf/gen_instance_manifest/vX_instance_manifest.py b/dcf/gen_instance_manifest/vX_instance_manifest.py index 3b3de54..2b0b4b0 100644 --- a/dcf/gen_instance_manifest/vX_instance_manifest.py +++ b/dcf/gen_instance_manifest/vX_instance_manifest.py @@ -34,7 +34,7 @@ parser.add_argument('--manifest_uri', default=f'gs://indexd_manifests/dcf_input/pdp_hosting/idc_v{settings.CURRENT_VERSION}_instance_manifest_*.tsv', help="GCS blob in which to save results") parser.add_argument('--temp_table_bqdataset', default='whc_dev', \ - help='Manifest of temporary table') + help='BQ dataset of temporary table') parser.add_argument('--temp_table', default=f'idc_v{settings.CURRENT_VERSION}_instance_manifest', \ help='Temporary table in which to write query results') args = parser.parse_args() diff --git a/gcs/copy_bucket_mp.py b/gcs/copy_bucket_mp.py index 55cb86d..85a5e28 100644 --- a/gcs/copy_bucket_mp.py +++ b/gcs/copy_bucket_mp.py @@ -80,11 +80,15 @@ def worker(input, args, dones): src_bucket = storage.Bucket(client, args.src_bucket) dst_bucket = storage.Bucket(client, args.dst_bucket) for blob_names, n in iter(input.get, 'STOP'): - blob_names_todo = blob_names - dones - if blob_names_todo: - copy_instances(args, client, src_bucket, dst_bucket, blob_names_todo, n) - else: - progresslogger.info(f'p{args.id}: Blobs {n}:{n+len(blob_names)-1} previously copied') + try: + blob_names_todo = blob_names - dones + if blob_names_todo: + copy_instances(args, client, src_bucket, dst_bucket, blob_names_todo, n) + else: + progresslogger.info(f'p{args.id}: Blobs {n}:{n+len(blob_names)-1} previously copied') + except Exception as exc: + errlogger.error(f'p{args.id}: Error {exc}') + def copy_all_instances(args, dones): diff --git a/gcs/copy_premerge_to_dev_buckets/copy_premerge_to_dev.py b/gcs/copy_premerge_buckets_to_dev.py similarity index 91% rename from gcs/copy_premerge_to_dev_buckets/copy_premerge_to_dev.py rename to gcs/copy_premerge_buckets_to_dev.py index 02834fa..c90e16d 100644 --- a/gcs/copy_premerge_to_dev_buckets/copy_premerge_to_dev.py +++ b/gcs/copy_premerge_buckets_to_dev.py @@ -14,8 +14,8 @@ # limitations under the License. # -# Copy pre-staging buckets populated by ingestion to staging buckets. -# Ingestion copies data into prestaging buckets named by version and +# Copy premerge buckets populated by ingestion to staging buckets. +# Ingestion copies data into premerge buckets named by version and # collection, e.g. idc_v9_idc_tcga_brca. The data in these buckets must be # copied to one of the idc-dev-etl staging buckets: # idc-dev-open, idc-dev-cr, idc-dev-defaced, idc-dev-redacted, idc-dev-excluded. @@ -26,7 +26,7 @@ import settings from google.cloud import storage, bigquery -from gcs.copy_bucket_mp.copy_bucket_mp import copy_all_instances +from copy_bucket_mp import copy_all_instances def get_collection_groups(): client = bigquery.Client() @@ -59,7 +59,7 @@ def preview_copies(args, client, bucket_data): return -def copy_dev_buckets(args): +def copy_premerge_buckets(args): client = storage.Client() bucket_data= get_collection_groups() preview_copies(args, client, bucket_data) @@ -87,9 +87,9 @@ def copy_dev_buckets(args): if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--version', default=settings.CURRENT_VERSION, help='Version to work on') - parser.add_argument('--processes', default=1, help="Number of concurrent processes") + parser.add_argument('--processes', default=32, help="Number of concurrent processes") parser.add_argument('--batch', default=100, help='Size of batch assigned to each process') args = parser.parse_args() args.id = 0 # Default process ID - copy_dev_buckets(args) \ No newline at end of file + copy_premerge_buckets(args) \ No newline at end of file diff --git a/gcs/delete_premerge_buckets/delete_premerge_buckets.py b/gcs/delete_premerge_buckets/delete_premerge_buckets.py index e0a0d25..3aa5d66 100644 --- a/gcs/delete_premerge_buckets/delete_premerge_buckets.py +++ b/gcs/delete_premerge_buckets/delete_premerge_buckets.py @@ -22,15 +22,14 @@ from utilities.logging_config import successlogger, progresslogger import settings from google.cloud import storage, bigquery -from gcs.empty_bucket_mp.empty_bucket_mp import pre_delete +from gcs.empty_bucket_mp.empty_bucket_mp import del_all_instances def get_collection_groups(): client = bigquery.Client() collections = {} - breakpoint() # FROM all_collections instead of all_included_collections? query = f""" - SELECT idc_webapp_collection_id, dev_tcia_url, dev_idc_url - FROM `idc-dev-etl.{settings.BQ_DEV_INT_DATASET}.all_included_collections` + SELECT REPLACE(REPLACE(LOWER(tcia_api_collection_id),'-','_'),' ','_') idc_webapp_collection_id, dev_tcia_url, dev_idc_url + FROM `idc-dev-etl.{settings.BQ_DEV_INT_DATASET}.all_collections` """ result = client.query(query).result() @@ -39,7 +38,7 @@ def get_collection_groups(): return collections -def preview_copies(args, client, bucket_data): +def preview_deletes(args, client, bucket_data): progresslogger.info('Deleting the following buckets') for collection_id in bucket_data: if client.bucket(f'idc_v{args.version}_tcia_{collection_id}').exists(): @@ -67,7 +66,7 @@ def delete_buckets(args): client = storage.Client() bucket_data= get_collection_groups() - preview_copies(args, client, bucket_data) + preview_deletes(args, client, bucket_data) for collection_id in bucket_data: if client.bucket(f'idc_v{args.version}_tcia_{collection_id}').exists(): @@ -76,7 +75,7 @@ def delete_buckets(args): args.bucket = prestaging_bucket progresslogger.info(f'Deleting bucket {prestaging_bucket}') # Delete the contents of the bucket - pre_delete(args) + del_all_instances(args) # Delete the bucket itself client.bucket(prestaging_bucket).delete() if client.bucket(f'idc_v{args.version}_idc_{collection_id}').exists(): @@ -85,7 +84,7 @@ def delete_buckets(args): args.bucket = prestaging_bucket progresslogger.info(f'Deleting bucket {prestaging_bucket}') # Delete the contents of the bucket - pre_delete(args) + del_all_instances(args) # Delete the bucket itself client.bucket(prestaging_bucket).delete() @@ -96,7 +95,7 @@ def delete_buckets(args): parser = argparse.ArgumentParser() parser.add_argument('--version', default=settings.CURRENT_VERSION) # parser.add_argument('--prestaging_bucket_prefix', default=[f'idc_v{settings.CURRENT_VERSION}_tcia_', f'idc_v{settings.CURRENT_VERSION}_idc_'], help='Prefix of premerge buckets') - parser.add_argument('--processes', default=32, help="Number of concurrent processes") + parser.add_argument('--processes', default=8, help="Number of concurrent processes") parser.add_argument('--batch', default=100, help='Size of batch assigned to each process') args = parser.parse_args() diff --git a/gcs/empty_bucket_mp/empty_bucket_mp.py b/gcs/empty_bucket_mp/empty_bucket_mp.py index 14aae57..54a028b 100644 --- a/gcs/empty_bucket_mp/empty_bucket_mp.py +++ b/gcs/empty_bucket_mp/empty_bucket_mp.py @@ -43,20 +43,21 @@ def delete_instances(args, client, bucket, blobs, n): - try: - with client.batch(): - for blob in blobs: - bucket.blob(blob[0], generation=blob[1]).delete() - # bucket.blob(blob[0], generation=blob[1]).delete() - - successlogger.info('p%s Delete %s blobs %s:%s ', args.id, args.bucket, n, n+len(blobs)-1) - except ServiceUnavailable: - errlogger.error('p%s Delete %s blobs %s:%s failed', args.id, args.bucket, n, n+len(blobs)-1) - except NotFound: - errlogger.error('p%s Delete %s blobs %s:%s failed, not found', args.id, args.bucket, n, n+len(blobs)-1) - except Exception as exc: - errlogger.error('p%s Exception %s %s:%s', args.id, exc, n, n+len(blobs)-1) - + TRIES = 3 + for i in range(TRIES): + try: + with client.batch(): + for blob in blobs: + bucket.blob(blob[0], generation=blob[1]).delete() + successlogger.info('p%s Delete %s blobs %s:%s ', args.id, args.bucket, n, n+len(blobs)-1) + break + except ServiceUnavailable: + errlogger.error('p%s Delete %s blobs %s:%s failed', args.id, args.bucket, n, n+len(blobs)-1) + except Exception as exc: + errlogger.error('p%s Exception %s %s:%s', args.id, exc, n, n+len(blobs)-1) + except NotFound: + errlogger.error('p%s Delete %s blobs %s:%s failed, not found', args.id, args.bucket, n, n+len(blobs)-1) + break def worker(input, args): diff --git a/gcs/release_gcs_data/copy_staging_buckets_to_public_buckets.py b/gcs/release_gcs_data/copy_staging_buckets_to_public_buckets.py index 30eb25c..64273a6 100644 --- a/gcs/release_gcs_data/copy_staging_buckets_to_public_buckets.py +++ b/gcs/release_gcs_data/copy_staging_buckets_to_public_buckets.py @@ -21,7 +21,7 @@ if __name__ == '__main__': parser = argparse.ArgumentParser() - parser.add_argument('--processes', default=1, help="Number of concurrent processes") + parser.add_argument('--processes', default=32, help="Number of concurrent processes") parser.add_argument('--batch', default=100, help='Size of batch assigned to each process') parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/copy_bucket_mp') @@ -34,13 +34,13 @@ dones = set([]) - args.src_bucket = 'idc-open-idc1-staging' - args.dst_bucket = 'idc-open-idc1' - copy_all_instances(args, dones) - - args.src_bucket = 'idc-open-cr-staging' - args.dst_bucket = 'idc-open-cr' - copy_all_instances(args, dones) + # args.src_bucket = 'idc-open-idc1-staging' + # args.dst_bucket = 'idc-open-idc1' + # copy_all_instances(args, dones) + # + # args.src_bucket = 'idc-open-cr-staging' + # args.dst_bucket = 'idc-open-cr' + # copy_all_instances(args, dones) args.src_bucket = 'public-datasets-idc-staging' args.dst_bucket = 'public-datasets-idc' diff --git a/gcs/release_gcs_data/validate_bucket/validate_idc_dev_cr.py b/gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_cr.py similarity index 89% rename from gcs/release_gcs_data/validate_bucket/validate_idc_dev_cr.py rename to gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_cr.py index 4facdb8..83aaa6b 100644 --- a/gcs/release_gcs_data/validate_bucket/validate_idc_dev_cr.py +++ b/gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_cr.py @@ -23,13 +23,15 @@ import json import settings -from gcs.validate_bucket.validate_bucket_mp import check_all_instances - +import builtins +builtins.APPEND_PROGRESSLOGGER = True +from gcs.validate_buckets.validate_bucket_mp import check_all_instances_mp if __name__ == '__main__': parser = argparse.ArgumentParser() # parser.add_argument('--version', default=f'{settings.CURRENT_VERSION}') parser.add_argument('--version', default=settings.CURRENT_VERSION) + parser.add_argument('--processes', default=32) parser.add_argument('--bucket', default='idc-dev-cr') parser.add_argument('--dev_or_pub', default = 'dev', help='Validating a dev or pub bucket') parser.add_argument('--premerge', default=False, help='True when performing prior to merging premerge buckets') @@ -40,4 +42,4 @@ args = parser.parse_args() print(f'args: {json.dumps(args.__dict__, indent=2)}') - check_all_instances(args) + check_all_instances_mp(args) diff --git a/gcs/release_gcs_data/validate_bucket/validate_idc_dev_defaced.py b/gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_defaced.py similarity index 89% rename from gcs/release_gcs_data/validate_bucket/validate_idc_dev_defaced.py rename to gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_defaced.py index 975bf82..15a29fb 100644 --- a/gcs/release_gcs_data/validate_bucket/validate_idc_dev_defaced.py +++ b/gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_defaced.py @@ -22,14 +22,16 @@ import argparse import json import settings - -from gcs.validate_bucket.validate_bucket_mp import check_all_instances +import builtins +builtins.APPEND_PROGRESSLOGGER = True +from gcs.validate_buckets.validate_bucket_mp import check_all_instances_mp if __name__ == '__main__': parser = argparse.ArgumentParser() # parser.add_argument('--version', default=f'{settings.CURRENT_VERSION}') parser.add_argument('--version', default=settings.CURRENT_VERSION) + parser.add_argument('--processes', default=32) parser.add_argument('--bucket', default='idc-dev-defaced') parser.add_argument('--dev_or_pub', default = 'dev', help='Validating a dev or pub bucket') parser.add_argument('--premerge', default=False, help='True when performing prior to merging premerge buckets') @@ -41,4 +43,4 @@ args = parser.parse_args() print(f'args: {json.dumps(args.__dict__, indent=2)}') - check_all_instances(args) \ No newline at end of file + check_all_instances_mp(args) \ No newline at end of file diff --git a/gcs/release_gcs_data/validate_bucket/validate_idc_dev_excluded.py b/gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_excluded.py similarity index 96% rename from gcs/release_gcs_data/validate_bucket/validate_idc_dev_excluded.py rename to gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_excluded.py index ae48291..bf53c05 100644 --- a/gcs/release_gcs_data/validate_bucket/validate_idc_dev_excluded.py +++ b/gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_excluded.py @@ -23,7 +23,7 @@ import json import settings -from gcs.validate_bucket.validate_bucket_mp import check_all_instances +from gcs.validate_buckets.validate_bucket import check_all_instances if __name__ == '__main__': diff --git a/gcs/release_gcs_data/validate_bucket/validate_idc_dev_open.py b/gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_open.py similarity index 88% rename from gcs/release_gcs_data/validate_bucket/validate_idc_dev_open.py rename to gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_open.py index 1509859..c95138b 100644 --- a/gcs/release_gcs_data/validate_bucket/validate_idc_dev_open.py +++ b/gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_open.py @@ -22,14 +22,17 @@ import argparse import json import settings +import builtins +builtins.APPEND_PROGRESSLOGGER = True from utilities.logging_config import progresslogger -from gcs.validate_bucket.validate_bucket_mp import check_all_instances +from gcs.validate_buckets.validate_bucket_mp import check_all_instances_mp if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--version', default=settings.CURRENT_VERSION) + parser.add_argument('--processes', default=64) parser.add_argument('--bucket', default='idc-dev-open') # parser.add_argument('--src_project', default=settings.DEV_PROJECT) parser.add_argument('--dev_or_pub', default = 'dev', help='Validating a dev or pub bucket') @@ -42,4 +45,4 @@ args = parser.parse_args() progresslogger.info(f'args: {json.dumps(args.__dict__, indent=2)}') - check_all_instances(args, args.premerge, premerge=args.premerge) \ No newline at end of file + check_all_instances_mp(args, premerge=args.premerge) \ No newline at end of file diff --git a/gcs/release_gcs_data/validate_bucket/validate_idc_dev_redacted.py b/gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_redacted.py similarity index 96% rename from gcs/release_gcs_data/validate_bucket/validate_idc_dev_redacted.py rename to gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_redacted.py index e3fe030..78eca26 100644 --- a/gcs/release_gcs_data/validate_bucket/validate_idc_dev_redacted.py +++ b/gcs/validate_buckets/dev_buckets_postmerge/validate_idc_dev_redacted.py @@ -23,7 +23,7 @@ import json import settings -from gcs.validate_bucket.validate_bucket_mp import check_all_instances +from gcs.validate_buckets.validate_bucket import check_all_instances if __name__ == '__main__': diff --git a/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_cr.py b/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_cr.py new file mode 100644 index 0000000..ac5396d --- /dev/null +++ b/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_cr.py @@ -0,0 +1,45 @@ +# +# Copyright 2015-2021, Institute for Systems Biology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Multiprocess script to validate that the idc-dev-cr bucket +contains the expected set of blobs. +""" + +import argparse +import json +import settings + +import builtins +builtins.APPEND_PROGRESSLOGGER = True +from gcs.validate_buckets.validate_bucket_mp import check_all_instances_mp + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + # parser.add_argument('--version', default=f'{settings.CURRENT_VERSION}') + parser.add_argument('--version', default=settings.CURRENT_VERSION) + parser.add_argument('--processes', default=32) + parser.add_argument('--bucket', default='idc-dev-cr') + parser.add_argument('--dev_or_pub', default = 'dev', help='Validating a dev or pub bucket') + parser.add_argument('--premerge', default=True, help='True when performing prior to merging premerge buckets') + parser.add_argument('--expected_blobs', default=f'{settings.LOG_DIR}/expected_blobs.txt', help='List of blobs names expected to be in above collections') + parser.add_argument('--found_blobs', default=f'{settings.LOG_DIR}/found_blobs.txt', help='List of blobs names found in bucket') + parser.add_argument('--batch', default=10000, help='Size of batch assigned to each process') + parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/validate_open_buckets') + + args = parser.parse_args() + print(f'args: {json.dumps(args.__dict__, indent=2)}') + check_all_instances_mp(args) diff --git a/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_defaced.py b/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_defaced.py new file mode 100644 index 0000000..c70343c --- /dev/null +++ b/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_defaced.py @@ -0,0 +1,46 @@ +# +# Copyright 2015-2021, Institute for Systems Biology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Multiprocess script to validate that the idc-dev-defaced bucket +contains the expected set of blobs. +""" + +import argparse +import json +import settings +import builtins +builtins.APPEND_PROGRESSLOGGER = True +from gcs.validate_buckets.validate_bucket_mp import check_all_instances_mp + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + # parser.add_argument('--version', default=f'{settings.CURRENT_VERSION}') + parser.add_argument('--version', default=settings.CURRENT_VERSION) + parser.add_argument('--processes', default=32) + parser.add_argument('--bucket', default='idc-dev-defaced') + parser.add_argument('--dev_or_pub', default = 'dev', help='Validating a dev or pub bucket') + parser.add_argument('--premerge', default=True, help='True when performing prior to merging premerge buckets') + parser.add_argument('--expected_blobs', default=f'{settings.LOG_DIR}/expected_blobs.txt', help='List of blobs names expected to be in above collections') + parser.add_argument('--found_blobs', default=f'{settings.LOG_DIR}/found_blobs.txt', help='List of blobs names found in bucket') + parser.add_argument('--batch', default=10000, help='Size of batch assigned to each process') + parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/validate_open_buckets') + + args = parser.parse_args() + print(f'args: {json.dumps(args.__dict__, indent=2)}') + + check_all_instances_mp(args) \ No newline at end of file diff --git a/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_excluded.py b/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_excluded.py new file mode 100644 index 0000000..5f88ac8 --- /dev/null +++ b/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_excluded.py @@ -0,0 +1,44 @@ +# +# Copyright 2015-2021, Institute for Systems Biology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Multiprocess script to validate that the idc-dev-excluded bucket +contains the expected set of blobs. +""" + +import argparse +import json +import settings + +from gcs.validate_buckets.validate_bucket import check_all_instances + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + # parser.add_argument('--version', default=f'{settings.CURRENT_VERSION}') + parser.add_argument('--version', default=settings.CURRENT_VERSION) + parser.add_argument('--bucket', default='idc-dev-excluded') + parser.add_argument('--dev_or_pub', default = 'dev', help='Validating a dev or pub bucket') + parser.add_argument('--premerge', default=True, help='True when performing prior to merging premerge buckets') + parser.add_argument('--expected_blobs', default=f'{settings.LOG_DIR}/expected_blobs.txt', help='List of blobs names expected to be in above collections') + parser.add_argument('--found_blobs', default=f'{settings.LOG_DIR}/found_blobs.txt', help='List of blobs names found in bucket') + parser.add_argument('--batch', default=10000, help='Size of batch assigned to each process') + parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/validate_open_buckets') + + args = parser.parse_args() + print(f'args: {json.dumps(args.__dict__, indent=2)}') + + check_all_instances(args) diff --git a/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_open.py b/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_open.py new file mode 100644 index 0000000..cbc12e7 --- /dev/null +++ b/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_open.py @@ -0,0 +1,48 @@ +# +# Copyright 2015-2021, Institute for Systems Biology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Multiprocess script to validate that the idc-dev-open bucket +contains the expected set of blobs. +""" + +import argparse +import json +import settings +import builtins +builtins.APPEND_PROGRESSLOGGER = True +from utilities.logging_config import progresslogger + +from gcs.validate_buckets.validate_bucket_mp import check_all_instances_mp + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--version', default=settings.CURRENT_VERSION) + parser.add_argument('--processes', default=64) + parser.add_argument('--bucket', default='idc-dev-open') + # parser.add_argument('--src_project', default=settings.DEV_PROJECT) + parser.add_argument('--dev_or_pub', default = 'dev', help='Validating a dev or pub bucket') + parser.add_argument('--premerge', default=True, help='True when performing prior to merging premerge buckets') + parser.add_argument('--expected_blobs', default=f'{settings.LOG_DIR}/expected_blobs.txt', help='List of blobs names expected to be in above collections') + parser.add_argument('--found_blobs', default=f'{settings.LOG_DIR}/found_blobs.txt', help='List of blobs names found in bucket') + parser.add_argument('--batch', default=10000, help='Size of batch assigned to each process') + parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/validate_open_buckets') + + args = parser.parse_args() + progresslogger.info(f'args: {json.dumps(args.__dict__, indent=2)}') + + check_all_instances_mp(args, premerge=args.premerge) \ No newline at end of file diff --git a/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_redacted.py b/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_redacted.py new file mode 100644 index 0000000..a45014e --- /dev/null +++ b/gcs/validate_buckets/dev_buckets_premerge/validate_idc_dev_redacted.py @@ -0,0 +1,44 @@ +# +# Copyright 2015-2021, Institute for Systems Biology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Multiprocess script to validate that the idc-dev-redacted bucket +contains the expected set of blobs. +""" + +import argparse +import json +import settings + +from gcs.validate_buckets.validate_bucket import check_all_instances + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + # parser.add_argument('--version', default=f'{settings.CURRENT_VERSION}') + parser.add_argument('--version', default=settings.CURRENT_VERSION) + parser.add_argument('--bucket', default='idc-dev-redacted') + parser.add_argument('--dev_or_pub', default = 'dev', help='Validating a dev or pub bucket') + parser.add_argument('--premerge', default=True, help='True when performing prior to merging premerge buckets') + parser.add_argument('--expected_blobs', default=f'{settings.LOG_DIR}/expected_blobs.txt', help='List of blobs names expected to be in above collections') + parser.add_argument('--found_blobs', default=f'{settings.LOG_DIR}/found_blobs.txt', help='List of blobs names found in bucket') + parser.add_argument('--batch', default=10000, help='Size of batch assigned to each process') + parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/validate_open_buckets') + + args = parser.parse_args() + print(f'args: {json.dumps(args.__dict__, indent=2)}') + + check_all_instances(args) diff --git a/gcs/release_gcs_data/validate_bucket/validate_idc_open_cr.py b/gcs/validate_buckets/public_buckets/validate_idc_open_cr.py similarity index 89% rename from gcs/release_gcs_data/validate_bucket/validate_idc_open_cr.py rename to gcs/validate_buckets/public_buckets/validate_idc_open_cr.py index 9f4c75d..b4f43b3 100644 --- a/gcs/release_gcs_data/validate_bucket/validate_idc_open_cr.py +++ b/gcs/validate_buckets/public_buckets/validate_idc_open_cr.py @@ -21,12 +21,15 @@ import json import settings -from gcs.validate_bucket.validate_bucket_mp import check_all_instances +import builtins +builtins.APPEND_PROGRESSLOGGER = True +from gcs.validate_buckets.validate_bucket_mp import check_all_instances_mp if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--version', default=f'{settings.CURRENT_VERSION}') + parser.add_argument('--processes', default=32) parser.add_argument('--bucket', default='idc-open-cr') parser.add_argument('--dev_or_pub', default = 'pub', help='Validating a dev or pub bucket') # parser.add_argument('--collection_group_table', default='cr_collections', help='BQ table containing list of collections') @@ -38,4 +41,4 @@ args = parser.parse_args() print(f'args: {json.dumps(args.__dict__, indent=2)}') - check_all_instances(args) + check_all_instances_mp(args) diff --git a/gcs/release_gcs_data/validate_bucket/validate_idc_open_idc.py b/gcs/validate_buckets/public_buckets/validate_idc_open_idc.py similarity index 96% rename from gcs/release_gcs_data/validate_bucket/validate_idc_open_idc.py rename to gcs/validate_buckets/public_buckets/validate_idc_open_idc.py index 1c29e2d..39ce368 100644 --- a/gcs/release_gcs_data/validate_bucket/validate_idc_open_idc.py +++ b/gcs/validate_buckets/public_buckets/validate_idc_open_idc.py @@ -24,7 +24,7 @@ import json import settings -from gcs.validate_bucket.validate_bucket_mp import check_all_instances +from gcs.validate_buckets.validate_bucket import check_all_instances if __name__ == '__main__': diff --git a/gcs/release_gcs_data/validate_bucket/validate_idc_open_idc1.py b/gcs/validate_buckets/public_buckets/validate_idc_open_idc1.py similarity index 89% rename from gcs/release_gcs_data/validate_bucket/validate_idc_open_idc1.py rename to gcs/validate_buckets/public_buckets/validate_idc_open_idc1.py index 7170551..3dbda5f 100644 --- a/gcs/release_gcs_data/validate_bucket/validate_idc_open_idc1.py +++ b/gcs/validate_buckets/public_buckets/validate_idc_open_idc1.py @@ -22,14 +22,17 @@ import argparse import json import settings -from utilities.logging_config import successlogger, progresslogger, errlogger -from gcs.validate_bucket.validate_bucket_mp import check_all_instances +import builtins +builtins.APPEND_PROGRESSLOGGER = True +from utilities.logging_config import successlogger, progresslogger, errlogger +from gcs.validate_buckets.validate_bucket_mp import check_all_instances_mp if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--version', default=f'{settings.CURRENT_VERSION}') + parser.add_argument('--processes', default=32) parser.add_argument('--bucket', default='idc-open-idc1') parser.add_argument('--dev_or_pub', default = 'pub', help='Validating a dev or pub bucket') # parser.add_argument('--collection_group_table', default='defaced_collections', help='BQ table containing list of collections') @@ -41,4 +44,4 @@ args = parser.parse_args() progresslogger.info(f'args: {json.dumps(args.__dict__, indent=2)}') - check_all_instances(args) + check_all_instances_mp(args) diff --git a/gcs/release_gcs_data/validate_bucket/validate_public_datasets_idc.py b/gcs/validate_buckets/public_buckets/validate_public_datasets_idc.py similarity index 85% rename from gcs/release_gcs_data/validate_bucket/validate_public_datasets_idc.py rename to gcs/validate_buckets/public_buckets/validate_public_datasets_idc.py index 554f3ef..7aa1840 100644 --- a/gcs/release_gcs_data/validate_bucket/validate_public_datasets_idc.py +++ b/gcs/validate_buckets/public_buckets/validate_public_datasets_idc.py @@ -22,13 +22,16 @@ import argparse import json import settings - -from gcs.validate_bucket.validate_bucket_mp import check_all_instances +import builtins +builtins.APPEND_PROGRESSLOGGER = True +from utilities.logging_config import progresslogger +from gcs.validate_buckets.validate_bucket_mp import check_all_instances_mp if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--version', default=f'{settings.CURRENT_VERSION}') + parser.add_argument('--processes', default=64) parser.add_argument('--bucket', default='public-datasets-idc', help='Bucket to be validated') parser.add_argument('--dev_or_pub', default = 'pub', help='Validating a dev or pub bucket') parser.add_argument('--premerge', default=False, help='True when performing prior to merging premerge buckets') @@ -39,4 +42,4 @@ args = parser.parse_args() print(f'args: {json.dumps(args.__dict__, indent=2)}') - check_all_instances(args, premerge=args.premerge) + check_all_instances_mp(args, premerge=args.premerge) diff --git a/gcs/release_gcs_data/validate_bucket/validate_idc_open_cr_staging.py b/gcs/validate_buckets/staging_buckets/validate_idc_open_cr_staging.py similarity index 100% rename from gcs/release_gcs_data/validate_bucket/validate_idc_open_cr_staging.py rename to gcs/validate_buckets/staging_buckets/validate_idc_open_cr_staging.py diff --git a/gcs/release_gcs_data/validate_bucket/validate_idc_open_idc1_staging.py b/gcs/validate_buckets/staging_buckets/validate_idc_open_idc1_staging.py similarity index 100% rename from gcs/release_gcs_data/validate_bucket/validate_idc_open_idc1_staging.py rename to gcs/validate_buckets/staging_buckets/validate_idc_open_idc1_staging.py diff --git a/gcs/release_gcs_data/validate_bucket/validate_public_datasets_idc_staging.py b/gcs/validate_buckets/staging_buckets/validate_public_datasets_idc_staging.py similarity index 78% rename from gcs/release_gcs_data/validate_bucket/validate_public_datasets_idc_staging.py rename to gcs/validate_buckets/staging_buckets/validate_public_datasets_idc_staging.py index ba976bd..f8555c9 100644 --- a/gcs/release_gcs_data/validate_bucket/validate_public_datasets_idc_staging.py +++ b/gcs/validate_buckets/staging_buckets/validate_public_datasets_idc_staging.py @@ -48,16 +48,33 @@ def get_expected_blobs_in_bucket(args): f.write(''.join(rows)) +# def get_found_blobs_in_bucket(args): +# client = storage.Client() +# bucket = client.bucket(args.bucket) +# page_token = "" +# # iterator = client.list_blobs(bucket, page_token=page_token, max_results=args.batch) +# iterator = client.list_blobs(bucket, versions=False, page_token=page_token, page_size=args.batch) +# with open(args.found_blobs, 'w') as f: +# for page in iterator.pages: +# blobs = [f'{blob.name}\n' for blob in page] +# f.write(''.join(blobs)) + + def get_found_blobs_in_bucket(args): client = storage.Client() bucket = client.bucket(args.bucket) page_token = "" # iterator = client.list_blobs(bucket, page_token=page_token, max_results=args.batch) - iterator = client.list_blobs(bucket, versions=False, page_token=page_token, page_size=args.batch) with open(args.found_blobs, 'w') as f: - for page in iterator.pages: - blobs = [f'{blob.name}\n' for blob in page] - f.write(''.join(blobs)) + series_iterator = client.list_blobs(bucket, versions=False, page_token=page_token, page_size=args.batch, \ + prefix='', delimiter='/') + for page in series_iterator.pages: + for prefix in page.prefixes: + instance_iterator = client.list_blobs(bucket, versions=False, page_token=page_token, page_size=args.batch, \ + prefix=prefix) + for page in instance_iterator.pages: + blobs = [f'{blob.name}\n' for blob in page] + f.write(''.join(blobs)) def check_all_instances(args): @@ -74,6 +91,7 @@ def check_all_instances(args): get_found_blobs_in_bucket(args) found_blobs = set(open(args.found_blobs).read().splitlines()) # json.dump(psql_blobs, open(args.blob_names), 'w') + if found_blobs == expected_blobs: successlogger.info(f"Bucket {args.bucket} has the correct set of blobs") else: diff --git a/gcs/release_gcs_data/validate_bucket/validate_bucket_mp.py b/gcs/validate_buckets/validate_bucket.py similarity index 51% rename from gcs/release_gcs_data/validate_bucket/validate_bucket_mp.py rename to gcs/validate_buckets/validate_bucket.py index d5ef8d0..09b9a7d 100644 --- a/gcs/release_gcs_data/validate_bucket/validate_bucket_mp.py +++ b/gcs/validate_buckets/validate_bucket.py @@ -23,27 +23,37 @@ def get_expected_blobs_in_bucket(args, premerge=False): client = bigquery.Client() - query = f""" - SELECT distinct concat(i.uuid, '.dcm') as blob_name - FROM `idc-dev-etl.idc_v{args.version}_dev.version` v - JOIN `idc-dev-etl.idc_v{args.version}_dev.version_collection` vc ON v.version = vc.version - JOIN `idc-dev-etl.idc_v{args.version}_dev.collection` c ON vc.collection_uuid = c.uuid - JOIN `idc-dev-etl.idc_v{args.version}_dev.collection_patient` cp ON c.uuid = cp.collection_uuid - JOIN `idc-dev-etl.idc_v{args.version}_dev.patient` p ON cp.patient_uuid = p.uuid - JOIN `idc-dev-etl.idc_v{args.version}_dev.patient_study` ps ON p.uuid = ps.patient_uuid - JOIN `idc-dev-etl.idc_v{args.version}_dev.study` st ON ps.study_uuid = st.uuid - JOIN `idc-dev-etl.idc_v{args.version}_dev.study_series` ss ON st.uuid = ss.study_uuid - JOIN `idc-dev-etl.idc_v{args.version}_dev.series` se ON ss.series_uuid = se.uuid - JOIN `idc-dev-etl.idc_v{args.version}_dev.series_instance` si ON se.uuid = si.series_uuid - JOIN `idc-dev-etl.idc_v{args.version}_dev.instance` i ON si.instance_uuid = i.uuid - JOIN `idc-dev-etl.idc_v{args.version}_dev.all_collections` aic - ON c.collection_id = aic.tcia_api_collection_id - WHERE ((i.source='tcia' and aic.{args.dev_or_pub}_tcia_url="{args.bucket}") - OR (i.source='idc' and aic.{args.dev_or_pub}_idc_url="{args.bucket}")) - AND i.excluded = False - AND if({premerge}, i.rev_idc_version < {args.version}, i.rev_idc_version <= {args.version}) - """ + # query = f""" + # SELECT distinct concat(s.uuid,'/', i.uuid, '.dcm') as blob_name + # FROM `idc-dev-etl.idc_v{args.version}_dev.version` v + # JOIN `idc-dev-etl.idc_v{args.version}_dev.version_collection` vc ON v.version = vc.version + # JOIN `idc-dev-etl.idc_v{args.version}_dev.collection` c ON vc.collection_uuid = c.uuid + # JOIN `idc-dev-etl.idc_v{args.version}_dev.collection_patient` cp ON c.uuid = cp.collection_uuid + # JOIN `idc-dev-etl.idc_v{args.version}_dev.patient` p ON cp.patient_uuid = p.uuid + # JOIN `idc-dev-etl.idc_v{args.version}_dev.patient_study` ps ON p.uuid = ps.patient_uuid + # JOIN `idc-dev-etl.idc_v{args.version}_dev.study` st ON ps.study_uuid = st.uuid + # JOIN `idc-dev-etl.idc_v{args.version}_dev.study_series` ss ON st.uuid = ss.study_uuid + # JOIN `idc-dev-etl.idc_v{args.version}_dev.series` se ON ss.series_uuid = se.uuid + # JOIN `idc-dev-etl.idc_v{args.version}_dev.series_instance` si ON se.uuid = si.series_uuid + # JOIN `idc-dev-etl.idc_v{args.version}_dev.instance` i ON si.instance_uuid = i.uuid + # JOIN `idc-dev-etl.idc_v{args.version}_dev.all_collections` aic + # ON c.collection_id = aic.tcia_api_collection_id + # WHERE ((i.source='tcia' and aic.{args.dev_or_pub}_tcia_url="{args.bucket}") + # OR (i.source='idc' and aic.{args.dev_or_pub}_idc_url="{args.bucket}")) + # AND i.excluded = False + # AND if({premerge}, i.rev_idc_version < {args.version}, i.rev_idc_version <= {args.version}) + # """ + query = f""" + SELECT distinct concat(se_uuid,'/', i_uuid, '.dcm') as blob_name + FROM `idc-dev-etl.idc_v{args.version}_dev.all_joined` aj + JOIN `idc-dev-etl.idc_v{args.version}_dev.all_collections` aic + ON aj.idc_collection_id = aic.idc_collection_id + WHERE ((i_source='tcia' and aic.{args.dev_or_pub}_gcs_tcia_url="{args.bucket}") + OR (i_source='idc' and aic.{args.dev_or_pub}_gcs_idc_url="{args.bucket}")) + AND i_excluded = False + AND if({premerge}, i_rev_idc_version < {args.version}, i_rev_idc_version <= {args.version}) + """ query_job = client.query(query) # Make an API request. query_job.result() # Wait for the query to complete. @@ -64,16 +74,32 @@ def get_expected_blobs_in_bucket(args, premerge=False): rows = [f'{row["blob_name"]}\n' for row in page] f.write(''.join(rows)) +# def get_found_blobs_in_bucket(args): +# client = storage.Client() +# bucket = client.bucket(args.bucket) +# page_token = "" +# # iterator = client.list_blobs(bucket, page_token=page_token, max_results=args.batch) +# iterator = client.list_blobs(bucket, versions=False, page_token=page_token, page_size=args.batch) +# with open(args.found_blobs, 'w') as f: +# for page in iterator.pages: +# blobs = [f'{blob.name}\n' for blob in page] +# f.write(''.join(blobs)) + def get_found_blobs_in_bucket(args): client = storage.Client() bucket = client.bucket(args.bucket) page_token = "" # iterator = client.list_blobs(bucket, page_token=page_token, max_results=args.batch) - iterator = client.list_blobs(bucket, versions=False, page_token=page_token, page_size=args.batch) with open(args.found_blobs, 'w') as f: - for page in iterator.pages: - blobs = [f'{blob.name}\n' for blob in page] - f.write(''.join(blobs)) + series_iterator = client.list_blobs(bucket, versions=False, page_token=page_token, page_size=args.batch, \ + prefix='', delimiter='/') + for page in series_iterator.pages: + for prefix in page.prefixes: + instance_iterator = client.list_blobs(bucket, versions=False, page_token=page_token, page_size=args.batch, \ + prefix=prefix) + for page in instance_iterator.pages: + blobs = [f'{blob.name}\n' for blob in page] + f.write(''.join(blobs)) def check_all_instances(args, premerge=False): try: diff --git a/gcs/validate_buckets/validate_bucket_mp.py b/gcs/validate_buckets/validate_bucket_mp.py new file mode 100644 index 0000000..bbc5590 --- /dev/null +++ b/gcs/validate_buckets/validate_bucket_mp.py @@ -0,0 +1,169 @@ +# +# Copyright 2015-2021, Institute for Systems Biology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Validate that a bucket holds the correct set of instance blobs +""" +import settings +import builtins +# Noramlly the progresslogger file is trunacated. The following causes it to be appended. +# builtins.APPEND_PROGRESSLOGGER = True +from utilities.logging_config import successlogger, progresslogger, errlogger +from google.cloud import storage, bigquery +from multiprocessing import Process, Queue + +def worker(input, args, dones): + # proglogger.info('p%s: Worker starting: args: %s', args.id, args ) + # print(f'p{args.id}: Worker starting: args: {args}') + + RETRIES=3 + + client = storage.Client() + bucket = client.bucket(args.bucket) + + for prefixes, n in iter(input.get, 'STOP'): + try: + for prefix in prefixes: + if prefix not in dones: + instance_iterator = client.list_blobs(bucket, versions=False, page_size=args.batch, \ + prefix=prefix) + for page in instance_iterator.pages: + + for blob in page: + successlogger.info(blob.name) + progresslogger.info(prefix) + # if blob_names_todo: + # copy_instances(args, client, src_bucket, dst_bucket, blob_names_todo, n) + # else: + # progresslogger.info(f'p{args.id}: Blobs {n}:{n+len(blob_names)-1} previously copied') + except Exception as exc: + errlogger.error(f'p{args.id}: Error {exc}') + +def get_expected_blobs_in_bucket(args, premerge=False): + client = bigquery.Client() + query = f""" + SELECT distinct concat(se_uuid,'/', i_uuid, '.dcm') as blob_name + FROM `idc-dev-etl.idc_v{args.version}_dev.all_joined` aj + JOIN `idc-dev-etl.idc_v{args.version}_dev.all_collections` aic + ON aj.idc_collection_id = aic.idc_collection_id + WHERE ((i_source='tcia' and aic.{"dev" if args.dev_or_pub=="dev" else "pub_gcs"}_tcia_url="{args.bucket}") + OR (i_source='idc' and aic.{"dev" if args.dev_or_pub=="dev" else "pub_gcs"}_idc_url="{args.bucket}")) + AND i_excluded = False + AND if({premerge}, i_rev_idc_version < {args.version}, i_rev_idc_version <= {args.version}) + ORDER BY blob_name + """ + + query_job = client.query(query) # Make an API request. + query_job.result() # Wait for the query to complete. + + # Get the destination table for the query results. + # + # All queries write to a destination table. If a destination table is not + # specified, the BigQuery populates it with a reference to a temporary + # anonymous table after the query completes. + destination = query_job.destination + + # Get the schema (and other properties) for the destination table. + # + # A schema is useful for converting from BigQuery types to Python types. + destination = client.get_table(destination) + with open(args.expected_blobs, 'w') as f: + for page in client.list_rows(destination, page_size=args.batch).pages: + rows = [f'{row["blob_name"]}\n' for row in page] + f.write(''.join(rows)) + + +def get_found_blobs_in_bucket(args): + client = storage.Client() + bucket = client.bucket(args.bucket) + page_token = "" + # Get the completed series + done_series = open(f'{progresslogger.handlers[0].baseFilename}').read().splitlines() + + # Start worker processes + num_processes = args.processes + processes = [] + task_queue = Queue() + for process in range(num_processes): + args.id = process + 1 + processes.append( + Process(group=None, target=worker, args=(task_queue, args, done_series))) + processes[-1].start() + + # iterator = client.list_blobs(bucket, page_token=page_token, max_results=args.batch) + n = 0 + with open(args.found_blobs, 'w') as f: + series_iterator = client.list_blobs(bucket, versions=False, page_size=args.batch, \ + prefix='', delimiter='/') + for page in series_iterator.pages: + prefixes = [prefix for prefix in page.prefixes] + task_queue.put((prefixes, n)) + # for prefix in page.prefixes: + # instance_iterator = client.list_blobs(bucket, versions=False, page_token=page_token, page_size=args.batch, \ + # prefix=prefix) + # for page in instance_iterator.pages: + # blobs = [f'{blob.name}\n' for blob in page] + # f.write(''.join(blobs)) + + # Tell child processes to stop + for i in range(num_processes): + task_queue.put('STOP') + task_queue.put('STOP') + # Wait for process to terminate + for process in processes: + print(f'Joining process: {process.name}, {process.is_alive()}') + process.join() + + +def check_all_instances_mp(args, premerge=False): + try: + expected_blobs = set(open(args.expected_blobs).read().splitlines()) + progresslogger.info(f'Already have expected blobs') + except: + progresslogger.info(f'Getting expected blobs') + get_expected_blobs_in_bucket(args, premerge) + expected_blobs = set(open(args.expected_blobs).read().splitlines()) + # json.dump(psql_blobs, open(args.blob_names), 'w') + + # try: + # # found_blobs = set(open(args.found_blobs).read().splitlines()) + # found_blobs = open(f'{successlogger.handlers[0].baseFilename}').read().splitlines() + # progresslogger.info(f'Already have found blobs') + # except: + # progresslogger.info(f'Getting found blobs') + # get_found_blobs_in_bucket(args) + # found_blobs = open(f'{successlogger.handlers[0].baseFilename}').read().splitlines() + # # json.dump(psql_blobs, open(args.blob_names), 'w') + + + progresslogger.info(f'Getting found blobs') + get_found_blobs_in_bucket(args) + found_blobs = set(open(f'{successlogger.handlers[0].baseFilename}').read().splitlines()) + + if found_blobs == expected_blobs: + successlogger.info(f"Bucket {args.bucket} has the correct set of blobs") + else: + errlogger.error(f"Bucket {args.bucket} does not have the correct set of blobs") + errlogger.error(f"Unexpected blobs in bucket: {len(found_blobs - expected_blobs)}") + for blob in found_blobs - expected_blobs: + errlogger.error(blob) + errlogger.error(f"Expected blobs not found in bucket: {len(expected_blobs - found_blobs)}") + for blob in expected_blobs - found_blobs: + errlogger.error(blob) + + return + + diff --git a/idc/models.py b/idc/models.py index 6dc8b35..8d8533b 100644 --- a/idc/models.py +++ b/idc/models.py @@ -600,7 +600,7 @@ class IDC_Instance(Base): sop_instance_uid = Column(String, primary_key=True, nullable=False) series_instance_uid = Column(ForeignKey('idc_series.series_instance_uid'), comment="Containing object") hash = Column(String, comment='Instance hash') - gcs_url = Column(String, comment='GCS URL of instance') + gcs_url = Column(String, comment='GCS URL of instance source') size = Column(BigInteger, comment='Instance size in bytes') excluded = Column(Boolean, comment='True of this series should be excluded from ingestion') idc_version = Column(Integer, comment='IDC version when this instance was added/revised') diff --git a/ingestion/instance.py b/ingestion/instance.py index 7d07d1b..1618142 100644 --- a/ingestion/instance.py +++ b/ingestion/instance.py @@ -41,12 +41,7 @@ def clone_instance(instance, uuid): def build_instances_tcia(sess, args, collection, patient, study, series): - # Download a zip of the instances in a series - # It will be write the zip to a file dicom/.zip in the - # working directory, and expand the zip to directory dicom/ - try: - # When TCIA provided series timestamps, we'll us that for timestamp. - now = datetime.now(timezone.utc) + try: # Delete the series from disk in case it is there from a previous run try: @@ -56,11 +51,13 @@ def build_instances_tcia(sess, args, collection, patient, study, series): # It wasn't there pass - download_start = time.time_ns() - # hashes = get_TCIA_instances_per_series_with_hashes(args.dicom_dir, series.series_instance_uid) + # Download a zip of the instances in a series + # It will write the zip to a file dicom/.zip in the + # working directory, and expand the zip to directory dicom/ hashes = get_TCIA_instances_per_series_with_hashes(args.dicom_dir, series) - download_time = (time.time_ns() - download_start)/10**9 + # Validate that the files on disk have the expected hashes. if not validate_hashes(args, collection, patient, study, series, hashes): + # If validation fails, return. None of the instances will have the done bit set to True return # Get a list of the files from the download @@ -80,19 +77,12 @@ def build_instances_tcia(sess, args, collection, patient, study, series): # Replace the TCIA assigned file name # Also compute the md5 hash and length in bytes of each - pydicom_times=[] - psql_times=[] - rename_times=[] - metadata_times=[] - begin = time.time_ns() instances = {instance.sop_instance_uid:instance for instance in series.instances} for dcm in dcms: try: - pydicom_times.append(time.time_ns()) reader = pydicom.dcmread("{}/{}/{}".format(args.dicom_dir, series.uuid, dcm), stop_before_pixels=True) SOPInstanceUID = reader.SOPInstanceUID - pydicom_times.append(time.time_ns()) except InvalidDicomError: errlogger.error(" p%s: Invalid DICOM file for %s/%s/%s/%s", args.pid, collection.collection_id, patient.submitter_case_id, study.study_instance_uid, series.uuid) @@ -105,16 +95,14 @@ def build_instances_tcia(sess, args, collection, patient, study, series): # Return without marking all instances done. This will be prevent the series from being done. return - psql_times.append(time.time_ns()) instance = instances[SOPInstanceUID] - # If an instance is already done, don't need to do anything more - if instance.done: - # Delete file. We already have it. - os.remove("{}/{}/{}".format(args.dicom_dir, series.uuid, dcm)) - progresslogger.debug(" p%s: Instance %s previously done, ", args.pid, series.uuid) - - continue - psql_times.append(time.time_ns()) + # # If an instance is already done, don't need to do anything more + # if instance.done: + # # Delete file. We already have it. + # os.remove("{}/{}/{}".format(args.dicom_dir, series.uuid, dcm)) + # progresslogger.debug(" p%s: Instance %s previously done, ", args.pid, series.uuid) + # + # continue # Validate that DICOM IDs match what we are expecting try: @@ -132,7 +120,6 @@ def build_instances_tcia(sess, args, collection, patient, study, series): # Return without marking all instances done. This will be prevent the series from being done. return - rename_times.append(time.time_ns()) uuid = instance.uuid file_name = "{}/{}/{}".format(args.dicom_dir, series.uuid, dcm) blob_name = "{}/{}/{}.dcm".format(args.dicom_dir, series.uuid, uuid) @@ -145,17 +132,14 @@ def build_instances_tcia(sess, args, collection, patient, study, series): os.remove("{}/{}/{}".format(args.dicom_dir, series.uuid, dcm)) continue else: - # Return without marking all instances done. This will be prevent the series from being done. + # Return without marking all instances done. This will prevent the series from being done. return os.rename(file_name, blob_name) - rename_times.append(time.time_ns()) - metadata_times.append(time.time_ns()) instance.hash = md5_hasher(blob_name) instance.size = Path(blob_name).stat().st_size instance.timestamp = datetime.utcnow() - metadata_times.append(time.time_ns()) if collection.collection_id == 'NLST': breakpoint() @@ -165,9 +149,7 @@ def build_instances_tcia(sess, args, collection, patient, study, series): sess.execute(delete(Instance).where(Instance.uuid==instance.uuid)) series.instances.remove(instance) - instances_time = time.time_ns() - begin - - copy_start = time.time_ns() + # Copy the instance data to a staging bucket try: copy_disk_to_gcs(args, collection, patient, study, series) except: @@ -175,46 +157,32 @@ def build_instances_tcia(sess, args, collection, patient, study, series): errlogger.error(" p%s: Copy files to GCS failed for %s/%s/%s/%s", args.pid, collection.collection_id, patient.submitter_case_id, study.study_instance_uid, series.series_instance_uid) return - copy_time = (time.time_ns() - copy_start)/10**9 - mark_done_start = time.time () for instance in series.instances: instance.done = True - mark_done_time = time.time() - mark_done_start - # rootlogger.debug(" p%s: Series %s, completed build_instances; %s", args.pid, series.series_instance_uid, time.asctime()) - progresslogger.debug(" p%s: Series %s: download: %s, instances: %s, pydicom: %s, psql: %s, rename: %s, metadata: %s, copy: %s, mark_done: %s", - args.pid, series.uuid, - download_time, - instances_time/10**9, - (sum(pydicom_times[1::2]) - sum(pydicom_times[0::2]))/10**9, - (sum(psql_times[1::2]) - sum(psql_times[0::2]))/10**9, - (sum(rename_times[1::2]) - sum(rename_times[0::2]))/10 **9, - (sum(metadata_times[1::2]) - sum(metadata_times[0::2])) / 10 ** 9, - copy_time, - mark_done_time) - except Exception as exc: + except Exception as exc: errlogger.info(' p%s build_instances failed: %s', args.pid, exc) raise exc def build_instances_idc(sess, args, collection, patient, study, series): - # Download a zip of the instances in a series - # It will be write the zip to a file dicom/.zip in the - # working directory, and expand the zip to directory dicom/ - # When TCIA provided series timestamps, we'll us that for timestamp. - now = datetime.now(timezone.utc) client=storage.Client() + # When idc is the source of instance data, the instances are already in a bucket. + # From idc_xxx DB hierarchy, we get a table of the SOPInstanceUID, hash and GCS URL of + # all the instances in the series stmt = select(IDC_Instance.sop_instance_uid, IDC_Instance.gcs_url, IDC_Instance.hash ). \ where(IDC_Instance.series_instance_uid == series.series_instance_uid) result = sess.execute(stmt) src_instance_metadata = {i.sop_instance_uid:{'gcs_url':i.gcs_url, 'hash':i.hash} \ for i in result.fetchall()} + # Now we copy each instance to the staging bucket start = time.time() total_size = 0 for instance in series.instances: if not instance.done: + # Copy the instance and validate the hash instance.hash = src_instance_metadata[instance.sop_instance_uid]['hash'] instance.size, hash = copy_gcs_to_gcs(args, client, args.prestaging_idc_bucket, series, instance, src_instance_metadata[instance.sop_instance_uid]['gcs_url']) if hash != instance.hash: diff --git a/preingestion/populate_idc_metadata_tables/populate_idc_metadata_tables_visible_human.py b/preingestion/populate_idc_metadata_tables/populate_idc_metadata_tables_visible_human.py index ef9a110..ee3b699 100644 --- a/preingestion/populate_idc_metadata_tables/populate_idc_metadata_tables_visible_human.py +++ b/preingestion/populate_idc_metadata_tables/populate_idc_metadata_tables_visible_human.py @@ -37,19 +37,19 @@ if __name__ == '__main__': parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--version', default=settings.CURRENT_VERSION) - parser.add_argument('--src_bucket', default='dac-vhm-dst', help='Bucket containing WSI instances') + parser.add_argument('--src_bucket', default='dac-vhm-dst', help='Source bucket containing instances') parser.add_argument('--mount_point', default='/mnt/disks/idc-etl/visible_human_project', help='Directory on which to mount the bucket.\ The script will create this directory if necessary.') parser.add_argument('--subdir', default='', help="Subdirectory of mount_point at which to start walking directory") parser.add_argument('--collection_id', default='NLM-Visible-Human-Project', help='idc_webapp_collection id of the collection or ID of analysis result to which instances belong.') - parser.add_argument('--wiki_doi', default='', help='Collection DOI') + parser.add_argument('--wiki_doi', default='', help='Collection DOI. Might be empty string.') parser.add_argument('--wiki_url', default='https://www.nlm.nih.gov/research/visible/visible_human.html',\ help='Info page URL') parser.add_argument('--license', default = {"license_url": 'https://www.nlm.nih.gov/databases/download/terms_and_conditions.html',\ "license_long_name": "National Library of Medicine Terms and Conditions; May 21, 2019", \ - "license_short_name": "National Library of Medicine Terms and Conditions; May 21, 2019"}) + "license_short_name": "National Library of Medicine Terms and Conditions; May 21, 2019"}, help="(Sub-)Collection license") parser.add_argument('--third_party', type=bool, default=False, help='True if from a third party analysis result') - parser.add_argument('--gen_hashes', default=True, help=' Generate hierarchical hashes of collection if True') + parser.add_argument('--gen_hashes', default=True, help=' Generate hierarchical hashes of collection if True.') args = parser.parse_args() print("{}".format(args), file=sys.stdout) diff --git a/utilities/logging_config.py b/utilities/logging_config.py index cbb49d7..5e590de 100644 --- a/utilities/logging_config.py +++ b/utilities/logging_config.py @@ -18,6 +18,7 @@ import logging from logging import INFO, ERROR import settings +import builtins # Suppress logging from request module logging.getLogger("requests").setLevel(logging.WARNING) @@ -49,8 +50,12 @@ progresslogger.setLevel(INFO) for hdlr in progresslogger.handlers[:]: progresslogger.removeHandler(hdlr) -#The progress log file is always truncated (the mode='w' does that.) -success_fh = logging.FileHandler('{}/progress.log'.format(settings.LOG_DIR), mode='w') +#The progress log file is usually truncated (the mode='w' does that.) +if not hasattr(builtins, "APPEND_PROGRESSLOGGER") or builtins.APPEND_PROGRESSLOGGER==False: + success_fh = logging.FileHandler('{}/progress.log'.format(settings.LOG_DIR), mode='w') +else: + success_fh = logging.FileHandler('{}/progress.log'.format(settings.LOG_DIR)) + progresslogger.addHandler(success_fh) successformatter = logging.Formatter('%(message)s') success_fh.setFormatter(successformatter)