diff --git a/pkg/cache/context.go b/pkg/cache/context.go index e07995dc7..80b3444a5 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -267,7 +267,11 @@ func (ctx *Context) updatePodInCache(oldObj, newObj interface{}) { func (ctx *Context) filterPods(obj interface{}) bool { switch obj := obj.(type) { case *v1.Pod: - return utils.GeneralPodFilter(obj) + if utils.GeneralPodFilter(obj) { + _, err := utils.GetApplicationIDFromPod(obj) + return err == nil + } + return false default: return false } diff --git a/pkg/cache/context_recovery.go b/pkg/cache/context_recovery.go index 79e99d1f6..73b9947aa 100644 --- a/pkg/cache/context_recovery.go +++ b/pkg/cache/context_recovery.go @@ -104,7 +104,10 @@ func (ctx *Context) recover(mgr []interfaces.Recoverable, due time.Duration) err continue } // yunikorn scheduled pods add to existing allocations - if utils.GeneralPodFilter(&pod) { + _, err = utils.GetApplicationIDFromPod(&pod) + ykPod := utils.GeneralPodFilter(&pod) && err == nil + switch { + case ykPod: if existingAlloc := getExistingAllocation(mgr, &pod); existingAlloc != nil { log.Logger().Debug("Adding resources for existing pod", zap.String("appID", existingAlloc.ApplicationID), @@ -123,7 +126,7 @@ func (ctx *Context) recover(mgr []interfaces.Recoverable, due time.Duration) err zap.String("nodeName", pod.Spec.NodeName), zap.Stringer("resources", common.GetPodResource(&pod))) } - } else if !utils.IsPodTerminated(&pod) { + case !utils.IsPodTerminated(&pod): // pod is not terminated (succeed or failed) state, // and it has a node assigned, that means the scheduler // has already allocated the pod onto a node @@ -141,7 +144,7 @@ func (ctx *Context) recover(mgr []interfaces.Recoverable, due time.Duration) err occupiedResource = common.Add(occupiedResource, podResource) nodeOccupiedResources[pod.Spec.NodeName] = occupiedResource ctx.nodes.cache.AddPod(&pod) - } else { + default: log.Logger().Debug("Skipping terminated pod", zap.String("podUID", string(pod.UID)), zap.String("podName", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))) diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index f2a81cdc7..58b3908e9 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -263,9 +263,22 @@ func TestFilterPods(t *testing.T) { }, Spec: v1.PodSpec{SchedulerName: "default-scheduler"}, } + pod3 := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "yunikorn-test-00003", + UID: "UID-00003", + Labels: map[string]string{"applicationId": "test-00003"}, + }, + Spec: v1.PodSpec{SchedulerName: "yunikorn"}, + } assert.Check(t, !context.filterPods(nil), "nil object was allowed") - assert.Check(t, context.filterPods(pod1), "yunikorn-managed pod was filtered") + assert.Check(t, !context.filterPods(pod1), "yunikorn-managed pod with no app id was allowed") assert.Check(t, !context.filterPods(pod2), "non-yunikorn-managed pod was allowed") + assert.Check(t, context.filterPods(pod3), "yunikorn-managed pod was filtered") } func TestAddPodToCache(t *testing.T) { diff --git a/pkg/cache/node_coordinator.go b/pkg/cache/node_coordinator.go index 78dea3e3c..df0e04840 100644 --- a/pkg/cache/node_coordinator.go +++ b/pkg/cache/node_coordinator.go @@ -48,10 +48,13 @@ func newNodeResourceCoordinator(nodes *schedulerNodes) *nodeResourceCoordinator // filter pods that not scheduled by us func (c *nodeResourceCoordinator) filterPods(obj interface{}) bool { - switch obj.(type) { + switch obj := obj.(type) { case *v1.Pod: - pod := obj.(*v1.Pod) - return !utils.GeneralPodFilter(pod) + if utils.GeneralPodFilter(obj) { + _, err := utils.GetApplicationIDFromPod(obj) + return err != nil + } + return true default: return false } diff --git a/pkg/cache/node_coordinator_test.go b/pkg/cache/node_coordinator_test.go index fe9fee555..2f6f3b6c3 100644 --- a/pkg/cache/node_coordinator_test.go +++ b/pkg/cache/node_coordinator_test.go @@ -23,6 +23,7 @@ import ( "gotest.tools/assert" v1 "k8s.io/api/core/v1" + apis "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/common/utils" @@ -295,3 +296,50 @@ func TestDeleteTerminatedPod(t *testing.T) { coordinator.deletePod(pod2) assert.Equal(t, executed, false) } + +func TestNodeCoordinatorFilterPods(t *testing.T) { + mockedSchedulerAPI := newMockSchedulerAPI() + nodes := newSchedulerNodes(mockedSchedulerAPI, NewTestSchedulerCache()) + host1 := utils.NodeForTest(Host1, "10G", "10") + nodes.addNode(host1) + coordinator := newNodeResourceCoordinator(nodes) + + pod1 := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "yunikorn-test-00001", + UID: "UID-00001", + }, + Spec: v1.PodSpec{SchedulerName: "yunikorn"}, + } + pod2 := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "yunikorn-test-00002", + UID: "UID-00002", + }, + Spec: v1.PodSpec{SchedulerName: "default-scheduler"}, + } + pod3 := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "yunikorn-test-00003", + UID: "UID-00003", + Labels: map[string]string{"applicationId": "test-00003"}, + }, + Spec: v1.PodSpec{SchedulerName: "yunikorn"}, + } + assert.Check(t, !coordinator.filterPods(nil), "nil object was allowed") + assert.Check(t, coordinator.filterPods(pod1), "yunikorn-managed pod with no app id was filtered") + assert.Check(t, coordinator.filterPods(pod2), "non-yunikorn-managed pod was filtered") + assert.Check(t, !coordinator.filterPods(pod3), "yunikorn-managed pod was allowed") +}