Skip to content

Commit

Permalink
fix waitForFirstConsumer support (volcano-sh#35)
Browse files Browse the repository at this point in the history
chery picked commits from

- volcano-sh#1984
- volcano-sh#1796

Co-authored-by: Yunus Olgun <yunuso@spotify.com>
  • Loading branch information
yolgun and yolgun authored Sep 16, 2022
1 parent 07f7818 commit ef7ea7a
Show file tree
Hide file tree
Showing 26 changed files with 1,506 additions and 116 deletions.
25 changes: 25 additions & 0 deletions LICENSES/vendor/github.com/agiledragon/gomonkey/v2/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module volcano.sh/volcano
go 1.17

require (
github.com/agiledragon/gomonkey/v2 v2.9.0
github.com/fsnotify/fsnotify v1.4.9
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/hashicorp/go-multierror v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/agiledragon/gomonkey/v2 v2.9.0 h1:PDiKKybR596O6FHW+RVSG0Z7uGCBNbmbUXh3uCNQ7Hc=
github.com/agiledragon/gomonkey/v2 v2.9.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
215 changes: 205 additions & 10 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@ limitations under the License.
package allocate

import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
"reflect"
"testing"
"time"
"volcano.sh/volcano/pkg/scheduler/plugins/gang"
"volcano.sh/volcano/pkg/scheduler/plugins/priority"

"github.com/agiledragon/gomonkey/v2"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"

schedulingv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/scheduler/api"
Expand All @@ -37,6 +43,13 @@ import (
)

func TestAllocate(t *testing.T) {
var tmp *cache.SchedulerCache
patches := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "AddBindTask", func(scCache *cache.SchedulerCache, task *api.TaskInfo) error {
scCache.Binder.Bind(nil, []*api.TaskInfo{task})
return nil
})
defer patches.Reset()

framework.RegisterPluginBuilder("drf", drf.New)
framework.RegisterPluginBuilder("proportion", proportion.New)

Expand Down Expand Up @@ -224,6 +237,7 @@ func TestAllocate(t *testing.T) {

Recorder: record.NewFakeRecorder(100),
}

for _, node := range test.nodes {
schedulerCache.AddNode(node)
}
Expand Down Expand Up @@ -261,17 +275,198 @@ func TestAllocate(t *testing.T) {

allocate.Execute(ssn)

for i := 0; i < len(test.expected); i++ {
select {
case <-binder.Channel:
case <-time.After(3 * time.Second):
t.Errorf("Failed to get binding request.")
}
}

if !reflect.DeepEqual(test.expected, binder.Binds) {
t.Errorf("expected: %v, got %v ", test.expected, binder.Binds)
}
})
}
}

func TestAllocateWithDynamicPVC(t *testing.T) {
var tmp *cache.SchedulerCache
patches := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "AddBindTask", func(scCache *cache.SchedulerCache, task *api.TaskInfo) error {
scCache.VolumeBinder.BindVolumes(task, task.PodVolumes)
scCache.Binder.Bind(nil, []*api.TaskInfo{task})
return nil
})
defer patches.Reset()

framework.RegisterPluginBuilder("gang", gang.New)
framework.RegisterPluginBuilder("priority", priority.New)

options.ServerOpts = &options.ServerOption{
MinNodesToFind: 100,
MinPercentageOfNodesToFind: 5,
PercentageOfNodesToFind: 100,
}

defer framework.CleanupPluginBuilders()

queue := &schedulingv1.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "c1",
},
Spec: schedulingv1.QueueSpec{
Weight: 1,
},
}
pg := &schedulingv1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "pg1",
Namespace: "c1",
},
Spec: schedulingv1.PodGroupSpec{
Queue: "c1",
MinMember: 2,
},
Status: schedulingv1.PodGroupStatus{
Phase: schedulingv1.PodGroupInqueue,
},
}
kubeClient := fake.NewSimpleClientset()
pvc, pv, sc := util.BuildDynamicPVC("c1", "pvc1", v1.ResourceList{
v1.ResourceStorage: resource.MustParse("1Gi"),
})
kubeClient.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{})
kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
pvcs := []*v1.PersistentVolumeClaim{pvc}
for i := 1; i <= 5; i++ {
tmp := pvc.DeepCopy()
tmp.Name = fmt.Sprintf("pvc%d", i)
pvcs = append(pvcs, tmp)
kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), tmp, metav1.CreateOptions{})
}
fakeVolumeBinder := util.NewFakeVolumeBinder(kubeClient)

allocate := New()

