diff --git a/seqteleporter/partition_property_finder/fusion_sites_finder.py b/seqteleporter/partition_property_finder/fusion_sites_finder.py index 4883001..bfd85aa 100644 --- a/seqteleporter/partition_property_finder/fusion_sites_finder.py +++ b/seqteleporter/partition_property_finder/fusion_sites_finder.py @@ -1,46 +1,7 @@ -"""Fusion sites finder functions - -[find_candidate_fusion_sites_for_a_junction(junction_aa, len_fusion_site, codon_table)] - -Identifies and returns a list of unique candidate DNA sequences for fusion sites associated with -a given amino acid sequence junction and maps each candidate fusion site within a sliding window context of the DNA -sequence corresponding to the amino acid junction. - -For a junction AA seq, find all possible DNA seqs: NNN NNN (cut site) NNN NNN. And then, for each possible DNA -seq, slide a window of size=len_fusion_site over from (cut site - len_fusion_site) to (cut site + -len_fusion_site) and get ligation_freq for each candidate fusion site. For example: NN[N NNN] (cut site) NNN NNN --> NNN [NNN (cut site) N]NN NNN -> ... -> NNN NNN (cut site) [NNN N]NN - ------------------------------------------------------------------------------------------------------------------------- -[assign_fusion_sites(s, partition, fidelity_data, satisfaction_fidelity, enzyme, enzyme_info_dic)] - -assigns fusion sites for a partition and gets the ligation fidelity - -TODO: what to do with terminals??? - -s = 'QVQLVQSGGGVVQP' -partition = (6,) -QVQLVQ SGGGVVQP -QVQL[NNN NNN] [NNN NNN]GGVV[NNN NNN] -QVQL[NNN NNN] [NNN NNN]GGVV[NNN NNN] - -Logic: 1. if len(AA)>=4, find all possible dna seq for 2 N-term AAs and 2 C-term AAs; else, raise error: frag too -short. 2. for each cut site, find all possible DNA seqs for the region from 2AA upstream cut site to 2AA downstream -cut site: NNN NNN (cut site) NNN NNN For each possible DNA seq of junction AAs, slide a window of -size=len_fusion_site over from (cut site - len_fusion_site) to (cut site + len_fusion_site) and get ligation_freq for -each candidate fusion site. For example: NN[N NNN] (cut site) NNN NNN -> NNN [NNN (cut site) N]NN NNN -> ... -> NNN -NNN (cut site) [NNN N]NN 3. for each combination of candidate fusion sites, calculate sum(mismatch_lig_freq) and sum( -lig_freq). Find the best combination of candidate fusion sites. - -""" - from itertools import product -from os.path import join, dirname, abspath -from os import listdir import numpy as np import pandas as pd import math -import re from typing import Dict, List, Tuple, Optional, Any from seqteleporter.utils.utils import (is_rev_compliment, frag_in_partition_too_short, is_valid_fusion_site_set, @@ -48,328 +9,415 @@ depth_first_product, breadth_first_product, nearest_first_product) from seqteleporter.partition_property_finder.ligation_fidelity_finder import compute_ligation_fidelity -# @cache -def find_candidate_fusion_sites_for_a_junction( - junction_aa: str, - len_fusion_site: int, - codon_table: Dict[str, List] -) -> Tuple[List, List]: - """ - For a junction AA seq, find all possible DNA seqs: NNN NNN (cut site) NNN NNN. And then, for each possible DNA - seq, slide a window of size=len_fusion_site over from (cut site - len_fusion_site) to (cut site + - len_fusion_site) and get ligation_freq for each candidate fusion site. For example: NN[N NNN] (cut site) NNN NNN - -> NNN [NNN (cut site) N]NN NNN -> ... -> NNN NNN (cut site) [NNN N]NN - - Parameters: junction_aa (str): The amino acid sequence at the junction for which candidate fusion sites are to be - found. len_fusion_site (int): The length of the fusion site to be considered. Determines the size of the sliding - window used to identify candidate fusion sites within the DNA sequence. codon_table (dict): A dictionary mapping - amino acids (single-letter codes) to their corresponding codons (list of strings). Used to translate the amino - acid sequence to all possible DNA sequences. - - Returns: tuple: A tuple containing two elements: 1. A list of unique DNA sequences representing potential fusion - sites. 2. A list of dictionaries, each representing a sliding window over the DNA sequence. Each dictionary - contains the full DNA sequence corresponding to the junction ('junction_dna'), the start index of the fusion site - within the DNA sequence ('i'), and the DNA sequence of the fusion site itself ('fusion_site'). - - """ - validate_codon_table(codon_table) - codon_lists = [codon_table[aa] for aa in junction_aa] - unique_candidate_fusion_sites_for_this_junction: list = [] - junction_dna_map_sliding_window: list = [] - for junction_codons in product(*codon_lists): - junction_dna = ''.join(junction_codons) - cut_site = int(len(junction_dna) / 2) - sliding_window_start = cut_site - len_fusion_site - junction_dna_map_sliding_window = \ - junction_dna_map_sliding_window + \ - [{'junction_dna': junction_dna, 'i': idx, 'fusion_site': junction_dna[idx:idx + len_fusion_site]} for idx in - range(sliding_window_start, cut_site + 1)] - sliding_windows = [junction_dna[idx:idx + len_fusion_site] for idx in range(sliding_window_start, cut_site + 1)] - unique_candidate_fusion_sites_for_this_junction = sorted( - list(set(unique_candidate_fusion_sites_for_this_junction + sliding_windows))) - - # Remove palindromic (double rotational symmetric) fusion sites as these sites are un-usable in realworld cases and - # hinders process speed dramatically - refined_candidate_fusion_sites_for_this_junction = [] - for fs in unique_candidate_fusion_sites_for_this_junction: - if not is_rev_compliment(fs, fs): - refined_candidate_fusion_sites_for_this_junction.append(fs) - - return refined_candidate_fusion_sites_for_this_junction, junction_dna_map_sliding_window - - -def refine_candidate_fusion_sites_for_a_cut(cut: int, mutations_0idx: Optional[List[Any]], - junction_dna_map_sliding_window: List, - codon_table: Dict[str, List]) -> Tuple[List, List]: - # set default: dna codon of mutation position overlaps candidate fusion site at this junction - discard_junction_dna_map_sliding_window = [] - if not mutations_0idx: - raise ValueError('No desired mutations are given') - else: - for mut in mutations_0idx: - # validate the cut position - if mut['position'] == cut - 1 or mut['position'] == cut: - raise ValueError(f"Invalid cut position!\n " - f"Cut position: {cut} results in mutation {mut} lying completely within fusion site") - # dna codon of mutation position overlaps candidate fusion site at this junction - else: - # mutation position is the second amino acid left to cut site - if mut['position'] == cut - 2: - for d in junction_dna_map_sliding_window: - junction_dna = d['junction_dna'] - idx = d['i'] - # dna codon of mutation position overlaps candidate fusion site - if idx < 3: - overlapped_bases = junction_dna[idx:3] - desired_variations = mut['aa'] - for desired_variation in desired_variations: - sub_codons_lst = [codon[-len(overlapped_bases):] - for codon in codon_table[desired_variation]] - if overlapped_bases not in sub_codons_lst: - discard_junction_dna_map_sliding_window.append(d) - break - # mutation position is the second amino acid right to cut site - if mut['position'] == cut + 1: - for d in junction_dna_map_sliding_window: - idx = d['i'] - junction_dna = d['junction_dna'] - fs_length = len(d['fusion_site']) - # dna codon of mutation position overlaps candidate fusion site - if idx + fs_length > len(junction_dna) - 3: - overlapped_bases = junction_dna[-3:idx + fs_length] - desired_variations = mut['aa'] - for desired_variation in desired_variations: - sub_codons_lst = [codon[:len(overlapped_bases)] - for codon in codon_table[desired_variation]] - if overlapped_bases not in sub_codons_lst: - discard_junction_dna_map_sliding_window.append(d) - break - - junction_dna_map_sliding_window = [ - d for d in junction_dna_map_sliding_window if d not in discard_junction_dna_map_sliding_window - ] - candidate_fusion_sites_for_this_junction = sorted( - list( - set( - [d['fusion_site'] for d in junction_dna_map_sliding_window] - ) - ) - ) - return candidate_fusion_sites_for_this_junction, junction_dna_map_sliding_window - - -def assign_fusion_sites( - s: str, mutations_0idx: Optional[List[Any]], partition: Tuple[int, ...], fidelity_data: pd.DataFrame, - satisfaction_fidelity: float, enzyme: str, enzyme_info_dic: dict, - fusion_sites_used_by_backbone: Tuple[str, ...], search_method: str, codon_table: Dict[str, List] -) -> Tuple[Optional[Tuple], float, List[List]]: - """ - Assigns optimal fusion sites for each junction within a sequence based on experimental data, a fidelity - threshold, and enzyme characteristics. - - This function evaluates potential fusion sites for sequence junctions created by a given partitioning. It - leverages experimental data and enzyme information to identify fusion sites that meet or exceed a specified - ligation fidelity threshold. The selection process generates candidate fusion sites for each junction, - evaluates their ligation fidelity against the experimental data, and selects the set of fusion sites with the - highest overall fidelity, or until it meets the satisfaction_fidelity criterion. - - Parameters: - s (str): The original sequence to be partitioned and fused at selected sites. - partition (list of - int): Indices within the sequence `s` indicating where partitions (cuts) are made. - fidelity_data (dict): - Experimental data used to calculate the ligation fidelity of potential fusion sites. This data should include - information relevant to the efficiency of ligation at various sites within similar sequences. - - satisfaction_fidelity (float): The fidelity threshold that must be met or exceeded for a fusion site to be - selected. - enzyme (str): The name of the enzyme used in the partitioning process. - enzyme_info_dic (dict): A - dictionary containing information about the enzymes used, including key characteristics like fusion site length. - - Returns: - tuple: A tuple containing three elements: 1. sel_fusion_sites (list): The selected fusion sites for - each junction that meet the satisfaction fidelity threshold. 2. ligation_fidelity (float): The ligation fidelity - of the selected fusion sites. 3. sel_junction_dna_map_sliding_window (list of lists): Mapped information for each - selected fusion site, detailing how they align with the sequence's underlying DNA and the experimental data. - - Raises: - ValueError: If any fragment resulting from the partitioning is shorter than 4 amino acids, - as all fragments must meet this minimum length requirement to ensure viability. - - """ - # In case of no cuts, just return default - if len(partition) == 0: - sel_fusion_sites: tuple = tuple() - ligation_fidelity_of_sel_fusion_sites = float('nan') - sel_junction_dna_map_sliding_window: list[list] = [] - return sel_fusion_sites, ligation_fidelity_of_sel_fusion_sites, sel_junction_dna_map_sliding_window - validate_fidelity_data(fidelity_data) - validate_enzyme_and_enzyme_info(enzyme, enzyme_info_dic) - - len_fusion_site = enzyme_info_dic[enzyme]['fusion_site_length'] - - partition_with_terminals = list((0,) + partition + (len(s),)) - if frag_in_partition_too_short(tup=partition_with_terminals, min_aa_length=4): - # validate all AA fragments length >= 4 - raise ValueError("At least One fragment too short!") - - candidate_fusion_sites_for_all_junctions = [] - junction_dna_map_sliding_window_all = [] - for cut in partition: - junction_aa = s[cut - 2:cut + 2] - # find_candidate_fusion_sites_for_a_junction - unique_candidate_fusion_sites_for_this_junction, junction_dna_map_sliding_window = \ - find_candidate_fusion_sites_for_a_junction(junction_aa, len_fusion_site, codon_table) - # refine_candidate_fusion_sites_for_a_cut - refined_candidate_fusion_sites_for_this_junction, junction_dna_map_sliding_window = \ - refine_candidate_fusion_sites_for_a_cut(cut, mutations_0idx, junction_dna_map_sliding_window, codon_table) - # remove fusions sites that are already reserved by user for backbone cloning - refined_candidate_fusion_sites_for_this_junction = \ - [i for i in refined_candidate_fusion_sites_for_this_junction if i not in fusion_sites_used_by_backbone] - # if any of the junction has no fitting fusion sites, this whole partition is invalid. return empty values. - if len(refined_candidate_fusion_sites_for_this_junction) == 0: +class FusionSitesFinder: + + def assign_fusion_sites( + self, + s: str, mutations_0idx: Optional[List[Any]], partition: Tuple[int, ...], fidelity_data: pd.DataFrame, + satisfaction_fidelity: float, enzyme: str, enzyme_info_dic: dict, + fusion_sites_used_by_backbone: Tuple[str, ...], search_method: str, codon_table: Dict[str, List] + ) -> Tuple[Optional[Tuple], float, List[List]]: + """ + Assigns optimal fusion sites for each junction within a sequence based on experimental data, a fidelity + threshold, and enzyme characteristics. + + This function evaluates potential fusion sites for sequence junctions created by a given partitioning. It + leverages experimental data and enzyme information to identify fusion sites that meet or exceed a specified + ligation fidelity threshold. The selection process generates candidate fusion sites for each junction, + evaluates their ligation fidelity against the experimental data, and selects the set of fusion sites with the + highest overall fidelity, or until it meets the satisfaction_fidelity criterion. + + Parameters: + - s (str): The original sequence to be partitioned and fused at selected sites. + - mutations_0idx (Optional[List[Any]]): A list of mutations indexed from 0, which may affect the fusion site selection. + - partition (Tuple[int, ...]): Indices within the sequence `s` indicating where partitions (cuts) are made. + - fidelity_data (pd.DataFrame): Experimental data used to calculate the ligation fidelity of potential fusion sites. + This data should include information relevant to the efficiency of ligation at various sites within similar sequences. + - satisfaction_fidelity (float): The fidelity threshold that must be met or exceeded for a fusion site to be selected. + - enzyme (str): The name of the enzyme used in the partitioning process. + - enzyme_info_dic (dict): A dictionary containing information about the enzymes used, including key characteristics like fusion site length. + - fusion_sites_used_by_backbone (Tuple[str, ...]): A tuple of fusion sites that are reserved for backbone cloning and should not be considered. + - search_method (str): The method used to search for optimal fusion sites. Options are 'DFS', 'BFS', or 'NFS'. + - codon_table (Dict[str, List]): A mapping of codons to their corresponding amino acids, used for evaluating fusion site candidates. + + Returns: + - Tuple[Optional[Tuple], float, List[List]]: + 1. sel_fusion_sites (tuple): The selected fusion sites for each junction that meet the satisfaction fidelity threshold. + 2. ligation_fidelity (float): The ligation fidelity of the selected fusion sites. + 3. sel_junction_dna_map_sliding_window (list of lists): Mapped information for each selected fusion site, detailing how they align with the sequence's underlying DNA and the experimental data. + + Raises: + - ValueError: If any fragment resulting from the partitioning is shorter than 4 amino acids, + as all fragments must meet this minimum length requirement to ensure viability. + - ValueError: If an invalid search method is provided; must be one of 'NFS', 'BFS', or 'DFS'. + """ + # In case of no cuts, just return default + if len(partition) == 0: sel_fusion_sites: tuple = tuple() ligation_fidelity_of_sel_fusion_sites = float('nan') - sel_junction_dna_map_sliding_window_all: list = [] - - return sel_fusion_sites, ligation_fidelity_of_sel_fusion_sites, sel_junction_dna_map_sliding_window_all - - # collect refined fusion sites and junction_dna_map_sliding_window - candidate_fusion_sites_for_all_junctions.append(refined_candidate_fusion_sites_for_this_junction) - junction_dna_map_sliding_window_all.append(junction_dna_map_sliding_window) - - # select fusion sites with the best fidelity for each partition - ligation_fidelity_of_sel_fusion_sites = -float('inf') - sel_fusion_sites = tuple() - loop = 0 - - # DFS -------------------------------------------------------------------------------------------------------- - if search_method == 'DFS': - candidate_fusion_sites_sets = depth_first_product(*candidate_fusion_sites_for_all_junctions) - # BFS -------------------------------------------------------------------------------------------------------- - elif search_method == 'BFS': - candidate_fusion_sites_sets = breadth_first_product(*candidate_fusion_sites_for_all_junctions) - # NFS -------------------------------------------------------------------------------------------------------- - elif search_method == 'NFS': - candidate_fusion_sites_sets = nearest_first_product(*candidate_fusion_sites_for_all_junctions) - # ------------------------------------------------------------------------------------------------------------ - else: - raise ValueError("Invalid search method. Search method must be one of these: 'NFS', 'BFS', 'DFS'.") - # ------------------------------------------------------------------------------------------------------------ - # Create a mapping of fusion site names to their indices - fusion_site_indices = {site: idx for idx, site in enumerate(fidelity_data.index)} - fusion_site_cols = {site: idx for idx, site in enumerate(fidelity_data.columns)} - # Extract correct ligation frequencies directly from the diagonal - correct_lig_freq_dict = {site: fidelity_data.iloc[i, i] for site, i in fusion_site_indices.items()} - for element in candidate_fusion_sites_sets: - loop += 1 - all_fusion_sites = element + fusion_sites_used_by_backbone - # remove cases where duplicated or rev-comp fusions sites are present in "element" - if not is_valid_fusion_site_set(all_fusion_sites): - continue - ligation_fidelity = compute_ligation_fidelity(all_fusion_sites_of_a_partition=all_fusion_sites, - fidelity_data=fidelity_data.values, - fusion_site_indices=fusion_site_indices, - fusion_site_cols=fusion_site_cols, - correct_lig_freq_dict=correct_lig_freq_dict) - if ligation_fidelity > ligation_fidelity_of_sel_fusion_sites: - ligation_fidelity_of_sel_fusion_sites = ligation_fidelity - sel_fusion_sites = element - if ligation_fidelity_of_sel_fusion_sites >= satisfaction_fidelity: - break - - # map to the junction_dna_map_sliding_window - sel_junction_dna_map_sliding_window = [] - if sel_fusion_sites: - for fusion_site, junction_dna_map_sliding_window in zip(sel_fusion_sites, junction_dna_map_sliding_window_all): - sel_junction_dna_map_sliding_window.append( - [d for d in junction_dna_map_sliding_window if d['fusion_site'] == fusion_site] - ) + sel_junction_dna_map_sliding_window: list[list] = [] + return sel_fusion_sites, ligation_fidelity_of_sel_fusion_sites, sel_junction_dna_map_sliding_window + + validate_fidelity_data(fidelity_data) + validate_enzyme_and_enzyme_info(enzyme, enzyme_info_dic) + + len_fusion_site = enzyme_info_dic[enzyme]['fusion_site_length'] + + partition_with_terminals = list((0,) + partition + (len(s),)) + if frag_in_partition_too_short(tup=partition_with_terminals, min_aa_length=4): + # validate all AA fragments length >= 4 + raise ValueError("At least One fragment too short!") + + candidate_fusion_sites_for_all_junctions = [] + junction_dna_map_sliding_window_all = [] + for cut in partition: + junction_aa = s[cut - 2:cut + 2] + # find_candidate_fusion_sites_for_a_junction + unique_candidate_fusion_sites_for_this_junction, junction_dna_map_sliding_window = \ + self.find_candidate_fusion_sites_for_a_junction(junction_aa, len_fusion_site, codon_table) + # refine_candidate_fusion_sites_for_a_cut + refined_candidate_fusion_sites_for_this_junction, junction_dna_map_sliding_window = \ + self.refine_candidate_fusion_sites_for_a_cut(cut, mutations_0idx, junction_dna_map_sliding_window, codon_table) + # remove fusions sites that are already reserved by user for backbone cloning + refined_candidate_fusion_sites_for_this_junction = \ + [i for i in refined_candidate_fusion_sites_for_this_junction if i not in fusion_sites_used_by_backbone] + # if any of the junction has no fitting fusion sites, this whole partition is invalid. return empty values. + if len(refined_candidate_fusion_sites_for_this_junction) == 0: + sel_fusion_sites: tuple = tuple() + ligation_fidelity_of_sel_fusion_sites = float('nan') + sel_junction_dna_map_sliding_window_all: list = [] + + return sel_fusion_sites, ligation_fidelity_of_sel_fusion_sites, sel_junction_dna_map_sliding_window_all + + # collect refined fusion sites and junction_dna_map_sliding_window + candidate_fusion_sites_for_all_junctions.append(refined_candidate_fusion_sites_for_this_junction) + junction_dna_map_sliding_window_all.append(junction_dna_map_sliding_window) + + # select fusion sites with the best fidelity for each partition + ligation_fidelity_of_sel_fusion_sites = -float('inf') + sel_fusion_sites = tuple() + loop = 0 + + # DFS -------------------------------------------------------------------------------------------------------- + if search_method == 'DFS': + candidate_fusion_sites_sets = depth_first_product(*candidate_fusion_sites_for_all_junctions) + # BFS -------------------------------------------------------------------------------------------------------- + elif search_method == 'BFS': + candidate_fusion_sites_sets = breadth_first_product(*candidate_fusion_sites_for_all_junctions) + # NFS -------------------------------------------------------------------------------------------------------- + elif search_method == 'NFS': + candidate_fusion_sites_sets = nearest_first_product(*candidate_fusion_sites_for_all_junctions) + # ------------------------------------------------------------------------------------------------------------ + else: + raise ValueError("Invalid search method. Search method must be one of these: 'NFS', 'BFS', 'DFS'.") + # ------------------------------------------------------------------------------------------------------------ + # Create a mapping of fusion site names to their indices + fusion_site_indices = {site: idx for idx, site in enumerate(fidelity_data.index)} + fusion_site_cols = {site: idx for idx, site in enumerate(fidelity_data.columns)} + # Extract correct ligation frequencies directly from the diagonal + correct_lig_freq_dict = {site: fidelity_data.iloc[i, i] for site, i in fusion_site_indices.items()} + for element in candidate_fusion_sites_sets: + loop += 1 + all_fusion_sites = element + fusion_sites_used_by_backbone + # remove cases where duplicated or rev-comp fusions sites are present in "element" + if not is_valid_fusion_site_set(all_fusion_sites): + continue + ligation_fidelity = compute_ligation_fidelity(all_fusion_sites_of_a_partition=all_fusion_sites, + fidelity_data=fidelity_data.values, + fusion_site_indices=fusion_site_indices, + fusion_site_cols=fusion_site_cols, + correct_lig_freq_dict=correct_lig_freq_dict) + if ligation_fidelity > ligation_fidelity_of_sel_fusion_sites: + ligation_fidelity_of_sel_fusion_sites = ligation_fidelity + sel_fusion_sites = element + if ligation_fidelity_of_sel_fusion_sites >= satisfaction_fidelity: + break + + # map to the junction_dna_map_sliding_window + sel_junction_dna_map_sliding_window = [] + if sel_fusion_sites: + for fusion_site, junction_dna_map_sliding_window in zip(sel_fusion_sites, junction_dna_map_sliding_window_all): + sel_junction_dna_map_sliding_window.append( + [d for d in junction_dna_map_sliding_window if d['fusion_site'] == fusion_site] + ) - return sel_fusion_sites, ligation_fidelity_of_sel_fusion_sites, sel_junction_dna_map_sliding_window - - -def select_junction_by_codon_usage(junctions: List, codon_usage_table_path: str) -> dict: - """Select junction DNA based on codon usage table""" - - # validate inputs - if len(junctions) == 0: - raise ValueError("No junctions are provided!") - codon_usage_dict = pd.read_csv(codon_usage_table_path, index_col='codon').to_dict(orient='index') - max_score = -float('inf') - sel_juction = {} - for junction in junctions: - junction_dna = junction['junction_dna'] - codon_usage_score = np.prod( - [codon_usage_dict[junction_dna[i:i + 3]]['relative_frequency'] for i in range(0, len(junction_dna), 3)]) - if codon_usage_score > max_score: - sel_juction = junction - return sel_juction - - -def concat_sel_fusion_sites_to_fragments( - fragments: List, - fusion_sites: List, - sel_junction_dna_map_fusion_sites: List, - codon_usage_table_path: str -) -> dict: - - output_dict: dict = {} - if len(fragments) == 1: - output_dict[fragments[0]] = {'order': 0, 'middle_aa': fragments[0]} - return output_dict + return sel_fusion_sites, ligation_fidelity_of_sel_fusion_sites, sel_junction_dna_map_sliding_window - for junction_idx, (fusion_site, junctions) in enumerate(zip(fusion_sites, sel_junction_dna_map_fusion_sites)): - junction = select_junction_by_codon_usage(junctions, codon_usage_table_path) - fs_start = junction['i'] - junction_dna = junction['junction_dna'] - cut_site = int(len(junction_dna) / 2) - left_frag, right_frag = fragments[junction_idx], fragments[junction_idx + 1] + def concat_sel_fusion_sites_to_fragments( + self, + fragments: List, + fusion_sites: List, + sel_junction_dna_map_fusion_sites: List, + codon_usage_table_path: str + ) -> dict: + """ + Concatenates selected fusion sites to DNA fragments based on specific junctions and codon usage. + + This method processes a list of DNA fragments and fusion sites, determining how to + concatenate them based on selected junctions mapped to the fusion sites. It generates + an output dictionary that contains information about the order, C-terminal, and N-terminal + DNA sequences for each fragment, alongside their corresponding middle amino acid sequences. + + Parameters: + ---------- + fragments : List + A list of DNA fragment identifiers (strings) to be processed. + + fusion_sites : List + A list of fusion site sequences (strings) that will be concatenated to the fragments. + + sel_junction_dna_map_fusion_sites : List + A list of selected junction DNA mappings corresponding to the fusion sites. + + codon_usage_table_path : str + The file path to a codon usage table used for selecting the appropriate junction. + + Returns: + ------- + dict + A dictionary where each key is a fragment identifier and the value is another + dictionary containing: + - 'order': The order of the fragment in the concatenation process. + - 'c_term_dna': The C-terminal DNA sequence after fusion. + - 'n_term_dna': The N-terminal DNA sequence after fusion (if applicable). + - 'middle_aa': The middle amino acid sequence after fusion. + + Notes: + ----- + - If only one fragment is provided, the function returns a dictionary with that fragment's + information without performing any fusion. + - The fusion process involves determining how much of each fragment is retained and how + much is replaced by the fusion site based on the selected junctions. + """ + + output_dict: dict = {} + if len(fragments) == 1: + output_dict[fragments[0]] = {'order': 0, 'middle_aa': fragments[0]} + return output_dict + + for junction_idx, (fusion_site, junctions) in enumerate(zip(fusion_sites, sel_junction_dna_map_fusion_sites)): + junction = self.select_junction_by_codon_usage(junctions, codon_usage_table_path) + fs_start = junction['i'] + junction_dna = junction['junction_dna'] + cut_site = int(len(junction_dna) / 2) + left_frag, right_frag = fragments[junction_idx], fragments[junction_idx + 1] + + if left_frag in output_dict: + left_frag_edited = output_dict[left_frag]['middle_aa'] + else: + left_frag_edited = left_frag + + min_i, max_i = len(junction_dna) / 2 - len(fusion_site), len(junction_dna) / 2 + left_frag_gives_away = math.ceil((len(fusion_site) - (fs_start - min_i)) / 3) + fusion_site_expression = "".join([ + "<", + junction_dna[fs_start:cut_site], + "|", + junction_dna[cut_site:fs_start + len(fusion_site)], + ">" + ]) + left_frag_c_term_dna = ''.join( + [junction_dna[(2 - left_frag_gives_away) * 3:fs_start], + fusion_site_expression] + ) + left_frag_middle_aa = left_frag_edited[:-left_frag_gives_away if left_frag_gives_away else None] + right_frag_gives_away = math.ceil((len(fusion_site) - (max_i - fs_start)) / 3) + right_frag_n_term_dna = ''.join([ + fusion_site_expression, + junction_dna[ + fs_start + len(fusion_site):-(2 - right_frag_gives_away) * 3 if 2 - right_frag_gives_away else None + ] + ]) + right_frag_middle_aa = right_frag[right_frag_gives_away:] + if left_frag in output_dict: + output_dict[left_frag].update({ + 'c_term_dna': left_frag_c_term_dna, + 'middle_aa': left_frag_middle_aa + }) + output_dict.update({ + right_frag: { + 'order': junction_idx + 1, + 'n_term_dna': right_frag_n_term_dna, + 'middle_aa': right_frag_middle_aa + } + }) + else: + output_dict.update({ + left_frag: { + 'order': junction_idx, + 'c_term_dna': left_frag_c_term_dna, + 'middle_aa': left_frag_middle_aa + }, + right_frag: { + 'order': junction_idx + 1, + 'n_term_dna': right_frag_n_term_dna, + 'middle_aa': right_frag_middle_aa + } + }) + return output_dict - if left_frag in output_dict: - left_frag_edited = output_dict[left_frag]['middle_aa'] + @staticmethod + def find_candidate_fusion_sites_for_a_junction( + junction_aa: str, + len_fusion_site: int, + codon_table: Dict[str, List] + ) -> Tuple[List, List]: + """ + Identify potential DNA fusion sites for a given amino acid junction sequence. + + This function generates all possible DNA sequences corresponding to a junction amino acid sequence + and then applies a sliding window approach to extract candidate fusion sites. The sliding window + spans from (cut site - len_fusion_site) to (cut site + len_fusion_site), allowing for the + identification of fusion sites based on the specified length. + + Parameters: + junction_aa (str): The amino acid sequence at the junction for which candidate fusion sites are to be found. + len_fusion_site (int): The length of the fusion site to be considered, which determines the size of the + sliding window used to identify candidate fusion sites within the DNA sequence. + codon_table (Dict[str, List]): A dictionary mapping single-letter amino acid codes to their corresponding + codons (list of strings). This is used to translate the amino acid + sequence into all possible DNA sequences. + + Returns: + Tuple[List, List]: A tuple containing two elements: + 1. A list of unique DNA sequences representing potential fusion sites. + 2. A list of dictionaries, each representing a sliding window over the DNA sequence. Each dictionary + contains: + - 'junction_dna': The full DNA sequence corresponding to the junction. + - 'i': The start index of the fusion site within the DNA sequence. + - 'fusion_site': The DNA sequence of the fusion site itself. + + Notes: + - The function removes palindromic fusion sites, as these are considered unusable in practical applications + and can hinder processing speed. + """ + validate_codon_table(codon_table) + codon_lists = [codon_table[aa] for aa in junction_aa] + unique_candidate_fusion_sites_for_this_junction: list = [] + junction_dna_map_sliding_window: list = [] + for junction_codons in product(*codon_lists): + junction_dna = ''.join(junction_codons) + cut_site = int(len(junction_dna) / 2) + sliding_window_start = cut_site - len_fusion_site + junction_dna_map_sliding_window = \ + junction_dna_map_sliding_window + \ + [{'junction_dna': junction_dna, 'i': idx, 'fusion_site': junction_dna[idx:idx + len_fusion_site]} for idx in + range(sliding_window_start, cut_site + 1)] + sliding_windows = [junction_dna[idx:idx + len_fusion_site] for idx in range(sliding_window_start, cut_site + 1)] + unique_candidate_fusion_sites_for_this_junction = sorted( + list(set(unique_candidate_fusion_sites_for_this_junction + sliding_windows))) + + # Remove palindromic (double rotational symmetric) fusion sites as these sites are un-usable in realworld cases and + # hinders process speed dramatically + refined_candidate_fusion_sites_for_this_junction = [] + for fs in unique_candidate_fusion_sites_for_this_junction: + if not is_rev_compliment(fs, fs): + refined_candidate_fusion_sites_for_this_junction.append(fs) + + return refined_candidate_fusion_sites_for_this_junction, junction_dna_map_sliding_window + + @staticmethod + def refine_candidate_fusion_sites_for_a_cut(cut: int, mutations_0idx: Optional[List[Any]], + junction_dna_map_sliding_window: List, + codon_table: Dict[str, List]) -> Tuple[List, List]: + """ + Refines candidate fusion sites based on the provided cut position and mutations. + + This function filters candidate fusion sites at a given cut position by checking + if any mutations overlap with the fusion site. It ensures that mutations do not + lie within the fusion site and discards junctions where the mutations affect the + candidate fusion sites. + + Parameters: + - cut (int): The position of the cut in the DNA sequence (1-based index). + - mutations_0idx (Optional[List[Any]]): A list of mutations, where each mutation + is represented as a dictionary containing at least 'position' and 'aa' keys. + - junction_dna_map_sliding_window (List): A list of dictionaries, each representing + a candidate junction with keys 'junction_dna', 'i', and 'fusion_site'. + - codon_table (Dict[str, List]): A dictionary mapping amino acid variations to + their corresponding codons. + + Returns: + - Tuple[List, List]: A tuple containing: + 1. A list of refined candidate fusion sites for the specified junction. + 2. The filtered junction DNA map sliding window, excluding discarded junctions. + + Raises: + - ValueError: If no mutations are provided or if any mutation lies within the + fusion site defined by the cut position. + """ + # set default: dna codon of mutation position overlaps candidate fusion site at this junction + discard_junction_dna_map_sliding_window = [] + if not mutations_0idx: + raise ValueError('No desired mutations are given') else: - left_frag_edited = left_frag - - min_i, max_i = len(junction_dna) / 2 - len(fusion_site), len(junction_dna) / 2 - left_frag_gives_away = math.ceil((len(fusion_site) - (fs_start - min_i)) / 3) - fusion_site_expression = "".join([ - "<", - junction_dna[fs_start:cut_site], - "|", - junction_dna[cut_site:fs_start + len(fusion_site)], - ">" - ]) - left_frag_c_term_dna = ''.join( - [junction_dna[(2 - left_frag_gives_away) * 3:fs_start], - fusion_site_expression] + for mut in mutations_0idx: + # validate the cut position + if mut['position'] == cut - 1 or mut['position'] == cut: + raise ValueError(f"Invalid cut position!\n " + f"Cut position: {cut} results in mutation {mut} lying completely within fusion site") + # dna codon of mutation position overlaps candidate fusion site at this junction + else: + # mutation position is the second amino acid left to cut site + if mut['position'] == cut - 2: + for d in junction_dna_map_sliding_window: + junction_dna = d['junction_dna'] + idx = d['i'] + # dna codon of mutation position overlaps candidate fusion site + if idx < 3: + overlapped_bases = junction_dna[idx:3] + desired_variations = mut['aa'] + for desired_variation in desired_variations: + sub_codons_lst = [codon[-len(overlapped_bases):] + for codon in codon_table[desired_variation]] + if overlapped_bases not in sub_codons_lst: + discard_junction_dna_map_sliding_window.append(d) + break + # mutation position is the second amino acid right to cut site + if mut['position'] == cut + 1: + for d in junction_dna_map_sliding_window: + idx = d['i'] + junction_dna = d['junction_dna'] + fs_length = len(d['fusion_site']) + # dna codon of mutation position overlaps candidate fusion site + if idx + fs_length > len(junction_dna) - 3: + overlapped_bases = junction_dna[-3:idx + fs_length] + desired_variations = mut['aa'] + for desired_variation in desired_variations: + sub_codons_lst = [codon[:len(overlapped_bases)] + for codon in codon_table[desired_variation]] + if overlapped_bases not in sub_codons_lst: + discard_junction_dna_map_sliding_window.append(d) + break + + junction_dna_map_sliding_window = [ + d for d in junction_dna_map_sliding_window if d not in discard_junction_dna_map_sliding_window + ] + candidate_fusion_sites_for_this_junction = sorted( + list( + set( + [d['fusion_site'] for d in junction_dna_map_sliding_window] + ) + ) ) - left_frag_middle_aa = left_frag_edited[:-left_frag_gives_away if left_frag_gives_away else None] - right_frag_gives_away = math.ceil((len(fusion_site) - (max_i - fs_start)) / 3) - right_frag_n_term_dna = ''.join([ - fusion_site_expression, - junction_dna[ - fs_start + len(fusion_site):-(2 - right_frag_gives_away) * 3 if 2 - right_frag_gives_away else None - ] - ]) - right_frag_middle_aa = right_frag[right_frag_gives_away:] - if left_frag in output_dict: - output_dict[left_frag].update({ - 'c_term_dna': left_frag_c_term_dna, - 'middle_aa': left_frag_middle_aa - }) - output_dict.update({ - right_frag: { - 'order': junction_idx + 1, - 'n_term_dna': right_frag_n_term_dna, - 'middle_aa': right_frag_middle_aa - } - }) - else: - output_dict.update({ - left_frag: { - 'order': junction_idx, - 'c_term_dna': left_frag_c_term_dna, - 'middle_aa': left_frag_middle_aa - }, - right_frag: { - 'order': junction_idx + 1, - 'n_term_dna': right_frag_n_term_dna, - 'middle_aa': right_frag_middle_aa - } - }) - return output_dict + return candidate_fusion_sites_for_this_junction, junction_dna_map_sliding_window + + @staticmethod + def select_junction_by_codon_usage(junctions: List, codon_usage_table_path: str) -> dict: + """Select junction DNA based on codon usage table""" + + # validate inputs + if len(junctions) == 0: + raise ValueError("No junctions are provided!") + codon_usage_dict = pd.read_csv(codon_usage_table_path, index_col='codon').to_dict(orient='index') + max_score = -float('inf') + sel_juction = {} + for junction in junctions: + junction_dna = junction['junction_dna'] + codon_usage_score = np.prod( + [codon_usage_dict[junction_dna[i:i + 3]]['relative_frequency'] for i in range(0, len(junction_dna), 3)]) + if codon_usage_score > max_score: + sel_juction = junction + return sel_juction + diff --git a/seqteleporter/partition_property_finder/ligation_fidelity_finder.py b/seqteleporter/partition_property_finder/ligation_fidelity_finder.py index 667ae00..2bcbae3 100644 --- a/seqteleporter/partition_property_finder/ligation_fidelity_finder.py +++ b/seqteleporter/partition_property_finder/ligation_fidelity_finder.py @@ -1,79 +1,6 @@ -"""Define function - computes the ligation fidelity using NEB data""" -import pandas as pd import numpy as np -from functools import cache -from typing import Dict -from os import environ, listdir -from os.path import join, dirname, abspath, basename -import static_frame as sf - -from seqteleporter.utils.utils import make_rev_compliment, validate_fidelity_data - - -# @cache -# def make_correct_lig_freq_dict(fidelity_data: sf.FrameHE) -> Dict[str, int]: -# # if fidelity_data is not valid, validate_fidelity_data(fidelity_data) raises error, and code stops. -# validate_fidelity_data(fidelity_data) -# # transform to dict -# sf_series = sf.Series(np.diag(fidelity_data)) -# correct_lig_freq_dict = {k: v for k, v in zip(fidelity_data.index, sf_series.values)} -# # dict_ = pd.Series(np.diag(fidelity_data), index=[fidelity_data.index]).to_dict() -# # correct_lig_freq_dict = {k[0]: v for k, v in dict_.items()} -# return correct_lig_freq_dict - - -# @cache -# def compute_ligation_fidelity(all_fusion_sites_of_a_partition: tuple, -# fidelity_data: sf.FrameHE) -> float: -# """ -# Computes the ligation fidelity for a partition of fusion sites. Fidelity is calculated as the -# frequency of correct ligations divided by the total frequency of ligations for a given fusion site. -# This involves calculating the fidelity of each junction within the partition, considering both the -# original and reverse complement sites, and then multiplying these fidelities to get the total ligation -# fidelity for the partition. -# -# Parameters: -# - all_fusion_sites_of_a_partition (list of str): Fusion sites within a partition, represented as strings. -# - fidelity_data (pd.DataFrame): A DataFrame containing observed ligation frequencies between fusion sites. -# Rows and columns represent fusion sites, and values indicate the frequency of ligation events. -# - correct_lig_freq_dict (dict): A dictionary mapping fusion sites to their correct ligation frequencies, -# as extracted from the diagonal of `fidelity_data`. -# -# Returns: -# - float: The total ligation fidelity for the partition, calculated as the product of individual -# fidelities for each junction, taking into account both direct and reverse complement ligation events. -# -# Notes: -# - The function assumes that for any fusion site, the reverse complement site is also considered -# in the fidelity calculation. If a site is its own reverse complement, special handling is applied -# to avoid double counting. -# """ -# validate_fidelity_data(fidelity_data) -# # correct_lig_freq_dict = make_correct_lig_freq_dict(fidelity_data) -# # transform to dict -# sf_series = sf.Series(np.diag(fidelity_data)) -# correct_lig_freq_dict = {k: v for k, v in zip(fidelity_data.index, sf_series.values)} -# -# all_fusion_sites_of_a_partition_set = set(all_fusion_sites_of_a_partition) # remove duplicates -# rev_comps = [make_rev_compliment(fs) for fs in all_fusion_sites_of_a_partition_set] -# # remove duplicates that is present as reverse complement -# sel_fusion_sites = list(set(list(all_fusion_sites_of_a_partition_set) + rev_comps)) -# sel_lig_freqs = fidelity_data.loc[sel_fusion_sites, sel_fusion_sites] -# sel_lig_freqs_rowsums = sel_lig_freqs.sum() -# total_lig_fidelity = 1 -# for fs in all_fusion_sites_of_a_partition_set: -# rev_comp_fs = make_rev_compliment(fs) -# if fs != rev_comp_fs: -# fidelity_of_this_junction = \ -# correct_lig_freq_dict[fs] * 2 / ( -# sel_lig_freqs_rowsums[fs] + sel_lig_freqs_rowsums[rev_comp_fs]) -# else: -# fidelity_of_this_junction = \ -# correct_lig_freq_dict[fs] * 2 / (sel_lig_freqs_rowsums[fs] * 2 + correct_lig_freq_dict[fs] * 2) -# total_lig_fidelity = total_lig_fidelity * fidelity_of_this_junction -# -# return total_lig_fidelity +from seqteleporter.utils.utils import make_rev_compliment def compute_ligation_fidelity(all_fusion_sites_of_a_partition: tuple, @@ -85,14 +12,17 @@ def compute_ligation_fidelity(all_fusion_sites_of_a_partition: tuple, Computes the ligation fidelity for a partition of fusion sites using a NumPy array for fidelity data. Parameters: - - all_fusion_sites_of_a_partition (list of str): Fusion sites within a partition, represented as strings. + - all_fusion_sites_of_a_partition (tuple of str): A tuple containing fusion site names within a partition. - fidelity_data (np.ndarray): A 2D NumPy array containing observed ligation frequencies between fusion sites. The array is indexed by fusion sites using the provided fusion_site_indices. - fusion_site_indices (dict): A dictionary mapping fusion site names to their corresponding indices in the fidelity_data array. + - fusion_site_cols (dict): A dictionary mapping fusion site names to their respective column indices in the fidelity_data array. + - correct_lig_freq_dict (dict): A dictionary containing the correct ligation frequencies for each fusion site. Returns: - - float: The total ligation fidelity for the partition. + - float: The total ligation fidelity for the partition, calculated as the product of the fidelity for each junction. """ + # Create a set of unique fusion sites and their reverse complements all_fusion_sites_of_a_partition_set = set(all_fusion_sites_of_a_partition) rev_comps = {make_rev_compliment(fs) for fs in all_fusion_sites_of_a_partition_set} diff --git a/seqteleporter/partition_property_finder/partition_property_finder.py b/seqteleporter/partition_property_finder/partition_property_finder.py index c2d2e7c..4ea730c 100644 --- a/seqteleporter/partition_property_finder/partition_property_finder.py +++ b/seqteleporter/partition_property_finder/partition_property_finder.py @@ -5,7 +5,7 @@ from seqteleporter.config import ENZYME_INFO, CODON_TABLE from seqteleporter.partition_property_finder.cost_finder import find_cost from seqteleporter.partition_property_finder.fragment_length_uneveness_finder import compute_fragment_length_unevenness -from seqteleporter.partition_property_finder.fusion_sites_finder import assign_fusion_sites +from seqteleporter.partition_property_finder.fusion_sites_finder import FusionSitesFinder from seqteleporter.utils.utils import frag_in_partition_too_short_or_too_long @@ -16,32 +16,47 @@ def find_partition_property(s: str, mutations_0idx: Optional[List[Any]], linked_ min_ligation_fidelity: float, satisfaction_fidelity: float, search_method: str, enzyme: str, analyze_cause_of_no_valid_partition: bool, cost_per_nt: float, provider_min_frag_len: int, provider_max_frag_len: int) -> Generator: + """ + Evaluates and yields valid partitions of a given DNA sequence based on specified criteria. + + This function takes a DNA sequence and attempts to partition it according to various constraints + related to fragment length, cost, and ligation fidelity. For each partition that meets the criteria, + it yields a dictionary containing information about the partition, including its validity and associated metrics. + Parameters: - s (str): The DNA sequence to be partitioned. - - mutations (list): A list of mutation positions within the sequence. - - number_of_cuts (int): The number of cuts to make in the sequence. + - mutations_0idx (Optional[List[Any]]): A list of mutation positions within the sequence, indexed from zero. + - linked_mutations_0idx (Optional[List[Any]]): A list of positions of linked mutations (if any), indexed from zero. + - partitions_list (List[Generator]): A list of generators that produce potential partitions of the sequence. - fidelity_data_path (str): Path to an Excel file containing experimental data used for assigning fusion sites. - - min_aa_length (int): The minimum length required for a fragment to be considered valid. - - max_cost (float): The maximum allowed cost for a partitioning strategy. The cost is calculated based on - factors such as enzyme usage and efficiency. - - max_unevenness (float): The maximum allowed unevenness in fragment lengths. Unevenness is a measure of how - dissimilar the lengths of the generated fragments are. + - fusion_sites_used_by_backbone (Tuple[str, ...]): A tuple of fusion sites that are considered by the backbone. + - min_aa_length (int): The minimum length required for a fragment to be considered valid (in amino acids). + - max_cost (int): The maximum allowed cost for a partitioning strategy. + - max_unevenness (float): The maximum allowed unevenness in fragment lengths, indicating the acceptable range of length variation. - min_ligation_fidelity (float): The minimum ligation fidelity required for a partition to be considered valid. - Ligation fidelity is a measure of the accuracy and efficiency of joining the - generated fragments. - - satisfaction_fidelity (float): The ligation fidelity threshold above which a partition is considered highly - satisfactory. + - satisfaction_fidelity (float): The ligation fidelity threshold above which a partition is considered highly satisfactory. + - search_method (str): The method used for searching fusion sites. + - enzyme (str): The enzyme used for the partitioning process. + - analyze_cause_of_no_valid_partition (bool): If True, the function will yield reasons for invalid partitions. + - cost_per_nt (float): The cost per nucleotide for the partitioning strategy. + - provider_min_frag_len (int): The minimum fragment length provided by the supplier. + - provider_max_frag_len (int): The maximum fragment length provided by the supplier. Yields: - - A dictionary for each valid partition containing: - - 'partition': A tuple representing the positions of the cuts. - - 'ligation_fidelity': The ligation fidelity for the selected fusion sites, indicating the accuracy and - efficiency of fragment joining. - - 'fragment_length_unevenness': The unevenness of the lengths of the fragments generated by this partition. - - 'cost': The cost associated with this partitioning strategy. - - 'fusion_sites': A list of selected fusion sites based on the partition. - - 'sel_junction_dna_map_fusion_sites': A detailed map of selected junctions and associated fusion sites. + - dict: A dictionary for each valid partition containing: + - 'partition' (tuple): A tuple representing the positions of the cuts in the DNA sequence. + - 'ligation_fidelity' (float): The ligation fidelity for the selected fusion sites, indicating the accuracy and + efficiency of fragment joining. + - 'fragment_length_unevenness' (float): The unevenness of the lengths of the fragments generated by this partition. + - 'cost' (float): The cost associated with this partitioning strategy. + - 'fusion_sites' (list): A list of selected fusion sites based on the partition. + - 'hard_constraint_violation' (str): A message indicating if any hard constraints were violated (if applicable). + - 'sel_junction_dna_map_fusion_sites' (list): A detailed map of selected junctions and associated fusion sites. + + Notes: + - The function will yield reasons for invalid partitions if `analyze_cause_of_no_valid_partition` is set to True. + - The function performs various checks including fragment length, cost, and ligation fidelity before yielding a valid partition. """ fidelity_data = pd.read_excel(fidelity_data_path, index_col=0) @@ -83,9 +98,20 @@ def find_partition_property(s: str, mutations_0idx: Optional[List[Any]], linked_ continue # Assign fusion sites, compute ligation frequency and mismatch ligation frequency + fusion_sites_finder = FusionSitesFinder() sel_fusion_sites, ligation_fidelity, sel_junction_dna_map_sliding_window = \ - assign_fusion_sites(s, mutations_0idx, partition, fidelity_data, satisfaction_fidelity, enzyme, - ENZYME_INFO, fusion_sites_used_by_backbone, search_method, CODON_TABLE) + fusion_sites_finder.assign_fusion_sites( + s, + mutations_0idx, + partition, + fidelity_data, + satisfaction_fidelity, + enzyme, + ENZYME_INFO, + fusion_sites_used_by_backbone, + search_method, + CODON_TABLE + ) if len(sel_fusion_sites) == 0: if analyze_cause_of_no_valid_partition: diff --git a/seqteleporter/partitioner/compute_best_partitions.py b/seqteleporter/partitioner/compute_best_partitions.py index 3edbc16..3e733f6 100644 --- a/seqteleporter/partitioner/compute_best_partitions.py +++ b/seqteleporter/partitioner/compute_best_partitions.py @@ -11,7 +11,7 @@ import pandas as pd from seqteleporter.config import ENZYME_INFO, PARTITION_SEARCH_MODES -from seqteleporter.partition_property_finder.fusion_sites_finder import concat_sel_fusion_sites_to_fragments +from seqteleporter.partition_property_finder.fusion_sites_finder import FusionSitesFinder from seqteleporter.partition_property_finder.partition_property_finder import find_partition_property from seqteleporter.partitioner.partitioner import find_cuttable_positions, partitioner from seqteleporter.utils.load_input_params import load_input_params @@ -446,8 +446,9 @@ def compute_best_partitions(s: str, mutations_0idx: Union[list, None], linked_mu # concat fusion sites back to amino acid after selecting best partitions to avoid running this process in # each and every loop of "partitioner()" + fusion_sites_finder = FusionSitesFinder() for partition in sel_partitions: - fragment_with_fusion_sites = concat_sel_fusion_sites_to_fragments( + fragment_with_fusion_sites = fusion_sites_finder.concat_sel_fusion_sites_to_fragments( fragments=partition["fragments"], fusion_sites=partition["fusion_sites"], sel_junction_dna_map_fusion_sites=partition['sel_junction_dna_map_fusion_sites'], diff --git a/tests/tests_partition_property_finder/test_fusion_sites_finder.py b/tests/tests_partition_property_finder/test_fusion_sites_finder.py index a29567a..9171287 100644 --- a/tests/tests_partition_property_finder/test_fusion_sites_finder.py +++ b/tests/tests_partition_property_finder/test_fusion_sites_finder.py @@ -7,13 +7,11 @@ import shutil from python_codon_tables.python_codon_tables import _tables_dir as codon_usage_table_dir -from seqteleporter.partition_property_finder.fusion_sites_finder import \ - breadth_first_product, nearest_first_product, find_candidate_fusion_sites_for_a_junction, \ - refine_candidate_fusion_sites_for_a_cut, assign_fusion_sites, select_junction_by_codon_usage, \ - concat_sel_fusion_sites_to_fragments - +from seqteleporter.partition_property_finder.fusion_sites_finder import FusionSitesFinder +from seqteleporter.utils.utils import breadth_first_product, nearest_first_product from seqteleporter.config import CODON_TABLE, ENZYME_INFO + FIDELITY_DATA_PATH = path.join( path.dirname(path.dirname(dirname(abspath(__file__)))), 'seqteleporter', 'data', 'neb_fidelity_data', 'FileS01_T4_01h_25C.xlsx' @@ -80,8 +78,11 @@ def test_find_candidate_fusion_sites_for_a_junction_simple(self): junction_aa = 'WWKK' len_fusion_site = 4 unique_candidate_fusion_sites_for_this_junction, junction_dna_map_sliding_window = \ - find_candidate_fusion_sites_for_a_junction(junction_aa=junction_aa, len_fusion_site=len_fusion_site, - codon_table=CODON_TABLE) + FusionSitesFinder.find_candidate_fusion_sites_for_a_junction( + junction_aa=junction_aa, + len_fusion_site=len_fusion_site, + codon_table=CODON_TABLE + ) expected_output_unique_candidate_fusion_sites = {'GTGG', 'TGGA', 'GGAA', 'GAAA', 'AAAA', 'GAAG', 'AAGA'} expected_output = (expected_output_unique_candidate_fusion_sites, self.expected_output_junction_dna_map_sliding_window) @@ -94,8 +95,11 @@ def test_find_candidate_fusion_sites_for_a_junction_with_palindromic_fusion_site junction_aa = 'MDHM' len_fusion_site = 4 unique_candidate_fusion_sites_for_this_junction, junction_dna_map_sliding_window = \ - find_candidate_fusion_sites_for_a_junction(junction_aa=junction_aa, len_fusion_site=len_fusion_site, - codon_table=CODON_TABLE) + FusionSitesFinder.find_candidate_fusion_sites_for_a_junction( + junction_aa=junction_aa, + len_fusion_site=len_fusion_site, + codon_table=CODON_TABLE + ) expected_output_unique_candidate_fusion_sites = {'GGAT', 'ATCA', 'TCAT', 'CATA', 'TCAC', 'CCAT', 'GGAC', 'GACC', 'ACCA', 'CCAC', 'CACA'} expected_output_junction_dna_map_sliding_window = [ @@ -169,7 +173,7 @@ def synthesize_expected_and_actual_outputs(self, rmv_idxes, mutations_0idx): ) candidate_fusion_sites_for_this_junction_out, junction_dna_map_sliding_window_out = \ - refine_candidate_fusion_sites_for_a_cut( + FusionSitesFinder.refine_candidate_fusion_sites_for_a_cut( cut, mutations_0idx, self.junction_dna_map_sliding_window, @@ -229,9 +233,9 @@ def test_assign_fusion_sites_normal_cases(self): {'junction_dna': 'TGGTGGAAAAAA', 'i': 5, 'fusion_site': 'GAAA'}, {'junction_dna': 'TGGTGGAAAAAG', 'i': 5, 'fusion_site': 'GAAA'} ]] - + fusion_sites_finder = FusionSitesFinder() sel_fusion_sites_, ligation_fidelity_of_sel_fusion_sites_, sel_junction_dna_map_sliding_window_ = \ - assign_fusion_sites(**self.inputs) + fusion_sites_finder.assign_fusion_sites(**self.inputs) self.assertEqual(expected_sel_fusion_sites, sel_fusion_sites_) self.assertEqual(expected_ligation_fidelity_of_sel_fusion_sites, @@ -260,8 +264,9 @@ def test_assign_fusion_sites_edge_cases(self): } if param in expected_outputs.keys(): + fusion_sites_finder = FusionSitesFinder() sel_fusion_sites, ligation_fidelity_of_sel_fusion_sites, sel_junction_dna_map_sliding_window = \ - assign_fusion_sites(**inputs) + fusion_sites_finder.assign_fusion_sites(**inputs) self.assertEqual(expected_outputs[param][0], sel_fusion_sites) ligation_fidelity_of_sel_fusion_sites = round(ligation_fidelity_of_sel_fusion_sites, 3) if math.isnan(expected_outputs[param][1]): @@ -271,7 +276,8 @@ def test_assign_fusion_sites_edge_cases(self): self.assertEqual(expected_outputs[param][2], sel_junction_dna_map_sliding_window) if param in expected_exceptions.keys(): with self.assertRaises(ValueError) as context: - assign_fusion_sites(**inputs) + fusion_sites_finder = FusionSitesFinder() + fusion_sites_finder.assign_fusion_sites(**inputs) self.assertEqual(expected_exceptions[param], str(context.exception)) shutil.rmtree(self.output_dir) @@ -286,7 +292,8 @@ def test_assign_fusion_sites_invalid_fidelity_data(self): }, index=['GGGC', 'TACT', 'GCCC']) inputs.update({'fidelity_data': invalid_fidelity_data}) with self.assertRaises(ValueError) as context: - assign_fusion_sites(**inputs) + fusion_sites_finder = FusionSitesFinder() + fusion_sites_finder.assign_fusion_sites(**inputs) self.assertEqual("fidelity_data.shape[0] != fidelity_data.shape[1]", str(context.exception)) @@ -304,8 +311,10 @@ def setUp(self) -> None: def test_select_junction_by_codon_usage_normal_cases(self): expected_sel_juction = {'junction_dna': 'TGGTGGAAGAAG', 'i': 3, 'fusion_site': 'TGGA'} - sel_juction = select_junction_by_codon_usage(junctions=self.junction_dna_map_sliding_window, - codon_usage_table_path=codon_usage_table_path) + sel_juction = FusionSitesFinder.select_junction_by_codon_usage( + junctions=self.junction_dna_map_sliding_window, + codon_usage_table_path=codon_usage_table_path + ) self.assertEqual(expected_sel_juction, sel_juction) @@ -327,8 +336,10 @@ def test_select_junction_by_codon_usage_edge_cases(self): if param in expected_exceptions.keys(): with self.assertRaises(ValueError) as context: - select_junction_by_codon_usage(junctions=inputs['junctions'], - codon_usage_table_path=codon_usage_table_path) + FusionSitesFinder.select_junction_by_codon_usage( + junctions=inputs['junctions'], + codon_usage_table_path=codon_usage_table_path + ) self.assertEqual(expected_exceptions[param], str(context.exception)) @@ -377,8 +388,8 @@ def test_concat_sel_fusion_sites_to_fragments_normal_cases(self): "middle_aa": "LSCAASGFTFSRYTIHWVRQA" } } - - fragment_with_fusion_sites = concat_sel_fusion_sites_to_fragments( + fusion_sites_finder = FusionSitesFinder() + fragment_with_fusion_sites = fusion_sites_finder.concat_sel_fusion_sites_to_fragments( fragments=self.fragments, fusion_sites=self.fusion_sites, sel_junction_dna_map_fusion_sites=self.junction_dna_map_sliding_window,