Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Never Resume Policy for Experiment #1184

Merged
merged 4 commits into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions examples/v1alpha3/never-resume-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
namespace: kubeflow
labels:
controller-tools.k8s.io: "1.0"
name: never-resume-example
spec:
objective:
type: maximize
goal: 0.99
objectiveMetricName: Validation-accuracy
additionalMetricNames:
- Train-accuracy
algorithm:
algorithmName: random
parallelTrialCount: 3
maxTrialCount: 12
maxFailedTrialCount: 3
resumePolicy: Never
parameters:
- name: --lr
parameterType: double
feasibleSpace:
min: "0.01"
max: "0.03"
- name: --num-layers
parameterType: int
feasibleSpace:
min: "2"
max: "5"
- name: --optimizer
parameterType: categorical
feasibleSpace:
list:
- sgd
- adam
- ftrl
trialTemplate:
goTemplate:
rawTemplate: |-
apiVersion: batch/v1
kind: Job
metadata:
name: {{.Trial}}
namespace: {{.NameSpace}}
spec:
template:
spec:
containers:
- name: {{.Trial}}
image: docker.io/kubeflowkatib/mxnet-mnist
command:
- "python3"
- "/opt/mxnet-mnist/mnist.py"
- "--batch-size=64"
{{- with .HyperParameters}}
{{- range .}}
- "{{.Name}}={{.Value}}"
{{- end}}
{{- end}}
restartPolicy: Never
11 changes: 9 additions & 2 deletions pkg/controller.v1alpha3/experiment/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,24 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re
if instance.IsCompleted() {
// Check if completed instance is restartable
// Experiment is restartable only if it is in succeeded state by reaching max trials
// And Resume Policy is LongRunning
if util.IsCompletedExperimentRestartable(instance) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this paragraph could be modified to something below

	if instance.IsCompleted() {
		needRestart := false
		// Check if completed instance is restartable
		// Experiment is restartable only if it is in succeeded state by reaching max trials
		if util.IsCompletedExperimentRestartable(instance) {
			// Check if max trials is reconfigured
			if (instance.Spec.MaxTrialCount != nil &&
				*instance.Spec.MaxTrialCount != instance.Status.Trials) ||
				(instance.Spec.MaxTrialCount == nil && instance.Status.Trials != 0) {
				msg := "Experiment is restarted"
				instance.MarkExperimentStatusRestarting(util.ExperimentRestartingReason, msg)
				needRestart = true
			}
		}
		// If experiment who doesn't need to restart is completed without running trials, stop reconcile
		if !needRestart && !instance.HasRunningTrials() {
			if instance.Spec.ResumePolicy != experimentsv1alpha3.LongRunning {
				return r.terminateSuggestion(instance)
			}
			return reconcile.Result{}, nil
		}
	}

Otherwise, terminateSuggestion won't work when corresponding experiment has been completed for reaching max trials.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sperlingxx Why terminateSuggestion won't run when experiment finishes with reaching max trials?

I believe, Reconcile loop runs with Succeeded experiment state, after reaching this step: https://github.com/kubeflow/katib/blob/master/pkg/controller.v1alpha3/experiment/util/status_util.go#L182.

I am wondering, do we actually need to check if Experiment has running trials in the controller @gaocegege @johnugeorge :
https://github.com/kubeflow/katib/blob/master/pkg/controller.v1alpha3/experiment/experiment_controller.go#L209 ?
Or we can just return reconcile.Result{}, nil without this check if Experiment is Completed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreyvelich I think util.IsCompletedExperimentRestartable will be true when experiment finishes with reaching max trials. But, for now, terminateSuggestion will only be called when util.IsCompletedExperimentRestartable is false.

Copy link
Member Author

@andreyvelich andreyvelich May 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but util.IsCompletedExperimentRestartable returns True only if ResumePolicy: LongRunning.

https://github.com/kubeflow/katib/pull/1184/files#diff-f0faa0b63a35acff95fe5e9d93d594ffR201

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreyvelich Oh, I got it!

// Check if max trials is reconfigured
if (instance.Spec.MaxTrialCount != nil &&
*instance.Spec.MaxTrialCount != instance.Status.Trials) ||
(instance.Spec.MaxTrialCount == nil && instance.Status.Trials != 0) {
logger.Info("Experiment is restarting")
msg := "Experiment is restarted"
instance.MarkExperimentStatusRestarting(util.ExperimentRestartingReason, msg)
}
} else {
if instance.Spec.ResumePolicy != experimentsv1alpha3.LongRunning {
return r.terminateSuggestion(instance)
// Terminate Suggestion after Experiment is finished if Resume Policy is Never
if instance.Spec.ResumePolicy == experimentsv1alpha3.NeverResume {
err := r.terminateSuggestion(instance)
if err != nil {
logger.Error(err, "Terminate Suggestion error")
}
return reconcile.Result{}, err
}
// If experiment is completed with no running trials, stop reconcile
if !instance.HasRunningTrials() {
Expand Down
29 changes: 14 additions & 15 deletions pkg/controller.v1alpha3/experiment/experiment_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -112,30 +113,28 @@ func (r *ReconcileExperiment) updateFinalizers(instance *experimentsv1alpha3.Exp
}
}

func (r *ReconcileExperiment) terminateSuggestion(instance *experimentsv1alpha3.Experiment) (reconcile.Result, error) {
log.Info("Start terminating original...")
func (r *ReconcileExperiment) terminateSuggestion(instance *experimentsv1alpha3.Experiment) error {
original := &suggestionsv1alpha3.Suggestion{}
err := r.Get(context.TODO(), types.NamespacedName{
Namespace: instance.Namespace,
Name: instance.Name,
}, original)
err := r.Get(context.TODO(),
types.NamespacedName{Namespace: instance.GetNamespace(), Name: instance.GetName()}, original)
if err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
return nil
}
return reconcile.Result{}, err
return err
}
if original.IsCompleted() {
return reconcile.Result{}, nil
// If Suggestion is failed or Suggestion is Succeeded, not needed to terminate Suggestion
if original.IsFailed() || original.IsSucceeded() {
return nil
}
log.Info("Start terminating suggestion")
suggestion := original.DeepCopy()
msg := "Suggestion is succeeded"
suggestion.MarkSuggestionStatusSucceeded(suggestionController.SuggestionSucceededReason, msg)
log.Info("Mark suggestion succeeded...")
log.Info("Mark suggestion succeeded")

if err := r.UpdateSuggestion(suggestion); err != nil {
return reconcile.Result{}, err
} else {
return reconcile.Result{Requeue: true}, nil
if err := r.UpdateSuggestionStatus(suggestion); err != nil {
return err
}
return nil
}
4 changes: 4 additions & 0 deletions pkg/controller.v1alpha3/experiment/suggestion/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ func (f *Fake) GetOrCreateSuggestion(instance *experimentsv1alpha3.Experiment, s
func (f *Fake) UpdateSuggestion(suggestion *suggestionsv1alpha3.Suggestion) error {
return nil
}

func (f *Fake) UpdateSuggestionStatus(suggestion *suggestionsv1alpha3.Suggestion) error {
return nil
}
9 changes: 9 additions & 0 deletions pkg/controller.v1alpha3/experiment/suggestion/suggestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var log = logf.Log.WithName("experiment-suggestion-client")
type Suggestion interface {
GetOrCreateSuggestion(instance *experimentsv1alpha3.Experiment, suggestionRequests int32) (*suggestionsv1alpha3.Suggestion, error)
UpdateSuggestion(suggestion *suggestionsv1alpha3.Suggestion) error
UpdateSuggestionStatus(suggestion *suggestionsv1alpha3.Suggestion) error
}

type General struct {
Expand Down Expand Up @@ -85,3 +86,11 @@ func (g *General) UpdateSuggestion(suggestion *suggestionsv1alpha3.Suggestion) e
}
return nil
}

func (g *General) UpdateSuggestionStatus(suggestion *suggestionsv1alpha3.Suggestion) error {
if err := g.Status().Update(context.TODO(), suggestion); err != nil {
return err
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/controller.v1alpha3/experiment/util/status_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance *
}

func IsCompletedExperimentRestartable(instance *experimentsv1alpha3.Experiment) bool {
if instance.IsSucceeded() && instance.IsCompletedReason(ExperimentMaxTrialsReachedReason) {
if instance.IsSucceeded() && instance.IsCompletedReason(ExperimentMaxTrialsReachedReason) && instance.Spec.ResumePolicy == experimentsv1alpha3.LongRunning {
return true
}
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package suggestion

import (
"context"

"github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1alpha3"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -49,11 +50,12 @@ func (r *ReconcileSuggestion) deleteDeployment(instance *v1alpha3.Suggestion) er
}
return err
}
log.Info("Deleting Suggestion Deployment", "namespace", realDeploy.Namespace, "name", realDeploy.Name)

err = r.Delete(context.TODO(), realDeploy)
if err != nil {
return err
}
log.Info("suggestion deployment %s has been deleted", realDeploy.Name)

return nil
}
Expand All @@ -71,11 +73,12 @@ func (r *ReconcileSuggestion) deleteService(instance *v1alpha3.Suggestion) error
}
return err
}
log.Info("Deleting Suggestion Service", "namespace", realService.Namespace, "name", realService.Name)

err = r.Delete(context.TODO(), realService)
if err != nil {
return err
}
log.Info("suggestion service %s has been deleted", realService.Name)

return nil
}
14 changes: 14 additions & 0 deletions pkg/mock/v1alpha3/experiment/suggestion/suggestion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions test/scripts/v1alpha3/run-never-resume.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/bin/bash

# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This shell script is used to build a cluster and create a namespace from our
# argo workflow

set -o errexit
set -o nounset
set -o pipefail

CLUSTER_NAME="${CLUSTER_NAME}"
ZONE="${GCP_ZONE}"
PROJECT="${GCP_PROJECT}"
GO_DIR=${GOPATH}/src/github.com/${REPO_OWNER}/${REPO_NAME}

echo "Activating service-account"
gcloud auth activate-service-account --key-file=${GOOGLE_APPLICATION_CREDENTIALS}

echo "Configuring kubectl"

echo "CLUSTER_NAME: ${CLUSTER_NAME}"
echo "ZONE: ${GCP_ZONE}"
echo "PROJECT: ${GCP_PROJECT}"

gcloud --project ${PROJECT} container clusters get-credentials ${CLUSTER_NAME} \
--zone ${ZONE}
kubectl config set-context $(kubectl config current-context) --namespace=default
USER=`gcloud config get-value account`

echo "All Katib components are running."
kubectl version
kubectl cluster-info
echo "Katib deployments"
kubectl -n kubeflow get deploy
echo "Katib services"
kubectl -n kubeflow get svc
echo "Katib pods"
kubectl -n kubeflow get pod

cd ${GO_DIR}/test/e2e/v1alpha3

echo "Running e2e test for never resume experiment"
export KUBECONFIG=$HOME/.kube/config
./run-e2e-experiment ../../../examples/v1alpha3/never-resume-example.yaml

kubectl -n kubeflow describe suggestion never-resume-example

kubectl -n kubeflow describe experiment never-resume-example
kubectl -n kubeflow delete experiment never-resume-example

exit 0
7 changes: 7 additions & 0 deletions test/workflows/components/workflows-v1alpha3.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@
name: "run-cmaes-e2e-tests",
template: "run-cmaes-e2e-tests",
},
{
name: "run-never-resume-e2e-tests",
template: "run-never-resume-e2e-tests",
},
],
],
},
Expand Down Expand Up @@ -396,6 +400,9 @@
$.parts(namespace, name, overrides).e2e(prow_env, bucket).buildTemplate("run-cmaes-e2e-tests", testWorkerImage, [
"test/scripts/v1alpha3/run-suggestion-cmaes.sh",
]), // run cmaes algorithm
$.parts(namespace, name, overrides).e2e(prow_env, bucket).buildTemplate("run-never-resume-e2e-tests", testWorkerImage, [
"test/scripts/v1alpha3/run-never-resume.sh",
]), // run never resume suggestion test
$.parts(namespace, name, overrides).e2e(prow_env, bucket).buildTemplate("create-pr-symlink", testWorkerImage, [
"python",
"-m",
Expand Down