Skip to content

Commit

Permalink
Adding events to trials (#852)
Browse files Browse the repository at this point in the history
* Adding events to trials

* Fix tests
  • Loading branch information
johnugeorge authored and k8s-ci-robot committed Oct 4, 2019
1 parent 0fadd5d commit 4e96b94
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 20 deletions.
26 changes: 20 additions & 6 deletions pkg/controller.v1alpha3/trial/trial_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package trial
import (
"bytes"
"context"
"fmt"

batchv1beta "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -30,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -44,8 +47,12 @@ import (
"github.com/kubeflow/katib/pkg/controller.v1alpha3/trial/managerclient"
)

const (
ControllerName = "trial-controller"
)

var (
log = logf.Log.WithName("trial-controller")
log = logf.Log.WithName(ControllerName)
)

/**
Expand All @@ -65,6 +72,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
ManagerClient: managerclient.New(),
recorder: mgr.GetRecorder(ControllerName),
}
r.updateStatusHandler = r.updateStatus
return r
Expand Down Expand Up @@ -127,7 +135,8 @@ var _ reconcile.Reconciler = &ReconcileTrial{}
// ReconcileTrial reconciles a Trial object
type ReconcileTrial struct {
client.Client
scheme *runtime.Scheme
scheme *runtime.Scheme
recorder record.EventRecorder

managerclient.ManagerClient
// updateStatusHandler is defined for test purpose.
Expand Down Expand Up @@ -209,14 +218,14 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha3.Trial) error {
// Job already exists
// TODO Can desired Spec differ from deployedSpec?
if deployedJob != nil {
jobConditionType, err := r.GetDeployedJobStatus(deployedJob)
jobCondition, err := r.GetDeployedJobStatus(deployedJob)
if err != nil {
logger.Error(err, "Get deployed status error")
return err
}

// Update trial observation when the job is succeeded.
if isTrialSucceeded(instance, *jobConditionType) {
if isJobSucceeded(jobCondition) {
if err = r.UpdateTrialStatusObservation(instance, deployedJob); err != nil {
logger.Error(err, "Update trial status observation error")
return err
Expand All @@ -227,8 +236,9 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha3.Trial) error {
// if job has succeded and if observation field is available.
// if job has failed
// This will ensure that trial is set to be complete only if metric is collected at least once
if isTrialComplete(instance, *jobConditionType) {
r.UpdateTrialStatusCondition(instance, *jobConditionType)
if isTrialComplete(instance, jobCondition) {
r.UpdateTrialStatusCondition(instance, deployedJob, jobCondition)

}
}
return nil
Expand Down Expand Up @@ -256,6 +266,8 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha3.Trial, desiredJob
logger.Error(err, "Create job error")
return nil, err
}
eventMsg := fmt.Sprintf("Job %s has been created", desiredJob.GetName())
r.recorder.Eventf(instance, corev1.EventTypeNormal, JobCreatedReason, eventMsg)
msg := "Trial is running"
instance.MarkTrialStatusRunning(TrialRunningReason, msg)
} else {
Expand All @@ -268,6 +280,8 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha3.Trial, desiredJob
logger.Error(err, "Delete job error")
return nil, err
} else {
eventMsg := fmt.Sprintf("Job %s has been deleted", desiredJob.GetName())
r.recorder.Eventf(instance, corev1.EventTypeNormal, JobDeletedReason, eventMsg)
return nil, nil
}
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller.v1alpha3/trial/trial_controller_consts.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package trial

const (
DefaultJobKind = "Job"
DefaultJobKind = "Job"

// For trials
TrialCreatedReason = "TrialCreated"
TrialRunningReason = "TrialRunning"
TrialSucceededReason = "TrialSucceeded"
TrialFailedReason = "TrialFailed"
TrialKilledReason = "TrialKilled"

// For Jobs
JobCreatedReason = "JobCreated"
JobDeletedReason = "JobDeleted"
JobSucceededReason = "JobSucceeded"
JobFailedReason = "JobFailed"
)
3 changes: 3 additions & 0 deletions pkg/controller.v1alpha3/trial/trial_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestCreateTFJobTrial(t *testing.T) {
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
ManagerClient: mc,
recorder: mgr.GetRecorder(ControllerName),
updateStatusHandler: func(instance *trialsv1alpha3.Trial) error {
if !instance.IsCreated() {
t.Errorf("Expected got condition created")
Expand Down Expand Up @@ -112,6 +113,7 @@ func TestReconcileTFJobTrial(t *testing.T) {
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
ManagerClient: mc,
recorder: mgr.GetRecorder(ControllerName),
}

r.updateStatusHandler = func(instance *trialsv1alpha3.Trial) error {
Expand Down Expand Up @@ -185,6 +187,7 @@ func TestReconcileCompletedTFJobTrial(t *testing.T) {
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
ManagerClient: mc,
recorder: mgr.GetRecorder(ControllerName),
}

r.updateStatusHandler = func(instance *trialsv1alpha3.Trial) error {
Expand Down
48 changes: 35 additions & 13 deletions pkg/controller.v1alpha3/trial/trial_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package trial

import (
"context"
"fmt"
"strconv"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -35,8 +37,9 @@ const (
cleanMetricsFinalizer = "clean-metrics-in-db"
)

func (r *ReconcileTrial) GetDeployedJobStatus(deployedJob *unstructured.Unstructured) (*commonv1.JobConditionType, error) {
jobConditionType := commonv1.JobRunning
func (r *ReconcileTrial) GetDeployedJobStatus(deployedJob *unstructured.Unstructured) (*commonv1.JobCondition, error) {
jobCondition := commonv1.JobCondition{}
jobCondition.Type = commonv1.JobRunning
kind := deployedJob.GetKind()
status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status")

Expand All @@ -51,10 +54,17 @@ func (r *ReconcileTrial) GetDeployedJobStatus(deployedJob *unstructured.Unstruct
log.Error(err, "Convert unstructured to status error")
return nil, err
}
if jobStatus.Active == 0 && jobStatus.Succeeded > 0 {
jobConditionType = commonv1.JobSucceeded
} else if jobStatus.Failed > 0 {
jobConditionType = commonv1.JobFailed
for _, cond := range jobStatus.Conditions {
if cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue {
jobCondition.Type = commonv1.JobSucceeded
// JobConditions message not populated when succeeded for batchv1 Job
break
}
if cond.Type == batchv1.JobFailed && cond.Status == corev1.ConditionTrue {
jobCondition.Type = commonv1.JobFailed
jobCondition.Message = cond.Message
break
}
}
default:
jobStatus := commonv1.JobStatus{}
Expand All @@ -66,28 +76,38 @@ func (r *ReconcileTrial) GetDeployedJobStatus(deployedJob *unstructured.Unstruct
}
if len(jobStatus.Conditions) > 0 {
lc := jobStatus.Conditions[len(jobStatus.Conditions)-1]
jobConditionType = lc.Type
jobCondition.Type = lc.Type
jobCondition.Message = lc.Message
}
}
} else if unerr != nil {
log.Error(unerr, "NestedFieldCopy unstructured to status error")
return nil, unerr
}
return &jobConditionType, nil
return &jobCondition, nil
}

func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1alpha3.Trial, jobCondition commonv1.JobConditionType) {
func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1alpha3.Trial, deployedJob *unstructured.Unstructured, jobCondition *commonv1.JobCondition) {
now := metav1.Now()
if jobCondition == commonv1.JobSucceeded {
jobConditionType := (*jobCondition).Type
if jobConditionType == commonv1.JobSucceeded {
msg := "Trial has succeeded"
instance.MarkTrialStatusSucceeded(TrialSucceededReason, msg)
instance.Status.CompletionTime = &now
} else if jobCondition == commonv1.JobFailed {
} else if jobConditionType == commonv1.JobFailed {
msg := "Trial has failed"
instance.MarkTrialStatusFailed(TrialFailedReason, msg)
instance.Status.CompletionTime = &now
}
//else nothing to do
if jobConditionType == commonv1.JobSucceeded {
eventMsg := fmt.Sprintf("Job %s has succeeded", deployedJob.GetName())
r.recorder.Eventf(instance, corev1.EventTypeNormal, JobSucceededReason, eventMsg)
} else if jobConditionType == commonv1.JobFailed {
jobConditionMessage := (*jobCondition).Message
eventMsg := fmt.Sprintf("Job %s has failed: %s", deployedJob.GetName(), jobConditionMessage)
r.recorder.Eventf(instance, corev1.EventTypeNormal, JobFailedReason, eventMsg)
}
return
}

Expand Down Expand Up @@ -144,7 +164,8 @@ func isTrialObservationAvailable(instance *trialsv1alpha3.Trial) bool {
return false
}

func isTrialComplete(instance *trialsv1alpha3.Trial, jobConditionType commonv1.JobConditionType) bool {
func isTrialComplete(instance *trialsv1alpha3.Trial, jobCondition *commonv1.JobCondition) bool {
jobConditionType := (*jobCondition).Type
if jobConditionType == commonv1.JobSucceeded && isTrialObservationAvailable(instance) {
return true
}
Expand All @@ -155,7 +176,8 @@ func isTrialComplete(instance *trialsv1alpha3.Trial, jobConditionType commonv1.J
return false
}

func isTrialSucceeded(instance *trialsv1alpha3.Trial, jobConditionType commonv1.JobConditionType) bool {
func isJobSucceeded(jobCondition *commonv1.JobCondition) bool {
jobConditionType := (*jobCondition).Type
if jobConditionType == commonv1.JobSucceeded {
return true
}
Expand Down

0 comments on commit 4e96b94

Please sign in to comment.