Skip to content

Commit

Permalink
Merge pull request #528 from opentargets/ds_3238_update_ETL_DAG
Browse files Browse the repository at this point in the history
feat(dag): add data transfer task group for release process
  • Loading branch information
DSuveges authored Mar 14, 2024
2 parents 1095c67 + d4f2792 commit 4dcecc3
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 172 deletions.
8 changes: 4 additions & 4 deletions config/datasets/ot_gcp.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Release specific configuration:
release_version: "24.01"
release_version: "24.03"
dev_version: XX.XX
release_folder: gs://genetics_etl_python_playground/releases/${datasets.release_version}

inputs: gs://genetics_etl_python_playground/input
static_assets: gs://genetics_etl_python_playground/static_assetss
static_assets: gs://genetics_etl_python_playground/static_assets
outputs: gs://genetics_etl_python_playground/output/python_etl/parquet/${datasets.dev_version}

## Datasets:
Expand Down Expand Up @@ -36,9 +36,9 @@ anderson: ${datasets.static_assets}/andersson2014/enhancer_tss_associations.bed
javierre: ${datasets.static_assets}/javierre_2016_preprocessed
jung: ${datasets.static_assets}/jung2019_pchic_tableS3.csv
thurman: ${datasets.static_assets}/thurman2012/genomewideCorrs_above0.7_promoterPlusMinus500kb_withGeneNames_32celltypeCategories.bed8.gz
target_index: ${datasets.release_folder}/targets # OTP 23.12 data
target_index: ${datasets.static_assets}/targets # OTP 23.12 data
gene_interactions: ${datasets.static_assets}/interaction # OTP 23.12 data

gene_interactions: ${datasets.release_folder}/interaction # OTP 23.12 data
finngen_finemapping_results_path: ${datasets.inputs}/Finngen_susie_finemapping_r10/full
finngen_finemapping_summaries_path: ${datasets.inputs}/Finngen_susie_finemapping_r10/Finngen_susie_credset_summary_r10.tsv

Expand Down
124 changes: 1 addition & 123 deletions poetry.lock

Large diffs are not rendered by default.

18 changes: 17 additions & 1 deletion src/airflow/dags/common_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
DataprocSubmitJobOperator,
)
from airflow.utils.trigger_rule import TriggerRule
from google.cloud import dataproc_v1
from google.cloud import dataproc_v1, storage

if TYPE_CHECKING:
from pathlib import Path
Expand Down Expand Up @@ -59,6 +59,22 @@
}


def check_gcp_folder_exists(bucket_name: str, folder_path: str) -> bool:
"""Check if a folder exists in a Google Cloud bucket.
Args:
bucket_name (str): The name of the Google Cloud bucket.
folder_path (str): The path of the folder to check.
Returns:
bool: True if the folder exists, False otherwise.
"""
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blobs = bucket.list_blobs(prefix=folder_path)
return any(blobs)


