Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revoke site resource when vm creation failed #291

Closed
wants to merge 13 commits into from
Closed
5 changes: 1 addition & 4 deletions globalscheduler/controllers/dispatcher/dispatcher_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func NewProcess(config *rest.Config, namespace string, name string, quit chan st
if err != nil {
klog.Fatal(err)
}

return Process{
namespace: namespace,
name: name,
Expand All @@ -97,12 +96,11 @@ func (p *Process) Run(quit chan struct{}) {

dispatcherSelector := fields.ParseSelectorOrDie("metadata.name=" + p.name)
dispatcherLW := cache.NewListWatchFromClient(p.dispatcherClientset.GlobalschedulerV1(), "dispatchers", p.namespace, dispatcherSelector)

dispatcherInformer := cache.NewSharedIndexInformer(dispatcherLW, &dispatcherv1.Dispatcher{}, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

dispatcherInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
klog.Infof("The dispatcher %s process is going to be killed...", p.name)
klog.V(3).Infof("The dispatcher %s process is going to be killed...", p.name)
os.Exit(0)
},
UpdateFunc: func(old, new interface{}) {
Expand Down Expand Up @@ -234,7 +232,6 @@ func (p *Process) SendPodToCluster(pod *v1.Pod) {
klog.Warningf("The pod %v failed to update its apiserver dtatbase status to failed with the error %v", pod.ObjectMeta.Name, err)
}
}
// util.CheckTime(pod.Name, "dispatcher", "CreatePod-End", 2)
}()
}
}
Expand Down
168 changes: 145 additions & 23 deletions globalscheduler/pkg/scheduler/eventhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
//"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
clusterv1 "k8s.io/kubernetes/globalscheduler/pkg/apis/cluster/v1"
"k8s.io/kubernetes/globalscheduler/pkg/scheduler/common/constants"
"k8s.io/kubernetes/globalscheduler/pkg/scheduler/types"
"k8s.io/kubernetes/globalscheduler/pkg/scheduler/utils"
"k8s.io/kubernetes/pkg/controller"
statusutil "k8s.io/kubernetes/pkg/util/pod"
)
Expand Down Expand Up @@ -113,6 +115,31 @@ func AddAllEventHandlers(sched *Scheduler) {
},
},
)
// failed pod queue
sched.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return failedToSchedule(t) && responsibleForPod(t, sched.SchedulerName)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return failedToSchedule(pod) && responsibleForPod(pod, sched.SchedulerName)
}
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
return false
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodWithdrawResource,
UpdateFunc: sched.updatePodWithdrawResource,
DeleteFunc: sched.deletePodWithdrawResource,
},
},
)
sched.ClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sched.addCluster,
UpdateFunc: sched.updateCluster,
Expand All @@ -135,10 +162,15 @@ func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
return schedulerName == pod.Status.AssignedScheduler.Name
}

// failedToSchedule selects pods that scheduled but failed to create vm
func failedToSchedule(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed
}

