Skip to content

Commit

Permalink
Merge pull request #38 from deepray-AI/hotfix
Browse files Browse the repository at this point in the history
rename validation to eval
  • Loading branch information
fuhailin authored Nov 19, 2023
2 parents 43d032d + b0e2b86 commit 953c397
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 20 deletions.
8 changes: 4 additions & 4 deletions deepray/core/base_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,13 +472,13 @@ def fit(
with array inputs.
When using `tf.distribute.experimental.ParameterServerStrategy`:
* `steps_per_epoch=None` is not supported.
validation_steps: Only relevant if `validation_data` is provided and
eval_steps: Only relevant if `validation_data` is provided and
is a `tf.data` dataset. Total number of steps (batches of
samples) to draw before stopping when performing validation
at the end of every epoch. If 'validation_steps' is None,
at the end of every epoch. If 'eval_steps' is None,
validation will run until the `validation_data` dataset is
exhausted. In the case of an infinitely repeated dataset, it
will run into an infinite loop. If 'validation_steps' is
will run into an infinite loop. If 'eval_steps' is
specified and only part of the dataset will be consumed, the
evaluation will start from the beginning of the dataset at each
epoch. This ensures that the same validation samples are used
Expand Down Expand Up @@ -552,7 +552,7 @@ def fit(
and what the model expects or when the input data is empty.
"""
self.steps_per_epoch = steps_per_epoch if steps_per_epoch else -1
self.validation_steps = eval_steps
self.eval_steps = eval_steps
if FLAGS.benchmark or FLAGS.stop_steps >= 0:
if FLAGS.stop_steps >= 0:
self.steps_per_epoch = FLAGS.stop_steps
Expand Down
61 changes: 46 additions & 15 deletions deepray/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Module():

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.validation_steps = None
self.eval_steps = None

def steps_to_run(self, current_step, steps_per_epoch, steps_per_loop):
"""Calculates steps to run on device."""
Expand Down Expand Up @@ -116,7 +116,7 @@ def on_epoch_end(self, epoch, current_step, eval_input, epoch_logs=None):
if self.metric_container:
self.metric_container.reset_state()

val_logs = self.run_evaluation(eval_input, self.validation_steps)
val_logs = self.evaluate(eval_input, self.eval_steps)
val_logs = {'val_' + name: val for name, val in val_logs.items()}
epoch_logs.update(val_logs)

Expand All @@ -130,40 +130,71 @@ def on_epoch_end(self, epoch, current_step, eval_input, epoch_logs=None):
"""
self.callbacks.on_epoch_end(epoch, epoch_logs)

def run_evaluation(self, eval_input, validation_steps=None):
if validation_steps is None:
if self.validation_steps is not None:
validation_steps = self.validation_steps
def evaluate(self, eval_input: tf.data.Dataset, eval_steps: int = None):
"""Returns the loss value & metrics values for the model in test mode.
Computation is done in batches (see the `batch_size` arg.)
Args:
eval_input: Target data. Like the input data `x`, it could be either Numpy
array(s) or TensorFlow tensor(s). It should be consistent with `x`
(you cannot have Numpy inputs and tensor targets, or inversely).
If `x` is a dataset, generator or `keras.utils.Sequence` instance,
`y` should not be specified (since targets will be obtained from
the iterator/dataset).
eval_steps: Integer or `None`. Total number of steps (batches of samples)
before declaring the evaluation round finished. Ignored with the
default value of `None`. If x is a `tf.data` dataset and `steps`
is None, 'evaluate' will run until the dataset is exhausted. This
argument is not supported with array inputs.
See the discussion of `Unpacking behavior for iterator-like inputs` for
`Model.fit`.
Returns:
Scalar test loss (if the model has a single output and no metrics)
or list of scalars (if the model has multiple outputs
and/or metrics). The attribute `model.metrics_names` will give you
the display labels for the scalar outputs.
Raises:
RuntimeError: If `trainer.evaluate` is wrapped in a `tf.function`.
"""

if eval_steps is None:
if self.eval_steps is not None:
eval_steps = self.eval_steps
else:
if self.validation_steps is None:
self.validation_steps = validation_steps
"""Runs validation steps and aggregate metrics."""
"""Runs validation steps and aggregate metrics."""
if self.eval_steps is None:
self.eval_steps = eval_steps
if not isinstance(eval_input, Iterator):
eval_input = distribution_utils.make_distributed_iterator(self.strategy, eval_input)

current_step = 0
while validation_steps is None or current_step < validation_steps:
while eval_steps is None or current_step < eval_steps:
try:
t0 = time.time()
steps, _ = self.steps_to_run(current_step, validation_steps, FLAGS.steps_per_summary)
steps, _ = self.steps_to_run(current_step, eval_steps, FLAGS.steps_per_summary)
for _ in tf.range(steps):
self.forward_step(next(eval_input))
current_step += 1
elapse_time = time.time() - t0
# Updates validing logging.
if validation_steps is None:
if eval_steps is None:
training_status = 'Valid Step: %d / time=%.3f sec' % (current_step, elapse_time)
else:
training_status = 'Valid Step: %d/%d / time=%.3f sec' % (current_step, validation_steps, elapse_time)
training_status = 'Valid Step: %d/%d / time=%.3f sec' % (current_step, eval_steps, elapse_time)
for key, value in self.get_metrics_result().items():
metric_value = value.numpy().astype(float)
training_status += ' %s=%f' % (key, metric_value)
if is_main_process():
logging.info(training_status)
except (tf.errors.OutOfRangeError, StopIteration):
self.validation_steps = current_step
self.eval_steps = current_step
if is_main_process():
logging.info('Data exhausted after %d steps', current_step)
logging.info('Data exhausted after %d eval_steps', current_step)
break

return self.get_metrics_result()
Expand Down
21 changes: 20 additions & 1 deletion deepray/layers/dynamic_embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ def __init__(
**kwargs
):
super(DistributedDynamicEmbedding, self).__init__()
self.embedding_dim = embedding_dim
self.key_dtype = key_dtype
self.value_dtype = value_dtype
self.initializer = initializer
self.de_option = de_option

if de_option.device_name == "Redis":
self.emb = EmbeddingLayerRedis(
embedding_size=embedding_dim,
Expand Down Expand Up @@ -143,7 +149,6 @@ def __init__(
key_dtype=key_dtype,
value_dtype=value_dtype,
initializer=initializer,
name=name,
devices=de_option.devices,
init_capacity=de_option.init_capacity,
kv_creator=de_option.kv_creator,
Expand All @@ -154,6 +159,20 @@ def __init__(
def call(self, ids, *args, **kwargs):
return self.emb(ids)

def get_config(self):
config = super().get_config()
config.update(
{
"embedding_dim": self.embedding_dim,
"key_dtype": self.key_dtype,
"value_dtype": self.value_dtype,
"initializer": self.initializer,
"name": self.name,
"de_option": self.de_option,
}
)
return config


class CompositionalEmbedding(tf.keras.layers.Layer):
"""
Expand Down

0 comments on commit 953c397

Please sign in to comment.