From 78b73162f514387effd343f78046a29d3a603a09 Mon Sep 17 00:00:00 2001 From: Vincent Date: Fri, 27 Oct 2023 15:26:33 +0800 Subject: [PATCH] support multiple models to traing/save/optimize for inference --- deepray/core/base_trainer.py | 66 +++++++++------- .../wikicorpus_en/processing/file_utils.py | 2 +- deepray/utils/export/__init__.py | 2 +- deepray/utils/export/export.py | 58 +++++++++++++- docker.sh | 1 + .../criteo_ctr/optimize_for_inference.py | 2 + .../Recommendation/criteo_ctr/run_horovod.sh | 9 --- modelzoo/Recommendation/criteo_ctr/train.py | 78 ++++++++----------- 8 files changed, 128 insertions(+), 90 deletions(-) diff --git a/deepray/core/base_trainer.py b/deepray/core/base_trainer.py index 21d56df..2f54ea7 100644 --- a/deepray/core/base_trainer.py +++ b/deepray/core/base_trainer.py @@ -208,22 +208,25 @@ def __init__( self._model = {} if isinstance(model, list): - if len(model) == 1: + if len(model) > 0: self._model = {"main_model": model[0]} - elif len(model) == 2: - self._model = {"main_model": model[0], "sub_model": model[1]} - else: - for i in range(len(model)): - if i == 0: - self._model["main_model"] = model[i] - else: + if len(model) == 2: + self._model["sub_model"] = model[1] + else: + for i in range(1, len(model)): self._model[f"sub_model{i}"] = model[i] + else: + raise ValueError("Not a reachable model.") elif isinstance(model, dict): - self._model = model + main_keys = [k for k in model.keys() if "main" in k] + if len(main_keys) == 1: + self._model = model + else: + raise ValueError("Must set one model with key contains \"main\"") elif isinstance(model, tf.keras.Model): self._model = {"main_model": model} else: - ValueError("Not a reachable model.") + raise ValueError("Not a reachable model.") self._loss = loss self._metrics = metrics @@ -258,7 +261,7 @@ def __init__( self.optimizer = tf.keras.mixed_precision.LossScaleOptimizer(self.optimizer, dynamic=True) @property - def model(self): + def main_model(self): """ Returns: The main model @@ -584,12 +587,12 @@ def fit( self.loss_container = self._loss else: self.loss_container = compile_utils.LossesContainer( - self._loss, self._loss_weights, output_names=self.model.output_names + self._loss, self._loss_weights, output_names=self.main_model.output_names ) self.metric_container = compile_utils.MetricsContainer( self._metrics, self._weighted_metrics, - output_names=self.model.output_names, + output_names=self.main_model.output_names, # from_serialized=from_serialized, ) if self._metrics or self._weighted_metrics else None @@ -611,7 +614,10 @@ def fit( if FLAGS.init_checkpoint: for (name, ckpt), init_ckpt in zip(self._checkpoints.items(), FLAGS.init_checkpoint): if init_ckpt: - latest_checkpoint = tf.train.latest_checkpoint(init_ckpt) + if tf.io.gfile.isdir(init_ckpt): + latest_checkpoint = tf.train.latest_checkpoint(init_ckpt) + else: + latest_checkpoint = init_ckpt logging.info( f'Checkpoint file {latest_checkpoint} found and restoring from initial checkpoint for {name} model.' ) @@ -636,7 +642,7 @@ def fit( callbacks, add_history=True, add_progbar=verbose != 0, - model=self.model, + model=self.main_model, verbose=verbose, epochs=self.epochs, steps=self.steps_per_epoch * self.epochs, @@ -649,7 +655,7 @@ def fit( else: opt = self.optimizer - self.model.compile( + self.main_model.compile( optimizer=opt, loss=self._loss, loss_weights=self._loss_weights, @@ -680,7 +686,7 @@ def fit( # Horovod: write logs on worker 0. verbose = 2 if is_main_process() else 0 - history = self.model.fit( + history = self.main_model.fit( train_input, epochs=self.epochs, steps_per_epoch=self.steps_per_epoch if self.steps_per_epoch else None, @@ -703,7 +709,7 @@ def run_customized_training_loop( self.current_step = self._first_steps = self.optimizer.iterations.numpy() self.first_batch = True - if not hasattr(self.model, 'optimizer'): + if not hasattr(self.main_model, 'optimizer'): raise ValueError('User should set optimizer attribute to model ' 'inside `model_fn`.') # if self.sub_model_export_name and self.sub_model is None: @@ -750,8 +756,8 @@ def run_customized_training_loop( self.first_batch = False self.on_batch_end(training_logs, steps, t0) self.on_epoch_end(epoch, self.current_step, eval_input, epoch_logs=training_logs) - if self.model.stop_training: - logging.info(f"self.model.stop_training = {self.model.stop_training}") + if self.main_model.stop_training: + logging.info(f"self.model.stop_training = {self.main_model.stop_training}") break self.callbacks.on_train_end(logs=training_logs) @@ -764,7 +770,7 @@ def run_customized_training_loop( export.export_to_checkpoint(self.manager, self.current_step) if is_main_process(): training_summary = {'total_training_steps': self.current_step} - if self.metric_container.metrics: + if self.loss_container: training_summary['train_loss'] = self._float_metric_value(self.loss_container.metrics[0]) if self.metric_container and self.metric_container.metrics: @@ -797,7 +803,7 @@ def run_customized_training_loop( dllogging.logger.log(step=(), data={"total_loss": training_summary['train_loss']}, verbosity=Verbosity.DEFAULT) dllogging.logger.log(data=results_perf, step=tuple()) - return self.model + return self.main_model def train_single_step(self, iterator, num_grad_accumulates): """Performs a distributed training step. @@ -814,7 +820,7 @@ def train_single_step(self, iterator, num_grad_accumulates): if _ == 0 or (_ + 1) % num_grad_accumulates == 0: self.step(num_grad_accumulates) if self.use_horovod and _ == 0 and self.first_batch: - hvd.broadcast_variables(self.model.variables, 0) + hvd.broadcast_variables(self.main_model.variables, 0) hvd.broadcast_variables(self.optimizer.variables(), 0) else: self._replicated_step(iterator, self.first_batch) @@ -823,15 +829,15 @@ def train_single_step(self, iterator, num_grad_accumulates): @property def trainable_variables(self): if hasattr(self.loss_container, 'trainable_variables'): - return self.model.trainable_variables + self.loss_container.trainable_variables + return self.main_model.trainable_variables + self.loss_container.trainable_variables else: - return self.model.trainable_variables + return self.main_model.trainable_variables def _replicated_step(self, inputs, first_batch=False): """Replicated training step.""" inputs, labels, sample_weight = data_adapter.unpack_x_y_sample_weight(inputs) with tf.GradientTape() as tape: - model_outputs = self.model(inputs, training=True) + model_outputs = self.main_model(inputs, training=True) loss = self.loss_container(labels, model_outputs, sample_weight=sample_weight) if self.use_horovod and not FLAGS.use_dynamic_embedding: @@ -843,7 +849,7 @@ def _replicated_step(self, inputs, first_batch=False): if self.use_horovod and first_batch: broadcast_vars = [ - var for var in self.model.variables + var for var in self.main_model.variables if (not isinstance(var, TrainableWrapper)) and (not isinstance(var, DEResourceVariable)) ] hvd.broadcast_variables(broadcast_vars, root_rank=0) @@ -861,7 +867,7 @@ def _replicated_step(self, inputs, first_batch=False): def forward(self, inputs): inputs, labels, sample_weight = data_adapter.unpack_x_y_sample_weight(inputs) with tf.GradientTape() as tape: - model_outputs = self.model(inputs, training=True) + model_outputs = self.main_model(inputs, training=True) loss = self.loss_container(labels, model_outputs, sample_weight=sample_weight) # Compute gradients @@ -897,7 +903,7 @@ def predict_step(self, iterator): def _test_step_fn(inputs): """Replicated accuracy calculation.""" inputs, labels, sample_weight = data_adapter.unpack_x_y_sample_weight(inputs) - model_outputs = self.model(inputs, training=False) + model_outputs = self.main_model(inputs, training=False) if labels is not None and self.metric_container: self.metric_container.update_state(labels, model_outputs, sample_weight=sample_weight) return model_outputs @@ -949,7 +955,7 @@ def train_steps(self, iterator, steps, num_grad_accumulates): if _ == 0 or (_ + 1) % num_grad_accumulates == 0: self.step(num_grad_accumulates) if self.use_horovod and _ == 0 and self.first_batch: - hvd.broadcast_variables(self.model.variables, 0) + hvd.broadcast_variables(self.main_model.variables, 0) hvd.broadcast_variables(self.optimizer.variables(), 0) else: for _ in tf.range(steps): diff --git a/deepray/datasets/wikicorpus_en/processing/file_utils.py b/deepray/datasets/wikicorpus_en/processing/file_utils.py index 2c9387e..6e8a7d5 100644 --- a/deepray/datasets/wikicorpus_en/processing/file_utils.py +++ b/deepray/datasets/wikicorpus_en/processing/file_utils.py @@ -497,7 +497,7 @@ def _resumable_file_manager(): # GET file object if url.startswith("s3://"): if resume_download: - logger.warn('Warning: resumable downloads are not implemented for "s3://" urls') + logging.warning('Warning: resumable downloads are not implemented for "s3://" urls') s3_get(url, temp_file, proxies=proxies) else: http_get(url, temp_file, proxies=proxies, resume_size=resume_size, user_agent=user_agent) diff --git a/deepray/utils/export/__init__.py b/deepray/utils/export/__init__.py index 48a3a10..cff1c60 100644 --- a/deepray/utils/export/__init__.py +++ b/deepray/utils/export/__init__.py @@ -1 +1 @@ -from .export import SavedModel, TFTRTModel, export_to_savedmodel, export_to_checkpoint +from .export import SavedModel, TFTRTModel, export_to_savedmodel, export_to_checkpoint, optimize_for_inference diff --git a/deepray/utils/export/export.py b/deepray/utils/export/export.py index d0b810f..24359e8 100644 --- a/deepray/utils/export/export.py +++ b/deepray/utils/export/export.py @@ -20,10 +20,12 @@ import os import re -from typing import Optional, Union, Dict, Text +import tempfile +from typing import Optional, Union, Dict, Text, List import tensorflow as tf from absl import logging, flags +from keras.engine import data_adapter from tensorflow.python.compiler.tensorrt import trt_convert as trt from tensorflow.python.saved_model import signature_constants from tensorflow.python.saved_model import tag_constants @@ -84,7 +86,7 @@ def export_to_savedmodel( savedmodel_dir: Optional[Text] = None, checkpoint_dir: Optional[Union[Text, Dict[Text, Text]]] = None, restore_model_using_load_weights: bool = False -) -> None: +) -> Text: """Export keras model for serving which does not include the optimizer. Arguments: @@ -148,11 +150,46 @@ def helper(name, _model: tf.keras.Model, _checkpoint_dir): if is_main_process(): logging.info(f"save pb model to: {_savedmodel_dir}, without optimizer & traces") + return _savedmodel_dir + if isinstance(model, dict): + ans = [] for name, _model in model.items(): - helper(name, _model, _checkpoint_dir=checkpoint_dir[name] if checkpoint_dir else None) + _dir = helper(name, _model, _checkpoint_dir=checkpoint_dir[name] if checkpoint_dir else None) + ans.append(_dir) + prefix_path = longestCommonPrefix(ans) + logging.info(f"Export multiple models to {prefix_path}*") + return prefix_path else: - helper(name="main", _model=model, _checkpoint_dir=checkpoint_dir) + return helper(name="main", _model=model, _checkpoint_dir=checkpoint_dir) + + +def optimize_for_inference( + model: Union[tf.keras.Model, Dict[Text, tf.keras.Model]], + dataset: tf.data.Dataset, + savedmodel_dir: Text, +) -> None: + x, y, z = data_adapter.unpack_x_y_sample_weight(next(iter(dataset))) + preds = model(x) + logging.info(preds) + + def helper(_model, path): + tmp_path = tempfile.mkdtemp(dir='/tmp/') + export_to_savedmodel(_model, savedmodel_dir=tmp_path) + file = os.path.join(path, "saved_model.pb") + if tf.io.gfile.exists(file): + tf.io.gfile.remove(file) + logging.info(f"Replace optimized saved_modle.pb for {file}") + tf.io.gfile.copy(os.path.join(tmp_path + "_main", "saved_model.pb"), file, overwrite=True) + else: + raise FileNotFoundError(f"{file} does not exist.") + + if isinstance(model, dict): + for name, _model in model.items(): + src = savedmodel_dir + name + helper(_model, src) + else: + helper(model, savedmodel_dir) class SavedModel: @@ -219,3 +256,16 @@ def __call__(self, x, **kwargs): def infer_step(self, x): output = self.graph_func(**x) return output + + +def longestCommonPrefix(strs: List[str]) -> str: + if not strs: + return "" + + length, count = len(strs[0]), len(strs) + for i in range(length): + c = strs[0][i] + if any(i == len(strs[j]) or strs[j][i] != c for j in range(1, count)): + return strs[0][:i] + + return strs[0] diff --git a/docker.sh b/docker.sh index 0f523ce..3a891e4 100644 --- a/docker.sh +++ b/docker.sh @@ -9,6 +9,7 @@ docker pull hailinfufu/deepray-dev:latest-py${PY_VERSION}-tf${TF_VERSION}-cu116- docker run --gpus all -it \ --rm=true \ + --name="deepray_dev" \ -w /workspaces \ --volume=dev-build:/workspaces \ --shm-size=1g \ diff --git a/modelzoo/Recommendation/criteo_ctr/optimize_for_inference.py b/modelzoo/Recommendation/criteo_ctr/optimize_for_inference.py index d2d823d..514e07b 100644 --- a/modelzoo/Recommendation/criteo_ctr/optimize_for_inference.py +++ b/modelzoo/Recommendation/criteo_ctr/optimize_for_inference.py @@ -33,6 +33,8 @@ def main(_): tf.io.gfile.remove(file) logging.info(f"Replace optimized saved_modle.pb for {file}") tf.io.gfile.copy(os.path.join(tmp_path + "_main", "saved_model.pb"), file, overwrite=True) + else: + raise FileNotFoundError(f"{file} does not exist.") if __name__ == "__main__": diff --git a/modelzoo/Recommendation/criteo_ctr/run_horovod.sh b/modelzoo/Recommendation/criteo_ctr/run_horovod.sh index 7ad78b0..2457fae 100644 --- a/modelzoo/Recommendation/criteo_ctr/run_horovod.sh +++ b/modelzoo/Recommendation/criteo_ctr/run_horovod.sh @@ -86,14 +86,5 @@ $hvd_command $nsys_command python train.py \ $use_hvd $use_fp16 $use_xla_tag set +x -# if [ $num_gpu -gt 1 ]; then -# python optimize_for_inference.py \ -# --feature_map=feature_map_small.csv \ -# --use_dynamic_embedding=True \ -# --model_dir=${RESULTS_DIR} \ -# --distribution_strategy=off \ -# $use_fp16 $use_xla_tag -# fi - # --init_checkpoint=/results/tf_tfra_training_criteo_dcn_fp32_gbs4096_231018053444/ckpt_main_model/ \ # --init_weights="/results/tf_tfra_training_criteo_dcn_fp32_gbs16384_231016072901/export_main/variables" \ diff --git a/modelzoo/Recommendation/criteo_ctr/train.py b/modelzoo/Recommendation/criteo_ctr/train.py index 4adfb95..42c7f8d 100644 --- a/modelzoo/Recommendation/criteo_ctr/train.py +++ b/modelzoo/Recommendation/criteo_ctr/train.py @@ -17,19 +17,17 @@ from __future__ import division from __future__ import print_function -import os import sys -import tempfile import tensorflow as tf -from absl import app, flags, logging +from absl import app, flags from tensorflow.keras import backend as K from tensorflow_recommenders_addons import dynamic_embedding as de from dcn_v2 import Ranking from deepray.core.base_trainer import Trainer from deepray.datasets.criteo import CriteoTsvReader -from deepray.utils.export import export_to_savedmodel +from deepray.utils.export import export_to_savedmodel, optimize_for_inference from deepray.utils.horovod_utils import is_main_process FLAGS = flags.FLAGS @@ -44,54 +42,44 @@ def main(_): train_input_fn = data_pipe(FLAGS.train_data, FLAGS.batch_size, is_training=True) trainer.fit(train_input=train_input_fn, steps_per_epoch=FLAGS.steps_per_epoch) - import numpy as np - a = { - "feature_14": - tf.constant(np.array([6394203, 7535249, 3500077, 836339, 7401745, 375123]), dtype=tf.int32), - "feature_15": - tf.constant(np.array([6394203, 7535249, 3500077, 836339, 7401745, 375123]), dtype=tf.int32), - "dense_features": - tf.constant( - np.array( - [ - [0.7361634, 0.7361634], [0.00337589, 0.00337589], [0.673707, 0.673707], [0.33169293, 0.33169293], - [0.8020003, 0.8020003], [0.18556607, 0.18556607] - ] - ), - dtype=tf.float32 - ) - } - - logging.info(model(a)) - logging.info(trainer.model(a)) - - for name in ["feature_14", "feature_15"]: - tensor = a[name] - test = model.embedding_layer[name](tensor) - logging.info(f"Embedding for {name} is {test}") - - export_to_savedmodel(trainer.model) + # import numpy as np + # a = { + # "feature_14": + # tf.constant(np.array([6394203, 7535249, 3500077, 836339, 7401745, 375123]), dtype=tf.int32), + # # "feature_15": + # # tf.constant(np.array([6394203, 7535249, 3500077, 836339, 7401745, 375123]), dtype=tf.int32), + # "dense_features": + # tf.constant( + # np.array( + # [ + # [0.7361634, 0.7361634], [0.00337589, 0.00337589], [0.673707, 0.673707], [0.33169293, 0.33169293], + # [0.8020003, 0.8020003], [0.18556607, 0.18556607] + # ] + # ), + # dtype=tf.float32 + # ) + # } + + # logging.info(model(a)) + # logging.info(trainer.model(a)) + + # for name in [ + # "feature_14", + # # "feature_15" + # ]: + # tensor = a[name] + # test = model.embedding_layer[name](tensor) + # logging.info(f"Embedding for {name} is {test}") + + savedmodel_path = export_to_savedmodel(trainer.main_model) if FLAGS.use_horovod and is_main_process(): FLAGS([sys.argv[0], "--use_horovod=False"]) # Modify the graph to a stand-alone version for inference K.clear_session() model = Ranking(interaction="cross", training=False) - test_ds = data_pipe(FLAGS.train_data, 1, is_training=True) - # If the model uses lazy build then we need to run the model once. - for x, y in test_ds.take(1): - preds = model(x) - logging.info(preds) - - tmp_path = tempfile.mkdtemp(dir='/tmp/') - src = os.path.join(FLAGS.model_dir, "export_main") - export_to_savedmodel(model, savedmodel_dir=tmp_path) - file = os.path.join(src, "saved_model.pb") - if tf.io.gfile.exists(file): - tf.io.gfile.remove(file) - logging.info(f"Replace optimized saved_modle.pb for {file}") - tf.io.gfile.copy(os.path.join(tmp_path + "_main", "saved_model.pb"), file, overwrite=True) + optimize_for_inference(model, test_ds, savedmodel_dir=savedmodel_path) if __name__ == "__main__":