Skip to content

Commit

Permalink
Merge pull request #76 from ImagingDataCommons/idc_v15
Browse files Browse the repository at this point in the history
Idc v15
  • Loading branch information
bcli4d authored Jul 12, 2023
2 parents 77a3c82 + 1caabb1 commit a6de929
Show file tree
Hide file tree
Showing 33 changed files with 631 additions and 186 deletions.
66 changes: 37 additions & 29 deletions bq/utils/publish_dataset/publish_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,19 @@ def bq_dataset_exists(client, project , target_dataset):

def copy_table(client, args, table_id):

try:
table = client.get_table(f'{args.trg_project}.{args.trg_dataset}.{table_id}')
progresslogger.info(f'Table {table} already exists.')
except:
# try:
# table = client.get_table(f'{args.trg_project}.{args.trg_dataset}.{table_id}')
# progresslogger.info(f'Table {table} already exists.')
# except:
if True:
src_table_id = f'{args.src_project}.{args.src_dataset}.{table_id}'
trg_table_id = f'{args.trg_project}.{args.trg_dataset}.{table_id}'

# Construct a BigQuery client object.
client = bigquery.Client()
job_config = bigquery.CopyJobConfig()
job_config.operation_type = 'COPY'
job_config.write_disposition = 'WRITE_EMPTY'
job_config.write_disposition = 'WRITE_TRUNCATE'

