Skip to content
This repository has been archived by the owner on Jul 30, 2021. It is now read-only.

checkpointer: implement state machine. #755

Merged
merged 3 commits into from
Nov 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/checkpoint/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
)

// getAPIParentPods will retrieve all pods from apiserver that are parents & should be checkpointed
func (c *checkpointer) getAPIParentPods(nodeName string) map[string]*v1.Pod {
// Returns false if we could not contact the apiserver.
func (c *checkpointer) getAPIParentPods(nodeName string) (bool, map[string]*v1.Pod) {
opts := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName).String(),
}

podList, err := c.apiserver.CoreV1().Pods(api.NamespaceAll).List(opts)
if err != nil {
glog.Warningf("Unable to contact APIServer, skipping garbage collection: %v", err)
return nil
return false, nil
}
return podListToParentPods(podList)
return true, podListToParentPods(podList)
}
30 changes: 22 additions & 8 deletions pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ package checkpoint

import (
"fmt"
"os"
"time"

"github.com/golang/glog"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
)
Expand All @@ -23,12 +25,14 @@ const (
shouldCheckpoint = "true"
podSourceFile = "file"

defaultPollingFrequency = 3 * time.Second
defaultCheckpointTimeout = 1 * time.Minute
defaultPollingFrequency = 5 * time.Second
defaultCheckpointTimeout = 1 * time.Minute
defaultCheckpointGracePeriod = 1 * time.Minute
)

var (
lastCheckpoint time.Time
lastCheckpoint time.Time
checkpointGracePeriod = defaultCheckpointGracePeriod
)

// Options defines the parameters that are required to start the checkpointer.
Expand Down Expand Up @@ -59,6 +63,7 @@ type checkpointer struct {
kubelet *kubeletClient
cri *remoteRuntimeService
checkpointerPod CheckpointerPod
checkpoints checkpoints
}

// Run instantiates and starts a new checkpointer. Returns error if there was a problem creating
Expand Down Expand Up @@ -90,6 +95,11 @@ func Run(opts Options) error {

// run is the main checkpointing loop.
func (c *checkpointer) run() {
// Make sure the inactive checkpoint path exists.
if err := os.MkdirAll(inactiveCheckpointPath, 0700); err != nil {
glog.Fatalf("Could not create inactive checkpoint path: %v", err)
}

for {
time.Sleep(defaultPollingFrequency)

Expand All @@ -101,20 +111,24 @@ func (c *checkpointer) run() {
localParentPods := c.kubelet.localParentPods()
localRunningPods := c.cri.localRunningPods()

c.createCheckpointsForValidParents(localParentPods)

// Try to get scheduled pods from the apiserver.
// These will be used to GC checkpoints for parents no longer scheduled to this node.
// A return value of nil is assumed to be "could not contact apiserver"
// TODO(aaron): only check this every 30 seconds or so
apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName)
apiAvailable, apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName)

// Get on disk copies of (in)active checkpoints
//TODO(aaron): Could be racy to load from disk each time, but much easier than trying to keep in-memory state in sync.
activeCheckpoints := getFileCheckpoints(activeCheckpointPath)
inactiveCheckpoints := getFileCheckpoints(inactiveCheckpointPath)

start, stop, remove := process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod)
// Update checkpoints using the latest information from the APIs.
c.checkpoints.update(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod)

// Update on-disk manifests based on updated checkpoint state.
c.createCheckpointsForValidParents()

// Update checkpoint states and determine which checkpoints to start, stop, or remove.
start, stop, remove := c.checkpoints.process(time.Now(), apiAvailable, localRunningPods, localParentPods, apiParentPods)

// Handle remove at last because we may still have some work to do
// before removing the checkpointer itself.
Expand Down
33 changes: 29 additions & 4 deletions pkg/checkpoint/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -23,6 +24,16 @@ func getFileCheckpoints(path string) map[string]*v1.Pod {

for _, f := range fi {
manifest := filepath.Join(path, f.Name())

// Check for leftover temporary checkpoints.
if strings.HasPrefix(filepath.Base(manifest), ".") {
glog.V(4).Infof("Found temporary checkpoint %s, removing.", manifest)
if err := os.Remove(manifest); err != nil {
glog.V(4).Infof("Error removing temporary checkpoint %s: %v.", manifest, err)
}
continue
}

b, err := ioutil.ReadFile(manifest)
if err != nil {
glog.Errorf("Error reading manifest: %v", err)
Expand Down Expand Up @@ -52,10 +63,6 @@ func writeCheckpointManifest(pod *v1.Pod) (bool, error) {
return false, err
}
path := filepath.Join(inactiveCheckpointPath, pod.Namespace+"-"+pod.Name+".json")
// Make sure the inactive checkpoint path exists.
if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
return false, err
}
return writeManifestIfDifferent(path, podFullName(pod), buff.Bytes())
}

Expand All @@ -73,3 +80,21 @@ func writeManifestIfDifferent(path, name string, data []byte) (bool, error) {
glog.Infof("Writing manifest for %q to %q", name, path)
return true, writeAndAtomicRename(path, data, 0644)
}

func writeAndAtomicRename(path string, data []byte, perm os.FileMode) error {
// Ensure that the temporary file is on the same filesystem so that os.Rename() does not error.
tmpfile, err := ioutil.TempFile(filepath.Dir(path), ".")
if err != nil {
return err
}
if _, err := tmpfile.Write(data); err != nil {
return err
}
if err := tmpfile.Close(); err != nil {
return err
}
if err := tmpfile.Chmod(perm); err != nil {
return err
}
return os.Rename(tmpfile.Name(), path)
}
10 changes: 0 additions & 10 deletions pkg/checkpoint/pod.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package checkpoint

import (
"io/ioutil"
"os"
"path/filepath"
"strings"

Expand Down Expand Up @@ -163,11 +161,3 @@ func podFullNameToInactiveCheckpointPath(id string) string {
func podFullNameToActiveCheckpointPath(id string) string {
return filepath.Join(activeCheckpointPath, strings.Replace(id, "/", "-", -1)+".json")
}

func writeAndAtomicRename(path string, data []byte, perm os.FileMode) error {
tmpfile := filepath.Join(filepath.Dir(path), "."+filepath.Base(path))
if err := ioutil.WriteFile(tmpfile, data, perm); err != nil {
return err
}
return os.Rename(tmpfile, path)
}
Loading