Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement for Custom CRD #1333

Merged
merged 11 commits into from
Oct 13, 2020
9 changes: 9 additions & 0 deletions pkg/apis/controller/trials/v1beta1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ func (trial *Trial) IsKilled() bool {
return hasCondition(trial, TrialKilled)
}

// IsMetricsUnavailable returns true if Trial metrics are not available
func (trial *Trial) IsMetricsUnavailable() bool {
cond := getCondition(trial, TrialSucceeded)
if cond != nil && cond.Status == v1.ConditionFalse {
return true
}
return false
}

func (trial *Trial) IsCompleted() bool {
return trial.IsSucceeded() || trial.IsFailed() || trial.IsKilled()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller.v1beta1/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ const (
TrialTemplateMetaKeyOfLabels = "Labels"

// UnavailableMetricValue is the value when metric was not reported or metric value can't be converted to float64
// This value is recorded in to DB when metrics collector can't parse objective metric from the training logs.
UnavailableMetricValue = "unavailable"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ func (g *General) ConvertExperiment(e *experimentsv1beta1.Experiment) *suggestio
func (g *General) ConvertTrials(ts []trialsv1beta1.Trial) []*suggestionapi.Trial {
trialsRes := make([]*suggestionapi.Trial, 0)
for _, t := range ts {
if t.IsMetricsUnavailable() {
continue
}
trial := &suggestionapi.Trial{
Name: t.Name,
Spec: &suggestionapi.TrialSpec{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -632,6 +633,21 @@ func newFakeTrials() []trialsv1beta1.Trial {
Conditions: fakeConditions,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "trial3-name",
Namespace: "namespace",
},
Status: trialsv1beta1.TrialStatus{
Conditions: []trialsv1beta1.TrialCondition{
{
Type: trialsv1beta1.TrialSucceeded,
Status: corev1.ConditionFalse,
Message: "Metrics are not available",
},
},
},
},
}
}

Expand Down
42 changes: 30 additions & 12 deletions pkg/controller.v1beta1/trial/trial_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package trial
import (
"context"
"fmt"
"time"

batchv1beta "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -57,6 +58,8 @@ const (

var (
log = logf.Log.WithName(ControllerName)
// errMetricsNotReported is the error when Trial job is succeeded but metrics are not reported yet
errMetricsNotReported = fmt.Errorf("Metrics are not reported yet")
)

// Add creates a new Trial Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
Expand Down Expand Up @@ -213,6 +216,11 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result,
} else {
err := r.reconcileTrial(instance)
if err != nil {
if err == errMetricsNotReported {
return reconcile.Result{
RequeueAfter: time.Second * 1,
}, nil
}
logger.Error(err, "Reconcile trial error")
r.recorder.Eventf(instance,
corev1.EventTypeWarning, ReconcileFailedReason,
Expand All @@ -225,7 +233,7 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result,
//assuming that only status change
err = r.updateStatusHandler(instance)
if err != nil {
logger.Info("Update trial instance status failed, reconciler requeued", "err", err)
logger.Info("Update trial instance status failed, reconcile requeued", "err", err)
return reconcile.Result{
Requeue: true,
}, nil
Expand Down Expand Up @@ -254,7 +262,7 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error {
// Job already exists
// TODO Can desired Spec differ from deployedSpec?
if deployedJob != nil {
if instance.Spec.SuccessCondition != "" && instance.Spec.FailureCondition != "" {
if instance.Spec.SuccessCondition != "" && instance.Spec.FailureCondition != "" && !instance.IsCompleted() {
jobStatus, err := trialutil.GetDeployedJobStatus(instance, deployedJob)
if err != nil {
logger.Error(err, "GetDeployedJobStatus error")
Expand All @@ -270,14 +278,19 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error {
return err
}
}
// If observation is empty metrics collector doesn't finish
if jobStatus.Condition == trialutil.JobSucceeded && instance.Status.Observation == nil {
logger.Info("Trial job is succeeded but metrics are not reported, reconcile requeued")
return errMetricsNotReported
}

// Update Trial job status only
// if job has succeeded and if observation field is available.
// if job has failed
// This will ensure that trial is set to be complete only if metric is collected at least once
r.UpdateTrialStatusCondition(instance, deployedJob.GetName(), jobStatus)

} else {
} else if instance.Spec.SuccessCondition == "" && instance.Spec.FailureCondition == "" {
// TODO (andreyvelich): This can be deleted after switch to custom CRD
kind := deployedJob.GetKind()
jobProvider, err := jobv1beta1.New(kind)
Expand Down Expand Up @@ -319,6 +332,7 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1beta1.Trial, desiredJob
gvk := schema.FromAPIVersionAndKind(apiVersion, kind)

// Add annotation to desired Job to disable istio sidecar
// TODO (andreyvelich): Can be removed after custom CRD implementation
err = util.TrainingJobAnnotations(desiredJob)
if err != nil {
logger.Error(err, "TrainingJobAnnotations error")
Expand All @@ -333,15 +347,19 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1beta1.Trial, desiredJob
if instance.IsCompleted() {
return nil, nil
}
jobProvider, err := jobv1beta1.New(desiredJob.GetKind())
if err != nil {
return nil, err
}
// mutate desiredJob according to provider
if err := jobProvider.MutateJob(instance, desiredJob); err != nil {
logger.Error(err, "Mutating desiredSpec of km.Training error")
return nil, err
}

// TODO (andreyvelich): Mutate job needs to be refactored (ref: https://github.com/kubeflow/katib/issues/1320)
// Currently, commented since we don't do Mutate Job for SupportedJobList
// jobProvider, err := jobv1beta1.New(desiredJob.GetKind())
// if err != nil {
// return nil, err
// }
// // mutate desiredJob according to provider
// if err := jobProvider.MutateJob(instance, desiredJob); err != nil {
// logger.Error(err, "Mutating desiredSpec of km.Training error")
// return nil, err
// }

logger.Info("Creating Job", "kind", kind,
"name", desiredJob.GetName())
err = r.Create(context.TODO(), desiredJob)
Expand Down
22 changes: 13 additions & 9 deletions pkg/controller.v1beta1/trial/trial_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

commonv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1"
Expand All @@ -40,11 +41,12 @@ const (

// UpdateTrialStatusCondition updates Trial status from current deployed Job status
func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Trial, deployedJobName string, jobStatus *trialutil.TrialJobStatus) {
logger := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()})

timeNow := metav1.Now()

if jobStatus.Condition == trialutil.JobSucceeded {
if isTrialObservationAvailable(instance) {
if isTrialObservationAvailable(instance) && !instance.IsSucceeded() {
msg := "Trial has succeeded "
reason := TrialSucceededReason

Expand All @@ -56,14 +58,16 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria
reason = fmt.Sprintf("%v. Job reason: %v", reason, jobStatus.Reason)
}

logger.Info("Trial status changed to Succeeded")
instance.MarkTrialStatusSucceeded(corev1.ConditionTrue, reason, msg)
instance.Status.CompletionTime = &timeNow

eventMsg := fmt.Sprintf("Job %v has succeeded", deployedJobName)
r.recorder.Eventf(instance, corev1.EventTypeNormal, JobSucceededReason, eventMsg)
r.collector.IncreaseTrialsSucceededCount(instance.Namespace)
} else {
// TODO (andreyvelich): Is is correct to mark succeeded status false when metrics are unavailable?
} else if !instance.IsMetricsUnavailable() {
// TODO (andreyvelich): Is it correct to mark succeeded status false when metrics are unavailable?
// Ref issue to add new condition: https://github.com/kubeflow/katib/issues/1343
msg := "Metrics are not available"
reason := TrialMetricsUnavailableReason

Expand All @@ -75,12 +79,13 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria
reason = fmt.Sprintf("%v. Job reason: %v", reason, jobStatus.Reason)
}

logger.Info("Trial status changed to Metrics Unavailable")
instance.MarkTrialStatusSucceeded(corev1.ConditionFalse, reason, msg)

eventMsg := fmt.Sprintf("Metrics are not available for Job %v", deployedJobName)
r.recorder.Eventf(instance, corev1.EventTypeWarning, JobMetricsUnavailableReason, eventMsg)
}
} else if jobStatus.Condition == trialutil.JobFailed {
} else if jobStatus.Condition == trialutil.JobFailed && !instance.IsFailed() {
msg := "Trial has failed"
reason := TrialFailedReason

Expand All @@ -102,12 +107,14 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria

r.recorder.Eventf(instance, corev1.EventTypeNormal, JobFailedReason, eventMsg)
r.collector.IncreaseTrialsFailedCount(instance.Namespace)
} else if jobStatus.Condition == trialutil.JobRunning {
logger.Info("Trial status changed to Failed")
} else if jobStatus.Condition == trialutil.JobRunning && !instance.IsRunning() {
msg := "Trial is running"
instance.MarkTrialStatusRunning(TrialRunningReason, msg)

eventMsg := fmt.Sprintf("Job %v is running", deployedJobName)
r.recorder.Eventf(instance, corev1.EventTypeNormal, JobRunningReason, eventMsg)
logger.Info("Trial status changed to Running")
// TODO(gaocegege): Should we maintain a TrialsRunningCount?
}
// else nothing to do
Expand Down Expand Up @@ -168,7 +175,7 @@ func (r *ReconcileTrial) UpdateTrialStatusObservation(instance *trialsv1beta1.Tr
return err
}
metricStrategies := instance.Spec.Objective.MetricStrategies
if reply.ObservationLog != nil {
if len(reply.ObservationLog.MetricLogs) != 0 {
observation, err := getMetrics(reply.ObservationLog.MetricLogs, metricStrategies)
if err != nil {
log.Error(err, "Get metrics from logs error")
Expand Down Expand Up @@ -203,9 +210,6 @@ func (r *ReconcileTrial) updateFinalizers(instance *trialsv1beta1.Trial, finaliz
}

func isTrialObservationAvailable(instance *trialsv1beta1.Trial) bool {
if instance == nil {
return false
}
objectiveMetricName := instance.Spec.Objective.ObjectiveMetricName
if instance.Status.Observation != nil && instance.Status.Observation.Metrics != nil {
for _, metric := range instance.Status.Observation.Metrics {
Expand Down
9 changes: 2 additions & 7 deletions pkg/controller.v1beta1/trial/util/job_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"

trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
Expand Down Expand Up @@ -47,7 +46,6 @@ var (

// GetDeployedJobStatus returns internal representation for deployed Job status.
func GetDeployedJobStatus(trial *trialsv1beta1.Trial, deployedJob *unstructured.Unstructured) (*TrialJobStatus, error) {
logger := log.WithValues("Trial", types.NamespacedName{Name: trial.GetName(), Namespace: trial.GetNamespace()})

trialJobStatus := &TrialJobStatus{}

Expand Down Expand Up @@ -75,7 +73,6 @@ func GetDeployedJobStatus(trial *trialsv1beta1.Trial, deployedJob *unstructured.

// Job condition is failed
trialJobStatus.Condition = JobFailed
logger.Info("Deployed Job status is failed", "Job", deployedJob.GetName())
return trialJobStatus, nil
}

Expand All @@ -97,15 +94,13 @@ func GetDeployedJobStatus(trial *trialsv1beta1.Trial, deployedJob *unstructured.

// Job condition is succeeded
trialJobStatus.Condition = JobSucceeded
logger.Info("Deployed Job status is succeeded", "Job", deployedJob.GetName())
return trialJobStatus, nil
}

// Set default Job condition is running when Job name is generated.
// Check if Trial is not running and is not completed
if !trial.IsRunning() && deployedJob.GetName() != "" && !trial.IsCompleted() {
// Check if Trial is not running
if !trial.IsRunning() && deployedJob.GetName() != "" {
trialJobStatus.Condition = JobRunning
logger.Info("Deployed Job status is running", "Job", deployedJob.GetName())
return trialJobStatus, nil
}

Expand Down
10 changes: 1 addition & 9 deletions pkg/controller.v1beta1/util/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@ limitations under the License.
package util

import (
"fmt"

batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"

suggestionsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1beta1"
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
jobv1beta1 "github.com/kubeflow/katib/pkg/job/v1beta1"
pytorchv1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1"
tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"
)
Expand Down Expand Up @@ -102,13 +99,8 @@ func TrainingJobAnnotations(desiredJob *unstructured.Unstructured) error {
}
return nil
default:
// Annotation appending of custom job can be done in Provider.MutateJob.
if _, ok := jobv1beta1.SupportedJobList[kind]; ok {
return nil
}
return fmt.Errorf("Invalid Katib Training Job kind %v", kind)
return nil
}

}

func appendAnnotation(annotations map[string]string, newAnnotationName string, newAnnotationValue string) map[string]string {
Expand Down
38 changes: 20 additions & 18 deletions pkg/db/v1beta1/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ func NewDBInterface() (common.KatibDBInterface, error) {
}

func (d *dbConn) RegisterObservationLog(trialName string, observationLog *v1beta1.ObservationLog) error {
var mname, mvalue string
sqlQuery := "INSERT INTO observation_logs (trial_name, time, metric_name, value) VALUES "
values := []interface{}{}

for _, mlog := range observationLog.MetricLogs {
mname = mlog.Metric.Name
mvalue = mlog.Metric.Value
if mlog.TimeStamp == "" {
continue
}
Expand All @@ -104,22 +104,24 @@ func (d *dbConn) RegisterObservationLog(trialName string, observationLog *v1beta
return fmt.Errorf("Error parsing start time %s: %v", mlog.TimeStamp, err)
}
sqlTimeStr := t.UTC().Format(mysqlTimeFmt)
_, err = d.db.Exec(
`INSERT INTO observation_logs (
trial_name,
time,
metric_name,
value
) VALUES (?, ?, ?, ?)`,
trialName,
sqlTimeStr,
mname,
mvalue,
)
if err != nil {
return err
}

sqlQuery += "(?, ?, ?, ?),"
values = append(values, trialName, sqlTimeStr, mlog.Metric.Name, mlog.Metric.Value)
}
sqlQuery = sqlQuery[0 : len(sqlQuery)-1]

// Prepare the statement
stmt, err := d.db.Prepare(sqlQuery)
if err != nil {
return fmt.Errorf("Pepare SQL statement failed: %v", err)
}

// Execute INSERT
_, err = stmt.Exec(values...)
if err != nil {
return fmt.Errorf("Execute SQL INSERT failed: %v", err)
}

return nil
}

Expand Down
Loading