diff --git a/pkg/controller/analytics/analytics_controller.go b/pkg/controller/analytics/analytics_controller.go index 53b1a8d55..8d27c57f2 100644 --- a/pkg/controller/analytics/analytics_controller.go +++ b/pkg/controller/analytics/analytics_controller.go @@ -563,10 +563,10 @@ func ConvertToRecommendationRule(analytics *analysisv1alph1.Analytics) *analysis recommendationRule.Spec.RunInterval = (time.Duration(*analytics.Spec.CompletionStrategy.PeriodSeconds) * time.Second).String() } - recommendationRule.Status = analysisv1alph1.RecommendationRuleStatus{ + /*recommendationRule.Status = analysisv1alph1.RecommendationRuleStatus{ LastUpdateTime: analytics.Status.LastUpdateTime, Recommendations: analytics.Status.Recommendations, - } + }*/ return recommendationRule } diff --git a/pkg/controller/recommendation/recommendation_rule_controller.go b/pkg/controller/recommendation/recommendation_rule_controller.go index eab54e581..2f6d8d082 100644 --- a/pkg/controller/recommendation/recommendation_rule_controller.go +++ b/pkg/controller/recommendation/recommendation_rule_controller.go @@ -3,7 +3,7 @@ package recommendation import ( "context" "fmt" - "reflect" + "sort" "strconv" "strings" "sync" @@ -121,40 +121,6 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen return false } - timeNow := metav1.Now() - - // if the first mission start time is last round, reset currMissions here - currMissions := newStatus.Recommendations - if currMissions != nil && len(currMissions) > 0 { - firstMissionStartTime := currMissions[0].LastStartTime - if firstMissionStartTime.IsZero() { - currMissions = nil - } else { - planingTime := firstMissionStartTime.Add(interval) - if time.Now().After(planingTime) { - currMissions = nil // reset missions to trigger creation for missions - } - } - } - - if currMissions == nil { - // create recommendation rule missions for this round - // every recommendation rule have multi recommender for one identity - for _, id := range identities { - for _, recommender := range recommendationRule.Spec.Recommenders { - currMissions = append(currMissions, analysisv1alph1.RecommendationMission{ - TargetRef: id.GetObjectReference(), - RecommenderRef: analysisv1alph1.Recommender{ - Name: recommender.Name, - }, - }) - } - } - - // +1 for runNumber - newStatus.RunNumber = newStatus.RunNumber + 1 - } - var currRecommendations analysisv1alph1.RecommendationList opts := []client.ListOption{ client.MatchingLabels(map[string]string{known.RecommendationRuleUidLabel: string(recommendationRule.UID)}), @@ -174,20 +140,50 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen return false } - if klog.V(6).Enabled() { - // Print identities - for k, id := range identities { - klog.V(6).InfoS("identities", "RecommendationRule", klog.KObj(recommendationRule), "key", k, "apiVersion", id.APIVersion, "kind", id.Kind, "namespace", id.Namespace, "name", id.Name) + var identitiesArray []ObjectIdentity + keys := make([]string, 0, len(identities)) + for k := range identities { + keys = append(keys, k) + } + sort.Strings(keys) // sort key to get a certain order + for _, key := range keys { + id := identities[key] + id.Recommendation = GetRecommendationFromIdentity(identities[key], currRecommendations) + identitiesArray = append(identitiesArray, id) + } + + timeNow := metav1.Now() + newRound := false + if len(identitiesArray) > 0 { + firstRecommendation := identitiesArray[0].Recommendation + firstMissionStartTime, err := utils.GetLastStartTime(firstRecommendation) + if err != nil { + newRound = true + } else { + planingTime := firstMissionStartTime.Add(interval) + now := utils.NowUTC() + if now.After(planingTime) { + newRound = true + } } } + if newRound { + // +1 for runNumber + newStatus.RunNumber = newStatus.RunNumber + 1 + } + maxConcurrency := 10 executionIndex := -1 var concurrency int - for index, mission := range currMissions { - if mission.LastStartTime != nil { - continue + for index, identity := range identitiesArray { + if identity.Recommendation != nil { + runNumber, _ := utils.GetRunNumber(identity.Recommendation) + if runNumber >= newStatus.RunNumber { + continue + } } + if executionIndex == -1 { executionIndex = index } @@ -198,22 +194,17 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen wg := sync.WaitGroup{} wg.Add(concurrency) - for index := executionIndex; index < len(currMissions) && index < concurrency+executionIndex; index++ { - var existingRecommendation *analysisv1alph1.Recommendation - for _, r := range currRecommendations.Items { - if reflect.DeepEqual(currMissions[index].TargetRef, r.Spec.TargetRef) && string(r.Spec.Type) == currMissions[index].RecommenderRef.Name { - existingRecommendation = &r - break - } + for index := executionIndex; index < len(identitiesArray) && index < concurrency+executionIndex; index++ { + if klog.V(6).Enabled() { + klog.V(6).InfoS("execute identities", "RecommendationRule", klog.KObj(recommendationRule), "target", identitiesArray[index].GetObjectReference()) } - - go executeMission(ctx, &wg, c.RecommenderMgr, c.Provider, c.PredictorMgr, recommendationRule, identities, &currMissions[index], existingRecommendation, c.Client, c.ScaleClient, c.OOMRecorder, timeNow, newStatus.RunNumber) + go executeIdentity(ctx, &wg, c.RecommenderMgr, c.Provider, c.PredictorMgr, recommendationRule, identitiesArray[index], c.Client, c.ScaleClient, c.OOMRecorder, timeNow, newStatus.RunNumber) } wg.Wait() finished := false - if executionIndex+concurrency == len(currMissions) || len(currMissions) == 0 { + if executionIndex+concurrency == len(identitiesArray) || len(identitiesArray) == 0 { finished = true } @@ -222,8 +213,8 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen // clean orphan recommendations for _, recommendation := range currRecommendations.Items { exist := false - for _, mission := range currMissions { - if recommendation.UID == mission.UID { + for _, id := range identitiesArray { + if recommendation.UID == id.Recommendation.UID { exist = true break } @@ -241,8 +232,6 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen } - newStatus.Recommendations = currMissions - updateRecommendationRuleStatus(ctx, c.Client, c.Recorder, recommendationRule, newStatus) return finished } @@ -303,15 +292,18 @@ func (c *RecommendationRuleController) getIdentities(ctx context.Context, recomm } for i := range filterdUnstructureds { - k := objRefKey(rs.Kind, rs.APIVersion, filterdUnstructureds[i].GetNamespace(), filterdUnstructureds[i].GetName()) - if _, exists := identities[k]; !exists { - identities[k] = ObjectIdentity{ - Namespace: filterdUnstructureds[i].GetNamespace(), - Name: filterdUnstructureds[i].GetName(), - Kind: rs.Kind, - APIVersion: rs.APIVersion, - Labels: filterdUnstructureds[i].GetLabels(), - Object: filterdUnstructureds[i], + for _, recommender := range recommendationRule.Spec.Recommenders { + k := objRefKey(rs.Kind, rs.APIVersion, filterdUnstructureds[i].GetNamespace(), filterdUnstructureds[i].GetName(), recommender.Name) + if _, exists := identities[k]; !exists { + identities[k] = ObjectIdentity{ + Namespace: filterdUnstructureds[i].GetNamespace(), + Name: filterdUnstructureds[i].GetName(), + Kind: rs.Kind, + APIVersion: rs.APIVersion, + Labels: filterdUnstructureds[i].GetLabels(), + Object: filterdUnstructureds[i], + Recommender: recommender.Name, + } } } } @@ -351,12 +343,14 @@ func updateRecommendationRuleStatus(ctx context.Context, c client.Client, record } type ObjectIdentity struct { - Namespace string - APIVersion string - Kind string - Name string - Labels map[string]string - Object unstructuredv1.Unstructured + Namespace string + APIVersion string + Kind string + Name string + Labels map[string]string + Recommender string + Object unstructuredv1.Unstructured + Recommendation *analysisv1alph1.Recommendation } func (id ObjectIdentity) GetObjectReference() corev1.ObjectReference { @@ -375,8 +369,22 @@ func newOwnerRef(a *analysisv1alph1.RecommendationRule) *metav1.OwnerReference { } } -func objRefKey(kind, apiVersion, namespace, name string) string { - return fmt.Sprintf("%s#%s#%s#%s", kind, apiVersion, namespace, name) +func objRefKey(kind, apiVersion, namespace, name, recommender string) string { + return fmt.Sprintf("%s#%s#%s#%s#%s", kind, apiVersion, namespace, name, recommender) +} + +func GetRecommendationFromIdentity(id ObjectIdentity, currRecommendations analysisv1alph1.RecommendationList) *analysisv1alph1.Recommendation { + for _, r := range currRecommendations.Items { + if id.Kind == r.Spec.TargetRef.Kind && + id.APIVersion == r.Spec.TargetRef.APIVersion && + id.Namespace == r.Spec.TargetRef.Namespace && + id.Name == r.Spec.TargetRef.Name && + id.Recommender == string(r.Spec.Type) { + return &r + } + } + + return nil } func CreateRecommendationObject(recommendationRule *analysisv1alph1.RecommendationRule, @@ -404,6 +412,11 @@ func CreateRecommendationObject(recommendationRule *analysisv1alph1.Recommendati }, } + recommendation.Labels = generateRecommendationLabels(recommendationRule, target, id, recommenderName) + return recommendation +} + +func generateRecommendationLabels(recommendationRule *analysisv1alph1.RecommendationRule, target corev1.ObjectReference, id ObjectIdentity, recommenderName string) map[string]string { labels := map[string]string{} labels[known.RecommendationRuleNameLabel] = recommendationRule.Name labels[known.RecommendationRuleUidLabel] = string(recommendationRule.UID) @@ -414,87 +427,82 @@ func CreateRecommendationObject(recommendationRule *analysisv1alph1.Recommendati labels[known.RecommendationRuleTargetKindLabel] = target.Kind labels[known.RecommendationRuleTargetVersionLabel] = target.GroupVersionKind().Version labels[known.RecommendationRuleTargetNameLabel] = target.Name + labels[known.RecommendationRuleTargetNamespaceLabel] = target.Namespace for k, v := range id.Labels { labels[k] = v } - recommendation.Labels = labels - return recommendation + return labels } -func executeMission(ctx context.Context, wg *sync.WaitGroup, recommenderMgr recommender.RecommenderManager, provider providers.History, predictorMgr predictormgr.Manager, - recommendationRule *analysisv1alph1.RecommendationRule, identities map[string]ObjectIdentity, mission *analysisv1alph1.RecommendationMission, - existingRecommendation *analysisv1alph1.Recommendation, client client.Client, scaleClient scale.ScalesGetter, oomRecorder oom.Recorder, timeNow metav1.Time, currentRunNumber int32) { +func executeIdentity(ctx context.Context, wg *sync.WaitGroup, recommenderMgr recommender.RecommenderManager, provider providers.History, predictorMgr predictormgr.Manager, + recommendationRule *analysisv1alph1.RecommendationRule, id ObjectIdentity, client client.Client, scaleClient scale.ScalesGetter, oomRecorder oom.Recorder, timeNow metav1.Time, currentRunNumber int32) { defer func() { - mission.LastStartTime = &timeNow - klog.Infof("Mission message: %s", mission.Message) if wg != nil { wg.Done() } }() + var message string - k := objRefKey(mission.TargetRef.Kind, mission.TargetRef.APIVersion, mission.TargetRef.Namespace, mission.TargetRef.Name) - if id, exist := identities[k]; !exist { - mission.Message = fmt.Sprintf("Failed to get identity, key %s. ", k) - return + recommendation := id.Recommendation + if recommendation == nil { + recommendation = CreateRecommendationObject(recommendationRule, id.GetObjectReference(), id, id.Recommender) } else { - recommendation := existingRecommendation - if recommendation == nil { - recommendation = CreateRecommendationObject(recommendationRule, mission.TargetRef, id, mission.RecommenderRef.Name) + // update existing recommendation's labels + for k, v := range generateRecommendationLabels(recommendationRule, id.GetObjectReference(), id, id.Recommender) { + recommendation.Labels[k] = v } + } - r, err := recommenderMgr.GetRecommenderWithRule(mission.RecommenderRef.Name, *recommendationRule) - if err != nil { - mission.Message = fmt.Sprintf("get recommender %s failed, %v", mission.RecommenderRef.Name, err) - return - } + r, err := recommenderMgr.GetRecommenderWithRule(id.Recommender, *recommendationRule) + if err != nil { + message = fmt.Sprintf("get recommender %s failed, %v", id.Recommender, err) + } else { p := make(map[providers.DataSourceType]providers.History) p[providers.PrometheusDataSource] = provider identity := framework.ObjectIdentity{ - Namespace: identities[k].Namespace, - Name: identities[k].Name, - Kind: identities[k].Kind, - APIVersion: identities[k].APIVersion, - Labels: identities[k].Labels, - Object: identities[k].Object, + Namespace: id.Namespace, + Name: id.Name, + Kind: id.Kind, + APIVersion: id.APIVersion, + Labels: id.Labels, + Object: id.Object, } recommendationContext := framework.NewRecommendationContext(ctx, identity, recommendationRule, predictorMgr, p, recommendation, client, scaleClient, oomRecorder) err = recommender.Run(&recommendationContext, r) if err != nil { - mission.Message = fmt.Sprintf("Failed to run recommendation flow in recommender %s: %s", r.Name(), err.Error()) - return - } - - recommendation.Status.LastUpdateTime = &timeNow - if recommendation.Annotations == nil { - recommendation.Annotations = map[string]string{} + message = fmt.Sprintf("Failed to run recommendation flow in recommender %s: %s", r.Name(), err.Error()) } - recommendation.Annotations[known.RunNumberAnnotation] = strconv.Itoa(int(currentRunNumber)) + } - if existingRecommendation != nil { - klog.Infof("Update recommendation %s", klog.KObj(recommendation)) - if err := client.Update(ctx, recommendation); err != nil { - mission.Message = fmt.Sprintf("Failed to create recommendation %s: %v", klog.KObj(recommendation), err) - return - } + if len(message) == 0 { + message = "Success" + } - klog.Infof("Successfully to update Recommendation %s", klog.KObj(recommendation)) - } else { - klog.Infof("Create recommendation %s", klog.KObj(recommendation)) - if err := client.Create(ctx, recommendation); err != nil { - mission.Message = fmt.Sprintf("Failed to create recommendation %s: %v", klog.KObj(recommendation), err) - return - } + recommendation.Status.LastUpdateTime = &timeNow + if recommendation.Annotations == nil { + recommendation.Annotations = map[string]string{} + } + recommendation.Annotations[known.RunNumberAnnotation] = strconv.Itoa(int(currentRunNumber)) + recommendation.Annotations[known.MessageAnnotation] = message + utils.SetLastStartTime(recommendation) + + if id.Recommendation != nil { + klog.Infof("Update recommendation %s", klog.KObj(recommendation)) + if err := client.Update(ctx, recommendation); err != nil { + klog.Errorf("Failed to create recommendation %s: %v", klog.KObj(recommendation), err) + return + } - klog.Infof("Successfully to create Recommendation %s", klog.KObj(recommendation)) + klog.Infof("Successfully to update Recommendation %s", klog.KObj(recommendation)) + } else { + klog.Infof("Create recommendation %s", klog.KObj(recommendation)) + if err := client.Create(ctx, recommendation); err != nil { + klog.Errorf("Failed to create recommendation %s: %v", klog.KObj(recommendation), err) + return } - mission.Message = "Success" - mission.UID = recommendation.UID - mission.Name = recommendation.Name - mission.Namespace = recommendation.Namespace - mission.Kind = recommendation.Kind - mission.APIVersion = recommendation.APIVersion + klog.Infof("Successfully to create Recommendation %s", klog.KObj(recommendation)) } } diff --git a/pkg/controller/recommendation/recommendation_trigger_controller.go b/pkg/controller/recommendation/recommendation_trigger_controller.go index 637053648..d6f33494a 100644 --- a/pkg/controller/recommendation/recommendation_trigger_controller.go +++ b/pkg/controller/recommendation/recommendation_trigger_controller.go @@ -81,15 +81,15 @@ func (c *RecommendationTriggerController) Reconcile(ctx context.Context, req ctr return ctrl.Result{}, fmt.Errorf("get target object for recommendation %s failed: %v", klog.KObj(recommendation), err) } - identities := map[string]ObjectIdentity{} - key := objRefKey(recommendation.Spec.TargetRef.Kind, recommendation.Spec.TargetRef.APIVersion, recommendation.Spec.TargetRef.Namespace, recommendation.Spec.TargetRef.Name) - identities[key] = ObjectIdentity{ - Namespace: object.GetNamespace(), - Name: object.GetName(), - Kind: object.GetKind(), - APIVersion: object.GetAPIVersion(), - Labels: object.GetLabels(), - Object: *object, + id := ObjectIdentity{ + Namespace: object.GetNamespace(), + Name: object.GetName(), + Kind: object.GetKind(), + APIVersion: object.GetAPIVersion(), + Labels: object.GetLabels(), + Recommender: string(recommendation.Spec.Type), + Object: *object, + Recommendation: recommendation, } newStatus := recommendationRule.Status.DeepCopy() @@ -106,7 +106,7 @@ func (c *RecommendationTriggerController) Reconcile(ctx context.Context, req ctr return ctrl.Result{}, nil } - executeMission(context.TODO(), nil, c.RecommenderMgr, c.Provider, c.PredictorMgr, recommendationRule, identities, &newStatus.Recommendations[currentMissionIndex], recommendation, c.Client, c.ScaleClient, c.OOMRecorder, metav1.Now(), newStatus.RunNumber) + executeIdentity(context.TODO(), nil, c.RecommenderMgr, c.Provider, c.PredictorMgr, recommendationRule, id, c.Client, c.ScaleClient, c.OOMRecorder, metav1.Now(), newStatus.RunNumber) if newStatus.Recommendations[currentMissionIndex].Message != "Success" { err = c.Client.Delete(context.TODO(), recommendation) if err != nil { diff --git a/pkg/known/annotation.go b/pkg/known/annotation.go index 4775eae73..7dc0e30bb 100644 --- a/pkg/known/annotation.go +++ b/pkg/known/annotation.go @@ -6,6 +6,8 @@ const ( ResourceRecommendationValueAnnotation = "analysis.crane.io/resource-recommendation" RunNumberAnnotation = "analysis.crane.io/run-number" AnalyticsConversionAnnotation = "analysis.crane.io/analytics-conversion" + LastStartTimeAnnotation = "analysis.crane.io/last-start-time" + MessageAnnotation = "analysis.crane.io/message" ) const ( diff --git a/pkg/known/label.go b/pkg/known/label.go index 2a49ffe27..d2bc705bc 100644 --- a/pkg/known/label.go +++ b/pkg/known/label.go @@ -17,10 +17,11 @@ const ( ) const ( - RecommendationRuleNameLabel = "analysis.crane.io/recommendation-rule-name" - RecommendationRuleUidLabel = "analysis.crane.io/recommendation-rule-uid" - RecommendationRuleRecommenderLabel = "analysis.crane.io/recommendation-rule-recommender" - RecommendationRuleTargetKindLabel = "analysis.crane.io/recommendation-target-kind" - RecommendationRuleTargetVersionLabel = "analysis.crane.io/recommendation-target-version" - RecommendationRuleTargetNameLabel = "analysis.crane.io/recommendation-target-name" + RecommendationRuleNameLabel = "analysis.crane.io/recommendation-rule-name" + RecommendationRuleUidLabel = "analysis.crane.io/recommendation-rule-uid" + RecommendationRuleRecommenderLabel = "analysis.crane.io/recommendation-rule-recommender" + RecommendationRuleTargetKindLabel = "analysis.crane.io/recommendation-target-kind" + RecommendationRuleTargetVersionLabel = "analysis.crane.io/recommendation-target-version" + RecommendationRuleTargetNameLabel = "analysis.crane.io/recommendation-target-name" + RecommendationRuleTargetNamespaceLabel = "analysis.crane.io/recommendation-target-namespace" ) diff --git a/pkg/utils/recommend.go b/pkg/utils/recommend.go index 585298009..1e8887127 100644 --- a/pkg/utils/recommend.go +++ b/pkg/utils/recommend.go @@ -3,6 +3,7 @@ package utils import ( "fmt" "strconv" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -66,9 +67,32 @@ func GetRunNumber(recommendation *analysisv1alpha1.Recommendation) (int32, error func GetRecommendationRuleOwnerReference(recommend *analysisv1alpha1.Recommendation) *metav1.OwnerReference { for _, ownerReference := range recommend.OwnerReferences { - if ownerReference.Kind == "RecommendationRule" { + if ownerReference.Kind == "RecommendationRule" || ownerReference.Kind == "Analytics" { return &ownerReference } } return nil } + +func GetLastStartTime(recommendation *analysisv1alpha1.Recommendation) (time.Time, error) { + if recommendation != nil && recommendation.Annotations != nil { + val, ok := recommendation.Annotations[known.LastStartTimeAnnotation] + if ok && len(val) != 0 { + return time.Parse("2006-01-02 15:04:05", val) + } + } + + return time.Now(), fmt.Errorf("get lastStartTime failed") +} + +func SetLastStartTime(recommendation *analysisv1alpha1.Recommendation) { + if recommendation != nil && recommendation.Annotations != nil { + if recommendation.Annotations == nil { + recommendation.Annotations = map[string]string{} + } + + now := time.Now() + loc, _ := time.LoadLocation("UTC") + recommendation.Annotations[known.LastStartTimeAnnotation] = now.In(loc).Format("2006-01-02 15:04:05") + } +} diff --git a/pkg/utils/time.go b/pkg/utils/time.go index d3bd6587f..48556efae 100644 --- a/pkg/utils/time.go +++ b/pkg/utils/time.go @@ -24,3 +24,9 @@ func ParseTimestamp(ts string) (time.Time, error) { } return time.Unix(i, 0), nil } + +func NowUTC() time.Time { + now := time.Now() + loc, _ := time.LoadLocation("UTC") + return now.In(loc) +}