diff --git a/pkg/controller/studyjob/studyjob_controller.go b/pkg/controller/studyjob/studyjob_controller.go index eb6bc8e5ecc..1f70f2ca772 100644 --- a/pkg/controller/studyjob/studyjob_controller.go +++ b/pkg/controller/studyjob/studyjob_controller.go @@ -348,28 +348,23 @@ func (r *ReconcileStudyJobController) updateWorker(c katibapi.ManagerClient, ins if ctime != nil && cjob.Status.LastScheduleTime != nil { if ctime.Before(cjob.Status.LastScheduleTime) && len(cjob.Status.Active) == 0 { saveModel(c, instance.Status.StudyID, instance.Status.Trials[i].TrialID, wid) - instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionCompleted - instance.Status.Trials[i].WorkerList[j].CompletionTime = metav1.Now() update = true - _, err := c.UpdateWorkerState( - context.Background(), - &katibapi.UpdateWorkerStateRequest{ - WorkerId: instance.Status.Trials[i].WorkerList[j].WorkerID, - Status: katibapi.State_COMPLETED, - }) - if err != nil { - log.Printf("Fail to update worker info. ID %s", instance.Status.Trials[i].WorkerList[j].WorkerID) - return false, err - } susp := true cjob.Spec.Suspend = &susp if err := r.Update(context.TODO(), cjob); err != nil { return false, err } - - cwids = append(cwids, wid) } } + } else { + // for some reason, metricsCollector for this worker cannot be found (deleted by anyone accidentally or even failed to be created) + update = true + instance.Status.Condition = katibv1alpha1.ConditionFailed + } + if update { + instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionCompleted + instance.Status.Trials[i].WorkerList[j].CompletionTime = metav1.Now() + cwids = append(cwids, wid) } case katibapi.State_RUNNING: if instance.Status.Trials[i].WorkerList[j].Condition != katibv1alpha1.ConditionRunning { @@ -377,28 +372,23 @@ func (r *ReconcileStudyJobController) updateWorker(c katibapi.ManagerClient, ins update = true } if errors.IsNotFound(cjoberr) { - r.spawnMetricsCollector(instance, c, instance.Status.StudyID, instance.Status.Trials[i].TrialID, wid, ns, instance.Spec.MetricsCollectorSpec) - } - _, err := c.UpdateWorkerState( - context.Background(), - &katibapi.UpdateWorkerStateRequest{ - WorkerId: instance.Status.Trials[i].WorkerList[j].WorkerID, - Status: katibapi.State_RUNNING, - }) - if err != nil { - log.Printf("Fail to update worker info. ID %s", instance.Status.Trials[i].WorkerList[j].WorkerID) - return false, err + spawnErr := r.spawnMetricsCollector(instance, c, instance.Status.StudyID, instance.Status.Trials[i].TrialID, wid, ns, instance.Spec.MetricsCollectorSpec) + if spawnErr != nil { + instance.Status.Condition = katibv1alpha1.ConditionFailed + } } case katibapi.State_ERROR: if instance.Status.Trials[i].WorkerList[j].Condition != katibv1alpha1.ConditionFailed { instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionFailed update = true } + } + if update { _, err := c.UpdateWorkerState( context.Background(), &katibapi.UpdateWorkerStateRequest{ WorkerId: instance.Status.Trials[i].WorkerList[j].WorkerID, - Status: katibapi.State_ERROR, + Status: status.WorkerState, }) if err != nil { log.Printf("Fail to update worker info. ID %s", instance.Status.Trials[i].WorkerList[j].WorkerID) @@ -613,17 +603,14 @@ func (r *ReconcileStudyJobController) spawnWorker(instance *katibv1alpha1.StudyJ BUFSIZE := 1024 job := createWorkerJobObj(wkind) if err := k8syaml.NewYAMLOrJSONDecoder(wm, BUFSIZE).Decode(job); err != nil { - instance.Status.Condition = katibv1alpha1.ConditionFailed log.Printf("Yaml decode error %v", err) return "", err } if err := controllerutil.SetControllerReference(instance, job.(metav1.Object), r.scheme); err != nil { - instance.Status.Condition = katibv1alpha1.ConditionFailed log.Printf("SetControllerReference error %v", err) return "", err } if err := r.Create(context.TODO(), job); err != nil { - instance.Status.Condition = katibv1alpha1.ConditionFailed log.Printf("Job Create error %v", err) return "", err } @@ -637,7 +624,6 @@ func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alp wkind, err := getWorkerKind(instance.Spec.WorkerSpec) if err != nil { log.Printf("getWorkerKind error %v", err) - instance.Status.Condition = katibv1alpha1.ConditionFailed return err } mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind, namespace, mcs) @@ -647,19 +633,16 @@ func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alp } if err := k8syaml.NewYAMLOrJSONDecoder(mcm, BUFSIZE).Decode(&mcjob); err != nil { - instance.Status.Condition = katibv1alpha1.ConditionFailed log.Printf("MetricsCollector Yaml decode error %v", err) return err } if err := controllerutil.SetControllerReference(instance, &mcjob, r.scheme); err != nil { - instance.Status.Condition = katibv1alpha1.ConditionFailed log.Printf("MetricsCollector SetControllerReference error %v", err) return err } if err := r.Create(context.TODO(), &mcjob); err != nil { - instance.Status.Condition = katibv1alpha1.ConditionFailed log.Printf("MetricsCollector Job Create error %v", err) return err }