diff --git a/pkg/checkpoint/apiserver.go b/pkg/checkpoint/apiserver.go index 9f7f6f40d..2bc1ecc97 100644 --- a/pkg/checkpoint/apiserver.go +++ b/pkg/checkpoint/apiserver.go @@ -9,7 +9,8 @@ import ( ) // getAPIParentPods will retrieve all pods from apiserver that are parents & should be checkpointed -func (c *checkpointer) getAPIParentPods(nodeName string) map[string]*v1.Pod { +// Returns false if we could not contact the apiserver. +func (c *checkpointer) getAPIParentPods(nodeName string) (bool, map[string]*v1.Pod) { opts := metav1.ListOptions{ FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName).String(), } @@ -17,7 +18,7 @@ func (c *checkpointer) getAPIParentPods(nodeName string) map[string]*v1.Pod { podList, err := c.apiserver.CoreV1().Pods(api.NamespaceAll).List(opts) if err != nil { glog.Warningf("Unable to contact APIServer, skipping garbage collection: %v", err) - return nil + return false, nil } - return podListToParentPods(podList) + return true, podListToParentPods(podList) } diff --git a/pkg/checkpoint/checkpoint.go b/pkg/checkpoint/checkpoint.go index 2b84be66f..dbb0ce595 100644 --- a/pkg/checkpoint/checkpoint.go +++ b/pkg/checkpoint/checkpoint.go @@ -4,8 +4,10 @@ package checkpoint import ( "fmt" + "os" "time" + "github.com/golang/glog" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" ) @@ -23,12 +25,14 @@ const ( shouldCheckpoint = "true" podSourceFile = "file" - defaultPollingFrequency = 3 * time.Second - defaultCheckpointTimeout = 1 * time.Minute + defaultPollingFrequency = 5 * time.Second + defaultCheckpointTimeout = 1 * time.Minute + defaultCheckpointGracePeriod = 1 * time.Minute ) var ( - lastCheckpoint time.Time + lastCheckpoint time.Time + checkpointGracePeriod = defaultCheckpointGracePeriod ) // Options defines the parameters that are required to start the checkpointer. @@ -90,6 +94,12 @@ func Run(opts Options) error { // run is the main checkpointing loop. func (c *checkpointer) run() { + // Make sure the inactive checkpoint path exists. + if err := os.MkdirAll(inactiveCheckpointPath, 0600); err != nil { + glog.Fatalf("Could not create inactive checkpoint path: %v", err) + } + + cs := checkpoints{} for { time.Sleep(defaultPollingFrequency) @@ -101,20 +111,21 @@ func (c *checkpointer) run() { localParentPods := c.kubelet.localParentPods() localRunningPods := c.cri.localRunningPods() - c.createCheckpointsForValidParents(localParentPods) - // Try to get scheduled pods from the apiserver. // These will be used to GC checkpoints for parents no longer scheduled to this node. - // A return value of nil is assumed to be "could not contact apiserver" // TODO(aaron): only check this every 30 seconds or so - apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName) + apiAvailable, apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName) // Get on disk copies of (in)active checkpoints //TODO(aaron): Could be racy to load from disk each time, but much easier than trying to keep in-memory state in sync. activeCheckpoints := getFileCheckpoints(activeCheckpointPath) inactiveCheckpoints := getFileCheckpoints(inactiveCheckpointPath) - start, stop, remove := process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod) + // Update checkpoints using the latest information from the APIs. + cs.update(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod) + + // Update checkpoint states and determine which checkpoints to start, stop, or remove. + start, stop, remove := cs.process(time.Now(), apiAvailable, localRunningPods, localParentPods, apiParentPods) // Handle remove at last because we may still have some work to do // before removing the checkpointer itself. diff --git a/pkg/checkpoint/manifest.go b/pkg/checkpoint/manifest.go index b4ecb664d..f774b6576 100644 --- a/pkg/checkpoint/manifest.go +++ b/pkg/checkpoint/manifest.go @@ -52,10 +52,6 @@ func writeCheckpointManifest(pod *v1.Pod) (bool, error) { return false, err } path := filepath.Join(inactiveCheckpointPath, pod.Namespace+"-"+pod.Name+".json") - // Make sure the inactive checkpoint path exists. - if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil { - return false, err - } return writeManifestIfDifferent(path, podFullName(pod), buff.Bytes()) } diff --git a/pkg/checkpoint/process.go b/pkg/checkpoint/process.go index 8af77cb26..97d4ea693 100644 --- a/pkg/checkpoint/process.go +++ b/pkg/checkpoint/process.go @@ -1,6 +1,7 @@ package checkpoint import ( + "fmt" "io/ioutil" "os" "time" @@ -9,127 +10,522 @@ import ( "k8s.io/client-go/pkg/api/v1" ) -// process() makes decisions on which checkpoints need to be started, stopped, or removed. -// It makes this decision based on inspecting the states from kubelet, apiserver, active/inactive checkpoints. -// -// - localRunningPods: running pods retrieved from CRI shim. Minimal amount of info (no podStatus) as it is extracted from container runtime. -// - localParentPods: pod state from kubelet api for all "to be checkpointed" pods - podStatus may be stale (only as recent as last apiserver contact) -// - apiParentPods: pod state from the api server for all "to be checkpointed" pods -// - activeCheckpoints: checkpoint pod manifests which are currently active & in the static pod manifest -// - inactiveCheckpoints: checkpoint pod manifest which are stored in an inactive directory, but are ready to be activated -// -// The return values are checkpoints which should be started or stopped, and checkpoints which need to be removed altogether. -// The removal of a checkpoint means its parent is no longer scheduled to this node, and we need to GC active / inactive -// checkpoints as well as any secrets / configMaps which are no longer necessary. -func process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints map[string]*v1.Pod, checkpointerPod CheckpointerPod) (start, stop, remove []string) { - // If this variable is filled, then it means we need to remove the pod-checkpointer's checkpoint. - // We treat the pod-checkpointer's checkpoint specially because we want to put it at the end of - // the remove queue. - var podCheckpointerID string - - // We can only make some GC decisions if we've successfully contacted an apiserver. - // When apiParentPods == nil, that means we were not able to get an updated list of pods. - removeMap := make(map[string]struct{}) - if len(apiParentPods) != 0 { - - // Scan for inacive checkpoints we should GC - for id := range inactiveCheckpoints { - // If the inactive checkpoint still has a parent pod, do nothing. - // This means the kubelet thinks it should still be running, which has the same scheduling info that we do -- - // so we won't make any decisions about its checkpoint. - // TODO(aaron): This is a safety check, and may not be necessary -- question is do we trust that the api state we received - // is accurate -- and that we should ignore our local state (or assume it could be inaccurate). For example, - // local kubelet pod state will be innacurate in the case that we can't contact apiserver (kubelet only keeps - // cached responses from api) -- however, we're assuming we've been able to contact api, so this likely is moot. - if _, ok := localParentPods[id]; ok { - glog.V(4).Infof("API GC: skipping inactive checkpoint %s", id) - continue +// apiState represents information returned from the various api endpoints for a given pod. +type apiState struct { + // apiAvailable is true if the apiserver was reachable. + apiAvailable bool + // apiParent is true if the api parent pod exists. + apiParent bool + // localRunning is true if the CRI shim reports that the pod is running locally. + localRunning bool + // localParent is true if the kubelet parent pod exists. + localParent bool +} + +// action represents the action to be taken based on the state of a checkpoint. +type action int + +const ( + // none is the default action of "do nothing". + none = iota + // start means that the checkpoint should be started. + start + // stop means that the checkpoint should be stopped. + stop + // remove means that the checkpoint should be garbage collected. + remove +) + +// String() implements fmt.Stringer.String(). +func (a action) String() string { + switch a { + case none: + return "none" + case start: + return "start" + case stop: + return "stop" + case remove: + return "remove" + default: + return "[unknown action]" + } +} + +// state represents the current state of a checkpoint. +type state interface { + // transition computes the new state for the current time and information from various apis. + transition(time.Time, apiState) state + // action returns the action that should be taken for this state. + action() action +} + +// checkpoint holds the state of a single checkpointed pod. A checkpoint can move between states +// based on the apiState that the checkpointer sees. +type checkpoint struct { + // name is the name of the checkpointed pod. + name string + // pod is the most up-to-date v1.Pod data. + pod *v1.Pod + // state is the current state of the checkpoint. + state state +} + +// String() implements fmt.Stringer.String(). +func (c checkpoint) String() string { + return fmt.Sprintf("%s (%s)", c.name, c.state) +} + +// checkpoints holds the state of the checkpoints. +type checkpoints struct { + checkpoints map[string]*checkpoint + selfCheckpoint *checkpoint +} + +// update updates the checkpoints using the information retrieved from the various API endpoints. +func (cs *checkpoints) update(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints map[string]*v1.Pod, checkpointerPod CheckpointerPod) { + if cs.checkpoints == nil { + cs.checkpoints = make(map[string]*checkpoint) + } + + // Temporarily add the self-checkpointer into the map so it is updated as well. + if cs.selfCheckpoint != nil { + cs.checkpoints[cs.selfCheckpoint.name] = cs.selfCheckpoint + } + + // Make sure all on-disk checkpoints are represented in memory, i.e. if we are restarting from a crash. + for name, pod := range activeCheckpoints { + if _, ok := cs.checkpoints[name]; !ok { + cs.checkpoints[name] = &checkpoint{ + name: name, + state: stateActive{}, + pod: pod, } + // Override the state for the self-checkpointer. + if isPodCheckpointer(pod, checkpointerPod) { + cs.checkpoints[name].state = stateSelfCheckpoint{} + } + } + } - // If the inactive checkpoint does not have a parent in the api-server, we must assume it should no longer be running on this node. - // NOTE: It's possible that a replacement for this pod has not been rescheduled elsewhere, but that's not something we can base our decision on. - // For example, if a single scheduler is running, and the node is drained, the scheduler pod will be deleted and there will be no replacement. - // However, we don't know this, and as far as the checkpointer is concerned - that pod is no longer scheduled to this node. - if _, ok := apiParentPods[id]; !ok { - glog.V(4).Infof("API GC: should remove inactive checkpoint %s", id) - - removeMap[id] = struct{}{} - if isPodCheckpointer(inactiveCheckpoints[id], checkpointerPod) { - podCheckpointerID = id - break - } + for name, pod := range inactiveCheckpoints { + if _, ok := cs.checkpoints[name]; !ok { + cs.checkpoints[name] = &checkpoint{ + name: name, + state: stateInactive{}, + pod: pod, + } + // Override the state for the self-checkpointer. + if isPodCheckpointer(pod, checkpointerPod) { + cs.checkpoints[name].state = stateSelfCheckpoint{} + } + } + } - delete(inactiveCheckpoints, id) + // Add union of parent pods from other sources. + for name, pod := range localParentPods { + if _, ok := cs.checkpoints[name]; !ok { + cs.checkpoints[name] = &checkpoint{ + name: name, + state: stateNone{}, } } + // Always overwrite with the localParentPod since it is a better source of truth. + cs.checkpoints[name].pod = pod + } - // Scan active checkpoints we should GC - for id := range activeCheckpoints { - // If the active checkpoint does not have a parent in the api-server, we must assume it should no longer be running on this node. - if _, ok := apiParentPods[id]; !ok { - glog.V(4).Infof("API GC: should remove active checkpoint %s", id) + for name, pod := range apiParentPods { + if _, ok := cs.checkpoints[name]; !ok { + cs.checkpoints[name] = &checkpoint{ + name: name, + state: stateNone{}, + } + } + // Always overwrite with the apiParentPod since it is a better source of truth. + cs.checkpoints[name].pod = pod + } - removeMap[id] = struct{}{} - if isPodCheckpointer(activeCheckpoints[id], checkpointerPod) { - podCheckpointerID = id - break + // Find the self-checkpointer pod if it exists and remove it from the map. + for _, cp := range cs.checkpoints { + if isPodCheckpointer(cp.pod, checkpointerPod) { + // Separate the self-checkpoint from the map, as it is handled separately. + cs.selfCheckpoint = cp + delete(cs.checkpoints, cp.name) + + // If this is a new self-checkpoint make sure the state is set correctly. + if cp.state.action() == none { + cp.state = stateSelfCheckpoint{} + } + } + } +} + +// process uses the apiserver inputs and curren time to determine which checkpoints to start, stop, +// or remove. +func (cs *checkpoints) process(now time.Time, apiAvailable bool, localRunningPods, localParentPods, apiParentPods map[string]*v1.Pod) (starts, stops, removes []string) { + // The checkpointer must be handled specially: the checkpoint always needs to remain active, and + // if it is removed from the apiserver then all other checkpoints need to be removed first. + if cs.selfCheckpoint != nil { + state := cs.selfCheckpoint.state.transition(now, apiState{ + apiAvailable: apiAvailable, + apiParent: apiParentPods[cs.selfCheckpoint.name] != nil, + localRunning: localRunningPods[cs.selfCheckpoint.name] != nil, + localParent: localParentPods[cs.selfCheckpoint.name] != nil, + }) + + if state != cs.selfCheckpoint.state { + glog.Infof("Self-checkpoint %s transitioning from state %s -> state %s", cs.selfCheckpoint, cs.selfCheckpoint.state, state) + cs.selfCheckpoint.state = state + } + + switch cs.selfCheckpoint.state.action() { + case none: + glog.Errorf("Unexpected transition to state %s with action 'none'", state) + case start: + // The selfCheckpoint must always be active to ensure that it can perform its functions in + // the face of a full control plane restart. + starts = append(starts, cs.selfCheckpoint.name) + case stop: + // If the checkpointer is stopped then stop all checkpoints. Next cycle they may restart. + for name, cp := range cs.checkpoints { + if cp.state.action() != none { + stops = append(stops, name) } + } + stops = append(stops, cs.selfCheckpoint.name) + return starts, stops, removes + case remove: + // If the checkpointer is removed then remove all checkpoints, putting the selfCheckpoint + // last and removing all state. + for name, cp := range cs.checkpoints { + if cp.state.action() != none { + removes = append(removes, name) + delete(cs.checkpoints, name) + } + } + removes = append(removes, cs.selfCheckpoint.name) + delete(cs.checkpoints, cs.selfCheckpoint.name) + cs.selfCheckpoint = nil + return starts, stops, removes + default: + panic(fmt.Sprintf("unhandled action: %s", cs.selfCheckpoint.state.action())) + } + } - delete(activeCheckpoints, id) + // Update states for all the checkpoints and compute which to start / stop / remove. + for name, cp := range cs.checkpoints { + state := cp.state.transition(now, apiState{ + apiAvailable: apiAvailable, + apiParent: apiParentPods[name] != nil, + localRunning: localRunningPods[name] != nil, + localParent: localParentPods[name] != nil, + }) + + if state != cp.state { + // Apply state transition. + // TODO(diegs): always apply this. + if cp.state.action() != state.action() { + switch state.action() { + case none: + glog.Errorf("Unexpected transition to state %s with action 'none'", state) + case start: + starts = append(starts, cp.name) + case stop: + stops = append(stops, cp.name) + case remove: + removes = append(removes, cp.name) + delete(cs.checkpoints, cp.name) + default: + panic(fmt.Sprintf("unhandled action: %s", state.action())) + } } + + glog.Infof("Checkpoint %s transitioning from state %s -> state %s", cp, cp.state, state) + cp.state = state } } - // Remove all checkpoints if we need to remove the pod checkpointer itself. - if podCheckpointerID != "" { - glog.V(4).Info("Pod checkpointer is removed, removing all checkpoints") - for id := range inactiveCheckpoints { - removeMap[id] = struct{}{} - delete(inactiveCheckpoints, id) + return starts, stops, removes +} + +// stateSelfCheckpoint represents a checkpoint of the checkpointer itself, which has special +// behavior. +// +// stateSelfCheckpoint can transition to stateInactiveGracePeriod. +type stateSelfCheckpoint struct{} + +// transition implements state.transition() +func (s stateSelfCheckpoint) transition(now time.Time, apis apiState) state { + if !apis.apiAvailable { + // If the apiserver is unavailable always stay in the selfCheckpoint state. + return s + } + + if apis.apiParent { + // If the parent pod exists always stay in the selfCheckpoint state. + return s + } + + // The apiserver pod is deleted, transition to stateInactiveGracePeriod. + // TODO(diegs): this is a little hacky, perhaps clean it up with a constructor. + return stateInactiveGracePeriod{gracePeriodEnd: now.Add(checkpointGracePeriod)}.checkGracePeriod(now, apis) +} + +// action implements state.action() +func (s stateSelfCheckpoint) action() action { + // The self-checkpoint should always be started. + return start +} + +// String() implements fmt.Stringer.String(). +func (s stateSelfCheckpoint) String() string { + return "self-checkpoint" +} + +// stateNone represents a new pod that has not been processed yet, so it has no checkpoint state. +// +// stateNone can transition to stateInactive, stateInactiveGracePeriod, or stateActive. +type stateNone struct{} + +// transition implements state.transition() +func (s stateNone) transition(now time.Time, apis apiState) state { + // Newly discovered pods are treated as mostly inactive, but only if there is either a local + // running pod or kubelet parent pod. In other words, if the new pod is only reflected in the + // apiserver we do not checkpoint it yet. + if apis.localRunning || apis.localParent { + return stateInactive{}.transition(now, apis) + } + return s +} + +// action implements state.action() +func (s stateNone) action() action { + return none +} + +// String() implements fmt.Stringer.String(). +func (s stateNone) String() string { + return "none" +} + +// stateInactive is a checkpoint that is currently sitting inactive on disk. +// +// stateInactive can transition to stateActive or stateInactiveGracePeriod. +type stateInactive struct{} + +// transition implements state.transition() +func (s stateInactive) transition(now time.Time, apis apiState) state { + if !apis.apiAvailable { + // The apiserver is unavailable but the local copy is running, remain in stateInactive. + if apis.localRunning { + return s } - for id := range activeCheckpoints { - removeMap[id] = struct{}{} - delete(activeCheckpoints, id) + + // The apiserver is unavailable and the local pod is not running, transition to stateActive. + return stateActive{} + } + + if apis.apiParent { + // The parent pod exists and the kubelet is running it, remain in stateInactive. + if apis.localRunning { + return s } + + // The parent pod exists but the kubelet is not running it, transition to stateActive. + return stateActive{} } - // Can make decisions about starting/stopping checkpoints just with local state. - // - // If there is an inactive checkpoint, and no parent pod is running, or the checkpoint - // is the pod-checkpointer, start the checkpoint. - for id := range inactiveCheckpoints { - _, ok := localRunningPods[id] - if !ok || isPodCheckpointer(inactiveCheckpoints[id], checkpointerPod) { - glog.V(4).Infof("Should start checkpoint %s", id) - start = append(start, id) + // The apiserver pod is deleted, transition to stateInactiveGracePeriod. + // TODO(diegs): this is a little hacky, perhaps clean it up with a constructor. + return stateInactiveGracePeriod{gracePeriodEnd: now.Add(checkpointGracePeriod)}.checkGracePeriod(now, apis) +} + +// action implements state.action() +func (s stateInactive) action() action { + return stop +} + +// String() implements fmt.Stringer.String(). +func (s stateInactive) String() string { + return "inactive" +} + +// stateInactiveGracePeriod is a checkpoint that is inactive but will be garbage collected after a +// grace period. +// +// stateInactiveGracePeriod can transition to stateInactive, stateActive, or stateRemove. +type stateInactiveGracePeriod struct { + // gracePeriodEnd is the time when the grace period for this checkpoint is over and it should be + // garbage collected. + gracePeriodEnd time.Time +} + +// transition implements state.transition() +func (s stateInactiveGracePeriod) transition(now time.Time, apis apiState) state { + if !apis.apiAvailable { + // The apiserver is unavailable but the local copy is running, remain in + // stateInactiveGracePeriod. + if apis.localRunning { + return s.checkGracePeriod(now, apis) } + + // The apiserver is unavailable and the local pod is not running, transition to stateActive. + return stateActive{} } - // If there is an active checkpoint and a running parent pod, stop the active checkpoint - // unless this is the pod-checkpointer. - // The parent may not be in a running state, but the kubelet is trying to start it - // so we should get out of the way. - for id := range activeCheckpoints { - _, ok := localRunningPods[id] - if ok && !isPodCheckpointer(activeCheckpoints[id], checkpointerPod) { - glog.V(4).Infof("Should stop checkpoint %s", id) - stop = append(stop, id) + if apis.apiParent { + // The parent pod exists and the kubelet is running it, remain in inactive. + if apis.localRunning { + return stateInactive{} } + + // The parent pod exists but the kubelet is not running it, transition to stateActive. + return stateActive{} } - // De-duped checkpoints to remove. If we decide to GC a checkpoint, we will clean up both inactive/active. - for k := range removeMap { - if k == podCheckpointerID { - continue + // The apiserver pod is still deleted, remain in stateInactiveGracePeriod. + return s.checkGracePeriod(now, apis) +} + +func (s stateInactiveGracePeriod) checkGracePeriod(now time.Time, apis apiState) state { + // Override state to remove if the grace period has passed. + if now.Equal(s.gracePeriodEnd) || now.After(s.gracePeriodEnd) { + glog.Infof("Grace period exceeded for state %s", s) + return stateRemove{} + } + return s +} + +// action implements state.action() +func (s stateInactiveGracePeriod) action() action { + return stop +} + +// String() implements fmt.Stringer.String(). +func (s stateInactiveGracePeriod) String() string { + return "inactive (grace period)" +} + +// stateActive is a checkpoint that is currently activated. +// +// stateActive can transition to stateInactive or stateActiveGracePeriod. +type stateActive struct{} + +// transition implements state.transition() +func (s stateActive) transition(now time.Time, apis apiState) state { + if !apis.apiAvailable { + // The apiserver is unavailable but the local copy is running, transition to inactive. + if apis.localRunning { + return stateInactive{} + } + + // The apiserver is unavailable and the local pod is not running, remain in stateActive. + return s + } + + if apis.apiParent { + // The parent pod exists and the kubelet is running it, transition to inactive. + if apis.localRunning { + return stateInactive{} } - remove = append(remove, k) + + // The parent pod exists but the kubelet is not running it, remain in stateActive. + return s } - // Put pod checkpoint at the last of the queue. - if podCheckpointerID != "" { - remove = append(remove, podCheckpointerID) + + // The apiserver pod is deleted, transition to stateActiveGracePeriod. + // TODO(diegs): this is a little hacky, perhaps clean it up with a constructor. + return stateActiveGracePeriod{gracePeriodEnd: now.Add(checkpointGracePeriod)}.checkGracePeriod(now, apis) +} + +// action implements state.action() +func (s stateActive) action() action { + return start +} + +// String() implements fmt.Stringer.String(). +func (s stateActive) String() string { + return "active" +} + +// stateActiveGracePeriod is a checkpoint that is active but will be garbage collected after a grace +// period. +// +// stateActiveGracePeriod can transition to stateActive or stateInactive. +type stateActiveGracePeriod struct { + // gracePeriodEnd is the time when the grace period for this checkpoint is over and it should be + // garbage collected. + gracePeriodEnd time.Time +} + +// transition implements state.transition() +func (s stateActiveGracePeriod) transition(now time.Time, apis apiState) state { + if !apis.apiAvailable { + // The apiserver is unavailable but the local copy is running, transition to stateInactive. + if apis.localRunning { + return stateInactive{} + } + + // The apiserver is unavailable and the local pod is not running, remain in + // stateActiveGracePeriod. + return s.checkGracePeriod(now, apis) } - return start, stop, remove + if apis.apiParent { + // The parent pod exists and the kubelet is running it, transition to stateInactive. + if apis.localRunning { + return stateInactive{} + } + + // The parent pod exists but the kubelet is not running it, transition to stateActive. + return stateActive{} + } + + // The apiserver pod is still deleted, remain in stateActiveGracePeriod. + return s.checkGracePeriod(now, apis) +} + +func (s stateActiveGracePeriod) checkGracePeriod(now time.Time, apis apiState) state { + // Override state to stateInactiveGracePeriod.transition() as if the grace period has passed. This + // has the effect of either transitioning to stateInactive or stateRemove. + if now.Equal(s.gracePeriodEnd) || now.After(s.gracePeriodEnd) { + glog.Infof("Grace period exceeded for state %s", s) + return stateInactiveGracePeriod{gracePeriodEnd: now}.transition(now, apis) + } + return s +} + +// action implements state.action() +func (s stateActiveGracePeriod) action() action { + return start +} + +// String() implements fmt.Stringer.String(). +func (s stateActiveGracePeriod) String() string { + return "active (grace period)" +} + +// stateRemove is a checkpoint that is being garbage collected. +// +// It is a terminal state that can never transition to other states; checkpoints in this state are +// removed as part of the update loop. +type stateRemove struct{} + +// transition implements state.transition() +func (s stateRemove) transition(now time.Time, apis apiState) state { + // Remove is a terminal state. This should never actually be called. + glog.Errorf("Unexpected call to transition() for state %s", s) + return s +} + +// action implements state.action() +func (s stateRemove) action() action { + return remove +} + +// String() implements fmt.Stringer.String(). +func (s stateRemove) String() string { + return "remove" } // createCheckpointsForValidParents will iterate through pods which are candidates for checkpointing, then: @@ -137,7 +533,6 @@ func process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints // - sanitize their podSpec, removing unnecessary information // - store the manifest on disk in an "inactive" checkpoint location func (c *checkpointer) createCheckpointsForValidParents(pods map[string]*v1.Pod) { - needsCheckpointUpdate := lastCheckpoint.IsZero() || time.Since(lastCheckpoint) >= defaultCheckpointTimeout for _, pod := range pods { diff --git a/pkg/checkpoint/process_test.go b/pkg/checkpoint/process_test.go index a9fcb42f5..3a9f74f34 100644 --- a/pkg/checkpoint/process_test.go +++ b/pkg/checkpoint/process_test.go @@ -1,14 +1,19 @@ package checkpoint import ( + "flag" "reflect" "testing" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" ) func TestProcess(t *testing.T) { + flag.Set("logtostderr", "true") + flag.Parse() + type testCase struct { desc string localRunning map[string]*v1.Pod @@ -19,6 +24,9 @@ func TestProcess(t *testing.T) { expectStart []string expectStop []string expectRemove []string + expectGraceStart []string + expectGraceStop []string + expectGraceRemove []string podName string } @@ -37,7 +45,7 @@ func TestProcess(t *testing.T) { desc: "Inactive checkpoint and no api parent: should remove", inactiveCheckpoints: map[string]*v1.Pod{"AA": {}}, apiParents: map[string]*v1.Pod{"BB": {}}, - expectRemove: []string{"AA"}, + expectGraceRemove: []string{"AA"}, }, { desc: "Inactive checkpoint and both api & local running: no change", @@ -67,10 +75,10 @@ func TestProcess(t *testing.T) { apiParents: map[string]*v1.Pod{"AA": {}}, }, { - desc: "Active checkpoint and no api parent: remove", + desc: "Active checkpoint and no api parent: should remove", activeCheckpoints: map[string]*v1.Pod{"AA": {}}, apiParents: map[string]*v1.Pod{"BB": {}}, - expectRemove: []string{"AA"}, + expectGraceRemove: []string{"AA"}, }, { desc: "Active checkpoint with local running, and api parent: should stop", @@ -84,14 +92,14 @@ func TestProcess(t *testing.T) { activeCheckpoints: map[string]*v1.Pod{"AA": {}}, localParents: map[string]*v1.Pod{"AA": {}}, apiParents: map[string]*v1.Pod{"BB": {}}, - expectRemove: []string{"AA"}, + expectGraceRemove: []string{"AA"}, }, { desc: "Both active and inactive checkpoints, with no api parent: remove both", activeCheckpoints: map[string]*v1.Pod{"AA": {}}, inactiveCheckpoints: map[string]*v1.Pod{"AA": {}}, apiParents: map[string]*v1.Pod{"BB": {}}, - expectRemove: []string{"AA"}, // Only need single remove, we should clean up both active/inactive + expectGraceRemove: []string{"AA"}, // Only need single remove, we should clean up both active/inactive }, { desc: "Inactive checkpoint, local parent, local running, no api parent: no change", // Safety check - don't GC if local parent still exists (even if possibly stale) @@ -108,7 +116,14 @@ func TestProcess(t *testing.T) { desc: "Inactive pod-checkpointer, local parent, local running, api parent: should start", localRunning: map[string]*v1.Pod{"kube-system/pod-checkpointer": {}}, localParents: map[string]*v1.Pod{"kube-system/pod-checkpointer": {}}, - apiParents: map[string]*v1.Pod{"kube-system/pod-checkpointer": {}}, + apiParents: map[string]*v1.Pod{ + "kube-system/pod-checkpointer": { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "pod-checkpointer", + }, + }, + }, inactiveCheckpoints: map[string]*v1.Pod{ "kube-system/pod-checkpointer": { ObjectMeta: metav1.ObjectMeta{ @@ -117,11 +132,19 @@ func TestProcess(t *testing.T) { }, }, }, - expectStart: []string{"kube-system/pod-checkpointer"}, + expectStart: []string{"kube-system/pod-checkpointer"}, + expectGraceStart: []string{"kube-system/pod-checkpointer"}, }, { - desc: "Inactive pod-checkpointer, local parent, no local running, api not reachable: should start", - localParents: map[string]*v1.Pod{"kube-system/pod-checkpointer": {}}, + desc: "Inactive pod-checkpointer, local parent, no local running, api not reachable: should start", + localParents: map[string]*v1.Pod{ + "kube-system/pod-checkpointer": { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "pod-checkpointer", + }, + }, + }, inactiveCheckpoints: map[string]*v1.Pod{ "kube-system/pod-checkpointer": { ObjectMeta: metav1.ObjectMeta{ @@ -130,11 +153,12 @@ func TestProcess(t *testing.T) { }, }, }, - expectStart: []string{"kube-system/pod-checkpointer"}, + expectStart: []string{"kube-system/pod-checkpointer"}, + expectGraceStart: []string{"kube-system/pod-checkpointer"}, }, { desc: "Inactive pod-checkpointer, no local parent, no api parent: should remove in the last", - localRunning: map[string]*v1.Pod{"kube-system/pod-checkpointer": {}, "AA": {}}, + localRunning: map[string]*v1.Pod{"kube-system/pod-checkpointer": {}, "AA": {}, "BB": {}}, localParents: map[string]*v1.Pod{"BB": {}}, apiParents: map[string]*v1.Pod{"BB": {}}, inactiveCheckpoints: map[string]*v1.Pod{ @@ -146,7 +170,8 @@ func TestProcess(t *testing.T) { }, "AA": {}, }, - expectRemove: []string{"AA", "kube-system/pod-checkpointer"}, + expectStop: []string{"AA", "kube-system/pod-checkpointer"}, + expectGraceRemove: []string{"AA", "kube-system/pod-checkpointer"}, }, { desc: "Inactive pod-checkpointer, no local parent, no api parent: should remove all", @@ -162,7 +187,8 @@ func TestProcess(t *testing.T) { }, "AA": {}, }, - expectRemove: []string{"AA", "kube-system/pod-checkpointer"}, + expectStop: []string{"AA", "kube-system/pod-checkpointer"}, + expectGraceRemove: []string{"AA", "kube-system/pod-checkpointer"}, }, { desc: "Active pod-checkpointer, no local parent, no api parent: should remove all", @@ -178,14 +204,22 @@ func TestProcess(t *testing.T) { }, "AA": {}, }, - expectRemove: []string{"AA", "kube-system/pod-checkpointer"}, + expectStop: []string{"AA", "kube-system/pod-checkpointer"}, + expectGraceRemove: []string{"AA", "kube-system/pod-checkpointer"}, }, { desc: "Running as an on-disk checkpointer: Inactive pod-checkpointer, local parent, local running, api parent: should start", podName: "pod-checkpointer-mynode", localRunning: map[string]*v1.Pod{"kube-system/pod-checkpointer": {}}, localParents: map[string]*v1.Pod{"kube-system/pod-checkpointer": {}}, - apiParents: map[string]*v1.Pod{"kube-system/pod-checkpointer": {}}, + apiParents: map[string]*v1.Pod{ + "kube-system/pod-checkpointer": { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "pod-checkpointer", + }, + }, + }, inactiveCheckpoints: map[string]*v1.Pod{ "kube-system/pod-checkpointer": { ObjectMeta: metav1.ObjectMeta{ @@ -194,12 +228,20 @@ func TestProcess(t *testing.T) { }, }, }, - expectStart: []string{"kube-system/pod-checkpointer"}, + expectStart: []string{"kube-system/pod-checkpointer"}, + expectGraceStart: []string{"kube-system/pod-checkpointer"}, }, { - desc: "Running as an on-disk checkpointer: Inactive pod-checkpointer, local parent, no local running, api not reachable: should start", - podName: "pod-checkpointer-mynode", - localParents: map[string]*v1.Pod{"kube-system/pod-checkpointer": {}}, + desc: "Running as an on-disk checkpointer: Inactive pod-checkpointer, local parent, no local running, api not reachable: should start", + podName: "pod-checkpointer-mynode", + localParents: map[string]*v1.Pod{ + "kube-system/pod-checkpointer": { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "pod-checkpointer", + }, + }, + }, inactiveCheckpoints: map[string]*v1.Pod{ "kube-system/pod-checkpointer": { ObjectMeta: metav1.ObjectMeta{ @@ -208,7 +250,8 @@ func TestProcess(t *testing.T) { }, }, }, - expectStart: []string{"kube-system/pod-checkpointer"}, + expectStart: []string{"kube-system/pod-checkpointer"}, + expectGraceStart: []string{"kube-system/pod-checkpointer"}, }, { desc: "Running as an on-disk checkpointer: Inactive pod-checkpointer, no local parent, no api parent: should remove in the last", @@ -225,7 +268,8 @@ func TestProcess(t *testing.T) { }, "AA": {}, }, - expectRemove: []string{"AA", "kube-system/pod-checkpointer"}, + expectStop: []string{"AA", "kube-system/pod-checkpointer"}, + expectGraceRemove: []string{"AA", "kube-system/pod-checkpointer"}, }, { desc: "Running as an on-disk checkpointer: Inactive pod-checkpointer, no local parent, no api parent: should remove all", @@ -242,7 +286,8 @@ func TestProcess(t *testing.T) { }, "AA": {}, }, - expectRemove: []string{"AA", "kube-system/pod-checkpointer"}, + expectStop: []string{"AA", "kube-system/pod-checkpointer"}, + expectGraceRemove: []string{"AA", "kube-system/pod-checkpointer"}, }, { desc: "Running as an on-disk checkpointer: Active pod-checkpointer, no local parent, no api parent: should remove all", @@ -259,11 +304,13 @@ func TestProcess(t *testing.T) { }, "AA": {}, }, - expectRemove: []string{"AA", "kube-system/pod-checkpointer"}, + expectStop: []string{"AA", "kube-system/pod-checkpointer"}, + expectGraceRemove: []string{"AA", "kube-system/pod-checkpointer"}, }, } for _, tc := range cases { + // Set up test state. cp := CheckpointerPod{ NodeName: "mynode", PodName: "pod-checkpointer", @@ -272,12 +319,24 @@ func TestProcess(t *testing.T) { if tc.podName != "" { cp.PodName = tc.podName } - gotStart, gotStop, gotRemove := process(tc.localRunning, tc.localParents, tc.apiParents, tc.activeCheckpoints, tc.inactiveCheckpoints, cp) + c := checkpoints{} + + // Run test now. + now := time.Time{} + c.update(tc.localRunning, tc.localParents, tc.apiParents, tc.activeCheckpoints, tc.inactiveCheckpoints, cp) + gotStart, gotStop, gotRemove := c.process(now, tc.apiParents != nil, tc.localRunning, tc.localParents, tc.apiParents) + + // Advance past grace period and test again. + now = now.Add(checkpointGracePeriod) + c.update(tc.localRunning, tc.localParents, tc.apiParents, tc.activeCheckpoints, tc.inactiveCheckpoints, cp) + gotGraceStart, gotGraceStop, gotGraceRemove := c.process(now, tc.apiParents != nil, tc.localRunning, tc.localParents, tc.apiParents) if !reflect.DeepEqual(tc.expectStart, gotStart) || !reflect.DeepEqual(tc.expectStop, gotStop) || - !reflect.DeepEqual(tc.expectRemove, gotRemove) { - t.Errorf("For test: %s\nExpected start: %s Got: %s\nExpected stop: %s Got: %s\nExpected remove: %s Got: %s\n", - tc.desc, tc.expectStart, gotStart, tc.expectStop, gotStop, tc.expectRemove, gotRemove) + !reflect.DeepEqual(tc.expectRemove, gotRemove) || + !reflect.DeepEqual(tc.expectGraceStart, gotGraceStart) || + !reflect.DeepEqual(tc.expectGraceStop, gotGraceStop) || + !reflect.DeepEqual(tc.expectGraceRemove, gotGraceRemove) { + t.Errorf("For test: %s\nExpected start: %s Got: %s\nExpected stop: %s Got: %s\nExpected remove: %s Got: %s\nExpected grace period start: %s Got: %s\nExpected grace period stop: %s Got: %s\nExpected grace period remove: %s Got: %s\n", tc.desc, tc.expectStart, gotStart, tc.expectStop, gotStop, tc.expectRemove, gotRemove, tc.expectGraceStart, gotGraceStart, tc.expectGraceStop, gotGraceStop, tc.expectGraceRemove, gotGraceRemove) } } }