Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix alphadia params parsing #428

Merged
merged 7 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 90 additions & 51 deletions proteobench/io/params/alphadia.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
ANSI_REGEX = re.compile(r"(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]")


# Function to clean up "(default)" and "(user defined)" substrings
def clean_line(line: str) -> str:
line = ANSI_REGEX.sub("", line)
return line.strip()


def parse_line(line: str) -> Tuple[str, dict, int]:
"""
Parse a log line into a tuple.
Expand All @@ -28,23 +34,35 @@ def parse_line(line: str) -> Tuple[str, dict, int]:
The indentation level
"""
# Remove the info part and convert ansi
line = ANSI_REGEX.sub("", line[22:].strip())
# Split the string to tab part and setting part
tab, setting = line.split("──")
setting_list = setting.split(":")
if len(setting_list) == 1:
setting_dict = {setting_list[0]: None}
else:
setting_dict = {setting_list[0]: setting_list[1]}
# Convert tab to level
level = levels.index(len(tab))

line = clean_line(line[22:])
try:
# Split the string to tab part and setting part
tab, setting = line.split("──")
setting_list = setting.split(":")
if len(setting_list) == 1:
setting_dict = {setting_list[0]: (None, None)}
else:
value = setting_list[1].strip()
if "(user defined)" in value:
value = value.replace("(user defined)", "").strip()
setting_dict = {setting_list[0]: (value, "user defined")}
elif "(default)" in value:
value = value.replace("(default)", "").strip()
setting_dict = {setting_list[0]: (value, "default")}

else:
setting_dict = {setting_list[0]: (value, None)}

# Convert tab to level
level = levels.index(len(tab))
except:
return "", {}, 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, when does this happen?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*The exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never, that one should've been removed :-), was for debugging purposes

# Return header, parsed setting, and the level
return setting_list[0], setting_dict, level


