diff --git a/globalscheduler/controllers/dispatcher/dispatcher_process.go b/globalscheduler/controllers/dispatcher/dispatcher_process.go index 8f3cdeaab..6490444df 100644 --- a/globalscheduler/controllers/dispatcher/dispatcher_process.go +++ b/globalscheduler/controllers/dispatcher/dispatcher_process.go @@ -74,7 +74,6 @@ func NewProcess(config *rest.Config, namespace string, name string, quit chan st if err != nil { klog.Fatal(err) } - return Process{ namespace: namespace, name: name, @@ -97,12 +96,11 @@ func (p *Process) Run(quit chan struct{}) { dispatcherSelector := fields.ParseSelectorOrDie("metadata.name=" + p.name) dispatcherLW := cache.NewListWatchFromClient(p.dispatcherClientset.GlobalschedulerV1(), "dispatchers", p.namespace, dispatcherSelector) - dispatcherInformer := cache.NewSharedIndexInformer(dispatcherLW, &dispatcherv1.Dispatcher{}, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) dispatcherInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { - klog.Infof("The dispatcher %s process is going to be killed...", p.name) + klog.V(3).Infof("The dispatcher %s process is going to be killed...", p.name) os.Exit(0) }, UpdateFunc: func(old, new interface{}) { @@ -234,7 +232,6 @@ func (p *Process) SendPodToCluster(pod *v1.Pod) { klog.Warningf("The pod %v failed to update its apiserver dtatbase status to failed with the error %v", pod.ObjectMeta.Name, err) } } - // util.CheckTime(pod.Name, "dispatcher", "CreatePod-End", 2) }() } } diff --git a/globalscheduler/pkg/scheduler/eventhandlers.go b/globalscheduler/pkg/scheduler/eventhandlers.go index 1ed0fae2b..ebd4ce68a 100644 --- a/globalscheduler/pkg/scheduler/eventhandlers.go +++ b/globalscheduler/pkg/scheduler/eventhandlers.go @@ -26,11 +26,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apitypes "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + //"k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog" clusterv1 "k8s.io/kubernetes/globalscheduler/pkg/apis/cluster/v1" "k8s.io/kubernetes/globalscheduler/pkg/scheduler/common/constants" "k8s.io/kubernetes/globalscheduler/pkg/scheduler/types" + "k8s.io/kubernetes/globalscheduler/pkg/scheduler/utils" "k8s.io/kubernetes/pkg/controller" statusutil "k8s.io/kubernetes/pkg/util/pod" ) @@ -113,6 +115,31 @@ func AddAllEventHandlers(sched *Scheduler) { }, }, ) + // failed pod queue + sched.PodInformer.Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *v1.Pod: + return failedToSchedule(t) && responsibleForPod(t, sched.SchedulerName) + case cache.DeletedFinalStateUnknown: + if pod, ok := t.Obj.(*v1.Pod); ok { + return failedToSchedule(pod) && responsibleForPod(pod, sched.SchedulerName) + } + utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched)) + return false + default: + utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj)) + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: sched.addPodWithdrawResource, + UpdateFunc: sched.updatePodWithdrawResource, + DeleteFunc: sched.deletePodWithdrawResource, + }, + }, + ) sched.ClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sched.addCluster, UpdateFunc: sched.updateCluster, @@ -135,10 +162,15 @@ func responsibleForPod(pod *v1.Pod, schedulerName string) bool { return schedulerName == pod.Status.AssignedScheduler.Name } +// failedToSchedule selects pods that scheduled but failed to create vm +func failedToSchedule(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodFailed +} + // addPodToCache add pod to the stack cache of the scheduler func (sched *Scheduler) addPodToCache(obj interface{}) { pod, ok := obj.(*v1.Pod) - klog.Infof("Add a pod: %v", pod) + klog.V(4).Infof("Add a pod: %v", pod.Name) if !ok { klog.Errorf("cannot convert to *v1.Pod: %v", obj) return @@ -160,7 +192,7 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { return } newPod, ok := newObj.(*v1.Pod) - klog.Infof("Update a pod: %v", newPod) + klog.V(4).Infof("Update a pod: %v", newPod) if !ok { klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj) return @@ -178,7 +210,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) { switch t := obj.(type) { case *v1.Pod: pod = t - klog.Infof("Delete a pod: %v", pod) + klog.V(4).Infof("Delete a pod: %v", pod.Name) case cache.DeletedFinalStateUnknown: var ok bool pod, ok = t.Obj.(*v1.Pod) @@ -301,15 +333,13 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { return } newPod, ok := newObj.(*v1.Pod) - klog.Infof("updatePodToSchedulingQueue : %v", newPod) + klog.V(4).Infof("updatePodToSchedulingQueue : %v", newPod) if !ok { klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj) return } - oldStack := getStackFromPod(oldPod) newStack := getStackFromPod(newPod) - if sched.skipStackUpdate(newStack) { return } @@ -323,7 +353,7 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { switch t := obj.(type) { case *v1.Pod: pod = obj.(*v1.Pod) - klog.Infof("deletePodToSchedulingQueue : %v", pod) + klog.V(4).Infof("deletePodToSchedulingQueue : %v", pod.Name) case cache.DeletedFinalStateUnknown: var ok bool pod, ok = t.Obj.(*v1.Pod) @@ -372,14 +402,14 @@ func (sched *Scheduler) skipStackUpdate(stack *types.Stack) bool { if !reflect.DeepEqual(assumedStackCopy, stackCopy) { return false } - klog.V(3).Infof("Skipping stack %s/%s/%s update", stack.Tenant, stack.PodNamespace, stack.PodName) + klog.V(4).Infof("Skipping stack %s/%s/%s update", stack.Tenant, stack.PodNamespace, stack.PodName) return true } func (sched *Scheduler) bindStacks(assumedStacks []types.Stack) { - klog.Infof("assumedStacks: %v", assumedStacks) + klog.V(4).Infof("assumedStacks: %v", assumedStacks) for _, newStack := range assumedStacks { - klog.Infof("newStack: %v", newStack) + klog.V(4).Infof("newStack: %v", newStack) clusterName := newStack.Selected.ClusterName sched.bindToSite(clusterName, &newStack) } @@ -398,7 +428,7 @@ func (sched *Scheduler) setPodScheduleErr(reqStack *types.Stack) error { newStatus := v1.PodStatus{ Phase: v1.PodNoSchedule, } - klog.Infof("Attempting to update pod status from %v to %v", pod.Status, newStatus) + klog.V(4).Infof("Attempting to update pod status from %v to %v", pod.Status, newStatus) _, _, err = statusutil.PatchPodStatus(sched.Client, reqStack.Tenant, reqStack.PodNamespace, reqStack.PodName, pod.Status, newStatus) if err != nil { klog.Warningf("PatchPodStatus for pod %q: %v", reqStack.PodName+"/"+reqStack.PodNamespace+"/"+ @@ -406,7 +436,7 @@ func (sched *Scheduler) setPodScheduleErr(reqStack *types.Stack) error { return err } - klog.Infof("Update pod status from %v to %v success", pod.Status, newStatus) + klog.V(4).Infof("Update pod status from %v to %v success", pod.Status, newStatus) return nil } @@ -423,10 +453,9 @@ func (sched *Scheduler) bindToSite(clusterName string, assumedStack *types.Stack Name: clusterName, }, } - - klog.V(3).Infof("binding: %v", binding) + klog.V(4).Infof("binding: %v", binding) // do api server update here - klog.Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) + klog.V(4).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) err := sched.Client.CoreV1().PodsWithMultiTenancy(binding.Namespace, binding.Tenant).Bind(binding) if err != nil { klog.Errorf("Failed to bind stack: %v/%v/%v", assumedStack.Tenant, assumedStack.PodNamespace, @@ -434,9 +463,9 @@ func (sched *Scheduler) bindToSite(clusterName string, assumedStack *types.Stack if err := sched.SchedulerCache.ForgetStack(assumedStack); err != nil { klog.Errorf("scheduler cache ForgetStack failed: %v", err) } - return err } + // return nil } @@ -444,7 +473,7 @@ func (sched *Scheduler) addCluster(object interface{}) { resource := object.(*clusterv1.Cluster) clusterCopy := resource.DeepCopy() if sched.verifyClusterInfo(clusterCopy) == false { - klog.Infof(" Cluster data is not correct: %v", clusterCopy) + klog.V(4).Infof(" Cluster data is not correct: %v", clusterCopy) } key, err := controller.KeyFunc(object) if err != nil { @@ -452,7 +481,7 @@ func (sched *Scheduler) addCluster(object interface{}) { return } sched.Enqueue(key, EventType_Create) - klog.Infof("Enqueue Create cluster: %v", key) + klog.V(4).Infof("Enqueue Create cluster: %v", key) } func (sched *Scheduler) updateCluster(oldObject, newObject interface{}) { @@ -461,7 +490,7 @@ func (sched *Scheduler) updateCluster(oldObject, newObject interface{}) { oldClusterCopy := oldResource.DeepCopy() newClusterCopy := newResource.DeepCopy() if sched.verifyClusterInfo(newClusterCopy) { - klog.Infof(" Cluster data is not correct: %v", newResource) + klog.V(4).Infof(" Cluster data is not correct: %v", newResource) } key1, err1 := controller.KeyFunc(oldObject) key2, err2 := controller.KeyFunc(newObject) @@ -478,13 +507,13 @@ func (sched *Scheduler) updateCluster(oldObject, newObject interface{}) { switch eventType { case ClusterUpdateNo: { - klog.Infof("No actual change in clusters, discarding: %v", newClusterCopy.Name) + klog.V(4).Infof("No actual change in clusters, discarding: %v", newClusterCopy.Name) break } case ClusterUpdateYes: { sched.Enqueue(key2, EventType_Update) - klog.Infof("Enqueue Update Cluster: %v", key2) + klog.V(4).Infof("Enqueue Update Cluster: %v", key2) break } default: @@ -499,7 +528,7 @@ func (sched *Scheduler) deleteCluster(object interface{}) { resource := object.(*clusterv1.Cluster) clusterCopy := resource.DeepCopy() if sched.verifyClusterInfo(clusterCopy) == false { - klog.Infof(" Cluster data is not correct: %v", clusterCopy) + klog.V(4).Infof(" Cluster data is not correct: %v", clusterCopy) return } key, err := controller.KeyFunc(object) @@ -510,7 +539,7 @@ func (sched *Scheduler) deleteCluster(object interface{}) { sched.Enqueue(key, EventType_Delete) siteID := clusterCopy.Spec.Region.Region + constants.SiteDelimiter + clusterCopy.Spec.Region.AvailabilityZone sched.deletedClusters[key] = siteID - klog.Infof("Enqueue Delete Cluster: %v", key) + klog.V(4).Infof("Enqueue Delete Cluster: %v", key) } // Enqueue puts key of the cluster object in the work queue @@ -532,3 +561,96 @@ func (sched *Scheduler) verifyClusterInfo(cluster *clusterv1.Cluster) (verified verified = true return verified } + +func (sched *Scheduler) verifyPodInfo(pod *v1.Pod) (verified bool) { + verified = false + name := pod.Name + if pod.Name == "" { + klog.Errorf("pod name:%s is null", name) + return verified + } + verified = true + return verified +} + +func (sched *Scheduler) addPodWithdrawResource(object interface{}) { + pod, ok := object.(*v1.Pod) + klog.V(4).Infof("Add a pod to withdraw resource: %v", pod.Name) + if !ok { + klog.Errorf("cannot convert to *v1.Pod: %v", object) + return + } + podCopy := pod.DeepCopy() + if sched.verifyPodInfo(podCopy) == false { + klog.V(4).Infof(" Pod data is not correct: %v", podCopy) + } + err := sched.withdrawResource(pod.Name) + if err != nil { + klog.Errorf("withdraw resource of pod %s failed", pod.Name) + } +} + +func (sched *Scheduler) updatePodWithdrawResource(oldObj, newObj interface{}) { + oldPod, ok := oldObj.(*v1.Pod) + if !ok { + klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj) + return + } + newPod, ok := newObj.(*v1.Pod) + klog.V(4).Infof("Update a pod: %v", newPod) + if !ok { + klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj) + return + } + if oldPod.Name != newPod.Name { + klog.Errorf("old pod name and new pod name should be equal: %s, %s", oldPod.Name, newPod.Name) + return + } + err := sched.withdrawResource(newPod.Name) + if err != nil { + klog.Errorf("withdraw resource of pod %s failed", oldPod.Name) + } +} + +func (sched *Scheduler) deletePodWithdrawResource(obj interface{}) { + var pod *v1.Pod + switch t := obj.(type) { + case *v1.Pod: + pod = t + klog.V(4).Infof("Delete a pod: %v", pod.Name) + case cache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*v1.Pod) + if !ok { + klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj) + return + } + default: + klog.Errorf("cannot convert to *v1.Pod: %v", t) + return + } + err := sched.withdrawResource(pod.Name) + if err != nil { + klog.Errorf("withdraw resource of pod %s failed", pod.Name) + } +} + +//withdraw reserved resources to a pod & add it to cash to other pods +func (sched *Scheduler) withdrawResource(podName string) error { + resource := sched.PodSiteResourceMap[podName] + if resource == nil { + klog.V(4).Infof("there is no preserved resource for pod: %s", podName) + return nil + } + allResInfo := resource.Resource + regionName := utils.GetRegionName(resource.SiteID) + regionFlavor, err := sched.siteCacheInfoSnapshot.GetRegionFlavors(regionName) + if err != nil { + klog.Errorf("there is no valid flavor for region: %s", regionName) + return err + } + siteCacheInfo := sched.siteCacheInfoSnapshot.SiteCacheInfoMap[resource.SiteID] + siteCacheInfo.UpdateSiteResInfo(allResInfo, regionFlavor, false) + delete(sched.PodSiteResourceMap, podName) + return nil +} diff --git a/globalscheduler/pkg/scheduler/factory/factory.go b/globalscheduler/pkg/scheduler/factory/factory.go index a817abff6..0992ecec3 100644 --- a/globalscheduler/pkg/scheduler/factory/factory.go +++ b/globalscheduler/pkg/scheduler/factory/factory.go @@ -44,9 +44,11 @@ func (i *podInformer) Lister() corelisters.PodLister { // NewPodInformer creates a shared index informer that returns only non-terminal pods. func NewPodInformer(schedulerName string, client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer { + /*selector := fields.ParseSelectorOrDie( + "status.phase=" + string(v1.PodAssigned) + + ",status.assignedScheduler.name=" + schedulerName)*/ selector := fields.ParseSelectorOrDie( - "status.phase=" + string(v1.PodAssigned) + - ",status.assignedScheduler.name=" + schedulerName) + "status.assignedScheduler.name=" + schedulerName) lw := cache.NewListWatchFromClient(client.CoreV1(), string(v1.ResourcePods), metav1.NamespaceAll, selector) return &podInformer{ informer: cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod, diff --git a/globalscheduler/pkg/scheduler/framework/interfaces/framework.go b/globalscheduler/pkg/scheduler/framework/interfaces/framework.go index c782d910f..01203fd49 100644 --- a/globalscheduler/pkg/scheduler/framework/interfaces/framework.go +++ b/globalscheduler/pkg/scheduler/framework/interfaces/framework.go @@ -425,11 +425,40 @@ func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, stack return status } +//resource func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState, stack *types.Stack, siteCacheInfo *schedulersitecacheinfo.SiteCacheInfo) *Status { return bp.Bind(ctx, state, stack, siteCacheInfo) } +/// RunBindResourcePlugins runs the set of configured bind plugins until one returns a non `Skip` status. +func (f *framework) RunBindResourcePlugins(ctx context.Context, state *CycleState, stack *types.Stack, + siteCacheInfo *schedulersitecacheinfo.SiteCacheInfo) (status *Status, siteId string, flavor string, resInfo *types.AllResInfo) { + if len(f.bindPlugins) == 0 { + return NewStatus(Skip, ""), "", "", nil + } + for _, bp := range f.bindPlugins { + status, siteId, flavor, resInfo = f.runBindResourcePlugin(ctx, bp, state, stack, siteCacheInfo) + if status != nil && status.Code() == Skip { + continue + } + if !status.IsSuccess() { + msg := fmt.Sprintf("plugin %q failed to bind pod \"%v\": %v", bp.Name(), stack.PodName, status.Message()) + klog.Errorf("%s", msg) + status = NewStatus(Error, msg) + return status, siteId, flavor, resInfo + } + return status, siteId, flavor, resInfo + } + return status, siteId, flavor, resInfo +} + +///added for resource bind & revoke +func (f *framework) runBindResourcePlugin(ctx context.Context, bp BindPlugin, state *CycleState, stack *types.Stack, + siteCacheInfo *schedulersitecacheinfo.SiteCacheInfo) (*Status, string, string, *types.AllResInfo) { + return bp.BindResource(ctx, state, stack, siteCacheInfo) +} + // RunPostBindPlugins runs the set of configured postbind plugins. func (f *framework) RunPostBindPlugins(ctx context.Context, state *CycleState, stack *types.Stack, siteID string) { for _, pl := range f.postBindPlugins { diff --git a/globalscheduler/pkg/scheduler/framework/interfaces/interface.go b/globalscheduler/pkg/scheduler/framework/interfaces/interface.go index d83daf7bc..b330dacf3 100644 --- a/globalscheduler/pkg/scheduler/framework/interfaces/interface.go +++ b/globalscheduler/pkg/scheduler/framework/interfaces/interface.go @@ -386,6 +386,7 @@ type BindPlugin interface { // it must return Skip in its Status code. If a bind plugin returns an Error, the // pod is rejected and will not be bound. Bind(ctx context.Context, state *CycleState, p *types.Stack, siteCacheInfo *schedulersitecacheinfo.SiteCacheInfo) *Status + BindResource(ctx context.Context, state *CycleState, p *types.Stack, siteCacheInfo *schedulersitecacheinfo.SiteCacheInfo) (*Status, string, string, *types.AllResInfo) } // StrategyPlugin is an interface that must be implemented by "strategy" plugins. strategy @@ -462,6 +463,9 @@ type Framework interface { RunBindPlugins(ctx context.Context, state *CycleState, stack *types.Stack, siteCacheInfo *schedulersitecacheinfo.SiteCacheInfo) *Status + RunBindResourcePlugins(ctx context.Context, state *CycleState, stack *types.Stack, + siteCacheInfo *schedulersitecacheinfo.SiteCacheInfo) (*Status, string, string, *types.AllResInfo) + //RunStrategyPlugins runs the set of configured strategy plugins. RunStrategyPlugins(ctx context.Context, state *CycleState, allocations *types.Allocation, siteScoreList SiteScoreList) (SiteScoreList, *Status) diff --git a/globalscheduler/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go b/globalscheduler/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go index 89366bd1a..049cd1a83 100644 --- a/globalscheduler/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go +++ b/globalscheduler/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go @@ -23,7 +23,7 @@ import ( "k8s.io/kubernetes/globalscheduler/pkg/scheduler/sitecacheinfo" "strconv" - "k8s.io/kubernetes/globalscheduler/pkg/scheduler/client/typed" + _ "k8s.io/kubernetes/globalscheduler/pkg/scheduler/client/typed" "k8s.io/kubernetes/globalscheduler/pkg/scheduler/framework/interfaces" "k8s.io/kubernetes/globalscheduler/pkg/scheduler/internal/cache" "k8s.io/kubernetes/globalscheduler/pkg/scheduler/types" @@ -67,10 +67,10 @@ func (b DefaultBinder) Bind(ctx context.Context, state *interfaces.CycleState, s //siteSelectedInfo is type of SiteSelectorInfo at cycle_state.go siteSelectedInfo, err := interfaces.GetSiteSelectorState(state, siteID) if err != nil { - klog.Errorf("Gettng site selector state failed! err: %s", err) + klog.Errorf("Getting site selector state failed! err: %s", err) return interfaces.NewStatus(interfaces.Error, fmt.Sprintf("getting site %q info failed: %v", siteID, err)) } - klog.Errorf("GetSiteSelectorState: %v", siteSelectedInfo) + klog.V(4).Infof("site selector info: %v", siteSelectedInfo) if len(stack.Resources) != len(siteSelectedInfo.Flavors) { klog.Errorf("flavor count not equal to server count! err: %s", err) return interfaces.NewStatus(interfaces.Error, fmt.Sprintf("siteID(%s) flavor count not equal to "+ @@ -85,7 +85,7 @@ func (b DefaultBinder) Bind(ctx context.Context, state *interfaces.CycleState, s klog.Warningf("flavor %s not found in region(%s)", flavorID, region) continue } - klog.Infof("flavor %s : %v", flavorID, flv) + klog.V(4).Infof("flavor %s : %v", flavorID, flv) vCPUInt, err := strconv.ParseInt(flv.Vcpus, 10, 64) if err != nil || vCPUInt <= 0 { klog.Warningf("flavor %s is invalid in region(%s)", flavorID, region) @@ -102,15 +102,64 @@ func (b DefaultBinder) Bind(ctx context.Context, state *interfaces.CycleState, s resInfo.CpuAndMem[flv.OsExtraSpecs.ResourceType] = reqRes } b.handle.Cache().UpdateSiteWithResInfo(siteID, resInfo) - regionFlavors, err := b.handle.SnapshotSharedLister().SiteCacheInfos().GetFlavors() + return nil +} + +// Bind binds pods to site using the k8s client. +// Same function with Bind except return bound resource info +func (b DefaultBinder) BindResource(ctx context.Context, state *interfaces.CycleState, stack *types.Stack, + siteCacheInfo *sitecacheinfo.SiteCacheInfo) (*interfaces.Status, string, string, *types.AllResInfo) { + region := siteCacheInfo.GetSite().RegionAzMap.Region + + //eipNum : private data + resInfo := types.AllResInfo{CpuAndMem: map[string]types.CPUAndMemory{}, Storage: map[string]float64{}} + siteID := siteCacheInfo.Site.SiteID + + stack.Selected.SiteID = siteID + stack.Selected.Region = region + stack.Selected.AvailabilityZone = siteCacheInfo.GetSite().RegionAzMap.AvailabilityZone + stack.Selected.ClusterName = siteCacheInfo.Site.ClusterName + stack.Selected.ClusterNamespace = siteCacheInfo.Site.ClusterNamespace + flavorID := "" + //siteSelectedInfo is type of SiteSelectorInfo at cycle_state.go + siteSelectedInfo, err := interfaces.GetSiteSelectorState(state, siteID) if err != nil { - klog.Errorf("Getting region's flavor failed: %s", err) - return interfaces.NewStatus(interfaces.Error, fmt.Sprintf("getting site %q info failed: %v", siteID, err)) + klog.Errorf("Gettng site selector state failed! err: %v", err) + status := interfaces.NewStatus(interfaces.Error, fmt.Sprintf("getting site %q info failed: %v", siteID, err)) + return status, siteID, flavorID, &resInfo } - if regionFlavors == nil || err != nil { - regionFlavors = map[string]*typed.RegionFlavor{} + if len(stack.Resources) != len(siteSelectedInfo.Flavors) { + klog.Errorf("flavor count not equal to server count! err: %v", err) + return interfaces.NewStatus(interfaces.Error, fmt.Sprintf("siteID(%s) flavor count not equal to "+ + "server count!", siteID)), siteID, flavorID, nil } - siteCacheInfo.DeductSiteResInfo(resInfo, regionFlavors) - klog.Infof("Resource state after deduction: %v", siteCacheInfo) - return nil + for i := 0; i < len(stack.Resources); i++ { + flavorID = siteSelectedInfo.Flavors[i].FlavorID + stack.Resources[i].FlavorIDSelected = flavorID + klog.V(4).Infof("GetFlavor - flavorID: %s, region: %s", flavorID, region) + flv, ok := cache.FlavorCache.GetFlavor(flavorID, region) + if !ok { + klog.Warningf("flavor %s not found in region(%s)", flavorID, region) + continue + } + klog.V(4).Infof("flavor %s : %v", flavorID, flv) + vCPUInt, err := strconv.ParseInt(flv.Vcpus, 10, 64) + if err != nil || vCPUInt <= 0 { + klog.Warningf("flavor %s is invalid in region(%s)", flavorID, region) + continue + } + reqRes, ok := resInfo.CpuAndMem[flv.OsExtraSpecs.ResourceType] + if !ok { + reqRes = types.CPUAndMemory{VCPU: 0, Memory: 0} + } + reqRes.VCPU += vCPUInt * int64(stack.Resources[i].Count) + reqRes.Memory += flv.Ram * int64(stack.Resources[i].Count) + + //put them all to resInfo + resInfo.CpuAndMem[flv.OsExtraSpecs.ResourceType] = reqRes + break + } + klog.V(4).Infof("UpdateSiteWithResInfo - siteID: %s, resInfo: %#v", siteID, resInfo) + b.handle.Cache().UpdateSiteWithResInfo(siteID, resInfo) + return nil, siteID, flavorID, &resInfo } diff --git a/globalscheduler/pkg/scheduler/framework/plugins/flavor/flavor.go b/globalscheduler/pkg/scheduler/framework/plugins/flavor/flavor.go index 87bfcb38a..e7f0572a4 100644 --- a/globalscheduler/pkg/scheduler/framework/plugins/flavor/flavor.go +++ b/globalscheduler/pkg/scheduler/framework/plugins/flavor/flavor.go @@ -310,7 +310,7 @@ func (f *Flavor) Filter(ctx context.Context, cycleState *interfaces.CycleState, var isCommonMatch, _ = isComFlavorMatch(flavorMap, siteCacheInfo) var isSpotMatch, _ = isSpotFlavorMatch(spotFlavorMap, siteCacheInfo) if isCommonMatch && isSpotMatch { - klog.Infof("*** isCommonMatch:%v, isSpotMatch:%v ", isCommonMatch, isSpotMatch) + klog.Infof("isCommonMatch:%v, isSpotMatch:%v ", isCommonMatch, isSpotMatch) return nil } } diff --git a/globalscheduler/pkg/scheduler/framework/plugins/siteavailability/siteavailability.go b/globalscheduler/pkg/scheduler/framework/plugins/siteavailability/siteavailability.go index e85ddddda..3a323f4fd 100644 --- a/globalscheduler/pkg/scheduler/framework/plugins/siteavailability/siteavailability.go +++ b/globalscheduler/pkg/scheduler/framework/plugins/siteavailability/siteavailability.go @@ -44,7 +44,7 @@ func (pl *SiteAvailability) Name() string { // Filter invoked at the filter extension point. func (pl *SiteAvailability) Filter(ctx context.Context, cycleState *interfaces.CycleState, stack *types.Stack, siteCacheInfo *sitecacheinfo.SiteCacheInfo) *interfaces.Status { - klog.Infof("Filter- siteCacheInfo: %v", siteCacheInfo) + klog.V(4).Infof("Filter- siteCacheInfo: %v", siteCacheInfo) if siteCacheInfo.GetSite().Status == constants.SiteStatusOffline || siteCacheInfo.GetSite().Status == constants.SiteStatusSellout { msg := fmt.Sprintf("Site(%s) status is %s, not available!", siteCacheInfo.GetSite().SiteID, siteCacheInfo.GetSite().Status) klog.Info(msg) diff --git a/globalscheduler/pkg/scheduler/internal/cache/snapshot.go b/globalscheduler/pkg/scheduler/internal/cache/snapshot.go index c6e9d6b0c..4015a678c 100644 --- a/globalscheduler/pkg/scheduler/internal/cache/snapshot.go +++ b/globalscheduler/pkg/scheduler/internal/cache/snapshot.go @@ -20,6 +20,7 @@ package cache import ( "fmt" "k8s.io/kubernetes/globalscheduler/pkg/scheduler/client/typed" + "k8s.io/kubernetes/globalscheduler/pkg/scheduler/common/constants" schedulerlisters "k8s.io/kubernetes/globalscheduler/pkg/scheduler/listers" schedulersitecacheinfo "k8s.io/kubernetes/globalscheduler/pkg/scheduler/sitecacheinfo" "k8s.io/kubernetes/globalscheduler/pkg/scheduler/types" @@ -135,3 +136,15 @@ func (s *Snapshot) Get(siteID string) (*schedulersitecacheinfo.SiteCacheInfo, er func (s *Snapshot) GetFlavors() (map[string]*typed.RegionFlavor, error) { return s.RegionFlavorMap, nil } + +func (s *Snapshot) GetRegionFlavors(region string) (map[string]*typed.RegionFlavor, error) { + regionFlavorMap := make(map[string]*typed.RegionFlavor) + for flavorId := range s.FlavorMap { + key := region + constants.FlavorDelimiter + flavorId + regionFlavor := s.RegionFlavorMap[key] + if regionFlavor != nil { + regionFlavorMap[key] = regionFlavor + } + } + return regionFlavorMap, nil +} diff --git a/globalscheduler/pkg/scheduler/scheduler.go b/globalscheduler/pkg/scheduler/scheduler.go index 50ed49300..0a4ae2175 100644 --- a/globalscheduler/pkg/scheduler/scheduler.go +++ b/globalscheduler/pkg/scheduler/scheduler.go @@ -80,6 +80,14 @@ type ScheduleResult struct { FeasibleSites int // Number of feasible site on one stack scheduled } +//perserved site resource for pod +type PodSiteResource struct { + PodName string + SiteID string + Flavor string + Resource types.AllResInfo +} + // Scheduler watches for new unscheduled pods. It attempts to find // site that they fit on and writes bindings back to the api server. type Scheduler struct { @@ -110,7 +118,8 @@ type Scheduler struct { mu sync.RWMutex //Cluster - KubeClientset clientset.Interface //kubernetes.Interface + //KubeClientset clientset.Interface //kubernetes.Interface + KubeClientset *clientset.Clientset ApiextensionsClientset apiextensionsclientset.Interface allocationClientset allocclientset.Interface allocationInformer cache.SharedIndexInformer @@ -124,6 +133,9 @@ type Scheduler struct { schedulerClientset schedulerclientset.Interface schedulerInformer cache.SharedIndexInformer workerNumber int + + // table to withdraw site resource + PodSiteResourceMap map[string]*PodSiteResource } // single scheduler instance @@ -132,7 +144,7 @@ var once sync.Once func NewScheduler(gsconfig *types.GSSchedulerConfiguration, stopCh <-chan struct{}) (*Scheduler, error) { stopEverything := stopCh - klog.Infof("stopEverything to check : %v", stopEverything) + klog.V(4).Infof("stopEverything to check : %v", stopEverything) if stopEverything == nil { stopEverything = wait.NeverStop } @@ -145,8 +157,8 @@ func NewScheduler(gsconfig *types.GSSchedulerConfiguration, stopCh <-chan struct ConfigFilePath: gsconfig.ConfigFilePath, deletedClusters: make(map[string]string), workerNumber: 1, + PodSiteResourceMap: make(map[string]*PodSiteResource), } - err := sched.buildFramework() if err != nil { return nil, fmt.Errorf("buildFramework by %s failed! err: %v", types.SchedulerDefaultProviderName, err) @@ -194,14 +206,14 @@ func (sched *Scheduler) StartInformersAndRun(stopCh <-chan struct{}) { } // start pod informers if sched.PodInformer != nil && sched.InformerFactory != nil { - klog.Infof("Starting scheduler %s informer", sched.SchedulerName) + klog.V(4).Infof("Starting scheduler %s informer", sched.SchedulerName) sched.InformerFactory.Start(stopCh) // Wait for all caches to sync before scheduling. sched.InformerFactory.WaitForCacheSync(stopCh) } // start scheduler informer if sched.schedulerInformer != nil { - klog.Infof("Starting scheduler informer for scheduler %s", sched.SchedulerName) + klog.V(4).Infof("Starting scheduler informer for scheduler %s", sched.SchedulerName) go sched.schedulerInformer.Run(stopCh) } // start allocation informer @@ -216,13 +228,13 @@ func (sched *Scheduler) StartInformersAndRun(stopCh <-chan struct{}) { // Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling // and blocked until the context is done. func (sched *Scheduler) Run(clusterWorkers int, podWorkers int, stopCh <-chan struct{}) { - klog.Infof("Starting scheduler %s", sched.SchedulerName) + klog.V(4).Infof("Starting scheduler %s", sched.SchedulerName) defer utilruntime.HandleCrash() //cluster if clusterWorkers > 0 { defer sched.ClusterQueue.ShutDown() - klog.Infof("Waiting informer caches to sync") + klog.V(4).Infof("Waiting informer caches to sync") if ok := cache.WaitForCacheSync(sched.StopEverything, sched.ClusterSynced); !ok { klog.Errorf("failed to wait for caches to sync") } @@ -234,7 +246,7 @@ func (sched *Scheduler) Run(clusterWorkers int, podWorkers int, stopCh <-chan st } defer sched.StackQueue.Close() - klog.Infof("Waiting informer caches to sync") + klog.V(4).Infof("Waiting informer caches to sync") if ok := cache.WaitForCacheSync(sched.StopEverything, sched.PodSynced); !ok { klog.Errorf("failed to wait for caches to sync") } @@ -245,7 +257,7 @@ func (sched *Scheduler) Run(clusterWorkers int, podWorkers int, stopCh <-chan st } klog.Info("Started cluster & pod workers") <-stopCh - klog.Infof("Shutting down scheduler %s", sched.SchedulerName) + klog.V(4).Infof("Shutting down scheduler %s", sched.SchedulerName) } // Cache returns the cache in scheduler for test to check the data in scheduler. @@ -270,15 +282,15 @@ func (sched *Scheduler) scheduleOne() bool { if shutdown != nil { return false } - klog.Infof("1. Stack: %v, stack selector: %v", stack, stack.Selector) + klog.V(4).Infof("1. Stack: %v, stack selector: %v", stack, stack.Selector) allocation, err := sched.generateAllocationFromStack(stack) - klog.Infof("2. Allocation: %v, allocation selector: %v", allocation, allocation.Selector) + klog.V(4).Infof("2. Allocation: %v, allocation selector: %v", allocation, allocation.Selector) if err != nil { return false } start := stack.CreateTime end := time.Now().UnixNano() - klog.Infof("=== done pop queue, time consumption: %v ms ===", (end-start)/int64(time.Millisecond)) + klog.V(4).Infof("=== done pop queue, time consumption: %v ms ===", (end-start)/int64(time.Millisecond)) // 2.do scheduling process start = end @@ -290,21 +302,21 @@ func (sched *Scheduler) scheduleOne() bool { return true } end = time.Now().UnixNano() - klog.Infof("=== done Scheduling pipline, time consumption: %vms ===", (end-start)/int64(time.Millisecond)) - klog.Infof("Schedule result: %v", result) //result is assumed stacks - klog.Infof("3. Assumed Stacks: %v", result) + klog.V(4).Infof("=== done Scheduling pipline, time consumption: %vms ===", (end-start)/int64(time.Millisecond)) + klog.V(4).Infof("Schedule result: %v", result) //result is assumed stacks + klog.V(4).Infof("3. Assumed Stacks: %v", result) // 3.bind scheduler result to pod start = end - klog.Infof("Try to bind to site, stacks:%v", result.Stacks) + klog.V(4).Infof("Try to bind to site, stacks:%v", result.Stacks) sched.bindStacks(result.Stacks) end = time.Now().UnixNano() - klog.Infof("=== done bind pod to cluster, time consumption: %vms ===", (end-start)/int64(time.Millisecond)) + klog.V(4).Infof("=== done bind pod to cluster, time consumption: %vms ===", (end-start)/int64(time.Millisecond)) // log the elapsed time for the entire schedule if stack.CreateTime != 0 { spendTime := time.Now().UnixNano() - stack.CreateTime - klog.Infof("@@@ Finished Schedule, time consumption: %vms @@@", spendTime/int64(time.Millisecond)) + klog.V(4).Infof("@@@ Finished Schedule, time consumption: %vms @@@", spendTime/int64(time.Millisecond)) } return true } @@ -344,13 +356,11 @@ func (sched *Scheduler) stackPassesFiltersOnSite( info *schedulersitecacheinfo.SiteCacheInfo, ) (bool, *interfaces.Status, error) { var status *interfaces.Status - statusMap := sched.SchedFrame.RunFilterPlugins(ctx, state, stack, info) status = statusMap.Merge() if !status.IsSuccess() && !status.IsUnschedulable() { return false, status, status.AsError() } - return status.IsSuccess(), status, nil } @@ -377,7 +387,7 @@ func (sched *Scheduler) findSitesThatPassFilters(ctx context.Context, state *int err = fmt.Errorf("SiteCacheInfoMap of %v is null", siteID) return nil, err } - klog.Infof("allSiteCacheInfos: %v", len(allSiteCacheInfos)) + klog.V(4).Infof("allSiteCacheInfos: %v", len(allSiteCacheInfos)) // Create filtered list with enough space to avoid growing it // and allow assigning. filtered := make([]*types.Site, len(allSiteCacheInfos)) @@ -463,7 +473,7 @@ func (sched *Scheduler) prioritizeSites( // sort by score. sort.Sort(sort.Reverse(result)) - klog.Infof("score sites: %v", result) + klog.V(4).Infof("score sites: %v", result) return result, nil } @@ -497,9 +507,19 @@ func (sched *Scheduler) selectHost(siteScoreList interfaces.SiteScoreList) (stri // We expect this to run asynchronously, so we handle binding metrics internally. func (sched *Scheduler) bind(ctx context.Context, stack *types.Stack, targetSiteID string, state *interfaces.CycleState) (err error) { - bindStatus := sched.SchedFrame.RunBindPlugins(ctx, state, stack, + bindStatus, siteId, flavorId, resInfo := sched.SchedFrame.RunBindResourcePlugins(ctx, state, stack, sched.siteCacheInfoSnapshot.SiteCacheInfoMap[targetSiteID]) if bindStatus.IsSuccess() { + podResource := PodSiteResource{stack.PodName, siteId, flavorId, *resInfo} + sched.PodSiteResourceMap[stack.PodName] = &podResource + region := utils.GetRegionName(siteId) + regionFlavors, err := sched.siteCacheInfoSnapshot.GetRegionFlavors(region) + if err != nil { + klog.Errorf("There is no valid flavors in region: %s", region) + return err + } + siteCacheInfo := sched.siteCacheInfoSnapshot.SiteCacheInfoMap[targetSiteID] + siteCacheInfo.UpdateSiteResInfo(*resInfo, regionFlavors, true) return nil } if bindStatus.Code() == interfaces.Error { @@ -511,14 +531,14 @@ func (sched *Scheduler) bind(ctx context.Context, stack *types.Stack, targetSite // Schedule Run begins watching and scheduling. It waits for cache to be synced , // then starts scheduling and blocked until the context is done. func (sched *Scheduler) Schedule(ctx context.Context, allocation *types.Allocation) (result ScheduleResult, err error) { - klog.Infof("Attempting to schedule allocation: %v", allocation.ID) + klog.V(4).Infof("Attempting to schedule allocation: %v", allocation.ID) state := interfaces.NewCycleState() schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() // 1. Snapshot site resource cache start := time.Now() - klog.Infof("[START] snapshot site...") + klog.V(4).Infof("[START] snapshot site...") ///UpdateFlavorMap updates FlavorCache.RegionFlavorMap, FlavorCache.FlavorMap) ///FlavorMap is updated when scheduler starts, RegionFlavorMap is updated @@ -528,16 +548,16 @@ func (sched *Scheduler) Schedule(ctx context.Context, allocation *types.Allocati // 2. Run "prefilter" plugins. start = time.Now() - klog.Infof("[START] Running prefilter plugins...") + klog.V(4).Infof("[START] Running prefilter plugins...") preFilterStatus := sched.SchedFrame.RunPreFilterPlugins(schedulingCycleCtx, state, &allocation.Stack) if !preFilterStatus.IsSuccess() { return result, preFilterStatus.AsError() } - klog.Infof("[DONE] Running prefilter plugins, use_time: %s", time.Since(start).String()) + klog.V(4).Infof("[DONE] Running prefilter plugins, use_time: %s", time.Since(start).String()) // 3. Run "filter" plugins. start = time.Now() - klog.Infof("[START] Running filter plugins...") + klog.V(4).Infof("[START] Running filter plugins...") filteredSitesStatuses := make(interfaces.SiteToStatusMap) allocation.Stack.Selector = allocation.Selector filteredSites, err := sched.findSitesThatPassFilters(ctx, state, &allocation.Stack, filteredSitesStatuses) @@ -545,9 +565,9 @@ func (sched *Scheduler) Schedule(ctx context.Context, allocation *types.Allocati klog.Errorf("findSitesThatPassFilters failed! err: %s", err) return result, err } - klog.Infof("[DONE] Running filter plugins, use_time: %s", time.Since(start).String()) + klog.V(4).Infof("[DONE] Running filter plugins, use_time: %s", time.Since(start).String()) - klog.Infof("filteredSitesStatuses = %v", filteredSitesStatuses.ToString()) + klog.V(4).Infof("filteredSitesStatuses = %v", filteredSitesStatuses.ToString()) if len(filteredSites) <= 0 { err := fmt.Errorf("filter none site. resultStatus: %s", filteredSitesStatuses.ToString()) klog.Error(err) @@ -556,33 +576,33 @@ func (sched *Scheduler) Schedule(ctx context.Context, allocation *types.Allocati // 4. Run "prescore" plugins. start = time.Now() - klog.Infof("[START] Running preScore plugins...") + klog.V(4).Infof("[START] Running preScore plugins...") prescoreStatus := sched.SchedFrame.RunPreScorePlugins(ctx, state, &allocation.Stack, filteredSites) if !prescoreStatus.IsSuccess() { return result, prescoreStatus.AsError() } - klog.Infof("[DONE] Running preScore plugins, use_time: %s", time.Since(start).String()) + klog.V(4).Infof("[DONE] Running preScore plugins, use_time: %s", time.Since(start).String()) // 5. Run "prioritizeSites" plugins. start = time.Now() - klog.Infof("[START] Running prioritizeSites plugins...") + klog.V(4).Infof("[START] Running prioritizeSites plugins...") priorityList, err := sched.prioritizeSites(ctx, state, &allocation.Stack, filteredSites) if err != nil { klog.Errorf("prioritizeSites failed! err: %s", err) return result, err } - klog.Infof("[DONE] Running prioritizeSites plugins, use_time: %s", time.Since(start).String()) + klog.V(4).Infof("[DONE] Running prioritizeSites plugins, use_time: %s", time.Since(start).String()) // 6. Run "strategy" plugins. start = time.Now() - klog.Infof("[START] Running strategy plugins...") + klog.V(4).Infof("[START] Running strategy plugins...") siteCount, strategyStatus := sched.SchedFrame.RunStrategyPlugins(ctx, state, allocation, priorityList) if !strategyStatus.IsSuccess() { klog.Errorf("RunStrategyPlugins failed! err: %s", err) return result, err } - klog.Infof("[DONE] Running StrategyPlugins plugins, use_time: %s", time.Since(start).String()) - klog.Infof("selected Hosts : %#v", siteCount) + klog.V(4).Infof("[DONE] Running StrategyPlugins plugins, use_time: %s", time.Since(start).String()) + klog.V(4).Infof("selected Hosts : %#v", siteCount) // 7. reserve resource start = time.Now() @@ -614,7 +634,7 @@ func (sched *Scheduler) Schedule(ctx context.Context, allocation *types.Allocati klog.Errorf("not find suit host") return result, fmt.Errorf("not find suit host") } - klog.Infof("reserve resource(%s) success, use_time: %s", allocation.ID, time.Since(start).String()) + klog.V(4).Infof("reserve resource(%s) success, use_time: %s", allocation.ID, time.Since(start).String()) return } @@ -717,7 +737,7 @@ func (sched *Scheduler) initPodClusterSchedulerAllocationInformers(gsconfig *typ sched.schedulerInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { if sched, ok := obj.(*schedulerv1.Scheduler); ok { - klog.Infof("The scheduler %s process is going to be killed...", sched.Name) + klog.V(4).Infof("The scheduler %s process is going to be killed...", sched.Name) os.Exit(0) } else { klog.Fatalf("The deleted object %v failed to convert to scheduler", obj) @@ -761,7 +781,7 @@ func (sched *Scheduler) processNextClusterItem() bool { if shutdown { return false } - klog.Infof("Process an item in work queue %v ", workItem) + klog.V(4).Infof("Process an item in work queue %v ", workItem) eventKey := workItem.(KeyWithEventType) key := eventKey.Key defer sched.ClusterQueue.Done(key) @@ -770,7 +790,7 @@ func (sched *Scheduler) processNextClusterItem() bool { utilruntime.HandleError(fmt.Errorf("Handle %v of key %v failed with %v", "serivce", key, err)) } sched.ClusterQueue.Forget(key) - klog.Infof("Successfully processed & synced %s", key) + klog.V(4).Infof("Successfully processed & synced %s", key) return true } @@ -780,7 +800,7 @@ func (sched *Scheduler) clusterSyncHandler(keyWithEventType KeyWithEventType) er return err } key := keyWithEventType.Key - klog.Infof("sync cache for key %v", key) + klog.V(4).Infof("sync cache for key %v", key) startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing %q (%v)", key, time.Since(startTime)) @@ -788,16 +808,16 @@ func (sched *Scheduler) clusterSyncHandler(keyWithEventType KeyWithEventType) er nameSpace, clusterName, err := cache.SplitMetaNamespaceKey(key) //This performs controller logic - create site's static info - klog.Infof("cluster processing - event: %v, cluster name: %v", keyWithEventType.EventType, clusterName) + klog.V(4).Infof("cluster processing - event: %v, cluster name: %v", keyWithEventType.EventType, clusterName) result, err := sched.updateStaticSiteResourceInfo(key, keyWithEventType.EventType, nameSpace, clusterName) if !result { klog.Errorf("Failed a cluster processing - event: %v, key: %v, error: %v", keyWithEventType, key, err) sched.ClusterQueue.AddRateLimited(keyWithEventType) } else { - klog.Infof(" Processed a cluster: %v", key) + klog.V(4).Infof(" Processed a cluster: %v", key) sched.ClusterQueue.Forget(key) } - klog.Infof("Cluster was handled by ClusterController - event: %v, cluster name: %v", keyWithEventType.EventType, clusterName) + klog.V(4).Infof("Cluster was handled by ClusterController - event: %v, cluster name: %v", keyWithEventType.EventType, clusterName) if keyWithEventType.EventType != EventType_Delete { cluster, err := sched.ClusterLister.Clusters(nameSpace).Get(clusterName) clusterCopy := cluster.DeepCopy() @@ -852,7 +872,7 @@ func (sched *Scheduler) updateStaticSiteResourceInfo(key string, event EventType klog.Errorf("Failed to retrieve cluster in local cache by cluster name: %s", clusterName) return false, err } - klog.Infof("create a site static info, cluster profile: %v", clusterCopy) + klog.V(4).Infof("create a site static info, cluster profile: %v", clusterCopy) clusterCopy.Status = ClusterStatusCreated site := convertClusterToSite(clusterCopy) siteCacheInfo := schedulersitecacheinfo.NewSiteCacheInfo() @@ -870,7 +890,7 @@ func (sched *Scheduler) updateStaticSiteResourceInfo(key string, event EventType klog.Errorf("Failed to retrieve cluster in local cache by cluster name - %s", clusterName) return false, err } - klog.Infof("update a site static info, cluster profile: %v", clusterCopy) + klog.V(4).Infof("update a site static info, cluster profile: %v", clusterCopy) clusterCopy.Status = ClusterStatusUpdated site := convertClusterToSite(clusterCopy) siteCacheInfo := schedulersitecacheinfo.NewSiteCacheInfo() @@ -902,7 +922,7 @@ func (sched *Scheduler) updateStaticSiteResourceInfo(key string, event EventType //This function updates sites' dynamic resource informaton func (sched *Scheduler) UpdateSiteDynamicResource(region string, resource *types.SiteResource) (err error) { //reset total(available) resource - klog.Infof("UpdateSiteDynamicResource region: %s, resource:%v", region, resource) + klog.V(4).Infof("UpdateSiteDynamicResource region: %s, resource:%v", region, resource) var siteID string for _, siteresource := range resource.CPUMemResources { siteID = region + constants.SiteDelimiter + siteresource.AvailabilityZone diff --git a/globalscheduler/pkg/scheduler/sitecacheinfo/sitecache_info.go b/globalscheduler/pkg/scheduler/sitecacheinfo/sitecache_info.go index ddf4627fc..b97b1ef0b 100644 --- a/globalscheduler/pkg/scheduler/sitecacheinfo/sitecache_info.go +++ b/globalscheduler/pkg/scheduler/sitecacheinfo/sitecache_info.go @@ -487,17 +487,14 @@ func (n *SiteCacheInfo) UpdateSiteWithRatio(ratios []types.AllocationRatio) erro var usedMem = int64(float64(totalRes.Memory) * memRatio) n.updateRequestResourceByResType(resType, &types.CPUAndMemory{VCPU: usedCpu, Memory: usedMem}) } - n.updateFlavor() n.generation = nextGeneration() - return nil } //UpdateSpotResources update spot resources func (n *SiteCacheInfo) UpdateSpotResources(spotRes map[string]types.SpotResource) error { n.AllocatableSpotFlavor = spotRes - n.generation = nextGeneration() return nil } @@ -552,11 +549,11 @@ func GetStackKey(stack *types.Stack) (string, error) { return uid, nil } -// DeductSiteResInfo deduct site's resource info -func (n *SiteCacheInfo) DeductSiteResInfo(resInfo types.AllResInfo, regionFlavorMap map[string]*typed.RegionFlavor) error { +//deduct or add +func (n *SiteCacheInfo) UpdateSiteResInfo(resInfo types.AllResInfo, regionFlavorMap map[string]*typed.RegionFlavor, deduct bool) error { var resourceTypes []string for resType, res := range resInfo.CpuAndMem { - //binding a pod for the first + //resource type is null, assign default resource type (e.g. when binding a pod for the first time) if resType == "" { resType = string(DefaultResourceType) resourceTypes = append(resourceTypes, resType) @@ -564,17 +561,17 @@ func (n *SiteCacheInfo) DeductSiteResInfo(resInfo types.AllResInfo, regionFlavor if len(n.RequestedResources) == 0 { reqRes := types.CPUAndMemory{VCPU: res.VCPU, Memory: res.Memory} n.RequestedResources[resType] = &reqRes - } else { - for reqType, reqRes := range n.RequestedResources { - resTypes := strings.Split(reqType, constants.FlavorDelimiter) - if !utils.IsContain(resTypes, resType) { - klog.Infof("!utils.IsContain: %v", !utils.IsContain(resTypes, resType)) - continue - } - reqRes.VCPU += res.VCPU - reqRes.Memory += res.Memory - n.RequestedResources[resType] = reqRes + continue + } + for reqType, reqRes := range n.RequestedResources { + resTypes := strings.Split(reqType, constants.FlavorDelimiter) + if !utils.IsContain(resTypes, resType) { + klog.V(4).Infof("!utils.IsContain: %v", !utils.IsContain(resTypes, resType)) + continue } + reqRes.VCPU += res.VCPU + reqRes.Memory += res.Memory + n.RequestedResources[resType] = reqRes } } for volType, used := range resInfo.Storage { @@ -585,7 +582,7 @@ func (n *SiteCacheInfo) DeductSiteResInfo(resInfo types.AllResInfo, regionFlavor reqVol += used n.RequestedStorage[volType] = reqVol } - n.updateSiteFlavor(resourceTypes, regionFlavorMap) + n.updateSiteFlavor(resourceTypes, regionFlavorMap, deduct) n.generation = nextGeneration() return nil } @@ -598,10 +595,14 @@ updateFlavor(): /home/ubuntu/go/src/k8s.io/arktos/conf/flavors.json global scheduler flavor config file: /home/ubuntu/go/src/k8s.io/arktos/conf/flavor_config.yaml */ -func (n *SiteCacheInfo) updateSiteFlavor(resourceTypes []string, regionFlavors map[string]*typed.RegionFlavor) { +func (n *SiteCacheInfo) updateSiteFlavor(resourceTypes []string, regionFlavors map[string]*typed.RegionFlavor, deduct bool) { n.mu.Lock() defer n.mu.Unlock() + var count, memCount int64 + for k, v := range regionFlavors { + klog.V(4).Infof("updateSiteFlavor Before - key: %#v, regionFlavor:%#v", k, v) + } if n.AllocatableFlavor == nil { n.AllocatableFlavor = map[string]int64{} } @@ -611,25 +612,31 @@ func (n *SiteCacheInfo) updateSiteFlavor(resourceTypes []string, regionFlavors m regionFalvorKey := regionName + constants.FlavorDelimiter + flavorid flv := regionFlavors[regionFalvorKey] if flv == nil { - n.deductFlavor() + n.updateFlavorCount(deduct) return } vCPUInt, err := strconv.ParseInt(flv.Vcpus, 10, 64) if err != nil { - n.deductFlavor() + n.updateFlavorCount(deduct) return } for _, resourceType := range resourceTypes { totalRes := n.TotalResources[resourceType] requestRes := n.RequestedResources[resourceType] if totalRes == nil { - n.deductFlavor() + n.updateFlavorCount(deduct) return - } else if requestRes == nil { + } + if requestRes == nil { requestRes = &types.CPUAndMemory{VCPU: 0, Memory: 0} } - count := (totalRes.VCPU - requestRes.VCPU) / vCPUInt - memCount := (totalRes.Memory - requestRes.Memory) / flv.Ram + if deduct == true { + count = (totalRes.VCPU - requestRes.VCPU) / vCPUInt + memCount = (totalRes.Memory - requestRes.Memory) / flv.Ram + } else { + count = (totalRes.VCPU + requestRes.VCPU) / vCPUInt + memCount = (totalRes.Memory + requestRes.Memory) / flv.Ram + } if count > memCount { count = memCount } @@ -643,20 +650,28 @@ func (n *SiteCacheInfo) updateSiteFlavor(resourceTypes []string, regionFlavors m } } -func (n *SiteCacheInfo) deductFlavor() { +func (n *SiteCacheInfo) updateFlavorCount(deduct bool) { + var m int64 + m = 1 //add + if deduct == true { + m = -1 //deduct + } if n.AllocatableFlavor == nil { n.AllocatableFlavor = map[string]int64{} } for key, value := range n.AllocatableFlavor { - n.AllocatableFlavor[key] = value - 1 + n.AllocatableFlavor[key] = value + m if n.RequestedFlavor == nil { n.RequestedFlavor = make(map[string]int64) } requested, ok := n.RequestedFlavor[key] if !ok { - n.RequestedFlavor[key] = 1 + n.RequestedFlavor[key] = 0 + if deduct == true { + n.RequestedFlavor[key] = 1 + } } else { - n.RequestedFlavor[key] = requested + 1 + n.RequestedFlavor[key] = requested - m } } }