Skip to content

Commit

Permalink
Merge pull request #394 from broadinstitute/file_exists
Browse files Browse the repository at this point in the history
New file exists
  • Loading branch information
ch-kr authored Jul 27, 2021
2 parents 1c3a668 + 0fe9f93 commit 2147e58
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Added function `rekey_new_reference` to re-key a Table or MatrixTable with a new reference genome [(#381)](https://github.com/broadinstitute/gnomad_methods/pull/381)
* Modified `SEXES` in utils/vcf to be 'XX' and 'XY' instead of 'female' and 'male' [(#381)](https://github.com/broadinstitute/gnomad_methods/pull/381)
* Fix `annotation_type_is_numeric` and `annotation_type_in_vcf_info` [(#379)](https://github.com/broadinstitute/gnomad_methods/pull/379)
* Added function `parallel_file_exists` to check whether a large number of files exist [(#394)](https://github.com/broadinstitute/gnomad_methods/pull/394)
* Changed module `sanity_checks` to `validity_checks`, modified functions `generic_field_check`, `make_filters_expr_dict` (previously `make_filters_sanity_check_expr`), and `make_group_sum_expr_dict` (previously `sample_sum_check`), and added functions `summarize_variant_filters`, `generic_field_check_loop`, `compare_subset_freqs`, `sum_group_callstats`, `summarize_variants`, `check_raw_and_adj_callstats`, `check_sex_chr_metrics`, `compute_missingness`, `vcf_field_check`, and `validate_release_t` [(#395)](https://github.com/broadinstitute/gnomad_methods/pull/389)

## Version 0.5.0 - April 22nd, 2021
Expand Down
72 changes: 71 additions & 1 deletion gnomad/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,85 @@
import os
import subprocess
import uuid
from typing import List, Optional, Tuple, Union
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, List, Dict, Optional, Tuple, Union

import hail as hl
from hailtop.aiotools import LocalAsyncFS, RouterAsyncFS, AsyncFS
from hailtop.aiogoogle import GoogleStorageAsyncFS
from hailtop.utils import bounded_gather, tqdm

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


async def parallel_file_exists(
fnames: List[str], parallelism: int = 750
) -> Dict[str, bool]:
"""
Check whether a large number of files exist.
Created for use with hail Batch jobs.
Normal `file_exists` function is very slow when checking a large number of files.
:param fnames: List of file names to check.
:param parallelism: Integer that sets parallelism of file existence checking task. Default is 750.
:return: Dictionary of file names (str) and whether the file exists (boolean).
"""

async def async_file_exists(fs: AsyncFS, fname: str) -> bool:
"""
Call `low_level_async_file_exists` to determine file existence.
:param fs: AsyncFS object.
:param fname: Path to file to check.
:return: Whether file exists.
"""
fext = os.path.splitext(fname)[1]
if fext in [".ht", ".mt"]:
fname += "/_SUCCESS"
try:
await fs.statfile(fname)
except FileNotFoundError:
return False
else:
return True

with tqdm(
total=len(fnames), desc="check files for existence", disable=False
) as pbar:
with ThreadPoolExecutor() as thread_pool:
async with RouterAsyncFS(
"file", [LocalAsyncFS(thread_pool), GoogleStorageAsyncFS()]
) as fs:

def check_existence_and_update_pbar_thunk(fname: str) -> Callable:
"""
Create function to check if file exists and update progress bar in stdout.
Function delays coroutine creation to avoid creating too many live coroutines.
:param fname: Path to file to check.
:return: Function that checks for file existence and updates progress bar.
"""

async def unapplied_function():
x = await async_file_exists(fs, fname)
pbar.update(1)
return x

return unapplied_function

file_existence_checks = [
check_existence_and_update_pbar_thunk(fname) for fname in fnames
]
file_existence = await bounded_gather(
*file_existence_checks, parallelism=parallelism
)
return dict(zip(fnames, file_existence))


def file_exists(fname: str) -> bool:
"""
Check whether a file exists.
Expand Down

0 comments on commit 2147e58

Please sign in to comment.