Skip to content

Commit

Permalink
Merge pull request #938 from rphillips/backports/104847
Browse files Browse the repository at this point in the history
Bug 1999133: kubelet: Handle UID reuse in pod worker
  • Loading branch information
openshift-merge-robot authored Sep 17, 2021
2 parents f39a06c + 66e62bc commit a24cdad
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 14 deletions.
5 changes: 5 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2250,6 +2250,8 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
// TODO: move inside syncPod and make reentrant
// https://github.com/kubernetes/kubernetes/issues/105014
kl.probeManager.AddPod(pod)
}
}
Expand Down Expand Up @@ -2284,6 +2286,9 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
if err := kl.deletePod(pod); err != nil {
klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
}
// TODO: move inside syncTerminatingPod|syncTerminatedPod (we should stop probing
// once the pod kill is acknowledged and during eviction)
// https://github.com/kubernetes/kubernetes/issues/105014
kl.probeManager.RemovePod(pod)
}
}
Expand Down
61 changes: 52 additions & 9 deletions pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,15 +983,7 @@ func (kl *Kubelet) filterOutInactivePods(pods []*v1.Pod) []*v1.Pod {
}

// terminal pods are considered inactive UNLESS they are actively terminating
isTerminal := p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed
if !isTerminal {
// a pod that has been marked terminal within the Kubelet is considered
// inactive (may have been rejected by Kubelet admision)
if status, ok := kl.statusManager.GetPodStatus(p.UID); ok {
isTerminal = status.Phase == v1.PodSucceeded || status.Phase == v1.PodFailed
}
}
if isTerminal && !kl.podWorkers.IsPodTerminationRequested(p.UID) {
if kl.isAdmittedPodTerminal(p) && !kl.podWorkers.IsPodTerminationRequested(p.UID) {
continue
}

Expand All @@ -1000,6 +992,28 @@ func (kl *Kubelet) filterOutInactivePods(pods []*v1.Pod) []*v1.Pod {
return filteredPods
}

// isAdmittedPodTerminal returns true if the provided config source pod is in
// a terminal phase, or if the Kubelet has already indicated the pod has reached
// a terminal phase but the config source has not accepted it yet. This method
// should only be used within the pod configuration loops that notify the pod
// worker, other components should treat the pod worker as authoritative.
func (kl *Kubelet) isAdmittedPodTerminal(pod *v1.Pod) bool {
// pods are considered inactive if the config source has observed a
// terminal phase (if the Kubelet recorded that the pod reached a terminal
// phase the pod should never be restarted)
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
return true
}
// a pod that has been marked terminal within the Kubelet is considered
// inactive (may have been rejected by Kubelet admision)
if status, ok := kl.statusManager.GetPodStatus(pod.UID); ok {
if status.Phase == v1.PodSucceeded || status.Phase == v1.PodFailed {
return true
}
}
return false
}

// removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node.
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Pod) {
Expand Down Expand Up @@ -1081,13 +1095,16 @@ func (kl *Kubelet) HandlePodCleanups() error {
// cleanup of pod cgroups.
runningPods := make(map[types.UID]sets.Empty)
possiblyRunningPods := make(map[types.UID]sets.Empty)
restartablePods := make(map[types.UID]sets.Empty)
for uid, sync := range workingPods {
switch sync {
case SyncPodWork:
runningPods[uid] = struct{}{}
possiblyRunningPods[uid] = struct{}{}
case TerminatingPodWork:
possiblyRunningPods[uid] = struct{}{}
case TemporarilyTerminatedPodWork:
restartablePods[uid] = struct{}{}
}
}

Expand Down Expand Up @@ -1171,6 +1188,32 @@ func (kl *Kubelet) HandlePodCleanups() error {
}

kl.backOff.GC()

// If two pods with the same UID are observed in rapid succession, we need to
// resynchronize the pod worker after the first pod completes and decide whether
// to restart the pod. This happens last to avoid confusing the desired state
// in other components and to increase the likelihood transient OS failures during
// container start are mitigated. In general only static pods will ever reuse UIDs
// since the apiserver uses randomly generated UUIDv4 UIDs with a very low
// probability of collision.
for uid := range restartablePods {
pod, ok := allPodsByUID[uid]
if !ok {
continue
}
if kl.isAdmittedPodTerminal(pod) {
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse, but pod phase is terminal", "pod", klog.KObj(pod), "podUID", pod.UID)
continue
}
start := kl.clock.Now()
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
// TODO: move inside syncPod and make reentrant
// https://github.com/kubernetes/kubernetes/issues/105014
kl.probeManager.AddPod(pod)
}

return nil
}

Expand Down
38 changes: 34 additions & 4 deletions pkg/kubelet/pod_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ const (
// TerminatedPodWork indicates the pod is stopped, can have no more running
// containers, and any foreground cleanup can be executed.
TerminatedPodWork
// TemporarilyTerminatedPodWork is the same as TerminatedPodWork, but indicates
// that a create update was delivered AFTER the pod was terminated, indicating
// that the pod may have been recreated with the same UID.
TemporarilyTerminatedPodWork
)

// podWork is the internal changes
Expand Down Expand Up @@ -127,7 +131,7 @@ type PodWorkers interface {
// and have been terminated for a significant period of time. Once this method
// has been called once, the workers are assumed to be fully initialized and
// subsequent calls to ShouldPodContentBeRemoved on unknown pods will return
// true.
// true. It returns a map describing the state of each known pod worker.
SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkType

// IsPodKnownTerminated returns true if the provided pod UID is known by the pod
Expand Down Expand Up @@ -254,6 +258,11 @@ type podSyncStatus struct {
// to remove the pod. A terminal pod (Succeeded/Failed) will have
// termination status until the pod is deleted.
finished bool
// restartRequested is true if the pod worker was informed the pod is
// expected to exist (update type of create, update, or sync) after
// it has been killed. When known pods are synced, any pod that is
// terminated and has restartRequested will have its history cleared.
restartRequested bool
// notifyPostTerminating will be closed once the pod transitions to
// terminated. After the pod is in terminated state, nothing should be
// added to this list.
Expand Down Expand Up @@ -514,6 +523,19 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
p.podSyncStatuses[uid] = status
}

// if an update is received that implies the pod should be running, but we are already terminating a pod by
// that UID, assume that two pods with the same UID were created in close temporal proximity (usually static
// pod but it's possible for an apiserver to extremely rarely do something similar) - flag the sync status
// to indicate that after the pod terminates it should be reset to "not running" to allow a subsequent add/update
// to start the pod worker again
if status.IsTerminationRequested() {
if options.UpdateType == kubetypes.SyncPodCreate {
status.restartRequested = true
klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
}

// once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
if status.IsFinished() {
klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
Expand Down Expand Up @@ -977,12 +999,16 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkT

p.podsSynced = true
for uid, status := range p.podSyncStatuses {
if _, exists := known[uid]; !exists {
if _, exists := known[uid]; !exists || status.restartRequested {
p.removeTerminatedWorker(uid)
}
switch {
case !status.terminatedAt.IsZero():
workers[uid] = TerminatedPodWork
if status.restartRequested {
workers[uid] = TemporarilyTerminatedPodWork
} else {
workers[uid] = TerminatedPodWork
}
case !status.terminatingAt.IsZero():
workers[uid] = TerminatingPodWork
default:
Expand All @@ -1009,7 +1035,11 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
return
}

klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
if status.restartRequested {
klog.V(4).InfoS("Pod has been terminated but another pod with the same UID was created, remove history to allow restart", "podUID", uid)
} else {
klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
}
delete(p.podSyncStatuses, uid)
delete(p.podUpdates, uid)
delete(p.lastUndeliveredWorkUpdate, uid)
Expand Down
89 changes: 88 additions & 1 deletion test/e2e_node/mirror_pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/test/e2e/framework"
Expand Down Expand Up @@ -133,6 +134,60 @@ var _ = SIGDescribe("MirrorPod", func() {
err := deleteStaticPod(podPath, staticPodName, ns)
framework.ExpectNoError(err)

ginkgo.By("wait for the mirror pod to disappear")
gomega.Eventually(func() error {
return checkMirrorPodDisappear(f.ClientSet, mirrorPodName, ns)
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
})
})
ginkgo.Context("when create a mirror pod without changes ", func() {
var ns, podPath, staticPodName, mirrorPodName string
ginkgo.BeforeEach(func() {
})
/*
Release: v1.23
Testname: Mirror Pod, recreate
Description: When a static pod's manifest is removed and readded, the mirror pod MUST successfully recreate. Create the static pod, verify it is running, remove its manifest and then add it back, and verify the static pod runs again.
*/
ginkgo.It("should successfully recreate when file is removed and recreated [NodeConformance]", func() {
ns = f.Namespace.Name
staticPodName = "static-pod-" + string(uuid.NewUUID())
mirrorPodName = staticPodName + "-" + framework.TestContext.NodeName

podPath = framework.TestContext.KubeletConfig.StaticPodPath
ginkgo.By("create the static pod")
err := createStaticPod(podPath, staticPodName, ns,
imageutils.GetE2EImage(imageutils.Nginx), v1.RestartPolicyAlways)
framework.ExpectNoError(err)

ginkgo.By("wait for the mirror pod to be running")
gomega.Eventually(func() error {
return checkMirrorPodRunning(f.ClientSet, mirrorPodName, ns)
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())

ginkgo.By("delete the pod manifest from disk")
err = deleteStaticPod(podPath, staticPodName, ns)
framework.ExpectNoError(err)

ginkgo.By("recreate the file")
err = createStaticPod(podPath, staticPodName, ns,
imageutils.GetE2EImage(imageutils.Nginx), v1.RestartPolicyAlways)
framework.ExpectNoError(err)

ginkgo.By("mirror pod should restart with count 1")
gomega.Eventually(func() error {
return checkMirrorPodRunningWithRestartCount(2*time.Second, 2*time.Minute, f.ClientSet, mirrorPodName, ns, 1)
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())

ginkgo.By("mirror pod should stay running")
gomega.Consistently(func() error {
return checkMirrorPodRunning(f.ClientSet, mirrorPodName, ns)
}, time.Second*30, time.Second*4).Should(gomega.BeNil())

ginkgo.By("delete the static pod")
err = deleteStaticPod(podPath, staticPodName, ns)
framework.ExpectNoError(err)

ginkgo.By("wait for the mirror pod to disappear")
gomega.Eventually(func() error {
return checkMirrorPodDisappear(f.ClientSet, mirrorPodName, ns)
Expand Down Expand Up @@ -194,8 +249,40 @@ func checkMirrorPodRunning(cl clientset.Interface, name, namespace string) error
}
for i := range pod.Status.ContainerStatuses {
if pod.Status.ContainerStatuses[i].State.Running == nil {
return fmt.Errorf("expected the mirror pod %q with container %q to be running", name, pod.Status.ContainerStatuses[i].Name)
return fmt.Errorf("expected the mirror pod %q with container %q to be running (got containers=%v)", name, pod.Status.ContainerStatuses[i].Name, pod.Status.ContainerStatuses[i].State)
}
}
return validateMirrorPod(cl, pod)
}

func checkMirrorPodRunningWithRestartCount(interval time.Duration, timeout time.Duration, cl clientset.Interface, name, namespace string, count int32) error {
var pod *v1.Pod
var err error
err = wait.PollImmediate(interval, timeout, func() (bool, error) {
pod, err = cl.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("expected the mirror pod %q to appear: %v", name, err)
}
if pod.Status.Phase != v1.PodRunning {
return false, fmt.Errorf("expected the mirror pod %q to be running, got %q", name, pod.Status.Phase)
}
for i := range pod.Status.ContainerStatuses {
if pod.Status.ContainerStatuses[i].State.Waiting != nil {
// retry if pod is in waiting state
return false, nil
}
if pod.Status.ContainerStatuses[i].State.Running == nil {
return false, fmt.Errorf("expected the mirror pod %q with container %q to be running (got containers=%v)", name, pod.Status.ContainerStatuses[i].Name, pod.Status.ContainerStatuses[i].State)
}
if pod.Status.ContainerStatuses[i].RestartCount == count {
// found the restart count
return true, nil
}
}
return false, nil
})
if err != nil {
return err
}
return validateMirrorPod(cl, pod)
}
Expand Down

0 comments on commit a24cdad

Please sign in to comment.