diff --git a/cmd/scheduler/app/options/options.go b/cmd/scheduler/app/options/options.go index 9668830820..7275fb2577 100644 --- a/cmd/scheduler/app/options/options.go +++ b/cmd/scheduler/app/options/options.go @@ -42,6 +42,7 @@ const ( defaultMinNodesToFind = 100 defaultPercentageOfNodesToFind = 0 defaultLockObjectNamespace = "volcano-system" + defaultNodeWorkers = 20 ) // ServerOption is the main context object for the controller manager. @@ -77,6 +78,7 @@ type ServerOption struct { NodeSelector []string EnableCacheDumper bool + NodeWorkerThreads uint32 } type DecryptFunc func(c *ServerOption) error @@ -131,6 +133,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.EnableMetrics, "enable-metrics", false, "Enable the metrics function; it is false by default") fs.StringSliceVar(&s.NodeSelector, "node-selector", nil, "volcano only work with the labeled node, like: --node-selector=volcano.sh/role:train --node-selector=volcano.sh/role:serving") fs.BoolVar(&s.EnableCacheDumper, "cache-dumper", true, "Enable the cache dumper, it's true by default") + fs.Uint32Var(&s.NodeWorkerThreads, "node-worker-threads", defaultNodeWorkers, "The number of threads syncing node operations.") } // CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled. diff --git a/cmd/scheduler/app/options/options_test.go b/cmd/scheduler/app/options/options_test.go index e815e6ff3e..30999528d8 100644 --- a/cmd/scheduler/app/options/options_test.go +++ b/cmd/scheduler/app/options/options_test.go @@ -57,6 +57,7 @@ func TestAddFlags(t *testing.T) { PercentageOfNodesToFind: defaultPercentageOfNodesToFind, EnableLeaderElection: true, LockObjectNamespace: defaultLockObjectNamespace, + NodeWorkerThreads: defaultNodeWorkers, } if !reflect.DeepEqual(expected, s) { diff --git a/cmd/scheduler/app/server.go b/cmd/scheduler/app/server.go index 5c73e0a6c8..ebbfb5ba95 100644 --- a/cmd/scheduler/app/server.go +++ b/cmd/scheduler/app/server.go @@ -74,12 +74,7 @@ func Run(opt *options.ServerOption) error { } } - sched, err := scheduler.NewScheduler(config, - opt.SchedulerNames, - opt.SchedulerConf, - opt.SchedulePeriod, - opt.DefaultQueue, - opt.NodeSelector) + sched, err := scheduler.NewScheduler(config, opt) if err != nil { panic(err) } diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index 47239682e9..0e154967f7 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -268,7 +268,7 @@ func TestAllocate(t *testing.T) { } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) @@ -454,7 +454,7 @@ func TestAllocateWithDynamicPVC(t *testing.T) { schedulerCache.AddPod(pod) } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } trueValue := true diff --git a/pkg/scheduler/actions/preempt/preempt_test.go b/pkg/scheduler/actions/preempt/preempt_test.go index 40c4b97a3e..9ecbd357e7 100644 --- a/pkg/scheduler/actions/preempt/preempt_test.go +++ b/pkg/scheduler/actions/preempt/preempt_test.go @@ -314,7 +314,7 @@ func TestPreempt(t *testing.T) { Value: 10, } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) diff --git a/pkg/scheduler/actions/reclaim/reclaim_test.go b/pkg/scheduler/actions/reclaim/reclaim_test.go index 13e7d312c0..4da92ab3b8 100644 --- a/pkg/scheduler/actions/reclaim/reclaim_test.go +++ b/pkg/scheduler/actions/reclaim/reclaim_test.go @@ -148,7 +148,7 @@ func TestReclaim(t *testing.T) { Value: 10, } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) diff --git a/pkg/scheduler/actions/shuffle/shuffle_test.go b/pkg/scheduler/actions/shuffle/shuffle_test.go index f2905c2b72..56d5c0b68a 100644 --- a/pkg/scheduler/actions/shuffle/shuffle_test.go +++ b/pkg/scheduler/actions/shuffle/shuffle_test.go @@ -163,7 +163,7 @@ func TestShuffle(t *testing.T) { } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, q := range test.queues { schedulerCache.AddQueueV1beta1(q) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index be208b16bc..7f0eb5c5f8 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -83,8 +83,8 @@ func init() { } // New returns a Cache implementation. -func New(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string) Cache { - return newSchedulerCache(config, schedulerNames, defaultQueue, nodeSelectors) +func New(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32) Cache { + return newSchedulerCache(config, schedulerNames, defaultQueue, nodeSelectors, nodeWorkers) } // SchedulerCache cache for the kube batch @@ -134,6 +134,7 @@ type SchedulerCache struct { NamespaceCollection map[string]*schedulingapi.NamespaceCollection errTasks workqueue.RateLimitingInterface + nodeQueue workqueue.RateLimitingInterface DeletedJobs workqueue.RateLimitingInterface informerFactory informers.SharedInformerFactory @@ -145,6 +146,8 @@ type SchedulerCache struct { // A map from image name to its imageState. imageStates map[string]*imageState + + nodeWorkers uint32 } type imageState struct { @@ -387,7 +390,7 @@ func (pgb *podgroupBinder) Bind(job *schedulingapi.JobInfo, cluster string) (*sc return job, nil } -func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string) *SchedulerCache { +func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32) *SchedulerCache { kubeClient, err := kubernetes.NewForConfig(config) if err != nil { panic(fmt.Sprintf("failed init kubeClient, with err: %v", err)) @@ -435,6 +438,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu Queues: make(map[schedulingapi.QueueID]*schedulingapi.QueueInfo), PriorityClasses: make(map[string]*schedulingv1.PriorityClass), errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), DeletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), kubeClient: kubeClient, vcClient: vcClient, @@ -446,7 +450,8 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo), imageStates: make(map[string]*imageState), - NodeList: []string{}, + NodeList: []string{}, + nodeWorkers: nodeWorkers, } if len(nodeSelectors) > 0 { for _, nodeSelectorLabel := range nodeSelectors { @@ -700,6 +705,11 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu func (sc *SchedulerCache) Run(stopCh <-chan struct{}) { sc.informerFactory.Start(stopCh) sc.vcInformerFactory.Start(stopCh) + sc.WaitForCacheSync(stopCh) + for i := 0; i < int(sc.nodeWorkers); i++ { + go wait.Until(sc.runNodeWorker, 0, stopCh) + } + // Re-sync error tasks. go wait.Until(sc.processResyncTask, 0, stopCh) @@ -990,6 +1000,36 @@ func (sc *SchedulerCache) processResyncTask() { } } +func (sc *SchedulerCache) runNodeWorker() { + for sc.processSyncNode() { + } +} + +func (sc *SchedulerCache) processSyncNode() bool { + obj, shutdown := sc.nodeQueue.Get() + if shutdown { + return false + } + defer sc.nodeQueue.Done(obj) + + nodeName, ok := obj.(string) + if !ok { + klog.Errorf("failed to convert %v to string", obj) + return true + } + + klog.V(5).Infof("started sync node %s", nodeName) + err := sc.SyncNode(nodeName) + if err == nil { + sc.nodeQueue.Forget(nodeName) + return true + } + + klog.Errorf("Failed to sync node <%s>, retry it.", nodeName) + sc.nodeQueue.AddRateLimited(nodeName) + return true +} + // AddBindTask add task to be bind to a cache which consumes by go runtime func (sc *SchedulerCache) AddBindTask(taskInfo *schedulingapi.TaskInfo) error { klog.V(5).Infof("add bind task %v/%v", taskInfo.Namespace, taskInfo.Name) diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index 1aa3b53465..b964921b58 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -32,7 +32,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - "volcano.sh/volcano/pkg/scheduler/api" volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" "volcano.sh/volcano/pkg/scheduler/util" @@ -155,7 +154,7 @@ func TestSchedulerCache_Bind_NodeWithSufficientResources(t *testing.T) { cache.AddPod(pod) node := buildNode("n1", api.BuildResourceList("2000m", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...)) - cache.AddNode(node) + cache.AddOrUpdateNode(node) task := api.NewTaskInfo(pod) task.Job = "j1" @@ -187,7 +186,7 @@ func TestSchedulerCache_Bind_NodeWithInsufficientResources(t *testing.T) { cache.AddPod(pod) node := buildNode("n1", api.BuildResourceList("2000m", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...)) - cache.AddNode(node) + cache.AddOrUpdateNode(node) task := api.NewTaskInfo(pod) task.Job = "j1" @@ -299,7 +298,7 @@ func TestNodeOperation(t *testing.T) { } for _, n := range test.nodes { - cache.AddNode(n) + cache.AddOrUpdateNode(n) } if !reflect.DeepEqual(cache, test.expected) { @@ -308,7 +307,7 @@ func TestNodeOperation(t *testing.T) { } // delete node - cache.DeleteNode(test.deletedNode) + cache.RemoveNode(test.deletedNode.Name) if !reflect.DeepEqual(cache, test.delExpect) { t.Errorf("case %d: \n expected %v, \n got %v \n", i, test.delExpect, cache) @@ -336,6 +335,7 @@ func TestBindTasks(t *testing.T) { pvInformer: informerFactory.Core().V1().PersistentVolumes(), scInformer: informerFactory.Storage().V1().StorageClasses(), errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), } sc.Binder = &DefaultBinder{} @@ -359,12 +359,6 @@ func TestBindTasks(t *testing.T) { DeleteFunc: sc.DeletePod, }, ) - sc.nodeInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: sc.AddNode, - UpdateFunc: sc.UpdateNode, - }, - ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go wait.Until(sc.processBindTask, time.Millisecond*5, ctx.Done()) @@ -375,9 +369,10 @@ func TestBindTasks(t *testing.T) { // make sure pod exist when calling fake client binding fakeKube.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) - fakeKube.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) + // set node in cache directly + sc.AddOrUpdateNode(node) task := api.NewTaskInfo(pod) task.NodeName = "n1" diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 969acea430..133c80e854 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -19,6 +19,7 @@ package cache import ( "context" "fmt" + "math" "reflect" "strconv" @@ -26,13 +27,17 @@ import ( schedulingv1 "k8s.io/api/scheduling/v1" sv1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" + "k8s.io/component-helpers/storage/ephemeral" + storagehelpers "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/scheduler/framework" + volumeutil "k8s.io/kubernetes/pkg/volume/util" nodeinfov1alpha1 "volcano.sh/apis/pkg/apis/nodeinfo/v1alpha1" "volcano.sh/apis/pkg/apis/scheduling" @@ -44,6 +49,8 @@ import ( commonutil "volcano.sh/volcano/pkg/util" ) +var DefaultAttachableVolumeQuantity int64 = math.MaxInt32 + func isTerminated(status schedulingapi.TaskStatus) bool { return status == schedulingapi.Succeeded || status == schedulingapi.Failed } @@ -66,6 +73,131 @@ func (sc *SchedulerCache) getOrCreateJob(pi *schedulingapi.TaskInfo) *scheduling return sc.Jobs[pi.Job] } +// addPodCSIVolumesToTask counts the csi volumes used by task +// @Lily922 TODO: support counting shared volumes. Currently, if two different pods use the same attachable volume +// and scheduled on the same nodes the volume will be count twice, but actually only use one attachable limit resource +func (sc *SchedulerCache) addPodCSIVolumesToTask(pi *schedulingapi.TaskInfo) error { + volumes, err := sc.getPodCSIVolumes(pi.Pod) + if err != nil { + klog.Errorf("got pod csi attachment persistent volumes count error: %s", err.Error()) + return err + } + + for key, count := range volumes { + pi.Resreq.AddScalar(key, float64(count)) + } + return nil +} + +func (sc *SchedulerCache) getPodCSIVolumes(pod *v1.Pod) (map[v1.ResourceName]int64, error) { + volumes := make(map[v1.ResourceName]int64) + for _, vol := range pod.Spec.Volumes { + pvcName := "" + isEphemeral := false + switch { + case vol.PersistentVolumeClaim != nil: + // Normal CSI volume can only be used through PVC + pvcName = vol.PersistentVolumeClaim.ClaimName + case vol.Ephemeral != nil: + // Generic ephemeral inline volumes also use a PVC, + // just with a computed name and certain ownership. + // That is checked below once the pvc object is + // retrieved. + pvcName = ephemeral.VolumeClaimName(pod, &vol) + isEphemeral = true + default: + // @Lily922 TODO: support Inline volumes. + // Inline volume is not supported now + continue + } + if pvcName == "" { + return volumes, fmt.Errorf("PersistentVolumeClaim had no name") + } + + pvc, err := sc.pvcInformer.Lister().PersistentVolumeClaims(pod.Namespace).Get(pvcName) + if err != nil { + // The PVC is required to proceed with + // scheduling of a new pod because it cannot + // run without it. Bail out immediately. + return volumes, fmt.Errorf("looking up PVC %s/%s: %v", pod.Namespace, pvcName, err) + } + // The PVC for an ephemeral volume must be owned by the pod. + if isEphemeral { + if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil { + return volumes, err + } + } + driverName := sc.getCSIDriverInfo(pvc) + if driverName == "" { + klog.V(5).InfoS("Could not find a CSI driver name for pvc(%s/%s), not counting volume", pvc.Namespace, pvc.Name) + continue + } + + // Count all csi volumes in cache, because the storage may change from unattachable volumes to attachable volumes after + // the cache set up, in this case it is very difficult to refresh all task caches. + // For unattachable volume, set the limits number to a very large value, in this way, scheduling will never + // be limited due to insufficient quantity of it. + k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(driverName)) + if _, ok := volumes[k]; !ok { + volumes[k] = 1 + } else { + volumes[k] += 1 + } + } + return volumes, nil +} + +func (sc *SchedulerCache) getCSIDriverInfo(pvc *v1.PersistentVolumeClaim) string { + pvName := pvc.Spec.VolumeName + + if pvName == "" { + klog.V(5).Infof("PV had no name for pvc <%s>", klog.KObj(pvc)) + return sc.getCSIDriverInfoFromSC(pvc) + } + + pv, err := sc.pvInformer.Lister().Get(pvName) + if err != nil { + klog.V(5).InfoS("Failed to get pv <%s> for pvc <%s>: %v", klog.KRef("", pvName), klog.KObj(pvc), err) + // If we can't fetch PV associated with PVC, may be it got deleted + // or PVC was prebound to a PVC that hasn't been created yet. + // fallback to using StorageClass for volume counting + return sc.getCSIDriverInfoFromSC(pvc) + } + + csiSource := pv.Spec.PersistentVolumeSource.CSI + if csiSource == nil { + // @Lily922 TODO: support non-CSI volumes and migrating pvc + // CSIMigration is not supported now. + klog.Warningf("Not support non-csi pvc.") + return "" + } + + return csiSource.Driver +} + +// getCSIDriverInfoFromSC get the name of the csi driver through the sc of the pvc. +func (sc *SchedulerCache) getCSIDriverInfoFromSC(pvc *v1.PersistentVolumeClaim) string { + scName := storagehelpers.GetPersistentVolumeClaimClass(pvc) + + // If StorageClass is not set or not found, then PVC must be using immediate binding mode + // and hence it must be bound before scheduling. So it is safe to not count it. + if scName == "" { + klog.V(3).Infof("PVC <%s> has no StorageClass", klog.KObj(pvc)) + return "" + } + + storageClass, err := sc.scInformer.Lister().Get(scName) + if err != nil { + klog.Errorf("Failed to get StorageClass for pvc <%s>: %v", klog.KObj(pvc), err) + return "" + } + + if driverName, ok := storageClass.Parameters["csi.storage.k8s.io/csi-driver-name"]; ok { + return driverName + } + return storageClass.Provisioner +} + func (sc *SchedulerCache) addTask(pi *schedulingapi.TaskInfo) error { if len(pi.NodeName) != 0 { if _, found := sc.Nodes[pi.NodeName]; !found { @@ -91,9 +223,21 @@ func (sc *SchedulerCache) addTask(pi *schedulingapi.TaskInfo) error { return nil } +func (sc *SchedulerCache) NewTaskInfo(pod *v1.Pod) (*schedulingapi.TaskInfo, error) { + taskInfo := schedulingapi.NewTaskInfo(pod) + if err := sc.addPodCSIVolumesToTask(taskInfo); err != nil { + return taskInfo, err + } + return taskInfo, nil +} + // Assumes that lock is already acquired. func (sc *SchedulerCache) addPod(pod *v1.Pod) error { - pi := schedulingapi.NewTaskInfo(pod) + pi, err := sc.NewTaskInfo(pod) + if err != nil { + klog.Errorf("generate taskInfo for pod(%s) failed: %v", pod.Name, err) + sc.resyncTask(pi) + } return sc.addTask(pi) } @@ -114,7 +258,10 @@ func (sc *SchedulerCache) syncTask(oldTask *schedulingapi.TaskInfo) error { return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldTask.Namespace, oldTask.Name, err) } - newTask := schedulingapi.NewTaskInfo(newPod) + newTask, err := sc.NewTaskInfo(newPod) + if err != nil { + return fmt.Errorf("failed to generate taskInfo of pod(%s), error: %v", newPod.Name, err) + } sc.Mutex.Lock() defer sc.Mutex.Unlock() @@ -314,58 +461,62 @@ func (sc *SchedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *scheduling // removeNodeImageStates removes the given node record from image entries having the node // in imageStates cache. After the removal, if any image becomes free, i.e., the image // is no longer available on any node, the image entry will be removed from imageStates. -func (sc *SchedulerCache) removeNodeImageStates(node *v1.Node) { - if node == nil { - return - } - - for _, image := range node.Status.Images { - for _, name := range image.Names { - state, ok := sc.imageStates[name] - if ok { - state.nodes.Delete(node.Name) - if len(state.nodes) == 0 { - // Remove the unused image to make sure the length of - // imageStates represents the total number of different - // images on all nodes - delete(sc.imageStates, name) - } - } +func (sc *SchedulerCache) removeNodeImageStates(node string) { + for image, state := range sc.imageStates { + state.nodes.Delete(node) + if len(state.nodes) == 0 { + // Remove the unused image to make sure the length of + // imageStates represents the total number of different + // images on all nodes + delete(sc.imageStates, image) } } } -// Assumes that lock is already acquired. -func (sc *SchedulerCache) addNode(node *v1.Node) error { +// AddOrUpdateNode adds or updates node info in cache. +func (sc *SchedulerCache) AddOrUpdateNode(node *v1.Node) error { + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + if sc.Nodes[node.Name] != nil { sc.Nodes[node.Name].SetNode(node) - sc.removeNodeImageStates(node) + sc.removeNodeImageStates(node.Name) } else { sc.Nodes[node.Name] = schedulingapi.NewNodeInfo(node) } sc.addNodeImageStates(node, sc.Nodes[node.Name]) + + var nodeExisted bool + for _, name := range sc.NodeList { + if name == node.Name { + nodeExisted = true + break + } + } + if !nodeExisted { + sc.NodeList = append(sc.NodeList, node.Name) + } return nil } -// Assumes that lock is already acquired. -func (sc *SchedulerCache) updateNode(oldNode, newNode *v1.Node) error { - if sc.Nodes[newNode.Name] != nil { - sc.Nodes[newNode.Name].SetNode(newNode) - sc.removeNodeImageStates(newNode) - sc.addNodeImageStates(newNode, sc.Nodes[newNode.Name]) - return nil - } +// RemoveNode removes node info from cache +func (sc *SchedulerCache) RemoveNode(nodeName string) error { + sc.Mutex.Lock() + defer sc.Mutex.Unlock() - return fmt.Errorf("node <%s> does not exist", newNode.Name) -} + for i, name := range sc.NodeList { + if name == nodeName { + sc.NodeList = append(sc.NodeList[:i], sc.NodeList[i+1:]...) + break + } + } + sc.removeNodeImageStates(nodeName) -// Assumes that lock is already acquired. -func (sc *SchedulerCache) deleteNode(node *v1.Node) error { - if _, ok := sc.Nodes[node.Name]; !ok { - return fmt.Errorf("node <%s> does not exist", node.Name) + if _, ok := sc.Nodes[nodeName]; !ok { + return fmt.Errorf("node <%s> does not exist", nodeName) } - numaInfo := sc.Nodes[node.Name].NumaInfo + numaInfo := sc.Nodes[nodeName].NumaInfo if numaInfo != nil { klog.V(3).Infof("delete numatopo <%s/%s>", numaInfo.Namespace, numaInfo.Name) err := sc.vcClient.NodeinfoV1alpha1().Numatopologies().Delete(context.TODO(), numaInfo.Name, metav1.DeleteOptions{}) @@ -373,9 +524,7 @@ func (sc *SchedulerCache) deleteNode(node *v1.Node) error { klog.Errorf("delete numatopo <%s/%s> failed.", numaInfo.Namespace, numaInfo.Name) } } - sc.removeNodeImageStates(node) - delete(sc.Nodes, node.Name) - + delete(sc.Nodes, nodeName) return nil } @@ -386,21 +535,12 @@ func (sc *SchedulerCache) AddNode(obj interface{}) { klog.Errorf("Cannot convert to *v1.Node: %v", obj) return } - - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - err := sc.addNode(node) - if err != nil { - klog.Errorf("Failed to add node %s into cache: %v", node.Name, err) - return - } - sc.NodeList = append(sc.NodeList, node.Name) + sc.nodeQueue.Add(node.Name) } // UpdateNode update node to scheduler cache func (sc *SchedulerCache) UpdateNode(oldObj, newObj interface{}) { - oldNode, ok := oldObj.(*v1.Node) + _, ok := oldObj.(*v1.Node) if !ok { klog.Errorf("Cannot convert oldObj to *v1.Node: %v", oldObj) return @@ -410,15 +550,7 @@ func (sc *SchedulerCache) UpdateNode(oldObj, newObj interface{}) { klog.Errorf("Cannot convert newObj to *v1.Node: %v", newObj) return } - - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - err := sc.updateNode(oldNode, newNode) - if err != nil { - klog.Errorf("Failed to update node %v in cache: %v", oldNode.Name, err) - return - } + sc.nodeQueue.Add(newNode.Name) } // DeleteNode delete node from scheduler cache @@ -438,22 +570,33 @@ func (sc *SchedulerCache) DeleteNode(obj interface{}) { klog.Errorf("Cannot convert to *v1.Node: %v", t) return } + sc.nodeQueue.Add(node.Name) +} - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - err := sc.deleteNode(node) +func (sc *SchedulerCache) SyncNode(nodeName string) error { + node, err := sc.nodeInformer.Lister().Get(nodeName) if err != nil { - klog.Errorf("Failed to delete node %s from cache: %v", node.Name, err) - return - } + if errors.IsNotFound(err) { + deleteErr := sc.RemoveNode(nodeName) + if deleteErr != nil { + klog.Errorf("Failed to delete node <%s> and remove from cache: %s", nodeName, deleteErr.Error()) + return deleteErr + } - for i, name := range sc.NodeList { - if name == node.Name { - sc.NodeList = append(sc.NodeList[:i], sc.NodeList[i+1:]...) - break + klog.V(3).Infof("Node <%s> was deleted, removed from cache.", nodeName) + return nil } + klog.Errorf("Failed to get node %s, error: %v", nodeName, err) + return err + } + + csiNode, err := sc.csiNodeInformer.Lister().Get(nodeName) + if err == nil { + sc.setCSIResourceOnNode(csiNode, node) + } else if !errors.IsNotFound(err) { + return err } + return sc.AddOrUpdateNode(node) } func (sc *SchedulerCache) AddOrUpdateCSINode(obj interface{}) { @@ -474,6 +617,7 @@ func (sc *SchedulerCache) AddOrUpdateCSINode(obj interface{}) { csiNodeStatus.DriverStatus[d.Name] = d.Allocatable != nil && d.Allocatable.Count != nil } sc.CSINodesStatus[csiNode.Name] = csiNodeStatus + sc.nodeQueue.Add(csiNode.Name) } func (sc *SchedulerCache) UpdateCSINode(oldObj, newObj interface{}) { @@ -511,6 +655,7 @@ func (sc *SchedulerCache) DeleteCSINode(obj interface{}) { sc.Mutex.Lock() delete(sc.CSINodesStatus, csiNode.Name) sc.Mutex.Unlock() + sc.nodeQueue.Add(csiNode.Name) } func getJobID(pg *schedulingapi.PodGroup) schedulingapi.JobID { @@ -1042,3 +1187,41 @@ func (sc *SchedulerCache) AddJob(obj interface{}) { defer sc.Mutex.Unlock() sc.Jobs[job.UID] = job } + +func (sc *SchedulerCache) setCSIResourceOnNode(csiNode *sv1.CSINode, node *v1.Node) { + if csiNode == nil || node == nil { + return + } + + csiResources := make(map[v1.ResourceName]resource.Quantity) + for i := range csiNode.Spec.Drivers { + d := csiNode.Spec.Drivers[i] + k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name)) + if d.Allocatable != nil && d.Allocatable.Count != nil { + csiResources[k] = *resource.NewScaledQuantity(int64(*d.Allocatable.Count), -3) + } else { + // Count all csi volumes in cache, because the storage may change from unattachable volumes to attachable volumes after + // the cache set up, in this case it is very difficult to refresh all task caches. + // For unattachable volume, set the limits number to a very large value, in this way, scheduling will never + // be limited due to insufficient quantity of it. + csiResources[k] = *resource.NewScaledQuantity(DefaultAttachableVolumeQuantity, -3) + } + } + + if len(csiResources) == 0 { + return + } + + if node.Status.Allocatable == nil { + node.Status.Allocatable = make(map[v1.ResourceName]resource.Quantity) + } + + if node.Status.Capacity == nil { + node.Status.Capacity = make(map[v1.ResourceName]resource.Quantity) + } + + for resourceName, quantity := range csiResources { + node.Status.Allocatable[resourceName] = quantity + node.Status.Capacity[resourceName] = quantity + } +} diff --git a/pkg/scheduler/cache/event_handlers_test.go b/pkg/scheduler/cache/event_handlers_test.go index d7b2913a5d..43e825906c 100644 --- a/pkg/scheduler/cache/event_handlers_test.go +++ b/pkg/scheduler/cache/event_handlers_test.go @@ -75,7 +75,7 @@ func TestSchedulerCache_updateTask(t *testing.T) { } for _, n := range test.Nodes { - cache.AddNode(n) + cache.AddOrUpdateNode(n) } cache.AddPod(test.OldPod) @@ -129,7 +129,7 @@ func TestSchedulerCache_UpdatePod(t *testing.T) { } for _, n := range test.Nodes { - cache.AddNode(n) + cache.AddOrUpdateNode(n) } cache.AddPod(test.OldPod) @@ -210,7 +210,7 @@ func TestSchedulerCache_AddPodGroupV1beta1(t *testing.T) { } for _, n := range test.Nodes { - cache.AddNode(n) + cache.AddOrUpdateNode(n) } test.Pod.Annotations = map[string]string{ "scheduling.k8s.io/group-name": "j1", @@ -336,7 +336,7 @@ func TestSchedulerCache_UpdatePodGroupV1beta1(t *testing.T) { } for _, n := range test.Nodes { - cache.AddNode(n) + cache.AddOrUpdateNode(n) } test.Pod.Annotations = map[string]string{ "scheduling.k8s.io/group-name": "j1", @@ -431,7 +431,7 @@ func TestSchedulerCache_DeletePodGroupV1beta1(t *testing.T) { cache.DeletedJobs = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) for _, n := range test.Nodes { - cache.AddNode(n) + cache.AddOrUpdateNode(n) } test.Pod.Annotations = map[string]string{ "scheduling.k8s.io/group-name": "j1", diff --git a/pkg/scheduler/plugins/binpack/binpack_test.go b/pkg/scheduler/plugins/binpack/binpack_test.go index 069a8b662b..7832805c99 100644 --- a/pkg/scheduler/plugins/binpack/binpack_test.go +++ b/pkg/scheduler/plugins/binpack/binpack_test.go @@ -265,7 +265,7 @@ func TestNode(t *testing.T) { Recorder: record.NewFakeRecorder(100), } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) diff --git a/pkg/scheduler/plugins/drf/hdrf_test.go b/pkg/scheduler/plugins/drf/hdrf_test.go index 3b7761bc4d..cc398a1fae 100644 --- a/pkg/scheduler/plugins/drf/hdrf_test.go +++ b/pkg/scheduler/plugins/drf/hdrf_test.go @@ -264,7 +264,7 @@ func TestHDRF(t *testing.T) { Recorder: record.NewFakeRecorder(100), } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, q := range test.queueSpecs { schedulerCache.AddQueueV1beta1( diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index c7e0d7f1e7..25c2915279 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -415,7 +415,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { } if node.Allocatable.MaxTaskNum <= len(nodeInfo.Pods) { - klog.V(4).Infof("NodePodNumber predicates Task <%s/%s> on Node <%s> failed, %d, %d", + klog.V(4).Infof("NodePodNumber predicates Task <%s/%s> on Node <%s> failed, allocatable <%d>, existed <%d>", task.Namespace, task.Name, node.Name, node.Allocatable.MaxTaskNum, len(nodeInfo.Pods)) podsNumStatus := &api.Status{ Code: api.Unschedulable, @@ -515,10 +515,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { if predicate.nodeVolumeLimitsEnable { status := nodeVolumeLimitsCSIFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) nodeVolumeStatus := framework.ConvertPredicateStatus(status) - if nodeVolumeStatus.Code != api.Success { - predicateStatus = append(predicateStatus, nodeVolumeStatus) - return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodeVolumeLimitsCSIFilter.Name(), status.Message()) - } + predicateStatus = append(predicateStatus, nodeVolumeStatus) } // Check VolumeZone diff --git a/pkg/scheduler/plugins/predicates/predicates_test.go b/pkg/scheduler/plugins/predicates/predicates_test.go index d6b07077a9..02efd1bc71 100644 --- a/pkg/scheduler/plugins/predicates/predicates_test.go +++ b/pkg/scheduler/plugins/predicates/predicates_test.go @@ -70,7 +70,7 @@ func TestEventHandler(t *testing.T) { return } - sc := cache.New(config, option.SchedulerNames, option.DefaultQueue, option.NodeSelector) + sc := cache.New(config, option.SchedulerNames, option.DefaultQueue, option.NodeSelector, option.NodeWorkerThreads) schedulerCache := sc.(*cache.SchedulerCache) // pending pods @@ -157,7 +157,7 @@ func TestEventHandler(t *testing.T) { } }() for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) diff --git a/pkg/scheduler/plugins/proportion/proportion_test.go b/pkg/scheduler/plugins/proportion/proportion_test.go index be725b3ae7..ea773179a5 100644 --- a/pkg/scheduler/plugins/proportion/proportion_test.go +++ b/pkg/scheduler/plugins/proportion/proportion_test.go @@ -217,7 +217,7 @@ func TestProportion(t *testing.T) { schedulerCache.DeletedJobs = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) diff --git a/pkg/scheduler/plugins/tdm/tdm_test.go b/pkg/scheduler/plugins/tdm/tdm_test.go index aef6bc4963..3782c02602 100644 --- a/pkg/scheduler/plugins/tdm/tdm_test.go +++ b/pkg/scheduler/plugins/tdm/tdm_test.go @@ -251,7 +251,7 @@ func Test_TDM(t *testing.T) { Recorder: record.NewFakeRecorder(100), } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } schedulerCache.AddPod(test.pod) @@ -713,7 +713,7 @@ func Test_TDM_victimsFn(t *testing.T) { Recorder: record.NewFakeRecorder(100), } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { diff --git a/pkg/scheduler/plugins/usage/usage_test.go b/pkg/scheduler/plugins/usage/usage_test.go index 24317e51e3..075fa30572 100644 --- a/pkg/scheduler/plugins/usage/usage_test.go +++ b/pkg/scheduler/plugins/usage/usage_test.go @@ -314,7 +314,7 @@ func TestUsage_predicateFn(t *testing.T) { } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) @@ -513,7 +513,7 @@ func TestUsage_nodeOrderFn(t *testing.T) { } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b91a4f5666..ded7f22264 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -57,30 +57,23 @@ type Scheduler struct { } // NewScheduler returns a scheduler -func NewScheduler( - config *rest.Config, - schedulerNames []string, - schedulerConf string, - period time.Duration, - defaultQueue string, - nodeSelectors []string, -) (*Scheduler, error) { +func NewScheduler(config *rest.Config, opt *options.ServerOption) (*Scheduler, error) { var watcher filewatcher.FileWatcher - if schedulerConf != "" { + if opt.SchedulerConf != "" { var err error - path := filepath.Dir(schedulerConf) + path := filepath.Dir(opt.SchedulerConf) watcher, err = filewatcher.NewFileWatcher(path) if err != nil { - return nil, fmt.Errorf("failed creating filewatcher for %s: %v", schedulerConf, err) + return nil, fmt.Errorf("failed creating filewatcher for %s: %v", opt.SchedulerConf, err) } } - cache := schedcache.New(config, schedulerNames, defaultQueue, nodeSelectors) + cache := schedcache.New(config, opt.SchedulerNames, opt.DefaultQueue, opt.NodeSelector, opt.NodeWorkerThreads) scheduler := &Scheduler{ - schedulerConf: schedulerConf, + schedulerConf: opt.SchedulerConf, fileWatcher: watcher, cache: cache, - schedulePeriod: period, + schedulePeriod: opt.SchedulePeriod, dumper: schedcache.Dumper{Cache: cache}, }