Skip to content

Commit

Permalink
ENH: enabled support for multithreaded upload.
Browse files Browse the repository at this point in the history
  • Loading branch information
tcpan committed Oct 30, 2024
1 parent 903df9a commit 688ea73
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 315 deletions.
81 changes: 6 additions & 75 deletions chorus_upload/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,80 +586,6 @@ def _mark_as_deleted(args, config, journal_fn):
local_ops.mark_files_as_deleted([file], databasename = journal_fn, version = version, verbose = args.verbose)
else:
print("ERROR: must provide either --file or --file-list")

# def _checkout_journal(args, config, journal_fn):
# local_fn = args.local_journal if args.local_journal else "journal.db"
# journal_upload_method = config.get('journal_upload_method', 'builtin')

# # if local_fn exists, make a backup.
# if Path(local_fn).exists():
# backup_fn = local_fn + ".bak_" + time.strftime("%Y%m%d%H%M%S")
# print("INFO: Using local journal ", local_fn, " backup at ", backup_fn)
# shutil.copy(local_fn, backup_fn)

# if journal_fn.startswith("az://") or journal_fn.startswith("s3://") or journal_fn.startswith("gs://"):
# # central journal
# # now download the journal file
# journal_path, locked_path, local_path, journal_md5 = upload_ops.checkout_journal(config, local_fn)
# if not Path(local_fn).exists():
# if journal_md5 is not None:
# raise ValueError("ERROR: journal file not properly checked out: ", local_fn)
# # else - original journal does not exist.
# else:
# shutil.copy(local_fn, local_fn + ".bak") # used for md5.
# print("INFO: checked out ", journal_path, " to ", local_path, " with md5 ", journal_md5, " locked at ", locked_path)

# else:
# # if local_fn and journal_fn are same, do nothing.
# if local_fn != journal_fn:
# if Path(journal_fn).exists():
# # make a copy
# shutil.copy(journal_fn, local_fn)
# print("INFO: copied ", journal_fn, " to ", local_fn)
# else:
# # else if local_fn and journal_fn are same, do nothing.
# print("INFO: journal file is already local.")



# def _checkin_journal(args, config, journal_fn):
# local_fn = args.local_journal if args.local_journal else "journal.db"
# journal_upload_method = config.get('journal_upload_method', 'builtin')


# if journal_fn.startswith("az://") or journal_fn.startswith("s3://") or journal_fn.startswith("gs://"):
# # if cloud journal
# client = storage_helper._make_client(config_helper.get_journal_config(config))

# # compute the lock path and md5
# orig_md5 = FileSystemHelper.get_metadata(path=Path(local_fn + ".bak"), with_md5=True)['md5']
# # generate the lock file name
# journal_path = FileSystemHelper(journal_fn, client = client)

# local_file = FileSystemHelper(local_fn)
# # check in to central.
# upload_ops.checkin_journal(journal_path, local_file, orig_md5)

# print("INFO: checked in ", local_fn, " to ", journal_fn, " with original md5 ", orig_md5)

# else:
# # local journal
# if journal_fn != local_fn:
# # back up journal_fn
# backup_fn = journal_fn + ".bak_" + time.strftime("%Y%m%d%H%M%S")
# if Path(journal_fn).exists():
# shutil.copy(journal_fn, backup_fn)

# # and copy local_fn back/
# if Path(local_fn).exist():
# shutil.copy(local_fn, journal_fn)
# else:
# print("ERROR: no local file ", local_file, "to copy to ", journal_fn)
# else:
# # else if local_fn and journal_fn are same, do nothing.
# print("INFO: journal file is already local.")





Expand All @@ -681,7 +607,7 @@ def _upload_files(args, config, journal_fn):
remaining = max_upload_count
for mod, mod_config in mod_configs.items():
sitefs = FileSystemHelper(mod_config["path"], client = _make_client(mod_config))
_, remaining = upload_ops.upload_files_new(sitefs, centralfs, modalities = [mod], databasename = journal_fn, max_num_files = remaining,
_, remaining = upload_ops.upload_files_parallel(sitefs, centralfs, modalities = [mod], databasename = journal_fn, max_num_files = remaining,
verbose = args.verbose, num_threads = nthreads, page_size = page_size)
if (remaining is not None) and (remaining <= 0):
break
Expand Down Expand Up @@ -863,6 +789,11 @@ def _list_versions(args, config, journal_fn):
print("INFO: Using config file: ", config_fn)
config = config_helper.load_config(config_fn)

# set a default client for central storage
central_config = config_helper.get_central_config(config)
central_client = storage_helper._make_client(central_config)
central_client.set_as_default_client()

local_journal_fn_override = args.local_journal if ('local_journal' in vars(args)) and (args.local_journal is not None) else None
journal_path, lock_path, local_path = upload_ops.get_journal_paths(config,
local_fn_override = local_journal_fn_override)
Expand Down
13 changes: 8 additions & 5 deletions chorus_upload/local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ def _gen_journal(root : FileSystemHelper, modalities: list[str],
all_args = []

page_size = kwargs.get("page_size", 1000)
n_cores = kwargs.get("num_threads", 1)
nthreads = min(n_cores, min(4, os.cpu_count()))

n_cores = kwargs.get("n_cores", 32)
nthreads = min(n_cores, min(32, (os.cpu_count() or 1) + 4))
print("INFO: Using ", nthreads, " threads")

for modality in modalities:
start = time.time()
# pattern = f"{modality}/**/*" if modality == "OMOP" else f"*/{modality}/**/*"
Expand Down Expand Up @@ -263,8 +264,10 @@ def _update_journal(root: FileSystemHelper, modalities: list[str],
curtimestamp = int(math.floor(time.time() * 1e6))

page_size = kwargs.get("page_size", 1000)
n_cores = kwargs.get("num_threads", 1)
nthreads = min(n_cores, min(4, os.cpu_count()))

n_cores = kwargs.get("n_cores", 32)
nthreads = min(n_cores, min(32, (os.cpu_count() or 1) + 4))
print("INFO: Using ", nthreads, " threads")

paths = []
all_insert_args = []
Expand Down
Loading

0 comments on commit 688ea73

Please sign in to comment.