Skip to content

Commit

Permalink
Merge pull request #72 from ImagingDataCommons/idc_v13
Browse files Browse the repository at this point in the history
Use all_collections table rather than all_included_collections;other clean up
  • Loading branch information
bcli4d authored Mar 4, 2023
2 parents f60c1d6 + f09d343 commit 0119c66
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 172 deletions.
24 changes: 15 additions & 9 deletions gch/populate_dicom_store/step4_export_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import requests
import subprocess
import time
import settings
from subprocess import PIPE
from google.cloud import bigquery
from googleapiclient.errors import HttpError
from google.api_core.exceptions import NotFound
from utilities.bq_helpers import create_BQ_dataset, copy_BQ_table
from python_settings import settings

import logging
from utilities.logging_config import successlogger, progresslogger, errlogger
Expand All @@ -41,7 +41,7 @@ def export_dicom_metadata(args):
bearer = str(results.stdout,encoding='utf-8').strip()

# BQ table to which to export metadata
destination = f'bq://{settings.DEV_PROJECT}.{settings.BQ_DEV_EXT_DATASET}.dicom_metadata'
destination = f'bq://{args.project}.{args.bq_dataset}.dicom_metadata'
data = {
'bigqueryDestination': {
'tableUri': destination,
Expand All @@ -53,7 +53,7 @@ def export_dicom_metadata(args):
'Authorization': f'Bearer {bearer}',
'Content-Type': 'application/json; charset=utf-8'
}
url = f'https://healthcare.googleapis.com/v1/projects/{settings.PUB_PROJECT}/locations/{settings.GCH_REGION}/datasets/{settings.GCH_DATASET}/dicomStores/{settings.GCH_DICOMSTORE}:export'
url = f'https://healthcare.googleapis.com/v1/projects/{args.pub_project}/locations/{args.gch_region}/datasets/{args.gch_dataset}/dicomStores/{args.gch_dicomstore}:export'
results = requests.post(url, headers=headers, json=data)

# Get the operation ID so we can track progress
Expand All @@ -68,7 +68,7 @@ def export_dicom_metadata(args):
headers = {
'Authorization': f'Bearer {bearer}'
}
url = f'https://healthcare.googleapis.com/v1/projects/{settings.PUB_PROJECT}/locations/{settings.GCH_REGION}/datasets/{settings.GCH_DATASET}/operations/{operation_id}'
url = f'https://healthcare.googleapis.com/v1/projects/{args.pub_project}/locations/{args.gch_region}/datasets/{args.gch_dataset}/operations/{operation_id}'
results = requests.get(url, headers=headers)

details = results.json()
Expand All @@ -91,7 +91,7 @@ def get_job(args):
headers = {
'Authorization': f'Bearer {bearer}'
}
url = f'https://healthcare.googleapis.com/v1/projects/{settings.GCH_DICOMSTORE}/locations/{settings.GCH_REGION}/datasets/{settings.GCH_DATASET}/operations'
url = f'https://healthcare.googleapis.com/v1/projects/{args.gch_dicomstore}/locations/{args.gch_region}/datasets/{args.gch_dataset}/operations'
results = requests.get(url, headers=headers)
# Get the operation ID so we can track progress
operation_id = results.json()['operations'][0]['name'].split('/')[-1]
Expand All @@ -105,7 +105,7 @@ def get_job(args):
headers = {
'Authorization': f'Bearer {bearer}'
}
url = f'https://healthcare.googleapis.com/v1/projects/{settings.GCH_DICOMSTORE}/locations/{settings.GCH_REGION}/datasets/{settings.GCH_DATASET}/operations/{operation_id}'
url = f'https://healthcare.googleapis.com/v1/projects/{args.gch_dicomstore}/locations/{args.gch_region}/datasets/{args.gch_dataset}/operations/{operation_id}'
results = requests.get(url, headers=headers)

details = results.json()
Expand All @@ -122,12 +122,12 @@ def get_job(args):
time.sleep(5*60)

def export_metadata(args):
client = bigquery.Client(project=settings.DEV_PROJECT)
client = bigquery.Client(project=args.dev_project)
# Create the BQ dataset if it does not already exist
try:
dst_dataset = client.get_dataset(settings.BQ_DEV_EXT_DATASET)
dst_dataset = client.get_dataset(args.bq_dataset)
except NotFound:
dst_dataset = create_BQ_dataset(client, settings.BQ_DEV_EXT_DATASET, settings.dataset_description)
dst_dataset = create_BQ_dataset(client, args.bq_dataset, args.dataset_description)

try:
start = time.time()
Expand All @@ -144,6 +144,12 @@ def export_metadata(args):
if __name__ == '__main__':
parser =argparse.ArgumentParser()
parser.add_argument('--dataset_description', default = f'IDC V{settings.CURRENT_VERSION} BQ tables and views')
parser.add_argument('--dev_project', default=settings.DEV_PROJECT)
parser.add_argument('--bq_dataset', default=settings.BQ_DEV_EXT_DATASET)
parser.add_argument('--pub_project', default=settings.PUB_PROJECT)
parser.add_argument('--gch_region', default=settings.GCH_REGION)
parser.add_argument('--gch_dataset', default=settings.GCH_DATASET)
parser.add_argument('--gch_dicomstore', default=settings.GCH_DICOMSTORE)
args = parser.parse_args()
print("{}".format(args), file=sys.stdout)
export_metadata(args)
Expand Down
18 changes: 0 additions & 18 deletions gcs/copy_bucket_mp/copy_bucket_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,10 @@ def copy_all_instances(args, dones):
client = storage.Client()
src_bucket = storage.Bucket(client, args.src_bucket)

# try:
# # Create a set of previously copied blobs
# # dones = set(open(f'{args.log_dir}/{args.src_bucket}_success.log').read().splitlines())
# dones = set(open(successlogger.handlers[0].baseFilename).read().splitlines())
# except:
# dones = set([])
# if args.src_bucket in dones:
# progresslogger.info(f'Bucket {args.src_bucket} previously copied')
# return

n=len(dones)
# dones = []
# iterator = client.list_blobs(dst_bucket, page_size=args.batch)
# for page in iterator.pages:
# blobs = [blob.name for blob in page]
# dones.extend(blobs)
# # if len(blobs) == 0:
# # break

progresslogger.info(f"{len(dones)} blobs previously copied")
# dones = set(dones)

progresslogger.info(f'Copying bucket {args.src_bucket} to {args.dst_bucket}, ')

Expand Down Expand Up @@ -160,7 +143,6 @@ def copy_all_instances(args, dones):
if src_bucket in error_buckets:
print(f'Bucket {args.src_bucket} had errors')
else:
successlogger.info(f'{args.src_bucket}')
progresslogger.info(f'Completed bucket {args.src_bucket}, {rate} instances/sec, {num_processes} processes')


Expand Down
9 changes: 4 additions & 5 deletions gcs/copy_premerge_to_dev_buckets/copy_premerge_to_staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@
def get_collection_groups():
client = bigquery.Client()
collections = {}
breakpoint() # FROM all_collections instead of all_included_collections?
query = f"""
SELECT idc_webapp_collection_id, dev_tcia_url, dev_idc_url
FROM `idc-dev-etl.{settings.BQ_DEV_INT_DATASET}.all_included_collections`
SELECT tcia_api_collection_id, dev_tcia_url, dev_idc_url
FROM `idc-dev-etl.{settings.BQ_DEV_INT_DATASET}.all_collections`
"""

result = client.query(query).result()
for row in result:
collections[row['idc_webapp_collection_id']] = {"dev_tcia_url": row["dev_tcia_url"], "dev_idc_url": row["dev_idc_url"]}
collections[row['tcia_api_collection_id'].lower().replace('-','_').replace(' ','_')] = {"dev_tcia_url": row["dev_tcia_url"], "dev_idc_url": row["dev_idc_url"]}

return collections

Expand Down Expand Up @@ -88,7 +87,7 @@ def copy_dev_buckets(args):
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--version', default=settings.CURRENT_VERSION, help='Version to work on')
parser.add_argument('--processes', default=32, help="Number of concurrent processes")
parser.add_argument('--processes', default=1, help="Number of concurrent processes")
parser.add_argument('--batch', default=100, help='Size of batch assigned to each process')
args = parser.parse_args()
args.id = 0 # Default process ID
Expand Down
Loading

0 comments on commit 0119c66

Please sign in to comment.