def parse_section(
line: str,
line: Tuple[str, dict, int],
line_generator: Iterable,
) -> Tuple[dict, int, Optional[Tuple]]:
"""
Expand Down Expand Up @@ -79,8 +97,11 @@ def parse_section(
next_line = next(line_generator)
header_next, line_dict_next, level_next = parse_line(next_line)
except:
# If no lines left, go up a level, returning the sectino so far
return section, 0, None
# If no lines left, go up a level, returning the section so far
return {k: v[0] if isinstance(v, tuple) else v for k, v in section.items()}, 0, None

nested_values = []
current_header = None

while True:
# If no more lines go up a level
Expand All @@ -89,22 +110,53 @@ def parse_section(
except:
break

if not isinstance(line_dict_next, dict):
continue

# If the next line is start of new section again
if level_next > level_prev:
# Get the subsection

subsection, _, next_line = parse_section(
line=parse_line(next_line),
line_generator=line_generator,
)
# Add this subsection to new section
# A new line is already outputted so continue
section[header_prev] = subsection
continue
if header_prev in ["precursor_len", "precursor_charge", "precursor_mz", "fragment_mz"]:
if current_header is None or current_header != header_prev:
nested_values = []
current_header = header_prev

# Collect all values under this nested section
value = list(line_dict_next.keys())[0].split()[0] # Extract value before space
nested_values.append(int(value))

# If "user defined", overwrite the default
if "(user defined)" in list(line_dict_next.keys())[0]:
nested_values.pop(-2)

# Save the values in the section
section[header_prev] = nested_values

try:
next_line = next(line_generator)
continue
except:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider putting this if in a different function. Add specific error to the except. IOError?

break

else:
# Get the subsection

subsection, _, next_line = parse_section(
line=parse_line(next_line),
line_generator=line_generator,
)
# Add this subsection to new section
# A new line is already outputted so continue
section[header_prev] = subsection
continue

# if new line is at same level
elif level_prev == level_next:
section.update(line_dict_next)
if isinstance(line_dict_next, dict):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put into function

for key, (value, flag) in line_dict_next.items():
if key in section and flag == "user defined":
section[key] = value
elif key not in section:
section[key] = value
header_prev = header_next
level_prev = level_next
try:
Expand All @@ -115,9 +167,9 @@ def parse_section(
# The next line needs to go up and output the section
# Also the new line should be returned
else:
return section, level_next, next_line
break

return section, 0, None
return {k: v[0] if isinstance(v, tuple) else v for k, v in section.items()}, level_next, next_line


def extract_file_version(line: str) -> str:
Expand Down Expand Up @@ -166,18 +218,11 @@ def add_fdr_parameters(parameter_dict: dict, parsed_settings: dict) -> None:


def get_min_max(list_of_elements: list) -> Tuple[int, int]:
if "(user defined)" in list_of_elements[1]:
min_value = int(list_of_elements[1].replace("(user defined)", ""))
if len(list_of_elements) == 4:
max_value = int(list_of_elements[3].replace("(user defined)", ""))
else:
max_value = int(list_of_elements[2])
min_value = int(list_of_elements[0])
if len(list_of_elements) == 3:
max_value = int(list_of_elements[2])
else:
min_value = int(list_of_elements[0])
if len(list_of_elements) == 3:
max_value = int(list_of_elements[2].replace("(user defined)", ""))
else:
max_value = int(list_of_elements[1])
max_value = int(list_of_elements[1])
return min_value, max_value


Expand All @@ -193,24 +238,18 @@ def extract_params(fname: str) -> ProteoBenchParameters:

parsed_settings, level, line = parse_section(line=parse_line(first_line), line_generator=line_generator)

peptide_lengths = get_min_max(list(parsed_settings["library_prediction"]["precursor_len"].keys()))
precursor_charges = get_min_max(list(parsed_settings["library_prediction"]["precursor_charge"].keys()))
peptide_lengths = get_min_max(parsed_settings["library_prediction"]["precursor_len"])
precursor_charges = get_min_max(parsed_settings["library_prediction"]["precursor_charge"])

if "(user defined)" in parsed_settings["search"]["target_ms1_tolerance"]:
prec_tol = float(parsed_settings["search"]["target_ms1_tolerance"].replace("(user defined)", ""))
else:
prec_tol = float(parsed_settings["search"]["target_ms1_tolerance"])
if "(user defined)" in parsed_settings["search"]["target_ms2_tolerance"]:
frag_tol = float(parsed_settings["search"]["target_ms2_tolerance"].replace("(user defined)", ""))
else:
frag_tol = float(parsed_settings["search"]["target_ms2_tolerance"])
prec_tol = float(parsed_settings["search"]["target_ms1_tolerance"])
frag_tol = float(parsed_settings["search"]["target_ms2_tolerance"])

parameters = {
"software_name": "AlphaDIA",
"search_engine": "AlphaDIA",
"software_version": version,
"search_engine_version": version,
"enable_match_between_runs": "?",
"enable_match_between_runs": False, # Not in AlphaDIA AFAIK
"precursor_mass_tolerance": prec_tol,
"fragment_mass_tolerance": frag_tol,
"enzyme": parsed_settings["library_prediction"]["enzyme"].strip(),
Expand All @@ -221,8 +260,8 @@ def extract_params(fname: str) -> ProteoBenchParameters:
"max_precursor_charge": precursor_charges[1],
"fixed_mods": parsed_settings["library_prediction"]["fixed_modifications"].strip(),
"variable_mods": parsed_settings["library_prediction"]["variable_modifications"].strip(),
"max_mods": int(parsed_settings["library_prediction"]["max_var_mod_num"].replace("(user defined)", "")),
"scan_window": int(parsed_settings["selection_config"]["max_size_rt"].replace("(user defined)", "")),
"max_mods": int(parsed_settings["library_prediction"]["max_var_mod_num"]),
"scan_window": int(parsed_settings["selection_config"]["max_size_rt"]),
"quantification_method_DIANN": None,
"second_pass": None,
"protein_inference": parsed_settings["fdr"]["inference_strategy"].strip(),
Expand Down
13 changes: 9 additions & 4 deletions proteobench/modules/quant_base/quant_base_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,17 @@ def add_current_data_point(

current_datapoint["old_new"] = "new"

if current_datapoint["intermediate_hash"] not in all_datapoints.loc["intermediate_hash", :].values:
all_datapoints.loc["old_new", :] = "old"
# TODO: this doesn't work outside of the web interface, because the intermediate_hash is not present without the old datapoints. Temp fix with try except
try:
if current_datapoint["intermediate_hash"] not in all_datapoints.loc["intermediate_hash", :].values:
all_datapoints.loc["old_new", :] = "old"
all_datapoints_new = pd.concat([all_datapoints, current_datapoint], axis=1)
all_datapoints_new = all_datapoints_new.T.reset_index(drop=True)
else:
all_datapoints_new = all_datapoints.T.reset_index(drop=True)
except KeyError: # if there is no intermediate_hash, because of local use
all_datapoints_new = pd.concat([all_datapoints, current_datapoint], axis=1)
all_datapoints_new = all_datapoints_new.T.reset_index(drop=True)
else:
all_datapoints_new = all_datapoints.T.reset_index(drop=True)

return all_datapoints_new

Expand Down
Loading