def create_cluster(
cluster_name: str,
master_machine_type: str = "n1-highmem-16",
Expand Down
35 changes: 0 additions & 35 deletions src/airflow/dags/dag_genetics_etl.py

This file was deleted.

8 changes: 4 additions & 4 deletions src/airflow/dags/eqtl_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
AUTOSCALING = "eqtl-preprocess"
PROJECT_ID = "open-targets-genetics-dev"

EQTL_CATALOG_SUSIE_LOCATION = "gs://eqtl_catalog_data/ebi_ftp/susie"
TEMP_DECOMPRESS_LOCATION = "gs://eqtl_catalog_data/susie_decompressed_tmp"
EQTL_CATALOG_SUSIE_LOCATION = "gs://eqtl_catalogue_data/ebi_ftp/susie"
TEMP_DECOMPRESS_LOCATION = "gs://eqtl_catalogue_data/susie_decompressed_tmp"
DECOMPRESS_FAILED_LOG = f"{TEMP_DECOMPRESS_LOCATION}.log"
STUDY_INDEX_PATH = "gs://eqtl_catalog_data/study_index"
CREDIBLE_SET_PATH = "gs://eqtl_catalog_data/credible_set_datasets/susie"
STUDY_INDEX_PATH = "gs://eqtl_catalogue_data/study_index"
CREDIBLE_SET_PATH = "gs://eqtl_catalogue_data/credible_set_datasets/susie"

with DAG(
dag_id=Path(__file__).stem,
Expand Down
2 changes: 1 addition & 1 deletion src/airflow/dags/finngen_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
)
FINNGEN_FM_SUMMARIES = "gs://genetics_etl_python_playground/input/Finngen_susie_finemapping_r10/Finngen_susie_credset_summary_r10.tsv"
FINNGEN_PREFIX = "FINNGEN_R10_"
FINNGEN_FM_OUT = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX/finngen_susie_processed"
FINNGEN_FM_OUT = f"{FINNGEN_BUCKET}/credible_set_datasets/finngen_susie"

with DAG(
dag_id=Path(__file__).stem,
Expand Down
160 changes: 160 additions & 0 deletions src/airflow/dags/genetics_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""Test DAG to prototype data transfer."""

from __future__ import annotations

from pathlib import Path

import common_airflow as common
from airflow.models.dag import DAG
from airflow.operators.python import ShortCircuitOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.utils.task_group import TaskGroup

CLUSTER_NAME = "otg-etl"
SOURCE_CONFIG_FILE_PATH = Path(__file__).parent / "configs" / "dag.yaml"

# Release specific variables:
RELEASE_VERSION = "24.03"
RELEASE_BUCKET_NAME = "genetics_etl_python_playground"

# Datasource paths:
GWAS_CATALOG_BUCKET_NAME = "gwas_catalog_data"
EQTL_BUCKET_NAME = "eqtl_catalogue_data"
FINNGEN_BUCKET_NAME = "finngen_data"
FINNGEN_RELEASE = "r10"

# Files to move:
DATA_TO_MOVE = {
# GWAS Catalog summary study index:
"gwas_catalog_study_index": {
"source_bucket": GWAS_CATALOG_BUCKET_NAME,
"source_object": "study_index",
"destination_bucket": RELEASE_BUCKET_NAME,
"destination_object": f"releases/{RELEASE_VERSION}/study_index/gwas_catalog",
},
# PICS credible sets from GWAS Catalog curated associations:
"gwas_catalog_curated_credible_set": {
"source_bucket": GWAS_CATALOG_BUCKET_NAME,
"source_object": "credible_set_datasets/gwas_catalog_curated",
"destination_bucket": RELEASE_BUCKET_NAME,
"destination_object": f"releases/{RELEASE_VERSION}/credible_set/gwas_catalog_pics_from_curation",
},
# PICS credible sets from GWAS Catalog summary statistics:
"gwas_catalog_sumstats_credible_set": {
"source_bucket": GWAS_CATALOG_BUCKET_NAME,
"source_object": "credible_set_datasets/gwas_catalog_summary_stats",
"destination_bucket": RELEASE_BUCKET_NAME,
"destination_object": f"releases/{RELEASE_VERSION}/credible_set/gwas_catalog_pics_from_summary_statistics",
},
# GWAS Catalog manifest files:
"gwas_catalog_manifests": {
"source_bucket": GWAS_CATALOG_BUCKET_NAME,
"source_object": "manifests",
"destination_bucket": RELEASE_BUCKET_NAME,
"destination_object": f"releases/{RELEASE_VERSION}/manifests",
},
# eQTL Catalog study index:
"eqtl_catalogue_study_index": {
"source_bucket": EQTL_BUCKET_NAME,
"source_object": "study_index",
"destination_bucket": RELEASE_BUCKET_NAME,
"destination_object": f"releases/{RELEASE_VERSION}/study_index/eqtl_catalogue",
},
# eQTL Catalog SuSiE credible sets:
"eqtl_catalogue_susie_credible_set": {
"source_bucket": EQTL_BUCKET_NAME,
"source_object": "credible_set_datasets/susie",
"destination_bucket": RELEASE_BUCKET_NAME,
"destination_object": f"releases/{RELEASE_VERSION}/credible_set/eqtl_catalogue_susie",
},
# Finngen study index:
"finngen_study_index": {
"source_bucket": FINNGEN_BUCKET_NAME,
"source_object": f"{FINNGEN_RELEASE}/study_index",
"destination_bucket": RELEASE_BUCKET_NAME,
"destination_object": f"releases/{RELEASE_VERSION}/study_index/finngen",
},
# Finngen summary statistics:
"finngen_PICS_credible_set": {
"source_bucket": FINNGEN_BUCKET_NAME,
"source_object": f"{FINNGEN_RELEASE}/credible_set_datasets/finngen_pics",
"destination_bucket": RELEASE_BUCKET_NAME,
"destination_object": f"releases/{RELEASE_VERSION}/credible_set/finngen_pics",
},
# Finngen SuSiE credible sets:
"finngen_susie_credible_set": {
"source_bucket": FINNGEN_BUCKET_NAME,
"source_object": f"{FINNGEN_RELEASE}/credible_set_datasets/finngen_susie_processed",
"destination_bucket": RELEASE_BUCKET_NAME,
"destination_object": f"releases/{RELEASE_VERSION}/credible_set/finngen_susie",
},
# L2G gold standard:
"gold_standard": {
"source_bucket": "genetics_etl_python_playground",
"source_object": "input/l2g/gold_standard/curation.json",
"destination_bucket": RELEASE_BUCKET_NAME,
"destination_object": f"releases/{RELEASE_VERSION}/locus_to_gene_gold_standard.json",
},
}


# This operator meant to fail the DAG if the release folder exists:
ensure_release_folder_not_exists = ShortCircuitOperator(
task_id="test_release_folder_exists",
python_callable=lambda bucket, path: not common.check_gcp_folder_exists(
bucket, path
),
op_kwargs={
"bucket": RELEASE_BUCKET_NAME,
"path": f"releases/{RELEASE_VERSION}",
},
)

with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics ETL workflow",
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
):
# Compiling tasks for moving data to the right place:
with TaskGroup(group_id="data_transfer") as data_transfer:
# Defining the tasks to execute in the task group:
[
GCSToGCSOperator(
task_id=f"move_{data_name}",
source_bucket=data["source_bucket"],
source_object=data["source_object"],
destination_bucket=data["destination_bucket"],
destination_object=data["destination_object"],
)
for data_name, data in DATA_TO_MOVE.items()
]

with TaskGroup(group_id="genetics_etl") as genetics_etl:
# Parse and define all steps and their prerequisites.
tasks = {}
steps = common.read_yaml_config(SOURCE_CONFIG_FILE_PATH)
for step in steps:
# Define task for the current step.
step_id = step["id"]
this_task = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id=step_id,
task_id=step_id,
)
# Chain prerequisites.
tasks[step_id] = this_task
for prerequisite in step.get("prerequisites", []):
this_task.set_upstream(tasks[prerequisite])

common.generate_dag(cluster_name=CLUSTER_NAME, tasks=list(tasks.values()))

# DAG description:
(
# Test that the release folder doesn't exist:
ensure_release_folder_not_exists
# Run data transfer:
>> data_transfer
# Once datasets are transferred, run the rest of the steps:
>> genetics_etl
)
24 changes: 24 additions & 0 deletions src/gentropy/assets/schemas/l2g_feature_matrix.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,30 @@
"name": "sqtlColocLlrMaximumNeighborhood",
"nullable": true,
"type": "float"
},
{
"metadata": {},
"name": "tuqtlColocClppMaximum",
"nullable": true,
"type": "float"
},
{
"metadata": {},
"name": "tuqtlColocClppMaximumNeighborhood",
"nullable": true,
"type": "float"
},
{
"metadata": {},
"name": "tuqtlColocLlrMaximum",
"nullable": true,
"type": "float"
},
{
"metadata": {},
"name": "tuqtlColocLlrMaximumNeighborhood",
"nullable": true,
"type": "float"
}
],
"type": "struct"
Expand Down
12 changes: 8 additions & 4 deletions src/gentropy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,17 @@ class LocusToGeneConfig(StepConfig):
# max clpp for each (study, locus) aggregating over all eQTLs
"eqtlColocClppMaximumNeighborhood",
# max clpp for each (study, locus, gene) aggregating over all pQTLs
# "pqtlColocClppMaximum",
"pqtlColocClppMaximum",
# max clpp for each (study, locus) aggregating over all pQTLs
# "pqtlColocClppMaximumNeighborhood",
"pqtlColocClppMaximumNeighborhood",
# max clpp for each (study, locus, gene) aggregating over all sQTLs
# "sqtlColocClppMaximum",
"sqtlColocClppMaximum",
# max clpp for each (study, locus) aggregating over all sQTLs
# "sqtlColocClppMaximumNeighborhood",
"sqtlColocClppMaximumNeighborhood",
# max clpp for each (study, locus) aggregating over all tuQTLs
"tuqtlColocClppMaximum",
# max clpp for each (study, locus, gene) aggregating over all tuQTLs
"tuqtlColocClppMaximumNeighborhood",
# # max log-likelihood ratio value for each (study, locus, gene) aggregating over all eQTLs
# "eqtlColocLlrLocalMaximum",
# # max log-likelihood ratio value for each (study, locus) aggregating over all eQTLs
Expand Down

0 comments on commit 4dcecc3

Please sign in to comment.