Skip to content

Commit

Permalink
feat: extract credible sets and studies from all eQTL Catalogue finem…
Browse files Browse the repository at this point in the history
…apping results (#518)

* feat: dataflow decompress prototype (#501)

* chore: commit susie results gist

* feat(study_index): add `tissueFromSourceId` to schema and make `traitFromSource` nullable

* fix: bug and linting fixes in new eqtl ingestion step

* perf: config bugfixes and performance improvements

* perf: remove data persistance to avoid executor failure

* perf: load susie results for studies of interest only

* perf: collect locus for leads only and optimise partitioning cols

* feat: parametrise methods to include

* feat: run full dag

* test: add tests

* fix: reorder test inputs

* docs: update eqtl catalogue docs

* fix: correct typos in tests docstrings

* refactor: change mqtl_quantification_methods to mqtl_quantification_methods_blacklist

* feat: studyId is based on measured trait and not on gene

* feat: credible set lead is the variant with highest pip

* feat(studies): change logic in _identify_study_type to extract qtl type based on quantization method

* refactor: externalise reading logic to source classes

* chore: add mqtl_quantification_methods_blacklist to yaml config

* docs: update docs

* fix(dag): pass bucket name to GCSDeleteBucketOperator

* refactor(coloc): move get_logsum function to common utils

* feat(studylocus): add calculate_credible_set_log10bf and use it for eqtlcat credible sets

* fix: credible sets dataset is too large and cant be broadcasted

* fix(dag): use GCSDeleteObjectsOperator instead of GCSDeleteBucketOperator

* fix: correct typo

* fix: correct typo
  • Loading branch information
ireneisdoomed authored and DSuveges committed Mar 8, 2024
1 parent 97bf3f0 commit 02dc66f
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 89 deletions.
1 change: 1 addition & 0 deletions config/step/ot_eqtl_catalogue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defaults:
eqtl_catalogue_paths_imported: ???
eqtl_catalogue_study_index_out: ???
eqtl_catalogue_credible_sets_out: ???
mqtl_quantification_methods_blacklist: []
session:
extended_spark_conf:
"spark.sql.shuffle.partitions": "3200"
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ title: eQTL Catalogue

The [eQTL Catalogue](https://www.ebi.ac.uk/eqtl/) aims to provide unified gene, protein expression and splicing Quantitative Trait Loci (QTLs) from all available human public studies.

It serves as the ultimate resource of mQTLs that we use for colocalization and target prioritization.
It serves as the ultimate resource of molecular QTLs that we use for colocalization and target prioritization.
14 changes: 6 additions & 8 deletions src/airflow/dags/eqtl_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
from airflow.providers.google.cloud.operators.dataflow import (
DataflowTemplatedJobStartOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSDeleteBucketOperator
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator

CLUSTER_NAME = "otg-preprocess-eqtl"
AUTOSCALING = "do-ld-explosion"
AUTOSCALING = "eqtl-preprocess"
PROJECT_ID = "open-targets-genetics-dev"

EQTL_CATALOG_SUSIE_LOCATION = "gs://eqtl_catalog_data/ebi_ftp/susie"
Expand Down Expand Up @@ -52,11 +52,10 @@
],
)

delete_decompressed_job = GCSDeleteBucketOperator(
delete_decompressed_job = GCSDeleteObjectsOperator(
task_id="delete_decompressed_files",
bucket_name=TEMP_DECOMPRESS_LOCATION,
force=True,
user_project=PROJECT_ID,
bucket_name=TEMP_DECOMPRESS_LOCATION.split("/")[2],
prefix=f"{TEMP_DECOMPRESS_LOCATION.split('/')[-1]}/",
)

(
Expand All @@ -69,6 +68,5 @@
)
>> common.install_dependencies(CLUSTER_NAME)
>> ingestion_job
>> delete_decompressed_job
>> common.delete_cluster(CLUSTER_NAME)
>> [delete_decompressed_job, common.delete_cluster(CLUSTER_NAME)]
)
23 changes: 23 additions & 0 deletions src/gentropy/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
from typing import TYPE_CHECKING, Tuple

import hail as hl
import numpy as np
from pyspark.sql import functions as f
from pyspark.sql import types as t

from gentropy.common.spark_helpers import pvalue_to_zscore

if TYPE_CHECKING:
from hail.table import Table
from numpy.typing import NDArray
from pyspark.sql import Column


Expand Down Expand Up @@ -325,3 +327,24 @@ def parse_efos(efo_uri: Column) -> Column:
"""
colname = efo_uri._jc.toString()
return f.array_sort(f.expr(f"regexp_extract_all(`{colname}`, '([A-Z]+_[0-9]+)')"))


def get_logsum(arr: NDArray[np.float64]) -> float:
"""Calculates logarithm of the sum of exponentials of a vector. The max is extracted to ensure that the sum is not Inf.
This function emulates scipy's logsumexp expression.
Args:
arr (NDArray[np.float64]): input array
Returns:
float: logsumexp of the input array
Example:
>>> l = [0.2, 0.1, 0.05, 0]
>>> round(get_logsum(l), 6)
1.476557
"""
themax = np.max(arr)
result = themax + np.log(np.sum(np.exp(arr - themax)))
return float(result)
6 changes: 1 addition & 5 deletions src/gentropy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ class EqtlCatalogueConfig(StepConfig):
eqtl_catalogue_paths_imported: str = MISSING
eqtl_catalogue_study_index_out: str = MISSING
eqtl_catalogue_credible_sets_out: str = MISSING
mqtl_quantification_methods: list[str] = field(
default_factory=lambda: [
"ge",
]
)
mqtl_quantification_methods_blacklist: list[str] = field(default_factory=lambda: [])
_target_: str = "gentropy.eqtl_catalogue.EqtlCatalogueStep"


Expand Down
24 changes: 24 additions & 0 deletions src/gentropy/dataset/study_locus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
from typing import TYPE_CHECKING

import pyspark.sql.functions as f
from pyspark.sql.types import FloatType

from gentropy.common.schemas import parse_spark_schema
from gentropy.common.spark_helpers import (
calculate_neglog_pvalue,
order_array_of_structs_by_field,
)
from gentropy.common.utils import get_logsum
from gentropy.dataset.dataset import Dataset
from gentropy.dataset.study_locus_overlap import StudyLocusOverlap
from gentropy.method.clump import LDclumping
Expand Down Expand Up @@ -220,6 +222,28 @@ def assign_study_locus_id(study_id_col: Column, variant_id_col: Column) -> Colum
variant_id_col = f.coalesce(variant_id_col, f.rand().cast("string"))
return f.xxhash64(study_id_col, variant_id_col).alias("studyLocusId")

@classmethod
def calculate_credible_set_log10bf(cls: type[StudyLocus], logbfs: Column) -> Column:
"""Calculate Bayes factor for the entire credible set. The Bayes factor is calculated as the logsumexp of the logBF values of the variants in the locus.
Args:
logbfs (Column): Array column with the logBF values of the variants in the locus.
Returns:
Column: log10 Bayes factor for the entire credible set.
Examples:
>>> spark.createDataFrame([([0.2, 0.1, 0.05, 0.0],)]).toDF("logBF").select(f.round(StudyLocus.calculate_credible_set_log10bf(f.col("logBF")), 7).alias("credibleSetlog10BF")).show()
+------------------+
|credibleSetlog10BF|
+------------------+
| 1.4765565|
+------------------+
<BLANKLINE>
"""
logsumexp_udf = f.udf(lambda x: get_logsum(x), FloatType())
return logsumexp_udf(logbfs).cast("double").alias("credibleSetlog10BF")

@classmethod
def get_schema(cls: type[StudyLocus]) -> StructType:
"""Provides the schema for the StudyLocus dataset.
Expand Down
80 changes: 62 additions & 18 deletions src/gentropy/datasource/eqtl_catalogue/finemapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING

import pyspark.sql.functions as f
from pyspark.sql import Column, DataFrame, Window
Expand All @@ -13,17 +14,21 @@
StructType,
)

from gentropy.common.session import Session
from gentropy.common.utils import parse_pvalue
from gentropy.dataset.study_locus import StudyLocus
from gentropy.datasource.eqtl_catalogue.study_index import EqtlCatalogueStudyIndex

if TYPE_CHECKING:
from pyspark.sql import DataFrame


@dataclass
class EqtlCatalogueFinemapping:
"""SuSIE finemapping dataset for eQTL Catalogue.
Credible sets from SuSIE are extracted and transformed into StudyLocus objects:
- A study ID is defined as a triad between: the publication, the tissue, and the measured gene (e.g. Braineac2_substantia_nigra_ENSG00000248275)
- A study ID is defined as a triad between: the publication, the tissue, and the measured trait (e.g. Braineac2_substantia_nigra_ENSG00000248275)
- Each row in the `credible_sets.tsv.gz` files is represented by molecular_trait_id/variant/rsid trios relevant for a given tissue. Each have their own finemapping statistics
- log Bayes Factors are available for all variants in the `lbf_variable.txt` files
"""
Expand Down Expand Up @@ -135,7 +140,7 @@ def parse_susie_results(
cls._extract_dataset_id_from_file_path(f.input_file_name()),
)
.join(
f.broadcast(
(
credible_sets.withColumn(
"dataset_id",
cls._extract_dataset_id_from_file_path(f.input_file_name()),
Expand Down Expand Up @@ -176,18 +181,18 @@ def parse_susie_results(
f.col("logBF"),
f.lit("SuSie").alias("finemappingMethod"),
# Study metadata
f.col("molecular_trait_id").alias("traitFromSource"),
f.col("gene_id").alias("geneId"),
f.array(f.col("molecular_trait_id")).alias("traitFromSourceMappedIds"),
f.col("dataset_id"),
f.concat_ws(
"_",
f.col("study_label"),
f.col("sample_group"),
f.col("gene_id"),
f.col("molecular_trait_id"),
).alias("studyId"),
f.col("tissue_id").alias("c"),
f.col("tissue_id").alias("tissueFromSourceId"),
EqtlCatalogueStudyIndex._identify_study_type(
f.col("study_label")
f.col("quant_method")
).alias("studyType"),
f.col("study_label").alias("projectId"),
f.concat_ws(
Expand Down Expand Up @@ -220,19 +225,12 @@ def from_susie_results(
field.name
for field in StudyLocus.get_schema().fields
if field.name in processed_finemapping_df.columns
] + ["studyLocusId", "locus"]
] + ["locus"]
return StudyLocus(
_df=(
processed_finemapping_df.withColumn(
"isLead",
f.row_number().over(
lead_w.orderBy(
*[
f.col("pValueExponent").asc(),
f.col("pValueMantissa").asc(),
]
)
)
f.row_number().over(lead_w.orderBy(f.desc("posteriorProbability")))
== f.lit(1),
)
.withColumn(
Expand All @@ -255,13 +253,59 @@ def from_susie_results(
)
.filter(f.col("isLead"))
.drop("isLead")
.withColumn(
"studyLocusId",
.select(
*study_locus_cols,
StudyLocus.assign_study_locus_id(
f.col("studyId"), f.col("variantId")
),
StudyLocus.calculate_credible_set_log10bf(
f.col("locus.logBF")
).alias("credibleSetlog10BF"),
)
.select(study_locus_cols)
),
_schema=StudyLocus.get_schema(),
).annotate_credible_sets()

@classmethod
def read_credible_set_from_source(
cls: type[EqtlCatalogueFinemapping],
session: Session,
credible_set_path: str | list[str],
) -> DataFrame:
"""Load raw credible sets from eQTL Catalogue.
Args:
session (Session): Spark session.
credible_set_path (str | list[str]): Path to raw table(s) containing finemapping results for any variant belonging to a credible set.
Returns:
DataFrame: Credible sets DataFrame.
"""
return session.spark.read.csv(
credible_set_path,
sep="\t",
header=True,
schema=cls.raw_credible_set_schema,
)

@classmethod
def read_lbf_from_source(
cls: type[EqtlCatalogueFinemapping],
session: Session,
lbf_path: str | list[str],
) -> DataFrame:
"""Load raw log Bayes Factors from eQTL Catalogue.
Args:
session (Session): Spark session.
lbf_path (str | list[str]): Path to raw table(s) containing Log Bayes Factors for each variant.
Returns:
DataFrame: Log Bayes Factors DataFrame.
"""
return session.spark.read.csv(
lbf_path,
sep="\t",
header=True,
schema=cls.raw_lbf_schema,
)
63 changes: 54 additions & 9 deletions src/gentropy/datasource/eqtl_catalogue/study_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

from __future__ import annotations

from itertools import chain
from typing import TYPE_CHECKING

import pandas as pd
import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

from gentropy.common.session import Session
from gentropy.dataset.study_index import StudyIndex

if TYPE_CHECKING:
Expand All @@ -23,7 +26,7 @@ class EqtlCatalogueStudyIndex:
- in the same publication (e.g. Alasoo_2018)
- in the same cell type or tissue (e.g. monocytes)
- and for the same gene associated with the measured molecular trait (e.g. ENSG00000141510)
- and for the same measured molecular trait (e.g. ENSG00000141510)
"""

Expand All @@ -40,24 +43,45 @@ class EqtlCatalogueStudyIndex:
StructField("quant_method", StringType(), True),
]
)
raw_studies_metadata_path = "https://raw.githubusercontent.com/eQTL-Catalogue/eQTL-Catalogue-resources/master/data_tables/dataset_metadata.tsv"
raw_studies_metadata_path = "https://raw.githubusercontent.com/eQTL-Catalogue/eQTL-Catalogue-resources/19929ff6a99bf402194292a14f96f9615b35f65f/data_tables/dataset_metadata.tsv"

@classmethod
def _identify_study_type(
cls: type[EqtlCatalogueStudyIndex], study_label_col: Column
cls: type[EqtlCatalogueStudyIndex], quantification_method_col: Column
) -> Column:
"""Identify the study type based on the study label.
"""Identify the study type based on the method to quantify the trait.
Args:
study_label_col (Column): column with the study label that identifies the supporting publication.
quantification_method_col (Column): column with the label of the method to quantify the trait. Available methods are [here](https://www.ebi.ac.uk/eqtl/Methods/)
Returns:
Column: The study type.
Examples:
>>> df = spark.createDataFrame([("ge",), ("exon",), ("tx",)], ["quant_method"])
>>> df.withColumn("study_type", EqtlCatalogueStudyIndex._identify_study_type(f.col("quant_method"))).show()
+------------+----------+
|quant_method|study_type|
+------------+----------+
| ge| eqtl|
| exon| eqtl|
| tx| eqtl|
+------------+----------+
<BLANKLINE>
"""
return f.when(
study_label_col == "Sun_2018",
f.lit("pqtl"),
).otherwise(f.lit("eqtl"))
method_to_study_type_mapping = {
"ge": "eqtl",
"exon": "eqtl",
"tx": "eqtl",
"microarray": "eqtl",
"leafcutter": "sqtl",
"aptamer": "pqtl",
"txrev": "tuqtl",
}
map_expr = f.create_map(
*[f.lit(x) for x in chain(*method_to_study_type_mapping.items())]
)
return map_expr.getItem(quantification_method_col)

@classmethod
def get_studies_of_interest(
Expand Down Expand Up @@ -100,3 +124,24 @@ def from_susie_results(
_df=processed_finemapping_df.select(study_index_cols).distinct(),
_schema=StudyIndex.get_schema(),
)

@classmethod
def read_studies_from_source(
cls: type[EqtlCatalogueStudyIndex],
session: Session,
mqtl_quantification_methods_blacklist: list[str],
) -> DataFrame:
"""Read raw studies metadata from eQTL Catalogue.
Args:
session (Session): Spark session.
mqtl_quantification_methods_blacklist (list[str]): Molecular trait quantification methods that we don't want to ingest. Available options in https://github.com/eQTL-Catalogue/eQTL-Catalogue-resources/blob/master/data_tables/dataset_metadata.tsv
Returns:
DataFrame: raw studies metadata.
"""
pd.DataFrame.iteritems = pd.DataFrame.items
return session.spark.createDataFrame(
pd.read_csv(cls.raw_studies_metadata_path, sep="\t"),
schema=cls.raw_studies_metadata_schema,
).filter(~(f.col("quant_method").isin(mqtl_quantification_methods_blacklist)))
Loading

0 comments on commit 02dc66f

Please sign in to comment.