Skip to content

Commit

Permalink
Merge pull request #497 from broadinstitute/constraint_apply_models
Browse files Browse the repository at this point in the history
Add generic constraint function `annotate_constraint_groupings()`
  • Loading branch information
jkgoodrich authored Dec 5, 2022
2 parents 608aed2 + 0dbd6b8 commit 5953a6c
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 1 deletion.
39 changes: 39 additions & 0 deletions gnomad/resources/resource_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,45 @@ def import_resource(self, overwrite: bool = True, **kwargs) -> None:
)


class ExpressionResource(BaseResource):
"""
A Hail Expression resource.
:param path: The Expression path (typically ending in .he).
:param import_args: Any sources that are required for the import and need to be
kept track of and/or passed to the import_func (e.g. .vcf path for an imported
VCF).
:param import_func: A function used to import the Expression. `import_func` will be
passed the `import_args` dictionary as kwargs.
"""

expected_file_extensions: List[str] = [".he"]

def he(self, force_import: bool = False) -> hl.expr.Expression:
"""
Read and return the Hail Expression resource.
:return: Hail Expression resource.
"""
if self.path is None or force_import:
return self.import_func(**self.import_args)
else:
return hl.experimental.read_expression(self.path)

def import_resource(self, overwrite: bool = True, **kwargs) -> None:
"""
Import the Expression resource using its import_func and writes it in its path.
:param overwrite: If set, existing file(s) will be overwritten.
:param kwargs: Any other parameters to be passed to hl.experimental.
write_expression.
:return: Nothing.
"""
self.import_func(**self.import_args).write(
self.path, overwrite=overwrite, **kwargs
)


