Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix alphadia params parsing #428

Merged
merged 7 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 116 additions & 47 deletions proteobench/io/params/alphadia.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
ANSI_REGEX = re.compile(r"(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]")


# Function to clean up lines
def clean_line(line: str) -> str:
line = ANSI_REGEX.sub("", line)
return line.strip()


def parse_line(line: str) -> Tuple[str, dict, int]:
"""
Parse a log line into a tuple.
Expand All @@ -28,23 +34,82 @@ def parse_line(line: str) -> Tuple[str, dict, int]:
The indentation level
"""
# Remove the info part and convert ansi
line = ANSI_REGEX.sub("", line[22:].strip())
line = clean_line(line[22:])
# Split the string to tab part and setting part
tab, setting = line.split("──")
setting_list = setting.split(":")
if len(setting_list) == 1:
setting_dict = {setting_list[0]: None}
setting_dict = {setting_list[0]: (None, None)}
else:
setting_dict = {setting_list[0]: setting_list[1]}
value = setting_list[1].strip()
if "(user defined)" in value:
value = value.replace("(user defined)", "").strip()
setting_dict = {setting_list[0]: (value, "user defined")}
elif "(default)" in value:
value = value.replace("(default)", "").strip()
setting_dict = {setting_list[0]: (value, "default")}

else:
setting_dict = {setting_list[0]: (value, None)}

# Convert tab to level
level = levels.index(len(tab))

# Return header, parsed setting, and the level
return setting_list[0], setting_dict, level


def process_nested_values(
header_prev: str, current_header: Optional[str], nested_values: list, line_dict_next: dict, section: dict
) -> Tuple[Optional[str], list]:
"""
Processes nested values from a given line dictionary and updates the section dictionary.

Args:
header_prev (str): The previous header string.
current_header (Optional[str]): The current header string, which can be None.
nested_values (list): A list of nested values to be updated.
line_dict_next (dict): A dictionary representing the next line to be processed.
section (dict): A dictionary representing the section to be updated.

Returns:
Tuple[Optional[str], list]: A tuple containing the updated current header and the list of nested values.
"""
if current_header is None or current_header != header_prev:
nested_values = []
current_header = header_prev

# Collect all values under this nested section
value = list(line_dict_next.keys())[0].split()[0] # Extract value before space
nested_values.append(int(value))

# If "user defined", overwrite the default
if "(user defined)" in list(line_dict_next.keys())[0]:
nested_values.pop(-2)

section[header_prev] = nested_values
return current_header, nested_values


def update_section_with_line_dict(section: dict, line_dict_next: dict) -> None:
"""
Update the section dictionary with values from line_dict_next.

Parameters
----------
section: dict
The section dictionary to update.
line_dict_next: dict
Dictionary containing the key-value pairs to update in the section.
"""
for key, (value, flag) in line_dict_next.items():
if key in section and flag == "user defined":
section[key] = value
elif key not in section:
section[key] = value