// addPodToCache add pod to the stack cache of the scheduler
func (sched *Scheduler) addPodToCache(obj interface{}) {
pod, ok := obj.(*v1.Pod)
klog.Infof("Add a pod: %v", pod)
klog.V(4).Infof("Add a pod: %v", pod.Name)
if !ok {
klog.Errorf("cannot convert to *v1.Pod: %v", obj)
return
Expand All @@ -160,7 +192,7 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
return
}
newPod, ok := newObj.(*v1.Pod)
klog.Infof("Update a pod: %v", newPod)
klog.V(4).Infof("Update a pod: %v", newPod)
if !ok {
klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
return
Expand All @@ -178,7 +210,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) {
switch t := obj.(type) {
case *v1.Pod:
pod = t
klog.Infof("Delete a pod: %v", pod)
klog.V(4).Infof("Delete a pod: %v", pod.Name)
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
Expand Down Expand Up @@ -301,15 +333,13 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
return
}
newPod, ok := newObj.(*v1.Pod)
klog.Infof("updatePodToSchedulingQueue : %v", newPod)
klog.V(4).Infof("updatePodToSchedulingQueue : %v", newPod)
if !ok {
klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
return
}

oldStack := getStackFromPod(oldPod)
newStack := getStackFromPod(newPod)

if sched.skipStackUpdate(newStack) {
return
}
Expand All @@ -323,7 +353,7 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
switch t := obj.(type) {
case *v1.Pod:
pod = obj.(*v1.Pod)
klog.Infof("deletePodToSchedulingQueue : %v", pod)
klog.V(4).Infof("deletePodToSchedulingQueue : %v", pod.Name)
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
Expand Down Expand Up @@ -372,14 +402,14 @@ func (sched *Scheduler) skipStackUpdate(stack *types.Stack) bool {
if !reflect.DeepEqual(assumedStackCopy, stackCopy) {
return false
}
klog.V(3).Infof("Skipping stack %s/%s/%s update", stack.Tenant, stack.PodNamespace, stack.PodName)
klog.V(4).Infof("Skipping stack %s/%s/%s update", stack.Tenant, stack.PodNamespace, stack.PodName)
return true
}

func (sched *Scheduler) bindStacks(assumedStacks []types.Stack) {
klog.Infof("assumedStacks: %v", assumedStacks)
klog.V(4).Infof("assumedStacks: %v", assumedStacks)
for _, newStack := range assumedStacks {
klog.Infof("newStack: %v", newStack)
klog.V(4).Infof("newStack: %v", newStack)
clusterName := newStack.Selected.ClusterName
sched.bindToSite(clusterName, &newStack)
}
Expand All @@ -398,15 +428,15 @@ func (sched *Scheduler) setPodScheduleErr(reqStack *types.Stack) error {
newStatus := v1.PodStatus{
Phase: v1.PodNoSchedule,
}
klog.Infof("Attempting to update pod status from %v to %v", pod.Status, newStatus)
klog.V(4).Infof("Attempting to update pod status from %v to %v", pod.Status, newStatus)
_, _, err = statusutil.PatchPodStatus(sched.Client, reqStack.Tenant, reqStack.PodNamespace, reqStack.PodName, pod.Status, newStatus)
if err != nil {
klog.Warningf("PatchPodStatus for pod %q: %v", reqStack.PodName+"/"+reqStack.PodNamespace+"/"+
reqStack.Tenant+"/"+reqStack.UID, err)
return err
}

klog.Infof("Update pod status from %v to %v success", pod.Status, newStatus)
klog.V(4).Infof("Update pod status from %v to %v success", pod.Status, newStatus)
return nil
}

Expand All @@ -423,36 +453,35 @@ func (sched *Scheduler) bindToSite(clusterName string, assumedStack *types.Stack
Name: clusterName,
},
}

klog.V(3).Infof("binding: %v", binding)
klog.V(4).Infof("binding: %v", binding)
// do api server update here
klog.Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
klog.V(4).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
err := sched.Client.CoreV1().PodsWithMultiTenancy(binding.Namespace, binding.Tenant).Bind(binding)
if err != nil {
klog.Errorf("Failed to bind stack: %v/%v/%v", assumedStack.Tenant, assumedStack.PodNamespace,
assumedStack.PodName)
if err := sched.SchedulerCache.ForgetStack(assumedStack); err != nil {
klog.Errorf("scheduler cache ForgetStack failed: %v", err)
}

return err
}
//
return nil
}

func (sched *Scheduler) addCluster(object interface{}) {
resource := object.(*clusterv1.Cluster)
clusterCopy := resource.DeepCopy()
if sched.verifyClusterInfo(clusterCopy) == false {
klog.Infof(" Cluster data is not correct: %v", clusterCopy)
klog.V(4).Infof(" Cluster data is not correct: %v", clusterCopy)
}
key, err := controller.KeyFunc(object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object: %v, error: %v", object, err))
return
}
sched.Enqueue(key, EventType_Create)
klog.Infof("Enqueue Create cluster: %v", key)
klog.V(4).Infof("Enqueue Create cluster: %v", key)
}

func (sched *Scheduler) updateCluster(oldObject, newObject interface{}) {
Expand All @@ -461,7 +490,7 @@ func (sched *Scheduler) updateCluster(oldObject, newObject interface{}) {
oldClusterCopy := oldResource.DeepCopy()
newClusterCopy := newResource.DeepCopy()
if sched.verifyClusterInfo(newClusterCopy) {
klog.Infof(" Cluster data is not correct: %v", newResource)
klog.V(4).Infof(" Cluster data is not correct: %v", newResource)
}
key1, err1 := controller.KeyFunc(oldObject)
key2, err2 := controller.KeyFunc(newObject)
Expand All @@ -478,13 +507,13 @@ func (sched *Scheduler) updateCluster(oldObject, newObject interface{}) {
switch eventType {
case ClusterUpdateNo:
{
klog.Infof("No actual change in clusters, discarding: %v", newClusterCopy.Name)
klog.V(4).Infof("No actual change in clusters, discarding: %v", newClusterCopy.Name)
break
}
case ClusterUpdateYes:
{
sched.Enqueue(key2, EventType_Update)
klog.Infof("Enqueue Update Cluster: %v", key2)
klog.V(4).Infof("Enqueue Update Cluster: %v", key2)
break
}
default:
Expand All @@ -499,7 +528,7 @@ func (sched *Scheduler) deleteCluster(object interface{}) {
resource := object.(*clusterv1.Cluster)
clusterCopy := resource.DeepCopy()
if sched.verifyClusterInfo(clusterCopy) == false {
klog.Infof(" Cluster data is not correct: %v", clusterCopy)
klog.V(4).Infof(" Cluster data is not correct: %v", clusterCopy)
return
}
key, err := controller.KeyFunc(object)
Expand All @@ -510,7 +539,7 @@ func (sched *Scheduler) deleteCluster(object interface{}) {
sched.Enqueue(key, EventType_Delete)
siteID := clusterCopy.Spec.Region.Region + constants.SiteDelimiter + clusterCopy.Spec.Region.AvailabilityZone
sched.deletedClusters[key] = siteID
klog.Infof("Enqueue Delete Cluster: %v", key)
klog.V(4).Infof("Enqueue Delete Cluster: %v", key)
}

// Enqueue puts key of the cluster object in the work queue
Expand All @@ -532,3 +561,96 @@ func (sched *Scheduler) verifyClusterInfo(cluster *clusterv1.Cluster) (verified
verified = true
return verified
}

func (sched *Scheduler) verifyPodInfo(pod *v1.Pod) (verified bool) {
verified = false
name := pod.Name
if pod.Name == "" {
klog.Errorf("pod name:%s is null", name)
return verified
}
verified = true
return verified
}

func (sched *Scheduler) addPodWithdrawResource(object interface{}) {
pod, ok := object.(*v1.Pod)
klog.V(4).Infof("Add a pod to withdraw resource: %v", pod.Name)
if !ok {
klog.Errorf("cannot convert to *v1.Pod: %v", object)
return
}
podCopy := pod.DeepCopy()
if sched.verifyPodInfo(podCopy) == false {
klog.V(4).Infof(" Pod data is not correct: %v", podCopy)
}
err := sched.withdrawResource(pod.Name)
if err != nil {
klog.Errorf("withdraw resource of pod %s failed", pod.Name)
}
}

func (sched *Scheduler) updatePodWithdrawResource(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj)
return
}
newPod, ok := newObj.(*v1.Pod)
klog.V(4).Infof("Update a pod: %v", newPod)
if !ok {
klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj)
return
}
if oldPod.Name != newPod.Name {
klog.Errorf("old pod name and new pod name should be equal: %s, %s", oldPod.Name, newPod.Name)
return
}
err := sched.withdrawResource(newPod.Name)
if err != nil {
klog.Errorf("withdraw resource of pod %s failed", oldPod.Name)
}
}

func (sched *Scheduler) deletePodWithdrawResource(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
pod = t
klog.V(4).Infof("Delete a pod: %v", pod.Name)
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*v1.Pod)
if !ok {
klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to *v1.Pod: %v", t)
return
}
err := sched.withdrawResource(pod.Name)
if err != nil {
klog.Errorf("withdraw resource of pod %s failed", pod.Name)
}
}

//withdraw reserved resources to a pod & add it to cash to other pods
func (sched *Scheduler) withdrawResource(podName string) error {
resource := sched.PodSiteResourceMap[podName]
if resource == nil {
klog.V(4).Infof("there is no preserved resource for pod: %s", podName)
return nil
}
allResInfo := resource.Resource
regionName := utils.GetRegionName(resource.SiteID)
regionFlavor, err := sched.siteCacheInfoSnapshot.GetRegionFlavors(regionName)
if err != nil {
klog.Errorf("there is no valid flavor for region: %s", regionName)
return err
}
siteCacheInfo := sched.siteCacheInfoSnapshot.SiteCacheInfoMap[resource.SiteID]
siteCacheInfo.UpdateSiteResInfo(allResInfo, regionFlavor, false)
delete(sched.PodSiteResourceMap, podName)
return nil
}
6 changes: 4 additions & 2 deletions globalscheduler/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ func (i *podInformer) Lister() corelisters.PodLister {
// NewPodInformer creates a shared index informer that returns only non-terminal pods.
func NewPodInformer(schedulerName string, client clientset.Interface,
resyncPeriod time.Duration) coreinformers.PodInformer {
/*selector := fields.ParseSelectorOrDie(
"status.phase=" + string(v1.PodAssigned) +
",status.assignedScheduler.name=" + schedulerName)*/
selector := fields.ParseSelectorOrDie(
"status.phase=" + string(v1.PodAssigned) +
",status.assignedScheduler.name=" + schedulerName)
"status.assignedScheduler.name=" + schedulerName)
lw := cache.NewListWatchFromClient(client.CoreV1(), string(v1.ResourcePods), metav1.NamespaceAll, selector)
return &podInformer{
informer: cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod,
Expand Down
Loading