# Construct and run a copy job.
job = client.copy_table(
Expand All @@ -93,32 +94,39 @@ def copy_table(client, args, table_id):


def copy_view(client, args, view_id):

try:
view = client.get_table(f'{args.trg_project}.{args.trg_dataset}.{view_id}')
progresslogger.info(f'View {view} already exists.')
except:
view = client.get_table(f'{args.src_project}.{args.src_dataset}.{view_id}')

new_view = bigquery.Table(f'{args.trg_project}.{args.trg_dataset}.{view_id}')
new_view.view_query = view.view_query.replace(args.src_project,args.pdp_project). \
replace(args.src_dataset,args.trg_dataset)

new_view.friendly_name = view.friendly_name
new_view.description = view.description
new_view.labels = view.labels
installed_view = client.create_table(new_view)

installed_view.schema = view.schema

try:
# # Update the schema after creating the view
# installed_view.schema = view.schema
client.update_table(installed_view, ['schema'])
progresslogger.info(f'Copy of view {view_id}: DONE')
except BadRequest as exc:
errlogger.error(f'{exc}')

view = client.get_table(f'{args.trg_project}.{args.trg_dataset}.{view_id}')
progresslogger.info(f'View {view} already exists.')
client.delete_table(f'{args.trg_project}.{args.trg_dataset}.{view_id}', not_found_ok=True)
progresslogger.info(f'Deleted {view}.')
except:
progresslogger.info(f'View {view_id} does not exist.')

finally:
view = client.get_table(f'{args.src_project}.{args.src_dataset}.{view_id}')

new_view = bigquery.Table(f'{args.trg_project}.{args.trg_dataset}.{view_id}')
new_view.view_query = view.view_query.replace(args.src_project,args.trg_project). \
replace(args.src_dataset,args.trg_dataset)

new_view.friendly_name = view.friendly_name
new_view.description = view.description
new_view.labels = view.labels
installed_view = client.create_table(new_view)

installed_view.schema = view.schema

try:
# # Update the schema after creating the view
# installed_view.schema = view.schema
client.update_table(installed_view, ['schema'])
progresslogger.info(f'Copy of view {view_id}: DONE')
except BadRequest as exc:
errlogger.error(f'{exc}')
except Exception as exc:
errlogger.error((f'{exc}'))
progresslogger.info((f'Really done'))
return

def publish_dataset(args):
Expand Down
12 changes: 8 additions & 4 deletions bq/utils/publish_dataset/publish_idc_pdp_staging_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@

progresslogger.info(f'args: {json.dumps(args.__dict__, indent=2)}')

for src_dataset in (
f'idc_v{settings.CURRENT_VERSION}',
f'idc_v1{settings.CURRENT_VERSION}_clinical',
):
for src_dataset in [
# f'idc_v{settings.CURRENT_VERSION}',
# f'idc_current',
f'idc_v{settings.CURRENT_VERSION}_clinical',
# 'idc_current_clinical'
]:
args.src_dataset = src_dataset
args.trg_dataset = src_dataset
publish_dataset(args)
4 changes: 2 additions & 2 deletions dcf/gen_instance_manifest/instance_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ def gen_instance_manifest(args):
ON
1=1
WHERE
# idc_version IN {args.versions}
# AND
i_rev_idc_version IN {args.versions}
AND
((i_source='tcia'
AND tcia_access='Public')
OR (i_source='idc'
Expand Down
2 changes: 1 addition & 1 deletion dcf/gen_instance_manifest/vALL_instance_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#


# Generate a manifest of new instance versions in the current (latest) IDC version
# Generate a manifest of new instances in some or all IDC versions

import argparse
import settings
Expand Down
2 changes: 1 addition & 1 deletion dcf/gen_instance_manifest/vX_instance_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
parser.add_argument('--manifest_uri', default=f'gs://indexd_manifests/dcf_input/pdp_hosting/idc_v{settings.CURRENT_VERSION}_instance_manifest_*.tsv',
help="GCS blob in which to save results")
parser.add_argument('--temp_table_bqdataset', default='whc_dev', \
help='Manifest of temporary table')
help='BQ dataset of temporary table')
parser.add_argument('--temp_table', default=f'idc_v{settings.CURRENT_VERSION}_instance_manifest', \
help='Temporary table in which to write query results')
args = parser.parse_args()
Expand Down
14 changes: 9 additions & 5 deletions gcs/copy_bucket_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ def worker(input, args, dones):
src_bucket = storage.Bucket(client, args.src_bucket)
dst_bucket = storage.Bucket(client, args.dst_bucket)
for blob_names, n in iter(input.get, 'STOP'):
blob_names_todo = blob_names - dones
if blob_names_todo:
copy_instances(args, client, src_bucket, dst_bucket, blob_names_todo, n)
else:
progresslogger.info(f'p{args.id}: Blobs {n}:{n+len(blob_names)-1} previously copied')
try:
blob_names_todo = blob_names - dones
if blob_names_todo:
copy_instances(args, client, src_bucket, dst_bucket, blob_names_todo, n)
else:
progresslogger.info(f'p{args.id}: Blobs {n}:{n+len(blob_names)-1} previously copied')
except Exception as exc:
errlogger.error(f'p{args.id}: Error {exc}')



def copy_all_instances(args, dones):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
# limitations under the License.
#

# Copy pre-staging buckets populated by ingestion to staging buckets.
# Ingestion copies data into prestaging buckets named by version and
# Copy premerge buckets populated by ingestion to staging buckets.
# Ingestion copies data into premerge buckets named by version and
# collection, e.g. idc_v9_idc_tcga_brca. The data in these buckets must be
# copied to one of the idc-dev-etl staging buckets:
# idc-dev-open, idc-dev-cr, idc-dev-defaced, idc-dev-redacted, idc-dev-excluded.
Expand All @@ -26,7 +26,7 @@

import settings
from google.cloud import storage, bigquery
from gcs.copy_bucket_mp.copy_bucket_mp import copy_all_instances
from copy_bucket_mp import copy_all_instances

def get_collection_groups():
client = bigquery.Client()
Expand Down Expand Up @@ -59,7 +59,7 @@ def preview_copies(args, client, bucket_data):
return


def copy_dev_buckets(args):
def copy_premerge_buckets(args):
client = storage.Client()
bucket_data= get_collection_groups()
preview_copies(args, client, bucket_data)
Expand Down Expand Up @@ -87,9 +87,9 @@ def copy_dev_buckets(args):
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--version', default=settings.CURRENT_VERSION, help='Version to work on')
parser.add_argument('--processes', default=1, help="Number of concurrent processes")
parser.add_argument('--processes', default=32, help="Number of concurrent processes")
parser.add_argument('--batch', default=100, help='Size of batch assigned to each process')
args = parser.parse_args()
args.id = 0 # Default process ID

copy_dev_buckets(args)
copy_premerge_buckets(args)
17 changes: 8 additions & 9 deletions gcs/delete_premerge_buckets/delete_premerge_buckets.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
from utilities.logging_config import successlogger, progresslogger
import settings
from google.cloud import storage, bigquery
from gcs.empty_bucket_mp.empty_bucket_mp import pre_delete
from gcs.empty_bucket_mp.empty_bucket_mp import del_all_instances

def get_collection_groups():
client = bigquery.Client()
collections = {}
breakpoint() # FROM all_collections instead of all_included_collections?
query = f"""
SELECT idc_webapp_collection_id, dev_tcia_url, dev_idc_url
FROM `idc-dev-etl.{settings.BQ_DEV_INT_DATASET}.all_included_collections`
SELECT REPLACE(REPLACE(LOWER(tcia_api_collection_id),'-','_'),' ','_') idc_webapp_collection_id, dev_tcia_url, dev_idc_url
FROM `idc-dev-etl.{settings.BQ_DEV_INT_DATASET}.all_collections`
"""

result = client.query(query).result()
Expand All @@ -39,7 +38,7 @@ def get_collection_groups():

return collections

def preview_copies(args, client, bucket_data):
def preview_deletes(args, client, bucket_data):
progresslogger.info('Deleting the following buckets')
for collection_id in bucket_data:
if client.bucket(f'idc_v{args.version}_tcia_{collection_id}').exists():
Expand Down Expand Up @@ -67,7 +66,7 @@ def delete_buckets(args):

client = storage.Client()
bucket_data= get_collection_groups()
preview_copies(args, client, bucket_data)
preview_deletes(args, client, bucket_data)

for collection_id in bucket_data:
if client.bucket(f'idc_v{args.version}_tcia_{collection_id}').exists():
Expand All @@ -76,7 +75,7 @@ def delete_buckets(args):
args.bucket = prestaging_bucket
progresslogger.info(f'Deleting bucket {prestaging_bucket}')
# Delete the contents of the bucket
pre_delete(args)
del_all_instances(args)
# Delete the bucket itself
client.bucket(prestaging_bucket).delete()
if client.bucket(f'idc_v{args.version}_idc_{collection_id}').exists():
Expand All @@ -85,7 +84,7 @@ def delete_buckets(args):
args.bucket = prestaging_bucket
progresslogger.info(f'Deleting bucket {prestaging_bucket}')
# Delete the contents of the bucket
pre_delete(args)
del_all_instances(args)
# Delete the bucket itself
client.bucket(prestaging_bucket).delete()

Expand All @@ -96,7 +95,7 @@ def delete_buckets(args):
parser = argparse.ArgumentParser()
parser.add_argument('--version', default=settings.CURRENT_VERSION)
# parser.add_argument('--prestaging_bucket_prefix', default=[f'idc_v{settings.CURRENT_VERSION}_tcia_', f'idc_v{settings.CURRENT_VERSION}_idc_'], help='Prefix of premerge buckets')
parser.add_argument('--processes', default=32, help="Number of concurrent processes")
parser.add_argument('--processes', default=8, help="Number of concurrent processes")
parser.add_argument('--batch', default=100, help='Size of batch assigned to each process')
args = parser.parse_args()

Expand Down
29 changes: 15 additions & 14 deletions gcs/empty_bucket_mp/empty_bucket_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,21 @@


def delete_instances(args, client, bucket, blobs, n):
try:
with client.batch():
for blob in blobs:
bucket.blob(blob[0], generation=blob[1]).delete()
# bucket.blob(blob[0], generation=blob[1]).delete()

successlogger.info('p%s Delete %s blobs %s:%s ', args.id, args.bucket, n, n+len(blobs)-1)
except ServiceUnavailable:
errlogger.error('p%s Delete %s blobs %s:%s failed', args.id, args.bucket, n, n+len(blobs)-1)
except NotFound:
errlogger.error('p%s Delete %s blobs %s:%s failed, not found', args.id, args.bucket, n, n+len(blobs)-1)
except Exception as exc:
errlogger.error('p%s Exception %s %s:%s', args.id, exc, n, n+len(blobs)-1)

TRIES = 3
for i in range(TRIES):
try:
with client.batch():
for blob in blobs:
bucket.blob(blob[0], generation=blob[1]).delete()
successlogger.info('p%s Delete %s blobs %s:%s ', args.id, args.bucket, n, n+len(blobs)-1)
break
except ServiceUnavailable:
errlogger.error('p%s Delete %s blobs %s:%s failed', args.id, args.bucket, n, n+len(blobs)-1)
except Exception as exc:
errlogger.error('p%s Exception %s %s:%s', args.id, exc, n, n+len(blobs)-1)
except NotFound:
errlogger.error('p%s Delete %s blobs %s:%s failed, not found', args.id, args.bucket, n, n+len(blobs)-1)
break


def worker(input, args):
Expand Down
16 changes: 8 additions & 8 deletions gcs/release_gcs_data/copy_staging_buckets_to_public_buckets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--processes', default=1, help="Number of concurrent processes")
parser.add_argument('--processes', default=32, help="Number of concurrent processes")
parser.add_argument('--batch', default=100, help='Size of batch assigned to each process')
parser.add_argument('--log_dir', default=f'/mnt/disks/idc-etl/logs/copy_bucket_mp')

Expand All @@ -34,13 +34,13 @@
dones = set([])


args.src_bucket = 'idc-open-idc1-staging'
args.dst_bucket = 'idc-open-idc1'
copy_all_instances(args, dones)

args.src_bucket = 'idc-open-cr-staging'
args.dst_bucket = 'idc-open-cr'
copy_all_instances(args, dones)
# args.src_bucket = 'idc-open-idc1-staging'
# args.dst_bucket = 'idc-open-idc1'
# copy_all_instances(args, dones)
#
# args.src_bucket = 'idc-open-cr-staging'
# args.dst_bucket = 'idc-open-cr'
# copy_all_instances(args, dones)

args.src_bucket = 'public-datasets-idc-staging'
args.dst_bucket = 'public-datasets-idc'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import json
import settings

from gcs.validate_bucket.validate_bucket_mp import check_all_instances

import builtins
builtins.APPEND_PROGRESSLOGGER = True
from gcs.validate_buckets.validate_bucket_mp import check_all_instances_mp

if __name__ == '__main__':
parser = argparse.ArgumentParser()
# parser.add_argument('--version', default=f'{settings.CURRENT_VERSION}')
parser.add_argument('--version', default=settings.CURRENT_VERSION)
parser.add_argument('--processes', default=32)
parser.add_argument('--bucket', default='idc-dev-cr')
parser.add_argument('--dev_or_pub', default = 'dev', help='Validating a dev or pub bucket')
parser.add_argument('--premerge', default=False, help='True when performing prior to merging premerge buckets')
Expand All @@ -40,4 +42,4 @@

args = parser.parse_args()
print(f'args: {json.dumps(args.__dict__, indent=2)}')
check_all_instances(args)
check_all_instances_mp(args)
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
import argparse
import json
import settings

from gcs.validate_bucket.validate_bucket_mp import check_all_instances
import builtins
builtins.APPEND_PROGRESSLOGGER = True
from gcs.validate_buckets.validate_bucket_mp import check_all_instances_mp


if __name__ == '__main__':
parser = argparse.ArgumentParser()
# parser.add_argument('--version', default=f'{settings.CURRENT_VERSION}')
parser.add_argument('--version', default=settings.CURRENT_VERSION)
parser.add_argument('--processes', default=32)
parser.add_argument('--bucket', default='idc-dev-defaced')
parser.add_argument('--dev_or_pub', default = 'dev', help='Validating a dev or pub bucket')
parser.add_argument('--premerge', default=False, help='True when performing prior to merging premerge buckets')
Expand All @@ -41,4 +43,4 @@
args = parser.parse_args()
print(f'args: {json.dumps(args.__dict__, indent=2)}')

check_all_instances(args)
check_all_instances_mp(args)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import json
import settings

from gcs.validate_bucket.validate_bucket_mp import check_all_instances
from gcs.validate_buckets.validate_bucket import check_all_instances


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit a6de929

Please sign in to comment.