Skip to content
This repository has been archived by the owner on May 6, 2024. It is now read-only.

More fixes for type annotations #128

Merged
merged 12 commits into from
Nov 1, 2023
22 changes: 0 additions & 22 deletions bin/check_samplesheet.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,6 @@ def print_error(error, context="Line", context_str=""):
sys.exit(1)


def sniff_format(handle):
"""
Detect the tabular format.

Args:
handle (text file): A handle to a `text file`_ object. The read position is
expected to be at the beginning (index 0).

Returns:
csv.Dialect: The detected tabular format.

.. _text file:
https://docs.python.org/3/glossary.html#term-text-file

"""
peek = read_head(handle)
handle.seek(0)
sniffer = csv.Sniffer()
dialect = sniffer.sniff(peek)
return dialect


def check_sdrf(check_ms, sdrf):
df = SdrfDataFrame.parse(sdrf)
errors = df.validate(DEFAULT_TEMPLATE)
Expand Down
94 changes: 41 additions & 53 deletions bin/diann_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, List, Tuple, Dict, Set
from typing import Any, List, Tuple, Dict, Set, Union

import click
import numpy as np
Expand Down Expand Up @@ -196,67 +196,53 @@ def get_exp_design_dfs(exp_design_file):
return s_DataFrame, f_table


@dataclass
class DiannDirectory:
base_path: os.PathLike
diann_version_file: str

def __post_init__(self):
self.base_path = Path(self.base_path)
def __init__(self, base_path, diann_version_file):
self.base_path = Path(base_path)
if not self.base_path.exists() and not self.base_path.is_dir():
raise NotADirectoryError(f"Path {self.base_path} does not exist")
self.diann_version_file = Path(self.diann_version_file)
self.diann_version_file = Path(diann_version_file)
if not self.diann_version_file.is_file():
raise FileNotFoundError(f"Path {self.diann_version_file} does not exist")

def find_suffix_file(self, suffix: str, only_first=True) -> os.PathLike:
def find_first_file_with_suffix(self, suffix: str) -> os.PathLike:
"""Finds a file with a given suffix in the directory.

:param suffix: The suffix to search for
:type suffix: str
:param only_first: Whether to return only the first file found, if false returns all, defaults to True
:type only_first: bool, optional

:raises FileNotFoundError: If no file with the given suffix is found
"""
matching = self.base_path.glob(f"**/*{suffix}")
if only_first:
try:
return next(matching)
except StopIteration:
raise FileNotFoundError(f"Could not find file with suffix {suffix}")
else:
out = list(matching)
if len(out) == 0:
raise FileNotFoundError(f"Could not find file with suffix {suffix}")
else:
return out
try:
return next(self.base_path.glob(f"**/*{suffix}"))
except StopIteration:
raise FileNotFoundError(f"Could not find file with suffix {suffix}")

@property
def report(self) -> os.PathLike:
return self.find_suffix_file("report.tsv")
return self.find_first_file_with_suffix("report.tsv")

@property
def pg_matrix(self) -> os.PathLike:
return self.find_suffix_file("pg_matrix.tsv")
return self.find_first_file_with_suffix("pg_matrix.tsv")

@property
def pr_matrix(self) -> os.PathLike:
return self.find_suffix_file("pr_matrix.tsv")
return self.find_first_file_with_suffix("pr_matrix.tsv")

@property
def fasta(self) -> os.PathLike:
try:
return self.find_suffix_file(".fasta")
return self.find_first_file_with_suffix(".fasta")
except FileNotFoundError:
return self.find_suffix_file(".fa")
return self.find_first_file_with_suffix(".fa")

@property
def ms_info(self) -> os.PathLike:
return self.find_suffix_file("ms_info.tsv")
return self.find_first_file_with_suffix("ms_info.tsv")

@property
def validate_diann_version(self) -> str:
def diann_version(self) -> str:
logger.debug("Validating DIANN version")
diann_version_id = None
with open(self.diann_version_file) as f:
Expand All @@ -267,18 +253,19 @@ def validate_diann_version(self) -> str:

