Skip to content

Commit

Permalink
revert(finngen): restore the studyId prefix in finngen cs and si (#856)
Browse files Browse the repository at this point in the history
  • Loading branch information
project-defiant authored Oct 17, 2024
1 parent 9c52397 commit c68a144
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 73 deletions.
5 changes: 3 additions & 2 deletions src/gentropy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,14 @@ class FinngenFinemappingConfig(StepConfig):
"gs://finngen-public-data-r11/finemap/full/susie/*.snp.bgz"
)
finngen_susie_finemapping_cs_summary_files: str = (
"gs://finngen-public-data-r11/finemap/summary/*.cred.summary.tsv"
"gs://finngen-public-data-r11/finemap/summary/*SUSIE.cred.summary.tsv"
)
finngen_finemapping_out: str = MISSING
finngen_finemapping_lead_pvalue_threshold: float = 1e-5
finngen_release_prefix: str = "FINNGEN_R11"
_target_: str = (
"gentropy.finngen_finemapping_ingestion.FinnGenFinemappingIngestionStep"
)
finngen_finemapping_lead_pvalue_threshold: float = 1e-5


@dataclass
Expand Down
11 changes: 9 additions & 2 deletions src/gentropy/datasource/finngen/finemapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def from_finngen_susie_finemapping(
spark: SparkSession,
finngen_susie_finemapping_snp_files: (str | list[str]),
finngen_susie_finemapping_cs_summary_files: (str | list[str]),
finngen_release_prefix: str,
credset_lbf_threshold: float = 0.8685889638065036,
) -> StudyLocus:
"""Process the SuSIE finemapping output for FinnGen studies.
Expand Down Expand Up @@ -261,6 +262,7 @@ def from_finngen_susie_finemapping(
spark (SparkSession): SparkSession object.
finngen_susie_finemapping_snp_files (str | list[str]): SuSIE finemapping output filename(s).
finngen_susie_finemapping_cs_summary_files (str | list[str]): filename of SuSIE finemapping credible set summaries.
finngen_release_prefix (str): Finngen project release prefix. Should look like FINNGEN_R*.
credset_lbf_threshold (float, optional): Filter out credible sets below, Default 0.8685889638065036 == np.log10(np.exp(2)), this is threshold from publication.
Returns:
Expand Down Expand Up @@ -295,7 +297,9 @@ def from_finngen_susie_finemapping(
.filter(f.col("cs").cast(t.IntegerType()) > 0)
.select(
# Add study idenfitier.
f.col("trait").cast(t.StringType()).alias("studyId"),
f.concat_ws("_", f.lit(finngen_release_prefix), f.col("trait"))
.cast(t.StringType())
.alias("studyId"),
f.col("region"),
# Add variant information.
f.regexp_replace(f.col("v"), ":", "_").alias("variantId"),
Expand Down Expand Up @@ -408,7 +412,10 @@ def from_finngen_susie_finemapping(
(f.col("credibleSetlog10BF") > credset_lbf_threshold)
| (f.col("credibleSetIndex") == 1)
)
.withColumn("studyId", f.col("trait"))
.withColumn(
"studyId",
f.concat_ws("_", f.lit(finngen_release_prefix), f.col("trait")),
)
)

processed_finngen_finemapping_df = processed_finngen_finemapping_df.join(
Expand Down
76 changes: 60 additions & 16 deletions src/gentropy/datasource/finngen/study_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import re
from typing import TypedDict
from urllib.request import urlopen

import pyspark.sql.functions as f
Expand All @@ -11,6 +12,13 @@
from gentropy.dataset.study_index import StudyIndex


class FinngenPrefixMatch(TypedDict):
"""Class to store the output of the validate_release_prefix."""

prefix: str
release: str


class FinnGenStudyIndex:
"""Study index dataset from FinnGen.
Expand All @@ -25,11 +33,57 @@ class FinnGenStudyIndex:
Some fields are also populated as constants, such as study type and the initial sample size.
"""

@staticmethod
def validate_release_prefix(release_prefix: str) -> FinngenPrefixMatch:
"""Validate release prefix passed to finngen StudyIndex.
Args:
release_prefix (str): Finngen release prefix, should be a string like FINNGEN_R*.
Returns:
FinngenPrefixMatch: Object containing valid prefix and release strings.
Raises:
ValueError: when incorrect release prefix is provided.
This method ensures that the trailing underscore is removed from prefix.
"""
pattern = re.compile(r"FINNGEN_(?P<release>R\d+){1}_?")
pattern_match = pattern.match(release_prefix)
if not pattern_match:
raise ValueError(
f"Invalid FinnGen release prefix: {release_prefix}, use the format FINNGEN_R*"
)
release = pattern_match.group("release").upper()
if release_prefix.endswith("_"):
release_prefix = release_prefix[:-1]
return FinngenPrefixMatch(prefix=release_prefix, release=release)

@staticmethod
def read_efo_curation(session: SparkSession, url: str) -> DataFrame:
"""Read efo curation from provided url.
Args:
session (SparkSession): Session to use when reading the mapping file.
url (str): Url to the mapping file. The file provided should be a tsv file.
Returns:
DataFrame: DataFrame with EFO mappings.
Example of the file can be found in https://raw.githubusercontent.com/opentargets/curation/refs/heads/master/mappings/disease/manual_string.tsv.
"""
csv_data = urlopen(url).readlines()
csv_rows = [row.decode("utf8") for row in csv_data]
rdd = session.sparkContext.parallelize(csv_rows)
# NOTE: type annotations for spark.read.csv miss the fact that the first param can be [RDD[str]]
efo_curation_mapping_df = session.read.csv(rdd, header=True, sep="\t")
return efo_curation_mapping_df

@staticmethod
def join_efo_mapping(
study_index: StudyIndex,
efo_curation_mapping: DataFrame,
finngen_release_prefix: str,
finngen_release: str,
) -> StudyIndex:
"""Add EFO mapping to the Finngen study index table.
Expand All @@ -44,24 +98,11 @@ def join_efo_mapping(
Args:
study_index (StudyIndex): Study index table.
efo_curation_mapping (DataFrame): Dataframe with EFO mappings.
finngen_release_prefix (str): FinnGen release prefix.
finngen_release (str): FinnGen release.
Returns:
StudyIndex: Study index table with added EFO mappings.
Raises:
ValueError: when incorrect release prefix is provided.
"""
finngen_release_prefix_regex = re.compile(r"FINNGEN_(?P<release>R\d+){1}_?")
finngen_release_prefix_match = finngen_release_prefix_regex.match(
finngen_release_prefix
)
if not finngen_release_prefix_match:
raise ValueError(
f"Invalid FinnGen release prefix: {finngen_release_prefix}, use the format FINNGEN_R*"
)
finngen_release = finngen_release_prefix_match.group("release").upper()

efo_mappings = (
efo_curation_mapping.withColumn("STUDY", f.upper(f.col("STUDY")))
.filter(f.col("STUDY").contains("FINNGEN"))
Expand Down Expand Up @@ -109,9 +150,12 @@ def from_source(
json_data = urlopen(finngen_phenotype_table_url).read().decode("utf-8")
rdd = spark.sparkContext.parallelize([json_data])
raw_df = spark.read.json(rdd)

return StudyIndex(
_df=raw_df.select(
f.concat(f.col("phenocode")).alias("studyId"),
f.concat(
f.concat_ws("_", f.lit(finngen_release_prefix), f.col("phenocode"))
).alias("studyId"),
f.col("phenostring").alias("traitFromSource"),
f.col("num_cases").cast("integer").alias("nCases"),
f.col("num_controls").cast("integer").alias("nControls"),
Expand Down
62 changes: 35 additions & 27 deletions src/gentropy/datasource/finngen_ukb_meta/study_index.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Study Index for Finngen data source."""

from __future__ import annotations

from urllib.request import urlopen
Expand Down Expand Up @@ -32,35 +33,42 @@ def from_source(
StudyIndex: Parsed and annotated FinnGen UKB meta-analysis study table.
"""
# Read the raw study index and process.
study_index_df = (
spark.read.csv(raw_study_index_path_from_tsv, sep="\t", header=True)
.select(
f.lit("gwas").alias("studyType"),
f.lit("FINNGEN_R11_UKB_META").alias("projectId"),
f.col("_gentropy_study_id").alias("studyId"),
f.col("name").alias("traitFromSource"),
f.lit(True).alias("hasSumstats"),
f.col("_gentropy_summary_stats_link").alias("summarystatsLocation"),
(f.col("fg_n_cases") + f.col("ukbb_n_cases") + f.col("fg_n_controls") + f.col("ukbb_n_controls")).cast("integer").alias("nSamples"),
f.array(
f.struct(
(f.col("fg_n_cases") + f.col("fg_n_controls")).cast("integer").alias("sampleSize"),
f.lit("Finnish").alias("ancestry"),
),
f.struct(
(f.col("ukbb_n_cases") + f.col("ukbb_n_controls")).cast("integer").alias("sampleSize"),
f.lit("European").alias("ancestry"),
),
).alias("discoverySamples"),
study_index_df = spark.read.csv(
raw_study_index_path_from_tsv, sep="\t", header=True
).select(
f.lit("gwas").alias("studyType"),
f.lit("FINNGEN_R11_UKB_META").alias("projectId"),
f.col("_gentropy_study_id").alias("studyId"),
f.col("name").alias("traitFromSource"),
f.lit(True).alias("hasSumstats"),
f.col("_gentropy_summary_stats_link").alias("summarystatsLocation"),
(
f.col("fg_n_cases")
+ f.col("ukbb_n_cases")
+ f.col("fg_n_controls")
+ f.col("ukbb_n_controls")
)
.cast("integer")
.alias("nSamples"),
f.array(
f.struct(
(f.col("fg_n_cases") + f.col("fg_n_controls"))
.cast("integer")
.alias("sampleSize"),
f.lit("Finnish").alias("ancestry"),
),
f.struct(
(f.col("ukbb_n_cases") + f.col("ukbb_n_controls"))
.cast("integer")
.alias("sampleSize"),
f.lit("European").alias("ancestry"),
),
).alias("discoverySamples"),
)
# Add population structure.
study_index_df = (
study_index_df
.withColumn(
"ldPopulationStructure",
cls.aggregate_and_map_ancestries(f.col("discoverySamples")),
)
study_index_df = study_index_df.withColumn(
"ldPopulationStructure",
cls.aggregate_and_map_ancestries(f.col("discoverySamples")),
)
# Create study index.
study_index = StudyIndex(
Expand All @@ -75,6 +83,6 @@ def from_source(
study_index = FinnGenStudyIndex.join_efo_mapping(
study_index,
efo_curation_mapping,
finngen_release_prefix="FINNGEN_R11",
finngen_release="R11",
)
return study_index
8 changes: 7 additions & 1 deletion src/gentropy/finngen_finemapping_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from gentropy.common.session import Session
from gentropy.config import FinngenFinemappingConfig
from gentropy.datasource.finngen.finemapping import FinnGenFinemapping
from gentropy.datasource.finngen.study_index import FinnGenStudyIndex


@dataclass
Expand All @@ -21,6 +22,7 @@ def __init__(
finngen_susie_finemapping_snp_files: str = FinngenFinemappingConfig().finngen_susie_finemapping_snp_files,
finngen_susie_finemapping_cs_summary_files: str = FinngenFinemappingConfig().finngen_susie_finemapping_cs_summary_files,
finngen_finemapping_lead_pvalue_threshold: float = FinngenFinemappingConfig().finngen_finemapping_lead_pvalue_threshold,
finngen_release_prefix: str = FinngenFinemappingConfig().finngen_release_prefix,
) -> None:
"""Run FinnGen finemapping ingestion step.
Expand All @@ -30,14 +32,18 @@ def __init__(
finngen_susie_finemapping_snp_files(str): Path to the FinnGen SuSIE finemapping results.
finngen_susie_finemapping_cs_summary_files (str): FinnGen SuSIE summaries for CS filters(LBF>2).
finngen_finemapping_lead_pvalue_threshold (float): Lead p-value threshold.
finngen_release_prefix (str): Finngen project release prefix. Should look like FINNGEN_R*.
"""
# Read finemapping outputs from the input paths.

finngen_release_prefix = FinnGenStudyIndex.validate_release_prefix(
finngen_release_prefix
)["prefix"]
(
FinnGenFinemapping.from_finngen_susie_finemapping(
spark=session.spark,
finngen_susie_finemapping_snp_files=finngen_susie_finemapping_snp_files,
finngen_susie_finemapping_cs_summary_files=finngen_susie_finemapping_cs_summary_files,
finngen_release_prefix=finngen_release_prefix,
)
# Flagging sub-significnat loci:
.validate_lead_pvalue(
Expand Down
24 changes: 11 additions & 13 deletions src/gentropy/finngen_studies.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from __future__ import annotations

from urllib.request import urlopen

from gentropy.common.session import Session
from gentropy.config import FinngenStudiesConfig
from gentropy.datasource.finngen.study_index import FinnGenStudyIndex
Expand Down Expand Up @@ -35,26 +33,26 @@ def __init__(
efo_curation_mapping_url (str): URL to the EFO curation mapping file
sample_size (int): Number of individuals that participated in sample collection, derived from finngen release metadata.
"""
_match = FinnGenStudyIndex.validate_release_prefix(finngen_release_prefix)
release_prefix = _match["prefix"]
release = _match["release"]

efo_curation_df = FinnGenStudyIndex.read_efo_curation(
session.spark,
efo_curation_mapping_url,
)
study_index = FinnGenStudyIndex.from_source(
session.spark,
finngen_phenotype_table_url,
finngen_release_prefix,
release_prefix,
finngen_summary_stats_url_prefix,
finngen_summary_stats_url_suffix,
sample_size,
)

# NOTE: hack to allow spark to read directly from the URL.
csv_data = urlopen(efo_curation_mapping_url).readlines()
csv_rows = [row.decode("utf8") for row in csv_data]
rdd = session.spark.sparkContext.parallelize(csv_rows)
# NOTE: type annotations for spark.read.csv miss the fact that the first param can be [RDD[str]]
efo_curation_mapping = session.spark.read.csv(rdd, header=True, sep="\t")

study_index_with_efo = FinnGenStudyIndex.join_efo_mapping(
study_index,
efo_curation_mapping,
finngen_release_prefix,
efo_curation_df,
release,
)
study_index_with_efo.df.write.mode(session.write_mode).parquet(
finngen_study_index_out
Expand Down
4 changes: 4 additions & 0 deletions tests/gentropy/datasource/finngen/test_finngen_finemapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def test_finngen_finemapping_from_finngen_susie_finemapping(
spark=spark,
finngen_susie_finemapping_snp_files=finngen_susie_finemapping_snp_files,
finngen_susie_finemapping_cs_summary_files=finngen_susie_finemapping_cs_summary_files,
finngen_release_prefix="FINNGEN_R11",
),
StudyLocus,
)
Expand Down Expand Up @@ -77,9 +78,12 @@ def test_finngen_finemapping_ingestion_step(
finngen_susie_finemapping_cs_summary_files=finngen_susie_finemapping_cs_summary_files,
finngen_susie_finemapping_snp_files=finngen_susie_finemapping_snp_files,
finngen_finemapping_lead_pvalue_threshold=1e-5,
finngen_release_prefix="FINNGEN_R11",
)
assert output_path.is_dir()
assert (output_path / "_SUCCESS").exists()

cs = StudyLocus.from_parquet(session=session, path=str(output_path))
assert cs.df.count() == 1
study_id: str = cs.df.select("studyId").collect()[0]["studyId"]
assert study_id.startswith("FINNGEN_R11_")
Loading

0 comments on commit c68a144

Please sign in to comment.