Skip to content

Commit

Permalink
fix: Use UUID as a precondition when calling the eviction API (#998) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Feb 13, 2024
1 parent e0cf0f2 commit 94e7412
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 61 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ go 1.21
require (
github.com/Pallinder/go-randomdata v1.2.0
github.com/avast/retry-go v3.0.0+incompatible
github.com/deckarep/golang-set v1.8.0
github.com/go-logr/logr v1.2.4
github.com/go-logr/zapr v1.2.4
github.com/imdario/mergo v0.3.16
github.com/imdario/mergo v0.3.6
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/onsi/ginkgo/v2 v2.13.0
github.com/onsi/gomega v1.28.1
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4=
github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -196,8 +194,8 @@ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+l
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
Expand Down
6 changes: 2 additions & 4 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ limitations under the License.
package controllers

import (
"k8s.io/client-go/kubernetes"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -45,14 +44,13 @@ import (
func NewControllers(
clock clock.Clock,
kubeClient client.Client,
kubernetesInterface kubernetes.Interface,
cluster *state.Cluster,
recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider,
) []controller.Controller {

p := provisioning.NewProvisioner(kubeClient, kubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
evictionQueue := terminator.NewQueue(kubernetesInterface.CoreV1(), recorder)
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster)
evictionQueue := terminator.NewQueue(kubeClient, recorder)

return []controller.Controller{
p, evictionQueue,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ var _ = BeforeSuite(func() {
machineStateController = informer.NewMachineController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, env.KubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster)
disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster)
})

Expand Down
56 changes: 55 additions & 1 deletion pkg/controllers/node/termination/machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/apis/v1beta1"
"github.com/aws/karpenter-core/pkg/controllers/node/termination"
"github.com/aws/karpenter-core/pkg/controllers/node/termination/terminator"
"github.com/aws/karpenter-core/pkg/metrics"
"github.com/aws/karpenter-core/pkg/test"
nodeclaimutil "github.com/aws/karpenter-core/pkg/utils/nodeclaim"
Expand Down Expand Up @@ -279,7 +280,7 @@ var _ = Describe("Machine/Termination", func() {

// Expect podNoEvict to fail eviction due to PDB, and be retried
Eventually(func() int {
return queue.NumRequeues(client.ObjectKeyFromObject(podNoEvict))
return queue.NumRequeues(terminator.NewQueueKey(podNoEvict))
}).Should(BeNumerically(">=", 1))

// Delete pod to simulate successful eviction
Expand Down Expand Up @@ -539,6 +540,59 @@ var _ = Describe("Machine/Termination", func() {
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectNotFound(ctx, env.Client, node)
})
It("should not evict a new pod with the same name using the old pod's eviction queue key", func() {
pod := test.Pod(test.PodOptions{
NodeName: node.Name,
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
OwnerReferences: defaultOwnerRefs,
},
})
ExpectApplied(ctx, env.Client, node, pod)

// Trigger Termination Controller
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))

// Don't trigger a call into the queue to make sure that we effectively aren't triggering eviction
// We'll use this to try to leave pods in the queue

// Expect node to exist and be draining
ExpectNodeWithMachineDraining(env.Client, node.Name)

// Delete the pod directly to act like something else is doing the pod termination
ExpectDeleted(ctx, env.Client, pod)

// Requeue the termination controller to completely delete the node
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))

// Expect that the old pod's key still exists in the queue
Expect(queue.Has(terminator.NewQueueKey(pod)))

// Re-create the pod and node, it should now have the same name, but a different UUID
node = test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{v1beta1.TerminationFinalizer},
},
})
pod = test.Pod(test.PodOptions{
NodeName: node.Name,
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
OwnerReferences: defaultOwnerRefs,
},
})
ExpectApplied(ctx, env.Client, node, pod)

// Trigger eviction queue with the pod key still in it
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