tests := []struct {
name string
pods []*v1.Pod
nodes []*v1.Node
pvs []*v1.PersistentVolume
expectedBind map[string]string
expectedActions map[string][]string
}{
{
name: "resource not match",
pods: []*v1.Pod{
util.BuildPodWithPVC("c1", "p1", "", v1.PodPending, util.BuildResourceList("1", "1G"), pvc, "pg1", make(map[string]string), make(map[string]string)),
util.BuildPodWithPVC("c1", "p2", "", v1.PodPending, util.BuildResourceList("1", "1G"), pvcs[2], "pg1", make(map[string]string), make(map[string]string)),
},
nodes: []*v1.Node{
util.BuildNode("n1", util.BuildResourceList("1", "4Gi"), make(map[string]string)),
},
expectedBind: map[string]string{},
expectedActions: map[string][]string{
"c1/p1": {"GetPodVolumes", "AllocateVolumes", "RevertVolumes"},
},
},
{
name: "node changed with enough resource",
pods: []*v1.Pod{
util.BuildPodWithPVC("c1", "p1", "", v1.PodPending, util.BuildResourceList("1", "1G"), pvc, "pg1", make(map[string]string), make(map[string]string)),
util.BuildPodWithPVC("c1", "p2", "", v1.PodPending, util.BuildResourceList("1", "1G"), pvcs[2], "pg1", make(map[string]string), make(map[string]string)),
},
nodes: []*v1.Node{
util.BuildNode("n2", util.BuildResourceList("2", "4Gi"), make(map[string]string)),
},
expectedBind: map[string]string{
"c1/p1": "n2",
"c1/p2": "n2",
},
expectedActions: map[string][]string{
"c1/p1": {"GetPodVolumes", "AllocateVolumes", "DynamicProvisions"},
"c1/p2": {"GetPodVolumes", "AllocateVolumes", "DynamicProvisions"},
},
},
{
name: "pvc with matched pv",
pods: []*v1.Pod{
util.BuildPodWithPVC("c1", "p3", "", v1.PodPending, util.BuildResourceList("1", "1G"), pvcs[3], "pg1", make(map[string]string), make(map[string]string)),
util.BuildPodWithPVC("c1", "p4", "", v1.PodPending, util.BuildResourceList("1", "1G"), pvcs[4], "pg1", make(map[string]string), make(map[string]string)),
},
pvs: []*v1.PersistentVolume{
pv,
},
nodes: []*v1.Node{
util.BuildNode("n3", util.BuildResourceList("2", "4Gi"), make(map[string]string)),
},
expectedBind: map[string]string{
"c1/p3": "n3",
"c1/p4": "n3",
},
expectedActions: map[string][]string{
"c1/p3": {"GetPodVolumes", "AllocateVolumes", "StaticBindings"},
"c1/p4": {"GetPodVolumes", "AllocateVolumes", "DynamicProvisions"},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
binder := &util.FakeBinder{
Binds: map[string]string{},
Channel: make(chan string),
}
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
Queues: make(map[api.QueueID]*api.QueueInfo),
Binder: binder,
StatusUpdater: &util.FakeStatusUpdater{},
VolumeBinder: fakeVolumeBinder,
Recorder: record.NewFakeRecorder(100),
}

schedulerCache.AddQueueV1beta1(queue)
schedulerCache.AddPodGroupV1beta1(pg)
for i, pod := range test.pods {
priority := int32(-i)
pod.Spec.Priority = &priority
schedulerCache.AddPod(pod)
}
for _, pv := range test.pvs {
kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{})
}
for _, node := range test.nodes {
schedulerCache.AddNode(node)
}

trueValue := true
ssn := framework.OpenSession(schedulerCache, []conf.Tier{
{
Plugins: []conf.PluginOption{
{
Name: "priority",
EnabledJobReady: &trueValue,
EnabledPredicate: &trueValue,
EnabledJobPipelined: &trueValue,
EnabledTaskOrder: &trueValue,
},
{
Name: "gang",
EnabledJobReady: &trueValue,
EnabledPredicate: &trueValue,
EnabledJobPipelined: &trueValue,
EnabledTaskOrder: &trueValue,
},
},
},
}, nil)
defer framework.CloseSession(ssn)

allocate.Execute(ssn)
for _, pv := range test.pvs {
kubeClient.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.Name, metav1.DeleteOptions{})
}
if !reflect.DeepEqual(test.expectedBind, binder.Binds) {
t.Errorf("expected: %v, got %v ", test.expectedBind, binder.Binds)
}
if !reflect.DeepEqual(test.expectedActions, fakeVolumeBinder.Actions) {
t.Errorf("expected: %v, got %v ", test.expectedActions, fakeVolumeBinder.Actions)
}
fakeVolumeBinder.Actions = make(map[string][]string)
})
}
}
Loading

0 comments on commit ef7ea7a

Please sign in to comment.