Skip to content

Commit

Permalink
feat: extract credible sets and studies from eQTL Catalogue finemappi…
Browse files Browse the repository at this point in the history
…ng results (#514)

* feat: dataflow decompress prototype (#501)

* chore: commit susie results gist

* feat(study_index): add `tissueFromSourceId` to schema and make `traitFromSource` nullable

* fix: bug and linting fixes in new eqtl ingestion step

* perf: config bugfixes and performance improvements

* perf: remove data persistance to avoid executor failure

* perf: load susie results for studies of interest only

* perf: collect locus for leads only and optimise partitioning cols

* feat: parametrise methods to include

* feat: run full dag

* test: add tests

* fix: reorder test inputs

* docs: update eqtl catalogue docs

* fix: correct typos in tests docstrings

* fix: correct typos in tests docstrings

* test: fix

* revert: revert unwanted change in studyId definition

* test: final fix

---------

Co-authored-by: David Ochoa <ochoa@ebi.ac.uk>
  • Loading branch information
ireneisdoomed and d0choa authored Mar 4, 2024
1 parent ffa3d34 commit ec9d2c7
Show file tree
Hide file tree
Showing 20 changed files with 961 additions and 246 deletions.
1 change: 0 additions & 1 deletion config/datasets/ot_gcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ thurman: ${datasets.static_assets}/thurman2012/genomewideCorrs_above0.7_promoter
target_index: ${datasets.release_folder}/targets # OTP 23.12 data

gene_interactions: ${datasets.release_folder}/interaction # OTP 23.12 data
eqtl_catalogue_paths_imported: ${datasets.inputs}/preprocess/eqtl_catalogue/tabix_ftp_paths_imported.tsv
finngen_finemapping_results_path: ${datasets.inputs}/Finngen_susie_finemapping_r10/full
finngen_finemapping_summaries_path: ${datasets.inputs}/Finngen_susie_finemapping_r10/Finngen_susie_credset_summary_r10.tsv

Expand Down
9 changes: 6 additions & 3 deletions config/step/ot_eqtl_catalogue.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
defaults:
- eqtl_catalogue

eqtl_catalogue_paths_imported: ${datasets.eqtl_catalogue_paths_imported}
eqtl_catalogue_study_index_out: ${datasets.eqtl_catalogue_study_index_out}
eqtl_catalogue_summary_stats_out: ${datasets.eqtl_catalogue_summary_stats_out}
eqtl_catalogue_paths_imported: ???
eqtl_catalogue_study_index_out: ???
eqtl_catalogue_credible_sets_out: ???
session:
extended_spark_conf:
"spark.sql.shuffle.partitions": "3200"
8 changes: 2 additions & 6 deletions docs/python_api/datasources/eqtl_catalogue/_eqtl_catalogue.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
title: eQTL Catalogue
---

The [eQTL Catalogue](https://www.ebi.ac.uk/eqtl/) aims to provide uniformly processed gene expression and splicing Quantitative Trait Loci (QTLs) from all available public studies on humans.
The [eQTL Catalogue](https://www.ebi.ac.uk/eqtl/) aims to provide unified gene, protein expression and splicing Quantitative Trait Loci (QTLs) from all available human public studies.

It serves as the ultimate resource of eQTLs that we use for colocalization and target prioritization.

We utilize data from the following study within the eQTL Catalogue:

1. **GTEx v8**, 49 tissues
It serves as the ultimate resource of mQTLs that we use for colocalization and target prioritization.
5 changes: 5 additions & 0 deletions docs/python_api/datasources/eqtl_catalogue/finemapping.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: Fine mapping results
---

::: gentropy.datasource.eqtl_catalogue.finemapping.EqtlCatalogueFinemapping
2 changes: 1 addition & 1 deletion docs/python_api/steps/eqtl_catalogue.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: eqtl_catalogue
title: eQTL Catalogue
---

::: gentropy.eqtl_catalogue.EqtlCatalogueStep
74 changes: 74 additions & 0 deletions src/airflow/dags/eqtl_preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Airflow DAG to extract credible sets and a study index from eQTL Catalogue's finemapping results."""

from __future__ import annotations

from pathlib import Path

import common_airflow as common
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.dataflow import (
DataflowTemplatedJobStartOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSDeleteBucketOperator

CLUSTER_NAME = "otg-preprocess-eqtl"
AUTOSCALING = "do-ld-explosion"
PROJECT_ID = "open-targets-genetics-dev"

EQTL_CATALOG_SUSIE_LOCATION = "gs://eqtl_catalog_data/ebi_ftp/susie"
TEMP_DECOMPRESS_LOCATION = "gs://eqtl_catalog_data/susie_decompressed_tmp"
DECOMPRESS_FAILED_LOG = f"{TEMP_DECOMPRESS_LOCATION}.log"
STUDY_INDEX_PATH = "gs://eqtl_catalog_data/study_index"
CREDIBLE_SET_PATH = "gs://eqtl_catalog_data/credible_set_datasets/susie"

with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics — eQTL preprocess",
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
):
# SuSIE fine mapping results are stored as gzipped files in a GCS bucket.
# To improve processing performance, we decompress the files before processing to a temporary location in GCS.
decompression_job = DataflowTemplatedJobStartOperator(
task_id="decompress_susie_outputs",
template="gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files",
location="europe-west1",
project_id=PROJECT_ID,
parameters={
"inputFilePattern": f"{EQTL_CATALOG_SUSIE_LOCATION}/**/*.gz",
"outputDirectory": TEMP_DECOMPRESS_LOCATION,
"outputFailureFile": DECOMPRESS_FAILED_LOG,
},
)

ingestion_job = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="ot_eqtl_catalogue",
task_id="ot_eqtl_ingestion",
other_args=[
f"step.eqtl_catalogue_paths_imported={TEMP_DECOMPRESS_LOCATION}",
f"step.eqtl_catalogue_study_index_out={STUDY_INDEX_PATH}",
f"step.eqtl_catalogue_credible_sets_out={CREDIBLE_SET_PATH}",
],
)

delete_decompressed_job = GCSDeleteBucketOperator(
task_id="delete_decompressed_files",
bucket_name=TEMP_DECOMPRESS_LOCATION,
force=True,
user_project=PROJECT_ID,
)

(
decompression_job
>> common.create_cluster(
CLUSTER_NAME,
autoscaling_policy=AUTOSCALING,
num_workers=4,
worker_machine_type="n1-highmem-8",
)
>> common.install_dependencies(CLUSTER_NAME)
>> ingestion_job
>> delete_decompressed_job
>> common.delete_cluster(CLUSTER_NAME)
)
1 change: 1 addition & 0 deletions src/airflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
apache-airflow-providers-google==10.10.1
apache-airflow-providers-apache-beam==5.6.1
psycopg2-binary==2.9.9
8 changes: 7 additions & 1 deletion src/gentropy/assets/schemas/study_index.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
{
"name": "traitFromSource",
"type": "string",
"nullable": false,
"nullable": true,
"metadata": {}
},
{
Expand All @@ -41,6 +41,12 @@
"nullable": true,
"metadata": {}
},
{
"name": "tissueFromSourceId",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "pubmedId",
"type": "string",
Expand Down
7 changes: 6 additions & 1 deletion src/gentropy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ class EqtlCatalogueConfig(StepConfig):

eqtl_catalogue_paths_imported: str = MISSING
eqtl_catalogue_study_index_out: str = MISSING
eqtl_catalogue_summary_stats_out: str = MISSING
eqtl_catalogue_credible_sets_out: str = MISSING
mqtl_quantification_methods: list[str] = field(
default_factory=lambda: [
"ge",
]
)
_target_: str = "gentropy.eqtl_catalogue.EqtlCatalogueStep"


Expand Down
Loading

0 comments on commit ec9d2c7

Please sign in to comment.