Consistently(func(g Gomega) {
g.Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pod), pod)).To(Succeed())
g.Expect(pod.DeletionTimestamp.IsZero()).To(BeTrue())
}, ReconcilerPropagationTime, RequestInterval).Should(Succeed())
})
})
Context("Metrics", func() {
It("should fire the terminationSummary metric when deleting nodes", func() {
Expand Down
56 changes: 55 additions & 1 deletion pkg/controllers/node/termination/nodeclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/aws/karpenter-core/pkg/apis/v1beta1"
"github.com/aws/karpenter-core/pkg/controllers/node/termination"
"github.com/aws/karpenter-core/pkg/controllers/node/termination/terminator"
"github.com/aws/karpenter-core/pkg/metrics"
"github.com/aws/karpenter-core/pkg/test"

Expand Down Expand Up @@ -281,7 +282,7 @@ var _ = Describe("NodeClaim/Termination", func() {

// Expect podNoEvict to fail eviction due to PDB, and be retried
Eventually(func() int {
return queue.NumRequeues(client.ObjectKeyFromObject(podNoEvict))
return queue.NumRequeues(terminator.NewQueueKey(podNoEvict))
}).Should(BeNumerically(">=", 1))

// Delete pod to simulate successful eviction
Expand Down Expand Up @@ -541,6 +542,59 @@ var _ = Describe("NodeClaim/Termination", func() {
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectNotFound(ctx, env.Client, node)
})
It("should not evict a new pod with the same name using the old pod's eviction queue key", func() {
pod := test.Pod(test.PodOptions{
NodeName: node.Name,
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
OwnerReferences: defaultOwnerRefs,
},
})
ExpectApplied(ctx, env.Client, node, pod)

// Trigger Termination Controller
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))

// Don't trigger a call into the queue to make sure that we effectively aren't triggering eviction
// We'll use this to try to leave pods in the queue

// Expect node to exist and be draining
ExpectNodeWithNodeClaimDraining(env.Client, node.Name)

// Delete the pod directly to act like something else is doing the pod termination
ExpectDeleted(ctx, env.Client, pod)

// Requeue the termination controller to completely delete the node
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))

// Expect that the old pod's key still exists in the queue
Expect(queue.Has(terminator.NewQueueKey(pod)))

// Re-create the pod and node, it should now have the same name, but a different UUID
node = test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{v1beta1.TerminationFinalizer},
},
})
pod = test.Pod(test.PodOptions{
NodeName: node.Name,
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
OwnerReferences: defaultOwnerRefs,
},
})
ExpectApplied(ctx, env.Client, node, pod)

// Trigger eviction queue with the pod key still in it
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