class BaseVersionedResource:
"""
Class for a versioned resource.
Expand Down
179 changes: 178 additions & 1 deletion gnomad/utils/constraint.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import hail as hl

from gnomad.utils.vep import explode_by_vep_annotation, process_consequences

logging.basicConfig(
format="%(asctime)s (%(name)s %(lineno)s): %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
Expand Down Expand Up @@ -311,7 +313,9 @@ def annotate_mutation_type(
:return: Table with mutation type annotations added.
"""
# Determine the context length by collecting all the context lengths.
context_lengths = list(filter(None, set(hl.len(t.context).collect())))
context_lengths = list(
filter(None, t.aggregate(hl.agg.collect_as_set(hl.len(t.context))))
)
if len(context_lengths) > 1:
raise ValueError(
"More than one length was found among the first 100 'context' values."
Expand Down Expand Up @@ -700,3 +704,176 @@ def get_all_pop_lengths(
)

return pop_lengths


def get_constraint_grouping_expr(
vep_annotation_expr: hl.StructExpression,
coverage_expr: Optional[hl.Int32Expression] = None,
include_transcript_group: bool = True,
include_canonical_group: bool = True,
) -> Dict[str, Union[hl.StringExpression, hl.Int32Expression, hl.BooleanExpression]]:
"""
Collect annotations used for constraint groupings.
Function collects the following annotations:
- annotation - 'most_severe_consequence' annotation in `vep_annotation_expr`
- modifier - classic lof annotation from 'lof' annotation in
`vep_annotation_expr`, LOFTEE annotation from 'lof' annotation in
`vep_annotation_expr`, PolyPhen annotation from 'polyphen_prediction' in
`vep_annotation_expr`, or "None" if neither is defined
- gene - 'gene_symbol' annotation inside `vep_annotation_expr`
- coverage - exome coverage if `coverage_expr` is specified
- transcript - id from 'transcript_id' in `vep_annotation_expr` (added when
`include_transcript_group` is True)
- canonical from `vep_annotation_expr` (added when `include_canonical_group` is
True)
.. note::
This function expects that the following fields are present in
`vep_annotation_expr`:
- lof
- polyphen_prediction
- most_severe_consequence
- gene_symbol
- transcript_id (if `include_transcript_group` is True)
- canonical (if `include_canonical_group` is True)
:param vep_annotation_expr: StructExpression of VEP annotation.
:param coverage_expr: Optional Int32Expression of exome coverage. Default is None.
:param include_transcript_group: Whether to include the transcript annotation in the
groupings. Default is True.
:param include_canonical_group: Whether to include canonical annotation in the
groupings. Default is True.
:return: A dictionary with keys as annotation names and values as actual
annotations.
"""
lof_expr = vep_annotation_expr.lof
polyphen_prediction_expr = vep_annotation_expr.polyphen_prediction

# Create constraint annotations to be used for groupings.
groupings = {
"annotation": vep_annotation_expr.most_severe_consequence,
"modifier": hl.coalesce(lof_expr, polyphen_prediction_expr, "None"),
"gene": vep_annotation_expr.gene_symbol,
}
if coverage_expr is not None:
groupings["coverage"] = coverage_expr

# Add 'transcript' and 'canonical' annotation if requested.
if include_transcript_group:
groupings["transcript"] = vep_annotation_expr.transcript_id
if include_canonical_group:
groupings["canonical"] = hl.or_else(vep_annotation_expr.canonical == 1, False)

return groupings


def annotate_exploded_vep_for_constraint_groupings(
ht: hl.Table, vep_annotation: str = "transcript_consequences"
) -> Tuple[Union[hl.Table, hl.MatrixTable], Tuple[str]]:
"""
Annotate Table with annotations used for constraint groupings.
Function explodes the specified VEP annotation (`vep_annotation`) and adds the following annotations:
- annotation -'most_severe_consequence' annotation in `vep_annotation`
- modifier - classic lof annotation from 'lof' annotation in
`vep_annotation`, LOFTEE annotation from 'lof' annotation in
`vep_annotation`, PolyPhen annotation from 'polyphen_prediction' in
`vep_annotation`, or "None" if neither is defined
- gene - 'gene_symbol' annotation inside `vep_annotation`
- coverage - exome coverage in `ht`
- transcript - id from 'transcript_id' in `vep_annotation` (added when
`include_transcript_group` is True)
- canonical from `vep_annotation` (added when `include_canonical_group` is
True)
.. note::
This function expects that the following annotations are present in `ht`:
- vep
- exome_coverage
:param t: Input Table or MatrixTable.
:param vep_annotation: Name of annotation in 'vep' annotation (one of
"transcript_consequences" and "worst_csq_by_gene") that will be used for
obtaining constraint annotations. Default is "transcript_consequences".
:return: A tuple of input Table or MatrixTable with grouping annotations added and
the names of added annotations.
"""
if vep_annotation == "transcript_consequences":
include_transcript_group = include_canonical_group = True
else:
include_transcript_group = include_canonical_group = False

# Annotate 'worst_csq_by_gene' to `ht` if it's specified for `vep_annotation`.
if vep_annotation == "worst_csq_by_gene":
ht = process_consequences(ht)

# Explode the specified VEP annotation.
ht = explode_by_vep_annotation(ht, vep_annotation)

# Collect the annotations used for groupings.
groupings = get_constraint_grouping_expr(
ht[vep_annotation],
coverage_expr=ht.exome_coverage,
include_transcript_group=include_transcript_group,
include_canonical_group=include_canonical_group,
)

return ht.annotate(**groupings), tuple(groupings.keys())


def compute_expected_variants(
ht: hl.Table,
plateau_models_expr: hl.StructExpression,
mu_expr: hl.Float64Expression,
cov_corr_expr: hl.Float64Expression,
cpg_expr: hl.BooleanExpression,
pop: Optional[str] = None,
) -> Dict[str, Union[hl.Float64Expression, hl.Int64Expression]]:
"""
Apply plateau models for all sites and for a population (if specified) to compute predicted proportion observed ratio and expected variant counts.
:param ht: Input Table.
:param plateau_models_expr: Linear models (output of `build_models()`, with the values
of the dictionary formatted as a StructExpression of intercept and slope, that
calibrates mutation rate to proportion observed for high coverage exomes. It
includes models for CpG, non-CpG sites, and each population in `POPS`.
:param mu_expr: Float64Expression of mutation rate.
:param cov_corr_expr: Float64Expression of corrected coverage expression.
:param cpg_expr: BooleanExpression noting whether a site is a CPG site.
:param pop: Optional population to use when applying plateau model. Default is
None.
:return: A dictionary with predicted proportion observed ratio and expected variant
counts.
"""
if pop is None:
pop = ""
plateau_model = hl.literal(plateau_models_expr.total)[cpg_expr]
slope = plateau_model[1]
intercept = plateau_model[0]
agg_func = hl.agg.sum
ann_to_sum = ["observed_variants", "possible_variants"]
else:
plateau_model = hl.literal(plateau_models_expr[pop])
slope = hl.map(lambda f: f[cpg_expr][1], plateau_model)
intercept = hl.map(lambda f: f[cpg_expr][0], plateau_model)
agg_func = hl.agg.array_sum
pop = f"_{pop}"
ann_to_sum = [f"downsampling_counts{pop}"]

# Apply plateau models for specified population.
ppo_expr = mu_expr * slope + intercept

# Generate sum aggregators for 'predicted_proportion_observed' and
# 'expected_variants', for specified population.
agg_expr = {
f"predicted_proportion_observed{pop}": agg_func(ppo_expr),
f"expected_variants{pop}": agg_func(ppo_expr * cov_corr_expr),
}

# Generate sum aggregators for 'observed_variants' and 'possible_variants' on
# the entire dataset if pop is None, and for 'downsampling_counts' for
# specified population if pop is not None.
agg_expr.update({ann: agg_func(ht[ann]) for ann in ann_to_sum})

return agg_expr
29 changes: 29 additions & 0 deletions gnomad/utils/vep.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,3 +640,32 @@ def add_most_severe_csq_to_tc_within_vep_root(
if isinstance(t, hl.MatrixTable)
else t.annotate(**{vep_root: annotation})
)


def explode_by_vep_annotation(
t: Union[hl.Table, hl.MatrixTable],
vep_annotation: str = "transcript_consequences",
vep_root: str = "vep",
) -> Union[hl.Table, hl.MatrixTable]:
"""
Explode the specified VEP annotation on the input Table/MatrixTable.
:param t: Input Table or MatrixTable.
:param vep_annotation: Name of annotation in `vep_root` to explode.
:param vep_root: Name used for root VEP annotation. Default is 'vep'.
:return: Table or MatrixTable with exploded VEP annotation.
"""
if vep_annotation not in t[vep_root].keys():
raise ValueError(
f"{vep_annotation} is not a row field of the {vep_root} annotation in"
" Table/MatrixTable!"
)
# Create top-level annotation for `vep_annotation` and explode it.
if isinstance(t, hl.Table):
t = t.transmute(**{vep_annotation: t[vep_root][vep_annotation]})
t = t.explode(t[vep_annotation])
else:
t = t.transmute_rows(**{vep_annotation: t[vep_root][vep_annotation]})
t = t.explode_rows(t[vep_annotation])

return t

0 comments on commit 5953a6c

Please sign in to comment.