diff --git a/config/step/ot_eqtl_catalogue.yaml b/config/step/ot_eqtl_catalogue.yaml index 06076d228..7d4441864 100644 --- a/config/step/ot_eqtl_catalogue.yaml +++ b/config/step/ot_eqtl_catalogue.yaml @@ -4,6 +4,7 @@ defaults: eqtl_catalogue_paths_imported: ??? eqtl_catalogue_study_index_out: ??? eqtl_catalogue_credible_sets_out: ??? +mqtl_quantification_methods_blacklist: [] session: extended_spark_conf: "spark.sql.shuffle.partitions": "3200" diff --git a/docs/python_api/datasources/eqtl_catalogue/_eqtl_catalogue.md b/docs/python_api/datasources/eqtl_catalogue/_eqtl_catalogue.md index c2e7c79ce..2e95f561c 100644 --- a/docs/python_api/datasources/eqtl_catalogue/_eqtl_catalogue.md +++ b/docs/python_api/datasources/eqtl_catalogue/_eqtl_catalogue.md @@ -4,4 +4,4 @@ title: eQTL Catalogue The [eQTL Catalogue](https://www.ebi.ac.uk/eqtl/) aims to provide unified gene, protein expression and splicing Quantitative Trait Loci (QTLs) from all available human public studies. -It serves as the ultimate resource of mQTLs that we use for colocalization and target prioritization. +It serves as the ultimate resource of molecular QTLs that we use for colocalization and target prioritization. diff --git a/src/airflow/dags/eqtl_preprocess.py b/src/airflow/dags/eqtl_preprocess.py index 40a3c1d6a..ec346d137 100644 --- a/src/airflow/dags/eqtl_preprocess.py +++ b/src/airflow/dags/eqtl_preprocess.py @@ -9,10 +9,10 @@ from airflow.providers.google.cloud.operators.dataflow import ( DataflowTemplatedJobStartOperator, ) -from airflow.providers.google.cloud.operators.gcs import GCSDeleteBucketOperator +from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator CLUSTER_NAME = "otg-preprocess-eqtl" -AUTOSCALING = "do-ld-explosion" +AUTOSCALING = "eqtl-preprocess" PROJECT_ID = "open-targets-genetics-dev" EQTL_CATALOG_SUSIE_LOCATION = "gs://eqtl_catalog_data/ebi_ftp/susie" @@ -52,11 +52,10 @@ ], ) - delete_decompressed_job = GCSDeleteBucketOperator( + delete_decompressed_job = GCSDeleteObjectsOperator( task_id="delete_decompressed_files", - bucket_name=TEMP_DECOMPRESS_LOCATION, - force=True, - user_project=PROJECT_ID, + bucket_name=TEMP_DECOMPRESS_LOCATION.split("/")[2], + prefix=f"{TEMP_DECOMPRESS_LOCATION.split('/')[-1]}/", ) ( @@ -69,6 +68,5 @@ ) >> common.install_dependencies(CLUSTER_NAME) >> ingestion_job - >> delete_decompressed_job - >> common.delete_cluster(CLUSTER_NAME) + >> [delete_decompressed_job, common.delete_cluster(CLUSTER_NAME)] ) diff --git a/src/gentropy/common/utils.py b/src/gentropy/common/utils.py index 05704a299..4cdda9ad2 100644 --- a/src/gentropy/common/utils.py +++ b/src/gentropy/common/utils.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, Tuple import hail as hl +import numpy as np from pyspark.sql import functions as f from pyspark.sql import types as t @@ -13,6 +14,7 @@ if TYPE_CHECKING: from hail.table import Table + from numpy.typing import NDArray from pyspark.sql import Column @@ -325,3 +327,24 @@ def parse_efos(efo_uri: Column) -> Column: """ colname = efo_uri._jc.toString() return f.array_sort(f.expr(f"regexp_extract_all(`{colname}`, '([A-Z]+_[0-9]+)')")) + + +def get_logsum(arr: NDArray[np.float64]) -> float: + """Calculates logarithm of the sum of exponentials of a vector. The max is extracted to ensure that the sum is not Inf. + + This function emulates scipy's logsumexp expression. + + Args: + arr (NDArray[np.float64]): input array + + Returns: + float: logsumexp of the input array + + Example: + >>> l = [0.2, 0.1, 0.05, 0] + >>> round(get_logsum(l), 6) + 1.476557 + """ + themax = np.max(arr) + result = themax + np.log(np.sum(np.exp(arr - themax))) + return float(result) diff --git a/src/gentropy/config.py b/src/gentropy/config.py index 5e82055cb..7202ee578 100644 --- a/src/gentropy/config.py +++ b/src/gentropy/config.py @@ -113,11 +113,7 @@ class EqtlCatalogueConfig(StepConfig): eqtl_catalogue_paths_imported: str = MISSING eqtl_catalogue_study_index_out: str = MISSING eqtl_catalogue_credible_sets_out: str = MISSING - mqtl_quantification_methods: list[str] = field( - default_factory=lambda: [ - "ge", - ] - ) + mqtl_quantification_methods_blacklist: list[str] = field(default_factory=lambda: []) _target_: str = "gentropy.eqtl_catalogue.EqtlCatalogueStep" diff --git a/src/gentropy/dataset/study_locus.py b/src/gentropy/dataset/study_locus.py index 1867dfa94..4ff9f0172 100644 --- a/src/gentropy/dataset/study_locus.py +++ b/src/gentropy/dataset/study_locus.py @@ -6,12 +6,14 @@ from typing import TYPE_CHECKING import pyspark.sql.functions as f +from pyspark.sql.types import FloatType from gentropy.common.schemas import parse_spark_schema from gentropy.common.spark_helpers import ( calculate_neglog_pvalue, order_array_of_structs_by_field, ) +from gentropy.common.utils import get_logsum from gentropy.dataset.dataset import Dataset from gentropy.dataset.study_locus_overlap import StudyLocusOverlap from gentropy.method.clump import LDclumping @@ -220,6 +222,28 @@ def assign_study_locus_id(study_id_col: Column, variant_id_col: Column) -> Colum variant_id_col = f.coalesce(variant_id_col, f.rand().cast("string")) return f.xxhash64(study_id_col, variant_id_col).alias("studyLocusId") + @classmethod + def calculate_credible_set_log10bf(cls: type[StudyLocus], logbfs: Column) -> Column: + """Calculate Bayes factor for the entire credible set. The Bayes factor is calculated as the logsumexp of the logBF values of the variants in the locus. + + Args: + logbfs (Column): Array column with the logBF values of the variants in the locus. + + Returns: + Column: log10 Bayes factor for the entire credible set. + + Examples: + >>> spark.createDataFrame([([0.2, 0.1, 0.05, 0.0],)]).toDF("logBF").select(f.round(StudyLocus.calculate_credible_set_log10bf(f.col("logBF")), 7).alias("credibleSetlog10BF")).show() + +------------------+ + |credibleSetlog10BF| + +------------------+ + | 1.4765565| + +------------------+ + + """ + logsumexp_udf = f.udf(lambda x: get_logsum(x), FloatType()) + return logsumexp_udf(logbfs).cast("double").alias("credibleSetlog10BF") + @classmethod def get_schema(cls: type[StudyLocus]) -> StructType: """Provides the schema for the StudyLocus dataset. diff --git a/src/gentropy/datasource/eqtl_catalogue/finemapping.py b/src/gentropy/datasource/eqtl_catalogue/finemapping.py index af814d94f..ff55ead72 100644 --- a/src/gentropy/datasource/eqtl_catalogue/finemapping.py +++ b/src/gentropy/datasource/eqtl_catalogue/finemapping.py @@ -2,6 +2,7 @@ from __future__ import annotations from dataclasses import dataclass +from typing import TYPE_CHECKING import pyspark.sql.functions as f from pyspark.sql import Column, DataFrame, Window @@ -13,17 +14,21 @@ StructType, ) +from gentropy.common.session import Session from gentropy.common.utils import parse_pvalue from gentropy.dataset.study_locus import StudyLocus from gentropy.datasource.eqtl_catalogue.study_index import EqtlCatalogueStudyIndex +if TYPE_CHECKING: + from pyspark.sql import DataFrame + @dataclass class EqtlCatalogueFinemapping: """SuSIE finemapping dataset for eQTL Catalogue. Credible sets from SuSIE are extracted and transformed into StudyLocus objects: - - A study ID is defined as a triad between: the publication, the tissue, and the measured gene (e.g. Braineac2_substantia_nigra_ENSG00000248275) + - A study ID is defined as a triad between: the publication, the tissue, and the measured trait (e.g. Braineac2_substantia_nigra_ENSG00000248275) - Each row in the `credible_sets.tsv.gz` files is represented by molecular_trait_id/variant/rsid trios relevant for a given tissue. Each have their own finemapping statistics - log Bayes Factors are available for all variants in the `lbf_variable.txt` files """ @@ -135,7 +140,7 @@ def parse_susie_results( cls._extract_dataset_id_from_file_path(f.input_file_name()), ) .join( - f.broadcast( + ( credible_sets.withColumn( "dataset_id", cls._extract_dataset_id_from_file_path(f.input_file_name()), @@ -176,18 +181,18 @@ def parse_susie_results( f.col("logBF"), f.lit("SuSie").alias("finemappingMethod"), # Study metadata + f.col("molecular_trait_id").alias("traitFromSource"), f.col("gene_id").alias("geneId"), - f.array(f.col("molecular_trait_id")).alias("traitFromSourceMappedIds"), f.col("dataset_id"), f.concat_ws( "_", f.col("study_label"), f.col("sample_group"), - f.col("gene_id"), + f.col("molecular_trait_id"), ).alias("studyId"), - f.col("tissue_id").alias("c"), + f.col("tissue_id").alias("tissueFromSourceId"), EqtlCatalogueStudyIndex._identify_study_type( - f.col("study_label") + f.col("quant_method") ).alias("studyType"), f.col("study_label").alias("projectId"), f.concat_ws( @@ -220,19 +225,12 @@ def from_susie_results( field.name for field in StudyLocus.get_schema().fields if field.name in processed_finemapping_df.columns - ] + ["studyLocusId", "locus"] + ] + ["locus"] return StudyLocus( _df=( processed_finemapping_df.withColumn( "isLead", - f.row_number().over( - lead_w.orderBy( - *[ - f.col("pValueExponent").asc(), - f.col("pValueMantissa").asc(), - ] - ) - ) + f.row_number().over(lead_w.orderBy(f.desc("posteriorProbability"))) == f.lit(1), ) .withColumn( @@ -255,13 +253,59 @@ def from_susie_results( ) .filter(f.col("isLead")) .drop("isLead") - .withColumn( - "studyLocusId", + .select( + *study_locus_cols, StudyLocus.assign_study_locus_id( f.col("studyId"), f.col("variantId") ), + StudyLocus.calculate_credible_set_log10bf( + f.col("locus.logBF") + ).alias("credibleSetlog10BF"), ) - .select(study_locus_cols) ), _schema=StudyLocus.get_schema(), ).annotate_credible_sets() + + @classmethod + def read_credible_set_from_source( + cls: type[EqtlCatalogueFinemapping], + session: Session, + credible_set_path: str | list[str], + ) -> DataFrame: + """Load raw credible sets from eQTL Catalogue. + + Args: + session (Session): Spark session. + credible_set_path (str | list[str]): Path to raw table(s) containing finemapping results for any variant belonging to a credible set. + + Returns: + DataFrame: Credible sets DataFrame. + """ + return session.spark.read.csv( + credible_set_path, + sep="\t", + header=True, + schema=cls.raw_credible_set_schema, + ) + + @classmethod + def read_lbf_from_source( + cls: type[EqtlCatalogueFinemapping], + session: Session, + lbf_path: str | list[str], + ) -> DataFrame: + """Load raw log Bayes Factors from eQTL Catalogue. + + Args: + session (Session): Spark session. + lbf_path (str | list[str]): Path to raw table(s) containing Log Bayes Factors for each variant. + + Returns: + DataFrame: Log Bayes Factors DataFrame. + """ + return session.spark.read.csv( + lbf_path, + sep="\t", + header=True, + schema=cls.raw_lbf_schema, + ) diff --git a/src/gentropy/datasource/eqtl_catalogue/study_index.py b/src/gentropy/datasource/eqtl_catalogue/study_index.py index 611bcdff7..71cec1ec0 100644 --- a/src/gentropy/datasource/eqtl_catalogue/study_index.py +++ b/src/gentropy/datasource/eqtl_catalogue/study_index.py @@ -2,11 +2,14 @@ from __future__ import annotations +from itertools import chain from typing import TYPE_CHECKING +import pandas as pd import pyspark.sql.functions as f from pyspark.sql.types import IntegerType, StringType, StructField, StructType +from gentropy.common.session import Session from gentropy.dataset.study_index import StudyIndex if TYPE_CHECKING: @@ -23,7 +26,7 @@ class EqtlCatalogueStudyIndex: - in the same publication (e.g. Alasoo_2018) - in the same cell type or tissue (e.g. monocytes) - - and for the same gene associated with the measured molecular trait (e.g. ENSG00000141510) + - and for the same measured molecular trait (e.g. ENSG00000141510) """ @@ -40,24 +43,45 @@ class EqtlCatalogueStudyIndex: StructField("quant_method", StringType(), True), ] ) - raw_studies_metadata_path = "https://raw.githubusercontent.com/eQTL-Catalogue/eQTL-Catalogue-resources/master/data_tables/dataset_metadata.tsv" + raw_studies_metadata_path = "https://raw.githubusercontent.com/eQTL-Catalogue/eQTL-Catalogue-resources/19929ff6a99bf402194292a14f96f9615b35f65f/data_tables/dataset_metadata.tsv" @classmethod def _identify_study_type( - cls: type[EqtlCatalogueStudyIndex], study_label_col: Column + cls: type[EqtlCatalogueStudyIndex], quantification_method_col: Column ) -> Column: - """Identify the study type based on the study label. + """Identify the study type based on the method to quantify the trait. Args: - study_label_col (Column): column with the study label that identifies the supporting publication. + quantification_method_col (Column): column with the label of the method to quantify the trait. Available methods are [here](https://www.ebi.ac.uk/eqtl/Methods/) Returns: Column: The study type. + + Examples: + >>> df = spark.createDataFrame([("ge",), ("exon",), ("tx",)], ["quant_method"]) + >>> df.withColumn("study_type", EqtlCatalogueStudyIndex._identify_study_type(f.col("quant_method"))).show() + +------------+----------+ + |quant_method|study_type| + +------------+----------+ + | ge| eqtl| + | exon| eqtl| + | tx| eqtl| + +------------+----------+ + """ - return f.when( - study_label_col == "Sun_2018", - f.lit("pqtl"), - ).otherwise(f.lit("eqtl")) + method_to_study_type_mapping = { + "ge": "eqtl", + "exon": "eqtl", + "tx": "eqtl", + "microarray": "eqtl", + "leafcutter": "sqtl", + "aptamer": "pqtl", + "txrev": "tuqtl", + } + map_expr = f.create_map( + *[f.lit(x) for x in chain(*method_to_study_type_mapping.items())] + ) + return map_expr.getItem(quantification_method_col) @classmethod def get_studies_of_interest( @@ -100,3 +124,24 @@ def from_susie_results( _df=processed_finemapping_df.select(study_index_cols).distinct(), _schema=StudyIndex.get_schema(), ) + + @classmethod + def read_studies_from_source( + cls: type[EqtlCatalogueStudyIndex], + session: Session, + mqtl_quantification_methods_blacklist: list[str], + ) -> DataFrame: + """Read raw studies metadata from eQTL Catalogue. + + Args: + session (Session): Spark session. + mqtl_quantification_methods_blacklist (list[str]): Molecular trait quantification methods that we don't want to ingest. Available options in https://github.com/eQTL-Catalogue/eQTL-Catalogue-resources/blob/master/data_tables/dataset_metadata.tsv + + Returns: + DataFrame: raw studies metadata. + """ + pd.DataFrame.iteritems = pd.DataFrame.items + return session.spark.createDataFrame( + pd.read_csv(cls.raw_studies_metadata_path, sep="\t"), + schema=cls.raw_studies_metadata_schema, + ).filter(~(f.col("quant_method").isin(mqtl_quantification_methods_blacklist))) diff --git a/src/gentropy/datasource/finngen/finemapping.py b/src/gentropy/datasource/finngen/finemapping.py index 8fd12b0ba..a31ddf511 100644 --- a/src/gentropy/datasource/finngen/finemapping.py +++ b/src/gentropy/datasource/finngen/finemapping.py @@ -253,10 +253,7 @@ def from_finngen_susie_finemapping( toploci_df = get_top_ranked_in_window( processed_finngen_finemapping_df, Window.partitionBy("studyId", "region", "credibleSetIndex").orderBy( - *[ - f.col("pValueExponent").asc(), - f.col("pValueMantissa").asc(), - ] + f.desc("posteriorProbability") ), ).select( "variantId", diff --git a/src/gentropy/eqtl_catalogue.py b/src/gentropy/eqtl_catalogue.py index bfbfd79c2..7adc5d8a2 100644 --- a/src/gentropy/eqtl_catalogue.py +++ b/src/gentropy/eqtl_catalogue.py @@ -2,9 +2,6 @@ from __future__ import annotations -import pandas as pd -import pyspark.sql.functions as f - from gentropy.common.session import Session from gentropy.datasource.eqtl_catalogue.finemapping import EqtlCatalogueFinemapping from gentropy.datasource.eqtl_catalogue.study_index import EqtlCatalogueStudyIndex @@ -19,7 +16,7 @@ class EqtlCatalogueStep: def __init__( self, session: Session, - mqtl_quantification_methods: list[str], + mqtl_quantification_methods_blacklist: list[str], eqtl_catalogue_paths_imported: str, eqtl_catalogue_study_index_out: str, eqtl_catalogue_credible_sets_out: str, @@ -28,39 +25,33 @@ def __init__( Args: session (Session): Session object. - mqtl_quantification_methods (list[str]): Molecular trait quantification methods to ingest. Available options in https://github.com/eQTL-Catalogue/eQTL-Catalogue-resources/blob/master/data_tables/dataset_metadata.tsv + mqtl_quantification_methods_blacklist (list[str]): Molecular trait quantification methods that we don't want to ingest. Available options in https://github.com/eQTL-Catalogue/eQTL-Catalogue-resources/blob/master/data_tables/dataset_metadata.tsv eqtl_catalogue_paths_imported (str): Input eQTL Catalogue fine mapping results path. eqtl_catalogue_study_index_out (str): Output eQTL Catalogue study index path. eqtl_catalogue_credible_sets_out (str): Output eQTL Catalogue credible sets path. """ # Extract - pd.DataFrame.iteritems = pd.DataFrame.items - studies_metadata = session.spark.createDataFrame( - pd.read_csv(EqtlCatalogueStudyIndex.raw_studies_metadata_path, sep="\t"), - schema=EqtlCatalogueStudyIndex.raw_studies_metadata_schema, - ).filter(f.col("quant_method").isin(list(mqtl_quantification_methods))) + studies_metadata = EqtlCatalogueStudyIndex.read_studies_from_source( + session, list(mqtl_quantification_methods_blacklist) + ) # Load raw data only for the studies we are interested in ingestion. This makes the proces much lighter. studies_to_ingest = EqtlCatalogueStudyIndex.get_studies_of_interest( studies_metadata ) - credible_sets_df = session.spark.read.csv( - [ + credible_sets_df = EqtlCatalogueFinemapping.read_credible_set_from_source( + session, + credible_set_path=[ f"{eqtl_catalogue_paths_imported}/{qtd_id}.credible_sets.tsv" for qtd_id in studies_to_ingest ], - sep="\t", - header=True, - schema=EqtlCatalogueFinemapping.raw_credible_set_schema, ) - lbf_df = session.spark.read.csv( - [ + lbf_df = EqtlCatalogueFinemapping.read_lbf_from_source( + session, + lbf_path=[ f"{eqtl_catalogue_paths_imported}/{qtd_id}.lbf_variable.txt" for qtd_id in studies_to_ingest ], - sep="\t", - header=True, - schema=EqtlCatalogueFinemapping.raw_lbf_schema, ) # Transform diff --git a/src/gentropy/method/colocalisation.py b/src/gentropy/method/colocalisation.py index fd56398f0..899343998 100644 --- a/src/gentropy/method/colocalisation.py +++ b/src/gentropy/method/colocalisation.py @@ -10,6 +10,7 @@ from pyspark.ml.linalg import DenseVector, Vectors, VectorUDT from pyspark.sql.types import DoubleType +from gentropy.common.utils import get_logsum from gentropy.dataset.colocalisation import Colocalisation if TYPE_CHECKING: @@ -105,28 +106,6 @@ class Coloc: """ - @staticmethod - def _get_logsum(log_bf: NDArray[np.float64]) -> float: - """Calculates logsum of vector. - - This function calculates the log of the sum of the exponentiated - logs taking out the max, i.e. insuring that the sum is not Inf - - Args: - log_bf (NDArray[np.float64]): log bayes factor - - Returns: - float: logsum - - Example: - >>> l = [0.2, 0.1, 0.05, 0] - >>> round(Coloc._get_logsum(l), 6) - 1.476557 - """ - themax = np.max(log_bf) - result = themax + np.log(np.sum(np.exp(log_bf - themax))) - return float(result) - @staticmethod def _get_posteriors(all_bfs: NDArray[np.float64]) -> DenseVector: """Calculate posterior probabilities for each hypothesis. @@ -142,7 +121,7 @@ def _get_posteriors(all_bfs: NDArray[np.float64]) -> DenseVector: >>> Coloc._get_posteriors(l) DenseVector([0.279, 0.2524, 0.2401, 0.2284]) """ - diff = all_bfs - Coloc._get_logsum(all_bfs) + diff = all_bfs - get_logsum(all_bfs) bfs_posteriors = np.exp(diff) return Vectors.dense(bfs_posteriors) @@ -166,7 +145,7 @@ def colocalise( Colocalisation: Colocalisation results """ # register udfs - logsum = f.udf(Coloc._get_logsum, DoubleType()) + logsum = f.udf(get_logsum, DoubleType()) posteriors = f.udf(Coloc._get_posteriors, VectorUDT()) return Colocalisation( _df=(