Skip to content

Commit

Permalink
fix: enzyme optmization with Kcat fitness function (#240)
Browse files Browse the repository at this point in the history
* fix: fixed enzyme optmization with Kcat fitness function

Signed-off-by: yvesnana <yves.g.nana@gmail.com>

* fixed random seed initialization

Signed-off-by: yvesnana <yves.g.nana@gmail.com>

* feat: added xgboost as needed for enzeptional

Signed-off-by: yvesnana <yves.g.nana@gmail.com>

* feat: added xgboost to requirement file

Signed-off-by: yvesnana <yves.g.nana@gmail.com>

---------

Signed-off-by: yvesnana <yves.g.nana@gmail.com>
  • Loading branch information
yvesnana authored Apr 25, 2024
1 parent 4872fbb commit 46efb89
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 27 deletions.
80 changes: 69 additions & 11 deletions examples/enzeptional/example_enzeptional.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,58 @@
import logging
import pandas as pd
from typing import Tuple, List, Optional
from gt4sd.frameworks.enzeptional.processing import HFandTAPEModelUtility
from gt4sd.frameworks.enzeptional.core import SequenceMutator, EnzymeOptimizer
from gt4sd.configuration import GT4SDConfiguration, sync_algorithm_with_s3


def initialize_environment():
"""Synchronize with GT4SD S3 storage and set up the environment."""
# NOTE: For those interested in optimizing kcat values, it is important to adjust the scorer path to reflect this focus, thereby selecting the appropriate model for kcat optimization: f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/kcat/model.pkl". The specification of the scaler, located within the same directory as the `scorer.pkl`, is mandatory for accurate model performance.
def initialize_environment(model = "feasibility") -> Tuple[str, Optional[str]]:
"""Synchronize with GT4SD S3 storage and set up the environment.
Args:
model (str): Type of optimization ("feasibility" or "kcat").
Returns:
Tuple[str, Optional[str]]: The path to the scorer file and scaler file (if existing).
"""
configuration = GT4SDConfiguration.get_instance()
sync_algorithm_with_s3("proteins/enzeptional/scorers", module="properties")
return f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/feasibility/model.pkl"


def load_experiment_parameters():
def load_experiment_parameters() -> Tuple[List, List, List, List]:
"""Load experiment parameters from a CSV file."""
df = pd.read_csv("data.csv").iloc[1]
return df["substrates"], df["products"], df["sequences"], eval(df["intervals"])


def setup_optimizer(
substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path
substrate_smiles: str,
product_smiles: str,
sample_sequence: str,
intervals: List[List[int]],
scorer_path: str,
scaler_path: str,
concat_order: List[str],
use_xgboost_scorer: bool
):
"""Set up and return the optimizer with all necessary components configured."""
"""Set up and return the optimizer with all necessary components configured
Args:
substrate_smiles (str): SMILES representation of
the substrate.
product_smiles (str): SMILES representation of the
product.
sample_sequence (str): The initial protein sequence.
intervals (List[List[int]]): Intervals for mutation.
scorer_path (str): File path to the scoring model.
scaler_path (str): Path to the scaller in case you are usinh the Kcat model.
concat_order (List[str]): Order of concatenating embeddings.
use_xgboost_scorer (bool): flag to specify if the fitness function is the Kcat.
Returns:
Initialized optmizer
"""
model_tokenizer_paths = "facebook/esm2_t33_650M_UR50D"
chem_paths = "seyonec/ChemBERTa-zinc-base-v1"

Expand Down Expand Up @@ -52,33 +82,61 @@ def setup_optimizer(
"selection_ratio": 0.25,
"perform_crossover": True,
"crossover_type": "single_point",
"concat_order": ["substrate", "sequence", "product"],
"concat_order": concat_order,
"scaler_filepath": scaler_path,
"use_xgboost_scorer": use_xgboost_scorer
}
return EnzymeOptimizer(**optimizer_config)


def optimize_sequences(optimizer):
"""Optimize sequences using the configured optimizer."""
"""Optimize sequences using the configured optimizer.
Args:
optimizer: Initialized optimizer
Returns:
Optimized sequences
"""
return optimizer.optimize(
num_iterations=3, num_sequences=5, num_mutations=5, time_budget=3600
)


def main():
def main_kcat():
"""Optimization using Kcat model"""
logging.basicConfig(level=logging.INFO)
scorer_path = initialize_environment()
scorer_path, scaler_path = initialize_environment(model="kcat")
concat_order, use_xgboost_scorer = ["substrate", "sequence"], True
(
substrate_smiles,
product_smiles,
sample_sequence,
intervals,
) = load_experiment_parameters()
optimizer = setup_optimizer(
substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path
substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, use_xgboost_scorer
)
optimized_sequences, iteration_info = optimize_sequences(optimizer)
logging.info("Optimization completed.")


def main_feasibility():
"""Optimization using Feasibility model"""
logging.basicConfig(level=logging.INFO)
scorer_path, scaler_path = initialize_environment()
concat_order, use_xgboost_scorer = ["substrate", "sequence", "product"], False
(
substrate_smiles,
product_smiles,
sample_sequence,
intervals,
) = load_experiment_parameters()
optimizer = setup_optimizer(
substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, use_xgboost_scorer
)
optimized_sequences, iteration_info = optimize_sequences(optimizer)
logging.info("Optimization completed.")

if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ torchmetrics>=0.7.0,<1.0.0
transformers>=4.22.0,<=4.24.0
typing_extensions>=3.7.4.3
wheel>=0.26
xgboost>=1.7.6
4 changes: 4 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ install_requires =
transformers<4.26.0
typing_extensions
wheel
xgboost
setup_requires =
setuptools
package_dir =
Expand Down Expand Up @@ -281,3 +282,6 @@ ignore_missing_imports = True

[mypy-ruamel.*]
ignore_missing_imports = True

[mypy-xgboost.*]
ignore_missing_imports = True
52 changes: 36 additions & 16 deletions src/gt4sd/frameworks/enzeptional/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from itertools import product as iter_product
import time
from joblib import load
import xgboost as xgb
from .processing import (
HFandTAPEModelUtility,
SelectionGenerator,
Expand Down Expand Up @@ -367,6 +368,8 @@ def __init__(
minimum_interval_length: int = 8,
pad_intervals: bool = False,
concat_order=["sequence", "substrate", "product"],
scaler_filepath: Optional[str] = None,
use_xgboost_scorer: Optional[bool] = False,
):
"""Initializes the optimizer with models, sequences, and
optimization parameters.
Expand All @@ -379,18 +382,22 @@ def __init__(
product_smiles (str): SMILES representation of the product.
chem_model_path (str): Path to the chemical model.
chem_tokenizer_path (str): Path to the chemical tokenizer.
scorer_filepath (str): Path to the scoring model.
mutator (SequenceMutator): The mutator for generating sequence variants.
intervals (List[Tuple[int, int]]): Intervals for mutation.
batch_size (int, optional): The number of sequences to process in one batch. Defaults to 2.
seed (int, optional): Random seed. Defaults to 123.
top_k (int, optional): Number of top mutations to consider. Defaults to 2.
selection_ratio (float, optional): Ratio of sequences to select after scoring. Defaults to 0.5.
perform_crossover (bool, optional): Flag to perform crossover operation. Defaults to False.
crossover_type (str, optional): Type of crossover operation. Defaults to "uniform".
minimum_interval_length (int, optional): Minimum length of mutation intervals. Defaults to 8.
pad_intervals (bool, optional): Flag to pad the intervals. Defaults to False.
concat_order (list, optional): Order of concatenating embeddings. Defaults to ["sequence", "substrate", "product"].
scorer_filepath (str): File path to the scoring model.
mutator (SequenceMutator): The mutator for generating
sequence variants.
intervals (List[List[int]]): Intervals for mutation.
batch_size (int): The number of sequences to process in one batch.
top_k (int): Number of top mutations to consider.
selection_ratio (float): Ratio of sequences to select
after scoring.
perform_crossover (bool): Flag to perform crossover operation.
crossover_type (str): Type of crossover operation.
minimum_interval_length (int): Minimum length of
mutation intervals.
pad_intervals (bool): Flag to pad the intervals.
concat_order (list): Order of concatenating embeddings.
scaler_filepath (str): Path to the scaller in case you are usinh the Kcat model.
use_xgboost_scorer (bool): flag to specify if the fitness function is the Kcat.
"""
self.sequence = sequence
self.protein_model = protein_model
Expand All @@ -407,7 +414,9 @@ def __init__(
self.mutator.set_top_k(top_k)
self.concat_order = concat_order
self.scorer = load(scorer_filepath)
self.seed = seed
if scaler_filepath is not None:
self.scaler = load(scaler_filepath)
self.use_xgboost_scorer = use_xgboost_scorer

self.chem_model = HFandTAPEModelUtility(chem_model_path, chem_tokenizer_path)
self.substrate_embedding = self.chem_model.embed([substrate_smiles])[0]
Expand All @@ -424,7 +433,7 @@ def __init__(
self.intervals = sanitize_intervals_with_padding(
self.intervals, minimum_interval_length, len(sequence)
)

self.seed = seed
random.seed(self.seed)

def optimize(
Expand Down Expand Up @@ -614,7 +623,13 @@ def score_sequence(self, sequence: str) -> Dict[str, Any]:
combined_embedding = np.concatenate(ordered_embeddings)
combined_embedding = combined_embedding.reshape(1, -1)

score = self.scorer.predict_proba(combined_embedding)[0][1]
if self.use_xgboost_scorer:
if self.scaler is not None:
combined_embedding = self.scaler.transform(combined_embedding)
score = self.scorer.predict(xgb.DMatrix(combined_embedding))[0]
else:
score = self.scorer.predict_proba(combined_embedding)[0][1]

return {"sequence": sequence, "score": score}

def score_sequences(self, sequences: List[str]) -> List[Dict[str, float]]:
Expand Down Expand Up @@ -643,7 +658,12 @@ def score_sequences(self, sequences: List[str]) -> List[Dict[str, float]]:
combined_embedding = np.concatenate(ordered_embeddings)
combined_embedding = combined_embedding.reshape(1, -1)

score = self.scorer.predict_proba(combined_embedding)[0][1]
if self.use_xgboost_scorer:
if self.scaler is not None:
combined_embedding = self.scaler.transform(combined_embedding)
score = self.scorer.predict(xgb.DMatrix(combined_embedding))[0]
else:
score = self.scorer.predict_proba(combined_embedding)[0][1]
output.append({"sequence": sequences[position], "score": score})

return output

0 comments on commit 46efb89

Please sign in to comment.