Skip to content
This repository has been archived by the owner on Jul 30, 2021. It is now read-only.

Commit

Permalink
checkpointer: implement state machine.
Browse files Browse the repository at this point in the history
Open TODOs:

- Specify and validate allowed state transitions.
- Add test coverage for grace period.
  • Loading branch information
diegs committed Oct 28, 2017
1 parent 6e1fa08 commit 1c1757d
Show file tree
Hide file tree
Showing 7 changed files with 488 additions and 174 deletions.
7 changes: 4 additions & 3 deletions pkg/checkpoint/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
)

// getAPIParentPods will retrieve all pods from apiserver that are parents & should be checkpointed
func (c *checkpointer) getAPIParentPods(nodeName string) map[string]*v1.Pod {
// Returns false if we could not contact the apiserver.
func (c *checkpointer) getAPIParentPods(nodeName string) (bool, map[string]*v1.Pod) {
opts := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName).String(),
}

podList, err := c.apiserver.CoreV1().Pods(api.NamespaceAll).List(opts)
if err != nil {
glog.Warningf("Unable to contact APIServer, skipping garbage collection: %v", err)
return nil
return false, nil
}
return podListToParentPods(podList)
return true, podListToParentPods(podList)
}
21 changes: 13 additions & 8 deletions pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ const (
shouldCheckpoint = "true"
podSourceFile = "file"

defaultPollingFrequency = 3 * time.Second
defaultCheckpointTimeout = 1 * time.Minute
defaultPollingFrequency = 3 * time.Second
defaultCheckpointTimeout = 1 * time.Minute
defaultCheckpointGracePeriod = 1 * time.Minute
)

var (
lastCheckpoint time.Time
lastCheckpoint time.Time
checkpointGracePeriod = defaultCheckpointGracePeriod
)

// Options defines the parameters that are required to start the checkpointer.
Expand Down Expand Up @@ -90,6 +92,8 @@ func Run(opts Options) error {

// run is the main checkpointing loop.
func (c *checkpointer) run() {
cs := checkpoints{}

for {
time.Sleep(defaultPollingFrequency)

Expand All @@ -101,20 +105,21 @@ func (c *checkpointer) run() {
localParentPods := c.kubelet.localParentPods()
localRunningPods := c.cri.localRunningPods()

c.createCheckpointsForValidParents(localParentPods)

// Try to get scheduled pods from the apiserver.
// These will be used to GC checkpoints for parents no longer scheduled to this node.
// A return value of nil is assumed to be "could not contact apiserver"
// TODO(aaron): only check this every 30 seconds or so
apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName)
apiAvailable, apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName)

// Get on disk copies of (in)active checkpoints
//TODO(aaron): Could be racy to load from disk each time, but much easier than trying to keep in-memory state in sync.
activeCheckpoints := getFileCheckpoints(activeCheckpointPath)
inactiveCheckpoints := getFileCheckpoints(inactiveCheckpointPath)

start, stop, remove := process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod)
// Update checkpoints using the latest information from the APIs.
cs.update(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod)

// Update checkpoint states and determine which checkpoints to start, stop, or remove.
start, stop, remove := cs.process(time.Now(), apiAvailable, localRunningPods, localParentPods, apiParentPods)

// Handle remove at last because we may still have some work to do
// before removing the checkpointer itself.
Expand Down
8 changes: 8 additions & 0 deletions pkg/checkpoint/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,11 @@ func writeManifestIfDifferent(path, name string, data []byte) (bool, error) {
glog.Infof("Writing manifest for %q to %q", name, path)
return true, writeAndAtomicRename(path, data, 0644)
}

func writeAndAtomicRename(path string, data []byte, perm os.FileMode) error {
tmpfile := filepath.Join(filepath.Dir(path), "."+filepath.Base(path))
if err := ioutil.WriteFile(tmpfile, data, perm); err != nil {
return err
}
return os.Rename(tmpfile, path)
}
10 changes: 0 additions & 10 deletions pkg/checkpoint/pod.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package checkpoint

import (
"io/ioutil"
"os"
"path/filepath"
"strings"

Expand Down Expand Up @@ -163,11 +161,3 @@ func podFullNameToInactiveCheckpointPath(id string) string {
func podFullNameToActiveCheckpointPath(id string) string {
return filepath.Join(activeCheckpointPath, strings.Replace(id, "/", "-", -1)+".json")
}

func writeAndAtomicRename(path string, data []byte, perm os.FileMode) error {
tmpfile := filepath.Join(filepath.Dir(path), "."+filepath.Base(path))
if err := ioutil.WriteFile(tmpfile, data, perm); err != nil {
return err
}
return os.Rename(tmpfile, path)
}
Loading

0 comments on commit 1c1757d

Please sign in to comment.