Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

预测查询自定义及prometheus-adapter替代metric-adapter的实现 #520

Merged
merged 15 commits into from
Sep 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion cmd/craned/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,19 @@ func initScheme() {
}

func initMetricCollector(mgr ctrl.Manager) {
discoveryClientSet, err := discovery.NewDiscoveryClientForConfig(mgr.GetConfig())
if err != nil {
klog.Exit(err, "Unable to create discover client")
}

scaleKindResolver := scale.NewDiscoveryScaleKindResolver(discoveryClientSet)
scaleClient := scale.New(
discoveryClientSet.RESTClient(), mgr.GetRESTMapper(),
dynamic.LegacyAPIPathResolverFunc,
scaleKindResolver,
)
// register as prometheus metric collector
metrics.CustomCollectorRegister(metrics.NewTspMetricCollector(mgr.GetClient()))
metrics.CustomCollectorRegister(metrics.NewCraneMetricCollector(mgr.GetClient(), scaleClient, mgr.GetRESTMapper()))
}

func initWebhooks(mgr ctrl.Manager, opts *options.Options) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.17
require (
github.com/evanphx/json-patch v4.11.0+incompatible
github.com/go-echarts/go-echarts/v2 v2.2.4
github.com/gocrane/api v0.7.1-0.20220819080332-e4c0d60e812d
github.com/gocrane/api v0.7.1-0.20220906050113-0f331eb419b0
github.com/google/cadvisor v0.39.2
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
github.com/prometheus/client_golang v1.11.0
Expand Down
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,8 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.1.0-rc.5 h1:QOAag7FoBaBYYHRqzqkhhd8fq5RTubvI4v3Ft/gDVVQ=
github.com/gobwas/ws v1.1.0-rc.5/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0=
github.com/gocrane/api v0.6.1-0.20220809112454-68f0199a774e h1:pIocbZM7LchSMG7XBbfD9K+Im7zZtMZjVU7paVJOv6I=
github.com/gocrane/api v0.6.1-0.20220809112454-68f0199a774e/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.6.1-0.20220812033255-887f4b4e7d8b h1:ELyVltbne39izU2XaFrgJtqnhdeV+hBt+JBKooN7N4w=
github.com/gocrane/api v0.6.1-0.20220812033255-887f4b4e7d8b/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.7.0 h1:EIvr5KKHby1PXZI6wB+Ac+D2BCvc0qiK5VpHWQVwnxg=
github.com/gocrane/api v0.7.0/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.7.1-0.20220819080332-e4c0d60e812d h1:qqPrNx1AETykgX80aWAmna/eQMDVWnUdSemWlfaZUNM=
github.com/gocrane/api v0.7.1-0.20220819080332-e4c0d60e812d/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.7.1-0.20220906050113-0f331eb419b0 h1:IIHNT4bDsuBJq9JHHoQhUOrtE5Ec2Ug/Om8s8WQD8ws=
github.com/gocrane/api v0.7.1-0.20220906050113-0f331eb419b0/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down
243 changes: 122 additions & 121 deletions pkg/controller/analytics/analytics_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,159 +97,160 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

/*
func (c *Controller) analyze(ctx context.Context, analytics *analysisv1alph1.Analytics) bool {
newStatus := analytics.Status.DeepCopy()
func (c *Controller) analyze(ctx context.Context, analytics *analysisv1alph1.Analytics) bool {
newStatus := analytics.Status.DeepCopy()

identities, err := c.getIdentities(ctx, analytics)
if err != nil {
c.Recorder.Event(analytics, corev1.EventTypeNormal, "FailedSelectResource", err.Error())
msg := fmt.Sprintf("Failed to get idenitities, Analytics %s error %v", klog.KObj(analytics), err)
klog.Errorf(msg)
setReadyCondition(newStatus, metav1.ConditionFalse, "FailedSelectResource", msg)
c.UpdateStatus(ctx, analytics, newStatus)
return false
}
identities, err := c.getIdentities(ctx, analytics)
if err != nil {
c.Recorder.Event(analytics, corev1.EventTypeNormal, "FailedSelectResource", err.Error())
msg := fmt.Sprintf("Failed to get idenitities, Analytics %s error %v", klog.KObj(analytics), err)
klog.Errorf(msg)
setReadyCondition(newStatus, metav1.ConditionFalse, "FailedSelectResource", msg)
c.UpdateStatus(ctx, analytics, newStatus)
return false
}

timeNow := metav1.Now()
timeNow := metav1.Now()

// if the first mission start time is last round, reset currMissions here
currMissions := newStatus.Recommendations
if currMissions != nil && len(currMissions) > 0 {
firstMissionStartTime := currMissions[0].LastStartTime
if firstMissionStartTime.IsZero() {
currMissions = nil
} else {
planingTime := firstMissionStartTime.Add(time.Duration(*analytics.Spec.CompletionStrategy.PeriodSeconds) * time.Second)
if time.Now().After(planingTime) {
currMissions = nil // reset missions to trigger creation for missions
// if the first mission start time is last round, reset currMissions here
currMissions := newStatus.Recommendations
if currMissions != nil && len(currMissions) > 0 {
firstMissionStartTime := currMissions[0].LastStartTime
if firstMissionStartTime.IsZero() {
currMissions = nil
} else {
planingTime := firstMissionStartTime.Add(time.Duration(*analytics.Spec.CompletionStrategy.PeriodSeconds) * time.Second)
if time.Now().After(planingTime) {
currMissions = nil // reset missions to trigger creation for missions
}
}
}
}

if currMissions == nil {
// create recommendation missions for this round
for _, id := range identities {
currMissions = append(currMissions, analysisv1alph1.RecommendationMission{
TargetRef: corev1.ObjectReference{Kind: id.Kind, APIVersion: id.APIVersion, Namespace: id.Namespace, Name: id.Name},
})
if currMissions == nil {
// create recommendation missions for this round
for _, id := range identities {
currMissions = append(currMissions, analysisv1alph1.RecommendationMission{
TargetRef: corev1.ObjectReference{Kind: id.Kind, APIVersion: id.APIVersion, Namespace: id.Namespace, Name: id.Name},
})
}
}
}

var currRecommendations []*analysisv1alph1.Recommendation
labelSet := labels.Set{}
labelSet[known.AnalyticsUidLabel] = string(analytics.UID)
currRecommendations, err = c.recommLister.Recommendations(analytics.Namespace).List(labels.SelectorFromSet(labelSet))
if err != nil {
c.Recorder.Event(analytics, corev1.EventTypeNormal, "FailedSelectResource", err.Error())
msg := fmt.Sprintf("Failed to get recomendations, Analytics %s error %v", klog.KObj(analytics), err)
klog.Errorf(msg)
setReadyCondition(newStatus, metav1.ConditionFalse, "FailedSelectResource", msg)
c.UpdateStatus(ctx, analytics, newStatus)
return false
}

if klog.V(6).Enabled() {
// Print identities
for k, id := range identities {
klog.V(6).InfoS("identities", "analytics", klog.KObj(analytics), "key", k, "apiVersion", id.APIVersion, "kind", id.Kind, "namespace", id.Namespace, "name", id.Name)
var currRecommendations []*analysisv1alph1.Recommendation
labelSet := labels.Set{}
labelSet[known.AnalyticsUidLabel] = string(analytics.UID)
currRecommendations, err = c.recommLister.Recommendations(analytics.Namespace).List(labels.SelectorFromSet(labelSet))
if err != nil {
c.Recorder.Event(analytics, corev1.EventTypeNormal, "FailedSelectResource", err.Error())
msg := fmt.Sprintf("Failed to get recomendations, Analytics %s error %v", klog.KObj(analytics), err)
klog.Errorf(msg)
setReadyCondition(newStatus, metav1.ConditionFalse, "FailedSelectResource", msg)
c.UpdateStatus(ctx, analytics, newStatus)
return false
}
}

maxConcurrency := 10
executionIndex := -1
var concurrency int
for index, mission := range currMissions {
if mission.LastStartTime != nil {
continue
}
if executionIndex == -1 {
executionIndex = index
}
if concurrency < maxConcurrency {
concurrency++
if klog.V(6).Enabled() {
// Print identities
for k, id := range identities {
klog.V(6).InfoS("identities", "analytics", klog.KObj(analytics), "key", k, "apiVersion", id.APIVersion, "kind", id.Kind, "namespace", id.Namespace, "name", id.Name)
}
}
}

wg := sync.WaitGroup{}
wg.Add(concurrency)
for index := executionIndex; index < len(currMissions) && index < concurrency+executionIndex; index++ {
var existingRecommendation *analysisv1alph1.Recommendation
for _, r := range currRecommendations {
if reflect.DeepEqual(currMissions[index].TargetRef, r.Spec.TargetRef) {
existingRecommendation = r
break
maxConcurrency := 10
executionIndex := -1
var concurrency int
for index, mission := range currMissions {
if mission.LastStartTime != nil {
continue
}
if executionIndex == -1 {
executionIndex = index
}
if concurrency < maxConcurrency {
concurrency++
}
}

go c.executeMission(ctx, &wg, analytics, identities, &currMissions[index], existingRecommendation, timeNow)
}
wg := sync.WaitGroup{}
wg.Add(concurrency)
for index := executionIndex; index < len(currMissions) && index < concurrency+executionIndex; index++ {
var existingRecommendation *analysisv1alph1.Recommendation
for _, r := range currRecommendations {
if reflect.DeepEqual(currMissions[index].TargetRef, r.Spec.TargetRef) {
existingRecommendation = r
break
}
}

wg.Wait()
go c.executeMission(ctx, &wg, analytics, identities, &currMissions[index], existingRecommendation, timeNow)
}

finished := false
if executionIndex+concurrency == len(currMissions) || len(currMissions) == 0 {
finished = true
}
wg.Wait()

if finished {
newStatus.LastUpdateTime = &timeNow
finished := false
if executionIndex+concurrency == len(currMissions) || len(currMissions) == 0 {
finished = true
}

// clean orphan recommendations
for _, recommendation := range currRecommendations {
exist := false
for _, mission := range currMissions {
if recommendation.UID == mission.UID {
exist = true
break
if finished {
newStatus.LastUpdateTime = &timeNow

// clean orphan recommendations
for _, recommendation := range currRecommendations {
exist := false
for _, mission := range currMissions {
if recommendation.UID == mission.UID {
exist = true
break
}
}
}

if !exist {
err = c.Client.Delete(ctx, recommendation)
if err != nil {
klog.ErrorS(err, "Failed to delete recommendation.", "recommendation", klog.KObj(recommendation))
} else {
klog.Infof("Deleted orphan recommendation %v.", klog.KObj(recommendation))
if !exist {
err = c.Client.Delete(ctx, recommendation)
if err != nil {
klog.ErrorS(err, "Failed to delete recommendation.", "recommendation", klog.KObj(recommendation))
} else {
klog.Infof("Deleted orphan recommendation %v.", klog.KObj(recommendation))
}
}
}

}

newStatus.Recommendations = currMissions
setReadyCondition(newStatus, metav1.ConditionTrue, "AnalyticsReady", "Analytics is ready")

c.UpdateStatus(ctx, analytics, newStatus)
return finished
}

newStatus.Recommendations = currMissions
setReadyCondition(newStatus, metav1.ConditionTrue, "AnalyticsReady", "Analytics is ready")
func (c *Controller) CreateRecommendationObject(ctx context.Context, analytics *analysisv1alph1.Analytics,

c.UpdateStatus(ctx, analytics, newStatus)
return finished
}
target corev1.ObjectReference, id ObjectIdentity) *analysisv1alph1.Recommendation {

func (c *Controller) CreateRecommendationObject(ctx context.Context, analytics *analysisv1alph1.Analytics,
target corev1.ObjectReference, id ObjectIdentity) *analysisv1alph1.Recommendation {

recommendation := &analysisv1alph1.Recommendation{
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-%s-", analytics.Name, strings.ToLower(string(analytics.Spec.Type))),
Namespace: analytics.Namespace,
OwnerReferences: []metav1.OwnerReference{
*newOwnerRef(analytics),
recommendation := &analysisv1alph1.Recommendation{
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-%s-", analytics.Name, strings.ToLower(string(analytics.Spec.Type))),
Namespace: analytics.Namespace,
OwnerReferences: []metav1.OwnerReference{
*newOwnerRef(analytics),
},
Labels: id.Labels,
},
Labels: id.Labels,
},
Spec: analysisv1alph1.RecommendationSpec{
TargetRef: target,
Type: analytics.Spec.Type,
},
}
Spec: analysisv1alph1.RecommendationSpec{
TargetRef: target,
Type: analytics.Spec.Type,
},
}

if recommendation.Labels == nil {
recommendation.Labels = map[string]string{}
}
recommendation.Labels[known.AnalyticsNameLabel] = analytics.Name
recommendation.Labels[known.AnalyticsUidLabel] = string(analytics.UID)
recommendation.Labels[known.AnalyticsTypeLabel] = string(analytics.Spec.Type)
if recommendation.Labels == nil {
recommendation.Labels = map[string]string{}
}
recommendation.Labels[known.AnalyticsNameLabel] = analytics.Name
recommendation.Labels[known.AnalyticsUidLabel] = string(analytics.UID)
recommendation.Labels[known.AnalyticsTypeLabel] = string(analytics.Spec.Type)

return recommendation
}
return recommendation
}
*/
func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {
/*c.kubeClient = kubernetes.NewForConfigOrDie(mgr.GetConfig())
Expand Down
Loading