def parse_section(
line: str,
line: Tuple[str, dict, int],
line_generator: Iterable,
) -> Tuple[dict, int, Optional[Tuple]]:
"""
Expand Down Expand Up @@ -78,46 +143,63 @@ def parse_section(
# Get the next line to know what to do
next_line = next(line_generator)
header_next, line_dict_next, level_next = parse_line(next_line)
except:
# If no lines left, go up a level, returning the sectino so far
return section, 0, None
except StopIteration:
# If no lines left, go up a level, returning the section so far
return {k: v[0] if isinstance(v, tuple) else v for k, v in section.items()}, 0, None

nested_values = []
current_header = None

while True:
# If no more lines go up a level
try:
header_next, line_dict_next, level_next = parse_line(next_line)
except:
except StopIteration:
break

if not isinstance(line_dict_next, dict):
continue

# If the next line is start of new section again
if level_next > level_prev:
# Get the subsection

subsection, _, next_line = parse_section(
line=parse_line(next_line),
line_generator=line_generator,
)
# Add this subsection to new section
# A new line is already outputted so continue
section[header_prev] = subsection
continue
if header_prev in ["precursor_len", "precursor_charge", "precursor_mz", "fragment_mz"]:
current_header, nested_values = process_nested_values(
header_prev, current_header, nested_values, line_dict_next, section
)
try:
next_line = next(line_generator)
continue
except StopIteration:
break

else:
# Get the subsection

subsection, _, next_line = parse_section(
line=parse_line(next_line),
line_generator=line_generator,
)
# Add this subsection to new section
# A new line is already outputted so continue
section[header_prev] = subsection
continue

# if new line is at same level
elif level_prev == level_next:
section.update(line_dict_next)
update_section_with_line_dict(section, line_dict_next)
header_prev = header_next
level_prev = level_next
try:
next_line = next(line_generator)
except:
except StopIteration:
break

# The next line needs to go up and output the section
# Also the new line should be returned
else:
return section, level_next, next_line
break

return section, 0, None
return {k: v[0] if isinstance(v, tuple) else v for k, v in section.items()}, level_next, next_line


def extract_file_version(line: str) -> str:
Expand Down Expand Up @@ -166,18 +248,11 @@ def add_fdr_parameters(parameter_dict: dict, parsed_settings: dict) -> None:


def get_min_max(list_of_elements: list) -> Tuple[int, int]:
if "(user defined)" in list_of_elements[1]:
min_value = int(list_of_elements[1].replace("(user defined)", ""))
if len(list_of_elements) == 4:
max_value = int(list_of_elements[3].replace("(user defined)", ""))
else:
max_value = int(list_of_elements[2])
min_value = int(list_of_elements[0])
if len(list_of_elements) == 3:
max_value = int(list_of_elements[2])
else:
min_value = int(list_of_elements[0])
if len(list_of_elements) == 3:
max_value = int(list_of_elements[2].replace("(user defined)", ""))
else:
max_value = int(list_of_elements[1])
max_value = int(list_of_elements[1])
return min_value, max_value


Expand All @@ -193,24 +268,18 @@ def extract_params(fname: str) -> ProteoBenchParameters:

parsed_settings, level, line = parse_section(line=parse_line(first_line), line_generator=line_generator)

peptide_lengths = get_min_max(list(parsed_settings["library_prediction"]["precursor_len"].keys()))
precursor_charges = get_min_max(list(parsed_settings["library_prediction"]["precursor_charge"].keys()))
peptide_lengths = get_min_max(parsed_settings["library_prediction"]["precursor_len"])
precursor_charges = get_min_max(parsed_settings["library_prediction"]["precursor_charge"])

if "(user defined)" in parsed_settings["search"]["target_ms1_tolerance"]:
prec_tol = float(parsed_settings["search"]["target_ms1_tolerance"].replace("(user defined)", ""))
else:
prec_tol = float(parsed_settings["search"]["target_ms1_tolerance"])
if "(user defined)" in parsed_settings["search"]["target_ms2_tolerance"]:
frag_tol = float(parsed_settings["search"]["target_ms2_tolerance"].replace("(user defined)", ""))
else:
frag_tol = float(parsed_settings["search"]["target_ms2_tolerance"])
prec_tol = float(parsed_settings["search"]["target_ms1_tolerance"])
frag_tol = float(parsed_settings["search"]["target_ms2_tolerance"])

parameters = {
"software_name": "AlphaDIA",
"search_engine": "AlphaDIA",
"software_version": version,
"search_engine_version": version,
"enable_match_between_runs": "?",
"enable_match_between_runs": False, # Not in AlphaDIA AFAIK
"precursor_mass_tolerance": prec_tol,
"fragment_mass_tolerance": frag_tol,
"enzyme": parsed_settings["library_prediction"]["enzyme"].strip(),
Expand All @@ -221,8 +290,8 @@ def extract_params(fname: str) -> ProteoBenchParameters:
"max_precursor_charge": precursor_charges[1],
"fixed_mods": parsed_settings["library_prediction"]["fixed_modifications"].strip(),
"variable_mods": parsed_settings["library_prediction"]["variable_modifications"].strip(),
"max_mods": int(parsed_settings["library_prediction"]["max_var_mod_num"].replace("(user defined)", "")),
"scan_window": int(parsed_settings["selection_config"]["max_size_rt"].replace("(user defined)", "")),
"max_mods": int(parsed_settings["library_prediction"]["max_var_mod_num"]),
"scan_window": int(parsed_settings["selection_config"]["max_size_rt"]),
"quantification_method_DIANN": None,
"second_pass": None,
"protein_inference": parsed_settings["fdr"]["inference_strategy"].strip(),
Expand Down
13 changes: 9 additions & 4 deletions proteobench/modules/quant_base/quant_base_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,17 @@ def add_current_data_point(

current_datapoint["old_new"] = "new"

if current_datapoint["intermediate_hash"] not in all_datapoints.loc["intermediate_hash", :].values:
all_datapoints.loc["old_new", :] = "old"
# TODO: this doesn't work outside of the web interface, because the intermediate_hash is not present without the old datapoints. Temp fix with try except
try:
if current_datapoint["intermediate_hash"] not in all_datapoints.loc["intermediate_hash", :].values:
all_datapoints.loc["old_new", :] = "old"
all_datapoints_new = pd.concat([all_datapoints, current_datapoint], axis=1)
all_datapoints_new = all_datapoints_new.T.reset_index(drop=True)
else:
all_datapoints_new = all_datapoints.T.reset_index(drop=True)
except KeyError: # if there is no intermediate_hash, because of local use
all_datapoints_new = pd.concat([all_datapoints, current_datapoint], axis=1)
all_datapoints_new = all_datapoints_new.T.reset_index(drop=True)
else:
all_datapoints_new = all_datapoints.T.reset_index(drop=True)

return all_datapoints_new

Expand Down
Loading