Skip to content
This repository has been archived by the owner on Jul 30, 2021. It is now read-only.

Commit

Permalink
checkpointer: implement state machine.
Browse files Browse the repository at this point in the history
This implements an explicit state machine to capture the various states
a checkpoint can be in. It recovers those states from disk upon startup
(if applicable) and then reconciles them with the states that are
fetched from the various apiservers.

It also adds 2 new states to attempt to work around issues with
kubernetes 1.8 in which daemonset pods are deleted before being
recreated. For single-master clusters this can lead to a total outage
during apiserver upgrades since the checkpointer will aggressively
retire the checkpoint for the deleted apiserver pod before it has
a chance to schedule the new one.
  • Loading branch information
diegs committed Oct 31, 2017
1 parent d3f812d commit b9d2b16
Show file tree
Hide file tree
Showing 5 changed files with 599 additions and 137 deletions.
7 changes: 4 additions & 3 deletions pkg/checkpoint/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
)

// getAPIParentPods will retrieve all pods from apiserver that are parents & should be checkpointed
func (c *checkpointer) getAPIParentPods(nodeName string) map[string]*v1.Pod {
// Returns false if we could not contact the apiserver.
func (c *checkpointer) getAPIParentPods(nodeName string) (bool, map[string]*v1.Pod) {
opts := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName).String(),
}

podList, err := c.apiserver.CoreV1().Pods(api.NamespaceAll).List(opts)
if err != nil {
glog.Warningf("Unable to contact APIServer, skipping garbage collection: %v", err)
return nil
return false, nil
}
return podListToParentPods(podList)
return true, podListToParentPods(podList)
}
27 changes: 19 additions & 8 deletions pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ package checkpoint

import (
"fmt"
"os"
"time"

"github.com/golang/glog"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
)
Expand All @@ -23,12 +25,14 @@ const (
shouldCheckpoint = "true"
podSourceFile = "file"

defaultPollingFrequency = 3 * time.Second
defaultCheckpointTimeout = 1 * time.Minute
defaultPollingFrequency = 5 * time.Second
defaultCheckpointTimeout = 1 * time.Minute
defaultCheckpointGracePeriod = 1 * time.Minute
)

var (
lastCheckpoint time.Time
lastCheckpoint time.Time
checkpointGracePeriod = defaultCheckpointGracePeriod
)

// Options defines the parameters that are required to start the checkpointer.
Expand Down Expand Up @@ -90,6 +94,12 @@ func Run(opts Options) error {

// run is the main checkpointing loop.
func (c *checkpointer) run() {
// Make sure the inactive checkpoint path exists.
if err := os.MkdirAll(inactiveCheckpointPath, 0600); err != nil {
glog.Fatalf("Could not create inactive checkpoint path: %v", err)
}

cs := checkpoints{}
for {
time.Sleep(defaultPollingFrequency)

Expand All @@ -101,20 +111,21 @@ func (c *checkpointer) run() {
localParentPods := c.kubelet.localParentPods()
localRunningPods := c.cri.localRunningPods()

c.createCheckpointsForValidParents(localParentPods)

// Try to get scheduled pods from the apiserver.
// These will be used to GC checkpoints for parents no longer scheduled to this node.
// A return value of nil is assumed to be "could not contact apiserver"
// TODO(aaron): only check this every 30 seconds or so
apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName)
apiAvailable, apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName)

// Get on disk copies of (in)active checkpoints
//TODO(aaron): Could be racy to load from disk each time, but much easier than trying to keep in-memory state in sync.
activeCheckpoints := getFileCheckpoints(activeCheckpointPath)
inactiveCheckpoints := getFileCheckpoints(inactiveCheckpointPath)

start, stop, remove := process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod)
// Update checkpoints using the latest information from the APIs.
cs.update(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod)

// Update checkpoint states and determine which checkpoints to start, stop, or remove.
start, stop, remove := cs.process(time.Now(), apiAvailable, localRunningPods, localParentPods, apiParentPods)

// Handle remove at last because we may still have some work to do
// before removing the checkpointer itself.
Expand Down
4 changes: 0 additions & 4 deletions pkg/checkpoint/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ func writeCheckpointManifest(pod *v1.Pod) (bool, error) {
return false, err
}
path := filepath.Join(inactiveCheckpointPath, pod.Namespace+"-"+pod.Name+".json")
// Make sure the inactive checkpoint path exists.
if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
return false, err
}
return writeManifestIfDifferent(path, podFullName(pod), buff.Bytes())
}

Expand Down
Loading

0 comments on commit b9d2b16

Please sign in to comment.