Skip to content

Commit

Permalink
feat(suggestion): Add suggestion controller (kubeflow#7)
Browse files Browse the repository at this point in the history
* Suggestion-0.1: Add suggestion controller

* Suggestion-0.1: Add controller basic logic, w/o logger or recorder, w/o cluster test

* Suggestion-0.1: Add logger or recorder, w/o cluster test

* Suggestion-0.1: Test in cluster, with CRD creation and deployment deletion
  • Loading branch information
anchovYu authored and caicloud-bot committed May 15, 2019
1 parent faada3d commit 5e14927
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 67 deletions.
21 changes: 21 additions & 0 deletions manifests/v1alpha3/sample/suggestion.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apiVersion: kubeflow.org/v1alpha2
kind: Suggestion
metadata:
labels:
controller-tools.k8s.io: "1.0"
name: sample
spec:
replicas: 1
type: HyperParameter
needHistory: false
template:
metadata:
name: suggestion-random
spec:
containers:
- name: suggestion-random
image: katib/suggestion-random:latest




5 changes: 5 additions & 0 deletions pkg/controller/v1alpha3/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type Recorder interface {
record.EventRecorder
ReportChange(o runtime.Object, operator, typ string)
ReportError(o runtime.Object, reason, message string)
}

// GeneralRecorder is the general-purpose implementation of Recorder.
Expand All @@ -25,6 +26,10 @@ func (g *GeneralRecorder) ReportChange(o runtime.Object, operator, typ string) {
g.EventRecorder.Event(o, corev1.EventTypeNormal, "SuccessfulChange", msg)
}

func (g *GeneralRecorder) ReportError(o runtime.Object, reason, message string) {
g.EventRecorder.Event(o, corev1.EventTypeWarning, reason, message)
}

// New returns a new Recorder.
func New(r record.EventRecorder) Recorder {
return &GeneralRecorder{
Expand Down
61 changes: 61 additions & 0 deletions pkg/controller/v1alpha3/suggestion/clientset/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package clientset

import (
"context"

suggestionv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/suggestion/v1alpha2"
"github.com/kubeflow/katib/pkg/controller/v1alpha3/recorder"
"github.com/kubeflow/katib/pkg/controller/v1alpha3/util"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
)

const (
clientsetName = "suggestion-deployment-clientset"
)

var log = logf.Log.WithName(clientsetName)

type DeploymentClient struct {
client.Client
recorder.Recorder
}

func New(c client.Client, r recorder.Recorder) DeploymentClient {
return DeploymentClient{
Client: c,
Recorder: r,
}
}

func (dc *DeploymentClient) CreateOrUpdateDeployment(suggestion *suggestionv1alpha2.Suggestion, desired *appsv1.Deployment) error {
found := &appsv1.Deployment{}
err := dc.Get(context.TODO(), types.NamespacedName{
Name: desired.Name,
Namespace: desired.Namespace,
}, found)
if err != nil && errors.IsNotFound(err) {
log.Info("Creating Deployment", "namespace", desired.Namespace, "name", desired.Name)
if err = dc.Create(context.TODO(), desired); err != nil {
return err
}
dc.ReportChange(suggestion, util.FlagCreate, util.TypeDeployment)
} else if err != nil {
return err
}

// TODO(anchovYu): check and update
// if !reflect.DeepEqual(desired.Spec, found.Spec) {
// // found.Spec = desired.Spec
// log.Info("Updating Deployment", "namespace", desired.Namespace, "name", desired.Name)
// if err = dc.Update(context.TODO(), desired); err != nil {
// return err
// }
// dc.ReportChange(suggestion, util.FlagUpdate, util.TypeDeployment)
// }
desired.Status = found.Status
return nil
}
55 changes: 55 additions & 0 deletions pkg/controller/v1alpha3/suggestion/condition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package suggestion

import (
suggestionv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/suggestion/v1alpha2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func createOrUpdateCondition(suggestionStatus *suggestionv1alpha2.SuggestionStatus,
conditionType suggestionv1alpha2.SuggestionConditionType,
conditionStatus corev1.ConditionStatus) {
createOrUpdateConditionWithReason(suggestionStatus, conditionType, conditionStatus, "", "")
}

func createOrUpdateConditionWithReason(suggestionStatus *suggestionv1alpha2.SuggestionStatus,
conditionType suggestionv1alpha2.SuggestionConditionType,
conditionStatus corev1.ConditionStatus,
reason, message string) {
conditions := suggestionStatus.Conditions
for i, cond := range conditions {
if cond.Type == conditionType {
updateCondition(&suggestionStatus.Conditions[i], conditionStatus, reason, message)
return
}
}
c := createCondition(conditionType, conditionStatus, reason, message)
suggestionStatus.Conditions = append(suggestionStatus.Conditions, c)

}

func createCondition(conditionType suggestionv1alpha2.SuggestionConditionType,
status corev1.ConditionStatus,
reason string,
message string) suggestionv1alpha2.SuggestionCondition {
return suggestionv1alpha2.SuggestionCondition{
Type: conditionType,
Status: status,
Reason: reason,
Message: message,
LastUpdateTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
}
}

func updateCondition(condition *suggestionv1alpha2.SuggestionCondition,
status corev1.ConditionStatus, reason string, message string) {
if condition.Status != status {
condition.LastTransitionTime = metav1.Now()
}
condition.Status = status
condition.Reason = reason
condition.Message = message
condition.LastUpdateTime = metav1.Now()

}
20 changes: 20 additions & 0 deletions pkg/controller/v1alpha3/suggestion/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package suggestion

import (
"fmt"

suggestionv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/suggestion/v1alpha2"
corev1 "k8s.io/api/core/v1"
)

func (r *ReconcileSuggestion) reportChange(s *suggestionv1alpha2.Suggestion, operator, typ string) {
msg := fmt.Sprintf("%s the %s", operator, typ)
log.Info(msg)
r.Recorder.ReportChange(s, operator, typ)
}

func (r *ReconcileSuggestion) reportError(s *suggestionv1alpha2.Suggestion, err error, reason, msg string) {
log.Error(err, msg)
r.Recorder.ReportError(s, reason, err.Error())
createOrUpdateConditionWithReason(&s.Status, suggestionv1alpha2.SuggestionDeploymentAvailable, corev1.ConditionFalse, reason, msg)
}
52 changes: 52 additions & 0 deletions pkg/controller/v1alpha3/suggestion/handle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package suggestion

import (
suggestionsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/suggestion/v1alpha2"
"github.com/kubeflow/katib/pkg/controller/v1alpha3/util"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func (r *ReconcileSuggestion) handle(suggestion *suggestionsv1alpha2.Suggestion) (result reconcile.Result, err error) {
oldStatus := suggestion.Status.DeepCopy()
result = reconcile.Result{}
logger := log.WithName(types.NamespacedName{
Namespace: suggestion.Namespace,
Name: suggestion.Name,
}.String())

defer func() {
err = r.updateStatusIfChanged(oldStatus, suggestion)
}()

desired, err := getDesiredDeployment(suggestion)
if err != nil {
r.reportError(suggestion, err, util.FailReason, "Failed to get desired deployment")
return result, err
}
logger.V(0).Info("OK: Get desired deployment of suggestion")

if err = controllerutil.SetControllerReference(suggestion, desired, r.scheme); err != nil {
r.reportError(suggestion, err, util.FailReason, "Failed to set controller reference")
return result, err
}
logger.V(0).Info("OK: Set controller reference between deployment and suggestion")

// if suggestion spec changes, create or update deployment
// desired deployment status is updated
if err = r.CreateOrUpdateDeployment(suggestion, desired); err != nil {
r.reportError(suggestion, err, util.FailReason, "Failed to create or update deployment of suggestion")
return result, err
}
logger.V(0).Info("OK: Create or update deployment")

// if deployment changes, sync status of suggestion
if err = r.syncStatus(&desired.Status, suggestion); err != nil {
r.reportError(suggestion, err, util.FailReason, "Failed to sync status of suggestion")
return result, err
}
logger.V(0).Info("OK: Sync status of suggestion")

return result, err
}
33 changes: 33 additions & 0 deletions pkg/controller/v1alpha3/suggestion/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package suggestion

import (
"context"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/equality"

suggestionsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/suggestion/v1alpha2"
suggestionv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/suggestion/v1alpha2"
)

func (r *ReconcileSuggestion) syncStatus(deployStatus *appsv1.DeploymentStatus, suggestion *suggestionv1alpha2.Suggestion) error {
for _, cond := range deployStatus.Conditions {
if cond.Type == appsv1.DeploymentAvailable {
createOrUpdateCondition(&suggestion.Status, suggestionv1alpha2.SuggestionDeploymentAvailable, cond.Status)
} else if cond.Type == appsv1.DeploymentProgressing {
createOrUpdateCondition(&suggestion.Status, suggestionv1alpha2.SuggestionDeploymentProgressing, cond.Status)
} else if cond.Type == appsv1.DeploymentReplicaFailure {
createOrUpdateCondition(&suggestion.Status, suggestionv1alpha2.SuggestionDeploymentReplicaFailure, cond.Status)
}
}
return nil
}

func (r *ReconcileSuggestion) updateStatusIfChanged(oldStatus *suggestionv1alpha2.SuggestionStatus,
suggestion *suggestionsv1alpha2.Suggestion) error {
if !equality.Semantic.DeepEqual(oldStatus, &suggestion.Status) {
return r.Status().Update(context.TODO(), suggestion)
}
return nil

}
Loading

0 comments on commit 5e14927

Please sign in to comment.