diff --git a/pkg/resourceapply/core.go b/pkg/resourceapply/core.go index 930b7713822..ce13a43b759 100644 --- a/pkg/resourceapply/core.go +++ b/pkg/resourceapply/core.go @@ -211,3 +211,35 @@ func ApplyEndpoints( options, ) } + +func ApplyPodWithControl( + ctx context.Context, + control ApplyControlInterface[*corev1.Pod], + recorder record.EventRecorder, + required *corev1.Pod, + options ApplyOptions, +) (*corev1.Pod, bool, error) { + return ApplyGeneric[*corev1.Pod](ctx, control, recorder, required, options) +} + +func ApplyPod( + ctx context.Context, + client corev1client.PodsGetter, + lister corev1listers.PodLister, + recorder record.EventRecorder, + required *corev1.Pod, + options ApplyOptions, +) (*corev1.Pod, bool, error) { + return ApplyPodWithControl( + ctx, + ApplyControlFuncs[*corev1.Pod]{ + GetCachedFunc: lister.Pods(required.Namespace).Get, + CreateFunc: client.Pods(required.Namespace).Create, + UpdateFunc: client.Pods(required.Namespace).Update, + DeleteFunc: client.Pods(required.Namespace).Delete, + }, + recorder, + required, + options, + ) +} diff --git a/pkg/resourceapply/core_test.go b/pkg/resourceapply/core_test.go index afe4df8f90e..e57c789ff3a 100644 --- a/pkg/resourceapply/core_test.go +++ b/pkg/resourceapply/core_test.go @@ -12,6 +12,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -3065,3 +3066,552 @@ func TestApplyEndpoints(t *testing.T) { }) } } + +func TestApplyPod(t *testing.T) { + // Using a generating function prevents unwanted mutations. + newPod := func() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test", + Labels: map[string]string{}, + OwnerReferences: []metav1.OwnerReference{ + { + Controller: pointer.Ptr(true), + UID: "abcdefgh", + APIVersion: "scylla.scylladb.com/v1", + Kind: "ScyllaCluster", + Name: "basic", + BlockOwnerDeletion: pointer.Ptr(true), + }, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + Image: "docker.io/scylladb/scylla-operator:latest", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10m"), + corev1.ResourceMemory: resource.MustParse("50Mi"), + }, + }, + }, + }, + NodeName: "test", + }, + } + } + + newPodWithHash := func() *corev1.Pod { + pod := newPod() + utilruntime.Must(SetHashAnnotation(pod)) + return pod + } + + tt := []struct { + name string + existing []runtime.Object + cache []runtime.Object // nil cache means autofill from the client + required *corev1.Pod + forceOwnership bool + expectedPod *corev1.Pod + expectedChanged bool + expectedErr error + expectedEvents []string + }{ + { + name: "creates a new pod when there is none", + existing: nil, + required: newPod(), + expectedPod: newPodWithHash(), + expectedChanged: true, + expectedErr: nil, + expectedEvents: []string{"Normal PodCreated Pod default/test created"}, + }, + { + name: "does nothing if the same pod already exists", + existing: []runtime.Object{ + newPodWithHash(), + }, + required: newPod(), + expectedPod: newPodWithHash(), + expectedChanged: false, + expectedErr: nil, + expectedEvents: nil, + }, + { + name: "does nothing if the same pod already exists and required one has the hash", + existing: []runtime.Object{ + newPodWithHash(), + }, + required: newPodWithHash(), + expectedPod: newPodWithHash(), + expectedChanged: false, + expectedErr: nil, + expectedEvents: nil, + }, + { + name: "updates the pod if it exists without the hash", + existing: []runtime.Object{ + newPod(), + }, + required: newPod(), + expectedPod: newPodWithHash(), + expectedChanged: true, + expectedErr: nil, + expectedEvents: []string{"Normal PodUpdated Pod default/test updated"}, + }, + { + name: "fails to create the pod without a controllerRef", + existing: nil, + required: func() *corev1.Pod { + pod := newPod() + pod.OwnerReferences = nil + return pod + }(), + expectedPod: nil, + expectedChanged: false, + expectedErr: fmt.Errorf(`/v1, Kind=Pod "default/test" is missing controllerRef`), + expectedEvents: nil, + }, + { + name: "updates the pod if resources differ", + existing: []runtime.Object{ + newPod(), + }, + required: func() *corev1.Pod { + pod := newPod() + pod.Spec.Containers[0].Resources.Limits[corev1.ResourceCPU] = resource.MustParse("20m") + return pod + }(), + expectedPod: func() *corev1.Pod { + pod := newPod() + pod.Spec.Containers[0].Resources.Limits[corev1.ResourceCPU] = resource.MustParse("20m") + utilruntime.Must(SetHashAnnotation(pod)) + return pod + }(), + expectedChanged: true, + expectedErr: nil, + expectedEvents: []string{"Normal PodUpdated Pod default/test updated"}, + }, + { + name: "updates the pod if labels differ", + existing: []runtime.Object{ + newPodWithHash(), + }, + required: func() *corev1.Pod { + pod := newPod() + pod.Labels["foo"] = "bar" + return pod + }(), + expectedPod: func() *corev1.Pod { + pod := newPod() + pod.Labels["foo"] = "bar" + utilruntime.Must(SetHashAnnotation(pod)) + return pod + }(), + expectedChanged: true, + expectedErr: nil, + expectedEvents: []string{"Normal PodUpdated Pod default/test updated"}, + }, + { + name: "won't update the pod if an admission changes it", + existing: []runtime.Object{ + func() *corev1.Pod { + pod := newPodWithHash() + // Simulate admission by changing a value after the hash is computed. + pod.Finalizers = append(pod.Finalizers, "admissionfinalizer") + return pod + }(), + }, + required: newPod(), + expectedPod: func() *corev1.Pod { + pod := newPodWithHash() + // Simulate admission by changing a value after the hash is computed. + pod.Finalizers = append(pod.Finalizers, "admissionfinalizer") + return pod + }(), + expectedChanged: false, + expectedErr: nil, + expectedEvents: nil, + }, + { + // We test propagating the RV from required in all the other tests. + name: "specifying no RV will use the one from the existing object", + existing: []runtime.Object{ + func() *corev1.Pod { + pod := newPodWithHash() + pod.ResourceVersion = "21" + return pod + }(), + }, + required: func() *corev1.Pod { + pod := newPod() + pod.ResourceVersion = "" + pod.Labels["foo"] = "bar" + return pod + }(), + expectedPod: func() *corev1.Pod { + pod := newPod() + pod.ResourceVersion = "21" + pod.Labels["foo"] = "bar" + utilruntime.Must(SetHashAnnotation(pod)) + return pod + }(), + expectedChanged: true, + expectedErr: nil, + expectedEvents: []string{"Normal PodUpdated Pod default/test updated"}, + }, + { + name: "update fails if the pod is missing but we still see it in the cache", + existing: nil, + cache: []runtime.Object{ + newPodWithHash(), + }, + required: func() *corev1.Pod { + pod := newPod() + pod.Labels["foo"] = "bar" + return pod + }(), + expectedPod: nil, + expectedChanged: false, + expectedErr: fmt.Errorf(`can't update /v1, Kind=Pod "default/test": %w`, apierrors.NewNotFound(corev1.Resource("pods"), "test")), + expectedEvents: []string{`Warning UpdatePodFailed Failed to update Pod default/test: pods "test" not found`}, + }, + { + name: "update fails if the existing object has no ownerRef", + existing: []runtime.Object{ + func() *corev1.Pod { + pod := newPod() + pod.OwnerReferences = nil + utilruntime.Must(SetHashAnnotation(pod)) + return pod + }(), + }, + required: func() *corev1.Pod { + pod := newPod() + pod.Labels["foo"] = "bar" + return pod + }(), + expectedPod: nil, + expectedChanged: false, + expectedErr: fmt.Errorf(`/v1, Kind=Pod "default/test" isn't controlled by us`), + expectedEvents: []string{`Warning UpdatePodFailed Failed to update Pod default/test: /v1, Kind=Pod "default/test" isn't controlled by us`}, + }, + { + name: "forced update succeeds if the existing object has no ownerRef", + existing: []runtime.Object{ + func() *corev1.Pod { + pod := newPod() + pod.OwnerReferences = nil + utilruntime.Must(SetHashAnnotation(pod)) + return pod + }(), + }, + required: func() *corev1.Pod { + pod := newPod() + pod.Labels["foo"] = "bar" + return pod + }(), + forceOwnership: true, + expectedPod: func() *corev1.Pod { + pod := newPod() + pod.Labels["foo"] = "bar" + utilruntime.Must(SetHashAnnotation(pod)) + return pod + }(), + expectedChanged: true, + expectedErr: nil, + expectedEvents: []string{"Normal PodUpdated Pod default/test updated"}, + }, + { + name: "update succeeds to replace ownerRef kind", + existing: []runtime.Object{ + func() *corev1.Pod { + pod := newPod() + pod.OwnerReferences[0].Kind = "WrongKind" + utilruntime.Must(SetHashAnnotation(pod)) + return pod + }(), + }, + required: func() *corev1.Pod { + pod := newPod() + return pod + }(), + expectedPod: func() *corev1.Pod { + pod := newPod() + utilruntime.Must(SetHashAnnotation(pod)) + return pod + }(), + expectedChanged: true, + expectedErr: nil, + expectedEvents: []string{"Normal PodUpdated Pod default/test updated"}, + }, + { + name: "update fails if the existing object is owned by someone else", + existing: []runtime.Object{ + func() *corev1.Pod { + pod := newPod() + pod.OwnerReferences[0].UID = "42" + utilruntime.Must(SetHashAnnotation(pod)) + return pod + }(), + }, + required: func() *corev1.Pod { + pod := newPod() + pod.Labels["foo"] = "bar" + return pod + }(), + expectedPod: nil, + expectedChanged: false, + expectedErr: fmt.Errorf(`/v1, Kind=Pod "default/test" isn't controlled by us`), + expectedEvents: []string{`Warning UpdatePodFailed Failed to update Pod default/test: /v1, Kind=Pod "default/test" isn't controlled by us`}, + }, + { + name: "forced update fails if the existing object is owned by someone else", + existing: []runtime.Object{ + func() *corev1.Pod { + pod := newPod() + pod.OwnerReferences[0].UID = "42" + utilruntime.Must(SetHashAnnotation(pod)) + return pod + }(), + }, + required: func() *corev1.Pod { + pod := newPod() + pod.Labels["foo"] = "bar" + return pod + }(), + forceOwnership: true, + expectedPod: nil, + expectedChanged: false, + expectedErr: fmt.Errorf(`/v1, Kind=Pod "default/test" isn't controlled by us`), + expectedEvents: []string{`Warning UpdatePodFailed Failed to update Pod default/test: /v1, Kind=Pod "default/test" isn't controlled by us`}, + }, + { + name: "all label and annotation keys are kept when the hash matches", + existing: []runtime.Object{ + func() *corev1.Pod { + pod := newPod() + pod.Annotations = map[string]string{ + "a-1": "a-alpha", + "a-2": "a-beta", + "a-3-": "", + } + pod.Labels = map[string]string{ + "l-1": "l-alpha", + "l-2": "l-beta", + "l-3-": "", + } + utilruntime.Must(SetHashAnnotation(pod)) + pod.Annotations["a-1"] = "a-alpha-changed" + pod.Annotations["a-3"] = "a-resurrected" + pod.Annotations["a-custom"] = "custom-value" + pod.Labels["l-1"] = "l-alpha-changed" + pod.Labels["l-3"] = "l-resurrected" + pod.Labels["l-custom"] = "custom-value" + return pod + }(), + }, + required: func() *corev1.Pod { + pod := newPod() + pod.Annotations = map[string]string{ + "a-1": "a-alpha", + "a-2": "a-beta", + "a-3-": "", + } + pod.Labels = map[string]string{ + "l-1": "l-alpha", + "l-2": "l-beta", + "l-3-": "", + } + return pod + }(), + forceOwnership: false, + expectedPod: func() *corev1.Pod { + pod := newPod() + pod.Annotations = map[string]string{ + "a-1": "a-alpha", + "a-2": "a-beta", + "a-3-": "", + } + pod.Labels = map[string]string{ + "l-1": "l-alpha", + "l-2": "l-beta", + "l-3-": "", + } + utilruntime.Must(SetHashAnnotation(pod)) + pod.Annotations["a-1"] = "a-alpha-changed" + pod.Annotations["a-3"] = "a-resurrected" + pod.Annotations["a-custom"] = "custom-value" + pod.Labels["l-1"] = "l-alpha-changed" + pod.Labels["l-3"] = "l-resurrected" + pod.Labels["l-custom"] = "custom-value" + return pod + }(), + expectedChanged: false, + expectedErr: nil, + expectedEvents: nil, + }, + { + name: "only managed label and annotation keys are updated when the hash changes", + existing: []runtime.Object{ + func() *corev1.Pod { + pod := newPod() + pod.Annotations = map[string]string{ + "a-1": "a-alpha", + "a-2": "a-beta", + "a-3-": "a-resurrected", + } + pod.Labels = map[string]string{ + "l-1": "l-alpha", + "l-2": "l-beta", + "l-3-": "l-resurrected", + } + utilruntime.Must(SetHashAnnotation(pod)) + pod.Annotations["a-1"] = "a-alpha-changed" + pod.Annotations["a-custom"] = "a-custom-value" + pod.Labels["l-1"] = "l-alpha-changed" + pod.Labels["l-custom"] = "l-custom-value" + return pod + }(), + }, + required: func() *corev1.Pod { + pod := newPod() + pod.Annotations = map[string]string{ + "a-1": "a-alpha-x", + "a-2": "a-beta-x", + "a-3-": "", + } + pod.Labels = map[string]string{ + "l-1": "l-alpha-x", + "l-2": "l-beta-x", + "l-3-": "", + } + return pod + }(), + forceOwnership: true, + expectedPod: func() *corev1.Pod { + pod := newPod() + pod.Annotations = map[string]string{ + "a-1": "a-alpha-x", + "a-2": "a-beta-x", + "a-3-": "", + } + pod.Labels = map[string]string{ + "l-1": "l-alpha-x", + "l-2": "l-beta-x", + "l-3-": "", + } + utilruntime.Must(SetHashAnnotation(pod)) + delete(pod.Annotations, "a-3-") + pod.Annotations["a-custom"] = "a-custom-value" + delete(pod.Labels, "l-3-") + pod.Labels["l-custom"] = "l-custom-value" + return pod + }(), + expectedChanged: true, + expectedErr: nil, + expectedEvents: []string{"Normal PodUpdated Pod default/test updated"}, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + // Client holds the state so it has to persists the iterations. + client := fake.NewSimpleClientset(tc.existing...) + + // ApplyPod needs to be reentrant so running it the second time should give the same results. + // (One of the common mistakes is editing the object after computing the hash so it differs the second time.) + iterations := 2 + if tc.expectedErr != nil { + iterations = 1 + } + for i := range iterations { + t.Run("", func(t *testing.T) { + ctx, ctxCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer ctxCancel() + + recorder := record.NewFakeRecorder(10) + + podCache := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + podLister := corev1listers.NewPodLister(podCache) + + if tc.cache != nil { + for _, obj := range tc.cache { + err := podCache.Add(obj) + if err != nil { + t.Fatal(err) + } + } + } else { + podList, err := client.CoreV1().Pods("").List(ctx, metav1.ListOptions{ + LabelSelector: labels.Everything().String(), + }) + if err != nil { + t.Fatal(err) + } + + for i := range podList.Items { + err := podCache.Add(&podList.Items[i]) + if err != nil { + t.Fatal(err) + } + } + } + + gotPod, gotChanged, gotErr := ApplyPod(ctx, client.CoreV1(), podLister, recorder, tc.required, ApplyOptions{ + ForceOwnership: tc.forceOwnership, + }) + if !reflect.DeepEqual(gotErr, tc.expectedErr) { + t.Fatalf("expected %v, got %v", tc.expectedErr, gotErr) + } + + if !equality.Semantic.DeepEqual(gotPod, tc.expectedPod) { + t.Errorf("expected %#v, got %#v, diff:\n%s", tc.expectedPod, gotPod, cmp.Diff(tc.expectedPod, gotPod)) + } + + // Make sure such object was actually created. + if gotPod != nil { + createdPod, err := client.CoreV1().Pods(gotPod.Namespace).Get(ctx, gotPod.Name, metav1.GetOptions{}) + if err != nil { + t.Error(err) + } + if !equality.Semantic.DeepEqual(createdPod, gotPod) { + t.Errorf("created and returned pods differ:\n%s", cmp.Diff(createdPod, gotPod)) + } + } + + if i == 0 { + if gotChanged != tc.expectedChanged { + t.Errorf("expected %t, got %t", tc.expectedChanged, gotChanged) + } + } else { + if gotChanged { + t.Errorf("object changed in iteration %d", i) + } + } + + close(recorder.Events) + var gotEvents []string + for e := range recorder.Events { + gotEvents = append(gotEvents, e) + } + if i == 0 { + if !reflect.DeepEqual(gotEvents, tc.expectedEvents) { + t.Errorf("expected %v, got %v, diff:\n%s", tc.expectedEvents, gotEvents, cmp.Diff(tc.expectedEvents, gotEvents)) + } + } else { + if len(gotEvents) > 0 { + t.Errorf("unexpected events: %v", gotEvents) + } + } + }) + } + }) + } +}