Consistently(func(g Gomega) {
g.Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pod), pod)).To(Succeed())
g.Expect(pod.DeletionTimestamp.IsZero()).To(BeTrue())
}, ReconcilerPropagationTime, RequestInterval).Should(Succeed())
})
})
Context("Metrics", func() {
It("should fire the terminationSummary metric when deleting nodes", func() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ var _ = BeforeSuite(func() {

cloudProvider = fake.NewCloudProvider()
recorder = test.NewEventRecorder()
queue = terminator.NewQueue(env.KubernetesInterface.CoreV1(), recorder)
queue = terminator.NewQueue(env.Client, recorder)
terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue), recorder)
})

Expand All @@ -75,7 +75,7 @@ var _ = AfterSuite(func() {
func ExpectNotEnqueuedForEviction(e *terminator.Queue, pods ...*v1.Pod) {
GinkgoHelper()
for _, pod := range pods {
Expect(e.Contains(client.ObjectKeyFromObject(pod))).To(BeFalse())
Expect(e.Has(terminator.NewQueueKey(pod))).To(BeFalse())
}
}

Expand Down
81 changes: 52 additions & 29 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"fmt"
"time"

set "github.com/deckarep/golang-set"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -60,19 +60,31 @@ func IsNodeDrainError(err error) bool {
return errors.As(err, &nodeDrainErr)
}

type QueueKey struct {
types.NamespacedName
UID types.UID
}

func NewQueueKey(pod *v1.Pod) QueueKey {
return QueueKey{
NamespacedName: client.ObjectKeyFromObject(pod),
UID: pod.UID,
}
}

type Queue struct {
workqueue.RateLimitingInterface
set.Set
sets.Set[QueueKey]

coreV1Client corev1.CoreV1Interface
recorder events.Recorder
kubeClient client.Client
recorder events.Recorder
}

func NewQueue(coreV1Client corev1.CoreV1Interface, recorder events.Recorder) *Queue {
func NewQueue(kubeClient client.Client, recorder events.Recorder) *Queue {
queue := &Queue{
RateLimitingInterface: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay)),
Set: set.NewSet(),
coreV1Client: coreV1Client,
Set: sets.New[QueueKey](),
kubeClient: kubeClient,
recorder: recorder,
}
return queue
Expand All @@ -89,9 +101,10 @@ func (q *Queue) Builder(_ context.Context, m manager.Manager) controller.Builder
// Add adds pods to the Queue
func (q *Queue) Add(pods ...*v1.Pod) {
for _, pod := range pods {
if nn := client.ObjectKeyFromObject(pod); !q.Set.Contains(nn) {
q.Set.Add(nn)
q.RateLimitingInterface.Add(nn)
qk := NewQueueKey(pod)
if !q.Set.Has(qk) {
q.Set.Insert(qk)
q.RateLimitingInterface.Add(qk)
}
}
}
Expand All @@ -100,53 +113,63 @@ func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R
// Check if the queue is empty. client-go recommends not using this function to gate the subsequent
// get call, but since we're popping items off the queue synchronously, there should be no synchonization
// issues.
if q.Len() == 0 {
if q.RateLimitingInterface.Len() == 0 {
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
// Get pod from queue. This waits until queue is non-empty.
item, shutdown := q.RateLimitingInterface.Get()
if shutdown {
return reconcile.Result{}, fmt.Errorf("EvictionQueue is broken and has shutdown")
}
nn := item.(types.NamespacedName)
defer q.RateLimitingInterface.Done(nn)
qk := item.(QueueKey)
defer q.RateLimitingInterface.Done(qk)
// Evict pod
if q.Evict(ctx, nn) {
q.RateLimitingInterface.Forget(nn)
q.Set.Remove(nn)
if q.Evict(ctx, qk) {
q.RateLimitingInterface.Forget(qk)
q.Set.Delete(qk)
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}
// Requeue pod if eviction failed
q.RateLimitingInterface.AddRateLimited(nn)
q.RateLimitingInterface.AddRateLimited(qk)
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}

// Evict returns true if successful eviction call, and false if not an eviction-related error
func (q *Queue) Evict(ctx context.Context, nn types.NamespacedName) bool {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("pod", nn))
if err := q.coreV1Client.Pods(nn.Namespace).EvictV1(ctx, &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{Name: nn.Name, Namespace: nn.Namespace},
}); err != nil {
func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("pod", key.NamespacedName))
if err := q.kubeClient.SubResource("eviction").Create(ctx,
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}},
&policyv1.Eviction{
DeleteOptions: &metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{
UID: lo.ToPtr(key.UID),
},
},
}); err != nil {
// status codes for the eviction API are defined here:
// https://kubernetes.io/docs/concepts/scheduling-eviction/api-eviction/#how-api-initiated-eviction-works
if apierrors.IsNotFound(err) { // 404
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
// 404 - The pod no longer exists
// https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L160
// 409 - The pod exists, but it is not the same pod that we initiated the eviction on
// https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L318
return true
}
if apierrors.IsTooManyRequests(err) { // 429 - PDB violation
q.recorder.Publish(terminatorevents.NodeFailedToDrain(&v1.Node{ObjectMeta: metav1.ObjectMeta{
Name: nn.Name,
Namespace: nn.Namespace,
}}, fmt.Errorf("evicting pod %s/%s violates a PDB", nn.Namespace, nn.Name)))
Name: key.Name,
Namespace: key.Namespace,
}}, fmt.Errorf("evicting pod %s/%s violates a PDB", key.Namespace, key.Name)))
return false
}
logging.FromContext(ctx).Errorf("evicting pod, %s", err)
return false
}
q.recorder.Publish(terminatorevents.EvictPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: nn.Name, Namespace: nn.Namespace}}))
q.recorder.Publish(terminatorevents.EvictPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}))
return true
}

func (q *Queue) Reset() {
q.RateLimitingInterface = workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay))
q.Set = set.NewSet()
q.Set = sets.New[QueueKey]()
}
Loading

0 comments on commit 94e7412

Please sign in to comment.