diff --git a/deepray/core/base_trainer.py b/deepray/core/base_trainer.py index aca2bcf..183e4b1 100644 --- a/deepray/core/base_trainer.py +++ b/deepray/core/base_trainer.py @@ -472,13 +472,13 @@ def fit( with array inputs. When using `tf.distribute.experimental.ParameterServerStrategy`: * `steps_per_epoch=None` is not supported. - validation_steps: Only relevant if `validation_data` is provided and + eval_steps: Only relevant if `validation_data` is provided and is a `tf.data` dataset. Total number of steps (batches of samples) to draw before stopping when performing validation - at the end of every epoch. If 'validation_steps' is None, + at the end of every epoch. If 'eval_steps' is None, validation will run until the `validation_data` dataset is exhausted. In the case of an infinitely repeated dataset, it - will run into an infinite loop. If 'validation_steps' is + will run into an infinite loop. If 'eval_steps' is specified and only part of the dataset will be consumed, the evaluation will start from the beginning of the dataset at each epoch. This ensures that the same validation samples are used @@ -552,7 +552,7 @@ def fit( and what the model expects or when the input data is empty. """ self.steps_per_epoch = steps_per_epoch if steps_per_epoch else -1 - self.validation_steps = eval_steps + self.eval_steps = eval_steps if FLAGS.benchmark or FLAGS.stop_steps >= 0: if FLAGS.stop_steps >= 0: self.steps_per_epoch = FLAGS.stop_steps diff --git a/deepray/core/module.py b/deepray/core/module.py index ccbac08..ff3b8a1 100644 --- a/deepray/core/module.py +++ b/deepray/core/module.py @@ -16,7 +16,7 @@ class Module(): def __init__(self, **kwargs): super().__init__(**kwargs) - self.validation_steps = None + self.eval_steps = None def steps_to_run(self, current_step, steps_per_epoch, steps_per_loop): """Calculates steps to run on device.""" @@ -116,7 +116,7 @@ def on_epoch_end(self, epoch, current_step, eval_input, epoch_logs=None): if self.metric_container: self.metric_container.reset_state() - val_logs = self.run_evaluation(eval_input, self.validation_steps) + val_logs = self.evaluate(eval_input, self.eval_steps) val_logs = {'val_' + name: val for name, val in val_logs.items()} epoch_logs.update(val_logs) @@ -130,40 +130,71 @@ def on_epoch_end(self, epoch, current_step, eval_input, epoch_logs=None): """ self.callbacks.on_epoch_end(epoch, epoch_logs) - def run_evaluation(self, eval_input, validation_steps=None): - if validation_steps is None: - if self.validation_steps is not None: - validation_steps = self.validation_steps + def evaluate(self, eval_input: tf.data.Dataset, eval_steps: int = None): + """Returns the loss value & metrics values for the model in test mode. + + Computation is done in batches (see the `batch_size` arg.) + + Args: + eval_input: Target data. Like the input data `x`, it could be either Numpy + array(s) or TensorFlow tensor(s). It should be consistent with `x` + (you cannot have Numpy inputs and tensor targets, or inversely). + If `x` is a dataset, generator or `keras.utils.Sequence` instance, + `y` should not be specified (since targets will be obtained from + the iterator/dataset). + eval_steps: Integer or `None`. Total number of steps (batches of samples) + before declaring the evaluation round finished. Ignored with the + default value of `None`. If x is a `tf.data` dataset and `steps` + is None, 'evaluate' will run until the dataset is exhausted. This + argument is not supported with array inputs. + + + See the discussion of `Unpacking behavior for iterator-like inputs` for + `Model.fit`. + + Returns: + Scalar test loss (if the model has a single output and no metrics) + or list of scalars (if the model has multiple outputs + and/or metrics). The attribute `model.metrics_names` will give you + the display labels for the scalar outputs. + + Raises: + RuntimeError: If `trainer.evaluate` is wrapped in a `tf.function`. + """ + + if eval_steps is None: + if self.eval_steps is not None: + eval_steps = self.eval_steps else: - if self.validation_steps is None: - self.validation_steps = validation_steps - """Runs validation steps and aggregate metrics.""" + """Runs validation steps and aggregate metrics.""" + if self.eval_steps is None: + self.eval_steps = eval_steps if not isinstance(eval_input, Iterator): eval_input = distribution_utils.make_distributed_iterator(self.strategy, eval_input) current_step = 0 - while validation_steps is None or current_step < validation_steps: + while eval_steps is None or current_step < eval_steps: try: t0 = time.time() - steps, _ = self.steps_to_run(current_step, validation_steps, FLAGS.steps_per_summary) + steps, _ = self.steps_to_run(current_step, eval_steps, FLAGS.steps_per_summary) for _ in tf.range(steps): self.forward_step(next(eval_input)) current_step += 1 elapse_time = time.time() - t0 # Updates validing logging. - if validation_steps is None: + if eval_steps is None: training_status = 'Valid Step: %d / time=%.3f sec' % (current_step, elapse_time) else: - training_status = 'Valid Step: %d/%d / time=%.3f sec' % (current_step, validation_steps, elapse_time) + training_status = 'Valid Step: %d/%d / time=%.3f sec' % (current_step, eval_steps, elapse_time) for key, value in self.get_metrics_result().items(): metric_value = value.numpy().astype(float) training_status += ' %s=%f' % (key, metric_value) if is_main_process(): logging.info(training_status) except (tf.errors.OutOfRangeError, StopIteration): - self.validation_steps = current_step + self.eval_steps = current_step if is_main_process(): - logging.info('Data exhausted after %d steps', current_step) + logging.info('Data exhausted after %d eval_steps', current_step) break return self.get_metrics_result() diff --git a/deepray/layers/dynamic_embedding.py b/deepray/layers/dynamic_embedding.py index 13ec7b1..cd57a39 100644 --- a/deepray/layers/dynamic_embedding.py +++ b/deepray/layers/dynamic_embedding.py @@ -110,6 +110,12 @@ def __init__( **kwargs ): super(DistributedDynamicEmbedding, self).__init__() + self.embedding_dim = embedding_dim + self.key_dtype = key_dtype + self.value_dtype = value_dtype + self.initializer = initializer + self.de_option = de_option + if de_option.device_name == "Redis": self.emb = EmbeddingLayerRedis( embedding_size=embedding_dim, @@ -143,7 +149,6 @@ def __init__( key_dtype=key_dtype, value_dtype=value_dtype, initializer=initializer, - name=name, devices=de_option.devices, init_capacity=de_option.init_capacity, kv_creator=de_option.kv_creator, @@ -154,6 +159,20 @@ def __init__( def call(self, ids, *args, **kwargs): return self.emb(ids) + def get_config(self): + config = super().get_config() + config.update( + { + "embedding_dim": self.embedding_dim, + "key_dtype": self.key_dtype, + "value_dtype": self.value_dtype, + "initializer": self.initializer, + "name": self.name, + "de_option": self.de_option, + } + ) + return config + class CompositionalEmbedding(tf.keras.layers.Layer): """