Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume experiment with extra trials from last checkpoint #952

Merged
merged 6 commits into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/apis/controller/experiments/v1alpha3/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ func (exp *Experiment) IsCompleted() bool {
return exp.IsSucceeded() || exp.IsFailed()
}

func (exp *Experiment) IsCompletedReason(reason string) bool {
cond := getCondition(exp, ExperimentSucceeded)
if cond != nil && cond.Status == v1.ConditionTrue && cond.Reason == reason {
return true
}
return false
}

func (exp *Experiment) HasRunningTrials() bool {
return exp.Status.TrialsRunning != 0
}
Expand Down Expand Up @@ -131,6 +139,12 @@ func (exp *Experiment) MarkExperimentStatusRunning(reason, message string) {
exp.setCondition(ExperimentRunning, v1.ConditionTrue, reason, message)
}

func (exp *Experiment) MarkExperimentStatusRestarting(reason, message string) {
exp.removeCondition(ExperimentSucceeded)
exp.removeCondition(ExperimentFailed)
exp.setCondition(ExperimentRestarting, v1.ConditionTrue, reason, message)
}

func (exp *Experiment) MarkExperimentStatusSucceeded(reason, message string) {
currentCond := getCondition(exp, ExperimentRunning)
if currentCond != nil {
Expand Down
18 changes: 16 additions & 2 deletions pkg/controller.v1alpha3/experiment/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,22 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re
return r.updateFinalizers(instance, finalizers)
}

if instance.IsCompleted() && !instance.HasRunningTrials() {
return reconcile.Result{}, nil
if instance.IsCompleted() {
// Check if completed instance is restartable
// Experiment is restartable only if it is in succeeded state by reaching max trials
if util.IsCompletedExperimentRestartable(instance) {
// Check if max trials is reconfigured
if (instance.Spec.MaxTrialCount != nil) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if a user changes MaxTrialCount to nil (infinity), here we should also allow to restart it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

(*instance.Spec.MaxTrialCount != instance.Status.Trials) {
msg := "Experiment is restarted"
instance.MarkExperimentStatusRestarting(util.ExperimentRestartingReason, msg)
}
} else {
// If experiment is completed with no running trials, stop reconcile
if !instance.HasRunningTrials() {
return reconcile.Result{}, nil
}
}
}
if !instance.IsCreated() {
if instance.Status.StartTime == nil {
Expand Down
26 changes: 18 additions & 8 deletions pkg/controller.v1alpha3/experiment/util/status_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ import (
var log = logf.Log.WithName("experiment-status-util")

const (
ExperimentCreatedReason = "ExperimentCreated"
ExperimentRunningReason = "ExperimentRunning"
ExperimentSucceededReason = "ExperimentSucceeded"
ExperimentFailedReason = "ExperimentFailed"
ExperimentKilledReason = "ExperimentKilled"
ExperimentCreatedReason = "ExperimentCreated"
ExperimentRunningReason = "ExperimentRunning"
ExperimentRestartingReason = "ExperimentRestarting"
ExperimentGoalReachedReason = "ExperimentGoalReached"
ExperimentMaxTrialsReachedReason = "ExperimentMaxTrialsReached"
ExperimentSuggestionEndReachedReason = "ExperimentSuggestionEndReached"
ExperimentFailedReason = "ExperimentFailed"
ExperimentKilledReason = "ExperimentKilled"
)

func UpdateExperimentStatus(instance *experimentsv1alpha3.Experiment, trials *trialsv1alpha3.TrialList) error {
Expand Down Expand Up @@ -143,23 +146,23 @@ func UpdateExperimentStatusCondition(instance *experimentsv1alpha3.Experiment, i

if isObjectiveGoalReached {
msg := "Experiment has succeeded because Objective goal has reached"
instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg)
instance.MarkExperimentStatusSucceeded(ExperimentGoalReachedReason, msg)
instance.Status.CompletionTime = &now
IncreaseExperimentsSucceededCount()
return
}

if (instance.Spec.MaxTrialCount != nil) && (completedTrialsCount >= *instance.Spec.MaxTrialCount) {
msg := "Experiment has succeeded because max trial count has reached"
instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg)
instance.MarkExperimentStatusSucceeded(ExperimentMaxTrialsReachedReason, msg)
instance.Status.CompletionTime = &now
IncreaseExperimentsSucceededCount()
return
}

if getSuggestionDone && (instance.Status.TrialsPending+instance.Status.TrialsRunning) == 0 {
msg := "Experiment has succeeded because suggestion service has reached the end"
instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg)
instance.MarkExperimentStatusSucceeded(ExperimentSuggestionEndReachedReason, msg)
instance.Status.CompletionTime = &now
IncreaseExperimentsSucceededCount()
return
Expand All @@ -176,3 +179,10 @@ func UpdateExperimentStatusCondition(instance *experimentsv1alpha3.Experiment, i
msg := "Experiment is running"
instance.MarkExperimentStatusRunning(ExperimentRunningReason, msg)
}

func IsCompletedExperimentRestartable(instance *experimentsv1alpha3.Experiment) bool {
if instance.IsSucceeded() && instance.IsCompletedReason(ExperimentMaxTrialsReachedReason) {
return true
}
return false
}
19 changes: 19 additions & 0 deletions pkg/mock/v1alpha3/util/katibclient/katibclient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/util/v1alpha3/katibclient/katib_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Client interface {
GetClient() client.Client
GetExperimentList(namespace ...string) (*experimentsv1alpha3.ExperimentList, error)
CreateExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error
UpdateExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error
DeleteExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error
GetExperiment(name string, namespace ...string) (*experimentsv1alpha3.Experiment, error)
GetConfigMap(name string, namespace ...string) (map[string]string, error)
Expand Down Expand Up @@ -123,6 +124,14 @@ func (k *KatibClient) CreateExperiment(experiment *experimentsv1alpha3.Experimen
return nil
}

func (k *KatibClient) UpdateExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this method used for UI? and it seems namespace param isn't used

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about it. Shall we do this separately?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this method is not useful in your PR topic, I think we'd better remove it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. this method is currently used in resume_e2e_experiment.go script in this PR to update the experiment.


if err := k.client.Update(context.Background(), experiment); err != nil {
return err
}
return nil
}

func (k *KatibClient) DeleteExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error {

if err := k.client.Delete(context.Background(), experiment); err != nil {
Expand Down
147 changes: 147 additions & 0 deletions test/e2e/v1alpha3/resume-e2e-experiment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package main

import (
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"time"

k8syaml "k8s.io/apimachinery/pkg/util/yaml"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-runtime/pkg/client"

commonv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3"
experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3"
"github.com/kubeflow/katib/pkg/util/v1alpha3/katibclient"
)

const (
timeout = 30 * time.Minute
)

func verifyResult(exp *experimentsv1alpha3.Experiment) (*float64, error) {
if len(exp.Status.CurrentOptimalTrial.ParameterAssignments) == 0 {
return nil, fmt.Errorf("Best parameter assignments not updated in status")
}

if len(exp.Status.CurrentOptimalTrial.Observation.Metrics) == 0 {
return nil, fmt.Errorf("Best metrics not updated in status")
}

metric := exp.Status.CurrentOptimalTrial.Observation.Metrics[0]
if metric.Name != exp.Spec.Objective.ObjectiveMetricName {
return nil, fmt.Errorf("Best objective metric not updated in status")
}
return &metric.Value, nil
}

func main() {
if len(os.Args) != 2 {
log.Fatal("Experiment name is missing")
}
expName := os.Args[1]
b, err := ioutil.ReadFile(expName)
if err != nil {
log.Fatal("Error in reading file ", err)
}
exp := &experimentsv1alpha3.Experiment{}
buf := bytes.NewBufferString(string(b))
if err = k8syaml.NewYAMLOrJSONDecoder(buf, 1024).Decode(exp); err != nil {
log.Fatal("Yaml decode error ", err)
}
kclient, err := katibclient.NewClient(client.Options{})
if err != nil {
log.Fatal("NewClient for Katib failed: ", err)
}
exp, err = kclient.GetExperiment(exp.Name, exp.Namespace)
if err != nil {
log.Fatal("Get Experiment error. Experiment not created yet ", err)
}
if exp.Spec.Algorithm.AlgorithmName != "hyperband" {
// Hyperband will validate the parallel trial count,
// thus we should not change it.
var maxtrials int32 = 7
var paralleltrials int32 = 3
exp.Spec.MaxTrialCount = &maxtrials
exp.Spec.ParallelTrialCount = &paralleltrials
}
err = kclient.UpdateExperiment(exp)
if err != nil {
log.Fatal("UpdateExperiment from YAML failed: ", err)
}
endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
log.Printf("Waiting for Experiment %s to start running.", exp.Name)
exp, err = kclient.GetExperiment(exp.Name, exp.Namespace)
if err != nil {
log.Fatal("Get Experiment error ", err)
}
if exp.IsRunning() {
log.Printf("Experiment %v started running", exp.Name)
break
}
time.Sleep(5 * time.Second)
}

for time.Now().Before(endTime) {
exp, err = kclient.GetExperiment(exp.Name, exp.Namespace)
if err != nil {
log.Fatal("Get Experiment error ", err)
}
log.Printf("Waiting for Experiment %s to finish.", exp.Name)
log.Printf(`Experiment %s's trials: %d trials, %d pending trials,
%d running trials, %d killed trials, %d succeeded trials, %d failed trials.`,
exp.Name,
exp.Status.Trials, exp.Status.TrialsPending, exp.Status.TrialsRunning,
exp.Status.TrialsKilled, exp.Status.TrialsSucceeded, exp.Status.TrialsFailed)
log.Printf("Optimal Trial for Experiment %s: %v", exp.Name,
exp.Status.CurrentOptimalTrial)
log.Printf("Experiment %s's conditions: %v", exp.Name, exp.Status.Conditions)

suggestion, err := kclient.GetSuggestion(exp.Name, exp.Namespace)
if err != nil {
log.Printf("Get Suggestion error: %v", err)
} else {
log.Printf("Suggestion %s's conditions: %v", suggestion.Name,
suggestion.Status.Conditions)
log.Printf("Suggestion %s's suggestions: %v", suggestion.Name,
suggestion.Status.Suggestions)
}
if exp.IsCompleted() {
log.Printf("Experiment %v finished", exp.Name)
break
}
time.Sleep(20 * time.Second)
}

if !exp.IsCompleted() {
log.Fatal("Experiment run timed out")
}

metricVal, err := verifyResult(exp)
if err != nil {
log.Fatal(err)
}
if metricVal == nil {
log.Fatal("Metric value in CurrentOptimalTrial not populated")
}

objectiveType := exp.Spec.Objective.Type
goal := *exp.Spec.Objective.Goal
if (objectiveType == commonv1alpha3.ObjectiveTypeMinimize && *metricVal < goal) ||
(objectiveType == commonv1alpha3.ObjectiveTypeMaximize && *metricVal > goal) {
log.Print("Objective Goal reached")
} else {

if exp.Status.Trials != *exp.Spec.MaxTrialCount {
log.Fatal("All trials are not run in the experiment ", exp.Status.Trials, exp.Spec.MaxTrialCount)
}

if exp.Status.TrialsSucceeded != *exp.Spec.MaxTrialCount {
log.Fatal("All trials are not successful ", exp.Status.TrialsSucceeded, *exp.Spec.MaxTrialCount)
}
}
log.Printf("Experiment has recorded best current Optimal Trial %v", exp.Status.CurrentOptimalTrial)
}
3 changes: 2 additions & 1 deletion test/scripts/v1alpha3/check-katib-ready.sh
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ kubectl -n kubeflow get pod
cd ${GO_DIR}/test/e2e/v1alpha3

echo "Building run-e2e-experiment for e2e test cases"
go build -o run-e2e-experiment github.com/kubeflow/katib/test/e2e/v1alpha3
go build -o run-e2e-experiment ./run-e2e-experiment.go
go build -o resume-e2e-experiment ./resume-e2e-experiment.go

kubectl apply -f valid-experiment.yaml
kubectl delete -f valid-experiment.yaml
Expand Down
2 changes: 2 additions & 0 deletions test/scripts/v1alpha3/run-suggestion-random.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ cd ${GO_DIR}/test/e2e/v1alpha3
echo "Running e2e hyperopt random experiment"
export KUBECONFIG=$HOME/.kube/config
./run-e2e-experiment ../../../examples/v1alpha3/random-example.yaml
echo "Resuming the completed random experiment"
./resume-e2e-experiment ../../../examples/v1alpha3/random-example.yaml
kubectl -n kubeflow describe suggestion
kubectl -n kubeflow delete experiment random-example
kubectl describe pods
Expand Down