if diann_version_id is None:
raise ValueError(f"Could not find DIA-NN version in file {self.diann_version_file}")
elif diann_version_id == "1.8.1":
return diann_version_id
else:
# Maybe this error should be detected beforehand to save time ...
raise ValueError(f"Unsupported DIANN version {diann_version_id}")

return diann_version_id

def validate_diann_version(self) -> None:
supported_diann_versions = ["1.8.1"]
if self.diann_version not in supported_diann_versions:
raise ValueError(f"Unsupported DIANN version {self.diann_version}")

def convert_to_mztab(
self, report, f_table, charge: int, missed_cleavages: int, dia_params: List[Any], out: os.PathLike
) -> None:
logger.info("Converting to mzTab")
# Convert to mzTab
self.validate_diann_version
self.validate_diann_version()

# This could be a branching point if we want to support other versions
# of DIA-NN, maybe something like this:
Expand All @@ -288,7 +275,7 @@ def convert_to_mztab(
# raise ValueError(f"Unsupported DIANN version {diann_version_id}, supported versions are 1.8.1 ...")

logger.info(f"Reading fasta file: {self.fasta}")
entries = []
entries: list = []
f = FASTAFile()
f.load(str(self.fasta), entries)
fasta_entries = [(e.identifier, e.sequence, len(e.sequence)) for e in entries]
Expand All @@ -301,7 +288,7 @@ def convert_to_mztab(
index_ref["study_variable"] = index_ref["study_variable"].astype("int")
report = report.merge(index_ref[["ms_run", "Run", "study_variable"]], on="Run", validate="many_to_one")

(MTD, database) = mztab_MTD(index_ref, dia_params, str(self.fasta), charge, missed_cleavages)
MTD, database = mztab_MTD(index_ref, dia_params, str(self.fasta), charge, missed_cleavages)
pg = pd.read_csv(
self.pg_matrix,
sep="\t",
Expand Down Expand Up @@ -749,7 +736,7 @@ def mztab_PEH(
.pivot(columns=["ms_run"], index="precursor.Index")
.reset_index()
)
tmp.columns = ["::".join([str(s) for s in col]).strip() for col in tmp.columns.values]
tmp.columns = pd.Index(["::".join([str(s) for s in col]).strip() for col in tmp.columns.values])
subname_mapper = {
"precursor.Index::::": "precursor.Index",
"Q.Value::min": "search_engine_score[1]_ms_run",
Expand Down Expand Up @@ -805,7 +792,7 @@ def mztab_PEH(

logger.debug("Re-ordering columns...")
out_mztab_PEH.loc[:, "PEH"] = "PEP"
out_mztab_PEH.loc[:, "database"] = database
out_mztab_PEH.loc[:, "database"] = str(database)
index = out_mztab_PEH.loc[:, "PEH"]
out_mztab_PEH.drop(["PEH", "Precursor.Id", "Genes", "pr_id"], axis=1, inplace=True)
out_mztab_PEH.insert(0, "PEH", index)
Expand Down Expand Up @@ -835,15 +822,15 @@ def mztab_PSH(report, folder, database):
"""
logger.info("Constructing PSH sub-table")

def __find_info(dir, n):
def __find_info(directory, n):
# This line matches n="220101_myfile", folder="." to
# "myfolder/220101_myfile_ms_info.tsv"
files = list(Path(dir).glob(f"*{n}*_info.tsv"))
files = list(Path(directory).glob(f"*{n}*_info.tsv"))
# Check that it matches one and only one file
if not files:
raise ValueError(f"Could not find {n} info file in {dir}")
raise ValueError(f"Could not find {n} info file in {directory}")
if len(files) > 1:
raise ValueError(f"Found multiple {n} info files in {dir}: {files}")
raise ValueError(f"Found multiple {n} info files in {directory}: {files}")

return files[0]

Expand Down Expand Up @@ -1065,7 +1052,7 @@ def make_lookup_dict(self, report) -> Dict[str, Tuple[str, float]]:
}
return out

def get_score(self, protein_id: str) -> float:
def get_score(self, protein_id: str) -> Tuple[Union[str, float], float]:
"""Returns a tuple contains modified sequences and the score at protein level.

Gets the best score and corresponding peptide for a given protein_id
Expand Down Expand Up @@ -1207,7 +1194,9 @@ def per_peptide_study_report(report: pd.DataFrame) -> pd.DataFrame:
.pivot(columns=["study_variable"], index="precursor.Index")
.reset_index()
)
pep_study_grouped.columns = ["::".join([str(s) for s in col]).strip() for col in pep_study_grouped.columns.values]
pep_study_grouped.columns = pd.Index(
["::".join([str(s) for s in col]).strip() for col in pep_study_grouped.columns.values]
)
# Columns here would be like:
# [
# "precursor.Index::::",
Expand Down Expand Up @@ -1276,10 +1265,9 @@ def calculate_coverage(ref_sequence: str, sequences: Set[str]):
local_start += 1

# merge overlapping intervals
starts, lengths = zip(*sorted(zip(starts, lengths)))
merged_starts = []
merged_lengths = []
for start, length in zip(starts, lengths):
merged_starts: list = []
merged_lengths: list = []
for start, length in zip(*sorted(zip(starts, lengths))):
if merged_starts and merged_starts[-1] + merged_lengths[-1] >= start:
merged_lengths[-1] = max(merged_starts[-1] + merged_lengths[-1], start + length) - merged_starts[-1]
else:
Expand Down Expand Up @@ -1311,7 +1299,7 @@ def calculate_protein_coverages(report: pd.DataFrame, out_mztab_PRH: pd.DataFram
ids_to_seqs = dict(zip(nested_df["Protein.Ids"], nested_df["Stripped.Sequence"]))
acc_to_ids = dict(zip(out_mztab_PRH["accession"], out_mztab_PRH["Protein.Ids"]))
fasta_id_to_seqs = dict(zip(fasta_df["id"], fasta_df["seq"]))
acc_to_fasta_ids = {}
acc_to_fasta_ids: dict = {}

# Since fasta ids are something like sp|P51451|BLK_HUMAN but
# accessions are something like Q9Y6V7-2, we need to find a
Expand All @@ -1330,7 +1318,7 @@ def calculate_protein_coverages(report: pd.DataFrame, out_mztab_PRH: pd.DataFram
# it entails more un-matched characters.
acc_to_fasta_ids[acc] = min(matches, key=len)

out = [None] * len(out_mztab_PRH["accession"])
out: List[str] = [""] * len(out_mztab_PRH["accession"])

for i, acc in enumerate(out_mztab_PRH["accession"]):
f_id = acc_to_fasta_ids[acc]
Expand Down
4 changes: 2 additions & 2 deletions modules/local/individual_final_analysis/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ process INDIVIDUAL_FINAL_ANALYSIS {
def args = task.ext.args ?: ''
scan_window = params.scan_window

if (params.mass_acc_automatic | params.scan_window_automatic){
if (params.mass_acc_automatic | params.scan_window_automatic) {
mass_acc_ms2 = "\$(cat ${diann_log} | grep \"Averaged recommended settings\" | cut -d ' ' -f 11 | tr -cd \"[0-9]\")"
scan_window = "\$(cat ${diann_log} | grep \"Averaged recommended settings\" | cut -d ' ' -f 19 | tr -cd \"[0-9]\")"
mass_acc_ms1 = "\$(cat ${diann_log} | grep \"Averaged recommended settings\" | cut -d ' ' -f 15 | tr -cd \"[0-9]\")"
} else if (meta['precursormasstoleranceunit'].toLowerCase().endsWith('ppm') && meta['fragmentmasstoleranceunit'].toLowerCase().endsWith('ppm')){
} else if (meta['precursormasstoleranceunit'].toLowerCase().endsWith('ppm') && meta['fragmentmasstoleranceunit'].toLowerCase().endsWith('ppm')) {
mass_acc_ms1 = meta["precursormasstolerance"]
mass_acc_ms2 = meta["fragmentmasstolerance"]
} else {
Expand Down
Loading