diff --git a/pkg/scheduler/plugins/coscheduling/core/core_test.go b/pkg/scheduler/plugins/coscheduling/core/core_test.go index 463ee998f..30ec3cf74 100644 --- a/pkg/scheduler/plugins/coscheduling/core/core_test.go +++ b/pkg/scheduler/plugins/coscheduling/core/core_test.go @@ -738,7 +738,7 @@ func TestPermit(t *testing.T) { gangId := util.GetId(pg.Namespace, pg.Name) gang := mgr.cache.getGangFromCacheByGangId(gangId, false) gang.lock.Lock() - gang.OnceResourceSatisfied = tt.onceSatisfy + gang.GangGroupInfo.OnceResourceSatisfied = tt.onceSatisfy gang.GangMatchPolicy = tt.matchPolicy gang.lock.Unlock() } diff --git a/pkg/scheduler/plugins/coscheduling/core/gang.go b/pkg/scheduler/plugins/coscheduling/core/gang.go index 4eeb731f9..e9d8d791c 100644 --- a/pkg/scheduler/plugins/coscheduling/core/gang.go +++ b/pkg/scheduler/plugins/coscheduling/core/gang.go @@ -57,10 +57,6 @@ type Gang struct { WaitingForBindChildren map[string]*v1.Pod // pods that have already bound BoundChildren map[string]*v1.Pod - // OnceResourceSatisfied indicates whether the gang has ever reached the ResourceSatisfied state,which means the - // children number has reached the minNum in the early step, - // once this variable is set true, it is irreversible. - OnceResourceSatisfied bool // only-waiting, only consider waiting pods // waiting-and-running, consider waiting and running pods @@ -344,7 +340,7 @@ func (gang *Gang) isGangOnceResourceSatisfied() bool { gang.lock.Lock() defer gang.lock.Unlock() - return gang.OnceResourceSatisfied + return gang.GangGroupInfo.isGangOnceResourceSatisfied() } func (gang *Gang) setChild(pod *v1.Pod) { @@ -472,10 +468,7 @@ func (gang *Gang) setResourceSatisfied() { gang.lock.Lock() defer gang.lock.Unlock() - if !gang.OnceResourceSatisfied { - gang.OnceResourceSatisfied = true - klog.Infof("Gang ResourceSatisfied, gangName: %v", gang.Name) - } + gang.GangGroupInfo.setResourceSatisfied() } func (gang *Gang) addBoundPod(pod *v1.Pod) { @@ -487,8 +480,8 @@ func (gang *Gang) addBoundPod(pod *v1.Pod) { gang.BoundChildren[podId] = pod klog.Infof("AddBoundPod, gangName: %v, podName: %v", gang.Name, podId) - if !gang.OnceResourceSatisfied && len(gang.BoundChildren) >= gang.MinRequiredNumber { - gang.OnceResourceSatisfied = true + if !gang.GangGroupInfo.isGangOnceResourceSatisfied() { + gang.GangGroupInfo.setResourceSatisfied() klog.Infof("Gang ResourceSatisfied due to addBoundPod, gangName: %v", gang.Name) } } @@ -507,6 +500,6 @@ func (gang *Gang) isGangValidForPermit() bool { case extension.GangMatchPolicyWaitingAndRunning: return len(gang.WaitingForBindChildren)+len(gang.BoundChildren) >= gang.MinRequiredNumber default: - return len(gang.WaitingForBindChildren) >= gang.MinRequiredNumber || gang.OnceResourceSatisfied == true + return len(gang.WaitingForBindChildren) >= gang.MinRequiredNumber || gang.GangGroupInfo.isGangOnceResourceSatisfied() } } diff --git a/pkg/scheduler/plugins/coscheduling/core/gang_cache.go b/pkg/scheduler/plugins/coscheduling/core/gang_cache.go index a1a269174..aeaebab99 100644 --- a/pkg/scheduler/plugins/coscheduling/core/gang_cache.go +++ b/pkg/scheduler/plugins/coscheduling/core/gang_cache.go @@ -113,6 +113,10 @@ func (gangCache *GangCache) deleteGangFromCacheByGangId(gangId string) { } func (gangCache *GangCache) onPodAdd(obj interface{}) { + gangCache.onPodAddInternal(obj, "create") +} + +func (gangCache *GangCache) onPodAddInternal(obj interface{}, action string) { pod, ok := obj.(*v1.Pod) if !ok { return @@ -152,7 +156,7 @@ func (gangCache *GangCache) onPodAdd(obj interface{}) { gang.setResourceSatisfied() } - klog.Infof("watch pod created, Name:%v, pgLabel:%v", pod.Name, pod.Labels[v1alpha1.PodGroupLabel]) + klog.Infof("watch pod %v, Name:%v, pgLabel:%v", action, pod.Name, pod.Labels[v1alpha1.PodGroupLabel]) } func (gangCache *GangCache) onPodUpdate(oldObj, newObj interface{}) { @@ -170,7 +174,7 @@ func (gangCache *GangCache) onPodUpdate(oldObj, newObj interface{}) { return } - gangCache.onPodAdd(newObj) + gangCache.onPodAddInternal(newObj, "update") } func (gangCache *GangCache) onPodDelete(obj interface{}) { diff --git a/pkg/scheduler/plugins/coscheduling/core/gang_cache_test.go b/pkg/scheduler/plugins/coscheduling/core/gang_cache_test.go index 2340fcf1e..3c669cfae 100644 --- a/pkg/scheduler/plugins/coscheduling/core/gang_cache_test.go +++ b/pkg/scheduler/plugins/coscheduling/core/gang_cache_test.go @@ -61,9 +61,10 @@ func TestGangCache_OnPodAdd(t *testing.T) { defaultArgs := getTestDefaultCoschedulingArgs(t) tests := []struct { - name string - pods []*corev1.Pod - wantCache map[string]*Gang + name string + pods []*corev1.Pod + wantCache map[string]*Gang + onceSatisfied bool }{ { name: "add invalid pod", @@ -216,9 +217,9 @@ func TestGangCache_OnPodAdd(t *testing.T) { }, }, }, - OnceResourceSatisfied: true, }, }, + onceSatisfied: true, }, { name: "add pod announcing Gang in lightweight-coscheduling way", @@ -314,9 +315,9 @@ func TestGangCache_OnPodAdd(t *testing.T) { }, }, }, - OnceResourceSatisfied: true, }, }, + onceSatisfied: true, }, { name: "add pods announcing Gang in Annotation way,but with illegal args", @@ -506,6 +507,7 @@ func TestGangCache_OnPodAdd(t *testing.T) { continue } tt.wantCache[k].GangGroupId = util.GetGangGroupId(v.GangGroup) + tt.wantCache[k].GangGroupInfo.OnceResourceSatisfied = tt.onceSatisfied } for _, pod := range tt.pods { @@ -543,9 +545,10 @@ func TestGangCache_OnPodUpdate(t *testing.T) { defaultArgs := getTestDefaultCoschedulingArgs(t) tests := []struct { - name string - pods []*corev1.Pod - wantCache map[string]*Gang + name string + pods []*corev1.Pod + wantCache map[string]*Gang + onceSatisfied bool }{ { name: "add invalid pod", @@ -649,9 +652,9 @@ func TestGangCache_OnPodUpdate(t *testing.T) { }, }, }, - OnceResourceSatisfied: true, }, }, + onceSatisfied: true, }, } @@ -671,6 +674,7 @@ func TestGangCache_OnPodUpdate(t *testing.T) { continue } tt.wantCache[k].GangGroupId = util.GetGangGroupId(v.GangGroup) + tt.wantCache[k].GangGroupInfo.OnceResourceSatisfied = true } for _, pod := range tt.pods { @@ -1125,8 +1129,8 @@ func TestGangCache_OnGangDelete(t *testing.T) { }, }, }, - OnceResourceSatisfied: true, } + wantedGang.GangGroupInfo.OnceResourceSatisfied = true wantedGang.GangGroupInfo.GangTotalChildrenNumMap[wantedGang.Name] = wantedGang.TotalChildrenNum wantedGang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"] = wantedGang.GangGroupInfo.LastScheduleTime diff --git a/pkg/scheduler/plugins/coscheduling/core/gang_summary.go b/pkg/scheduler/plugins/coscheduling/core/gang_summary.go index 412309409..bad77f2fe 100644 --- a/pkg/scheduler/plugins/coscheduling/core/gang_summary.go +++ b/pkg/scheduler/plugins/coscheduling/core/gang_summary.go @@ -45,7 +45,7 @@ func (gang *Gang) GetGangSummary() *GangSummary { gangSummary.GangMatchPolicy = gang.GangMatchPolicy gangSummary.MinRequiredNumber = gang.MinRequiredNumber gangSummary.TotalChildrenNum = gang.TotalChildrenNum - gangSummary.OnceResourceSatisfied = gang.OnceResourceSatisfied + gangSummary.OnceResourceSatisfied = gang.GangGroupInfo.isGangOnceResourceSatisfied() gangSummary.GangGroupInfo = gang.GangGroupInfo gangSummary.GangFrom = gang.GangFrom gangSummary.HasGangInit = gang.HasGangInit diff --git a/pkg/scheduler/plugins/coscheduling/core/ganggroup.go b/pkg/scheduler/plugins/coscheduling/core/ganggroup.go index 5a8b53976..931d3283d 100644 --- a/pkg/scheduler/plugins/coscheduling/core/ganggroup.go +++ b/pkg/scheduler/plugins/coscheduling/core/ganggroup.go @@ -28,6 +28,11 @@ type GangGroupInfo struct { GangTotalChildrenNumMap map[string]int ChildrenScheduleRoundMap map[string]int + // OnceResourceSatisfied indicates whether the gang has ever reached the ResourceSatisfied state,which means the + // children number has reached the minNum in the early step, + // once this variable is set true, it is irreversible. + OnceResourceSatisfied bool + LastScheduleTime time.Time ChildrenLastScheduleTime map[string]time.Time } @@ -224,3 +229,20 @@ func (gg *GangGroupInfo) resetPodLastScheduleTime(pod *corev1.Pod) { podId := util.GetId(pod.Namespace, pod.Name) gg.ChildrenLastScheduleTime[podId] = gg.LastScheduleTime } + +func (gg *GangGroupInfo) isGangOnceResourceSatisfied() bool { + gg.lock.Lock() + defer gg.lock.Unlock() + + return gg.OnceResourceSatisfied +} + +func (gg *GangGroupInfo) setResourceSatisfied() { + gg.lock.Lock() + defer gg.lock.Unlock() + + if !gg.OnceResourceSatisfied { + gg.OnceResourceSatisfied = true + klog.Infof("Gang ResourceSatisfied, gangName: %v", gg.GangGroupId) + } +} diff --git a/pkg/scheduler/plugins/coscheduling/core/ganggroup_test.go b/pkg/scheduler/plugins/coscheduling/core/ganggroup_test.go index 906b397a7..78560ea53 100644 --- a/pkg/scheduler/plugins/coscheduling/core/ganggroup_test.go +++ b/pkg/scheduler/plugins/coscheduling/core/ganggroup_test.go @@ -102,4 +102,11 @@ func TestGangGroupInfo(t *testing.T) { gg.deletePodLastScheduleTime("test/pod1") assert.Equal(t, 1, len(gg.ChildrenLastScheduleTime)) } + { + gg := NewGangGroupInfo("aa", []string{"aa"}) + assert.Equal(t, false, gg.isGangOnceResourceSatisfied()) + + gg.setResourceSatisfied() + assert.Equal(t, true, gg.isGangOnceResourceSatisfied()) + } } diff --git a/pkg/scheduler/plugins/coscheduling/coscheduling_test.go b/pkg/scheduler/plugins/coscheduling/coscheduling_test.go index 341fd577f..8d0d7086b 100644 --- a/pkg/scheduler/plugins/coscheduling/coscheduling_test.go +++ b/pkg/scheduler/plugins/coscheduling/coscheduling_test.go @@ -640,6 +640,7 @@ func TestPostFilter(t *testing.T) { if tt.pod.Name == "pod3" { wg.Add(2) } + for _, pod := range tt.pods { tmpPod := pod suit.Handle.(framework.Framework).RunPermitPlugins(context.Background(), cycleState, tmpPod, "") @@ -652,6 +653,7 @@ func TestPostFilter(t *testing.T) { defer wg.Done() }() } + if tt.pod.Name == "pod3" { totalWaitingPods := 0 suit.Handle.IterateOverWaitingPods(