Skip to content

Commit

Permalink
feat: Make importer datasource communication explicit
Browse files Browse the repository at this point in the history
Make the communication of datasources in the importer explicit by adding
a GetTerminationMessage method to the DataSourceInterface.
Then use this method to communicate additional information to the import
controller once the importer pod has terminated, instead of writing
additional data to the termination message in the Close method of
datasources.

Signed-off-by: Felix Matouschek <fmatouschek@redhat.com>
  • Loading branch information
0xFelix committed Feb 15, 2024
1 parent 221469d commit 1c8370d
Show file tree
Hide file tree
Showing 23 changed files with 255 additions and 112 deletions.
2 changes: 1 addition & 1 deletion cmd/cdi-importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ go_library(
"//pkg/util:go_default_library",
"//pkg/util/prometheus:go_default_library",
"//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/ptr:go_default_library",
],
)

Expand Down
55 changes: 36 additions & 19 deletions cmd/cdi-importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ package main
// ImporterSecretKey Optional. Secret key is the password to your account.

import (
"errors"
"flag"
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/pkg/errors"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"
Expand Down Expand Up @@ -181,35 +181,52 @@ func handleImport(
processor := newDataProcessor(contentType, volumeMode, ds, imageSize, filesystemOverhead, preallocation)
err := processor.ProcessData()

if err != nil {
if err != nil && !errors.Is(err, importer.ErrRequiresScratchSpace) {
klog.Errorf("%+v", err)
if err == importer.ErrRequiresScratchSpace {
if err := util.WriteTerminationMessage(common.ScratchSpaceRequired); err != nil {
klog.Errorf("%+v", err)
}
return common.ScratchSpaceNeededExitCode
}
err = util.WriteTerminationMessage(fmt.Sprintf("Unable to process data: %v", err.Error()))
if err != nil {
if err := util.WriteTerminationMessage(fmt.Sprintf("Unable to process data: %v", err.Error())); err != nil {
klog.Errorf("%+v", err)
}

return 1
}
touchDoneFile()
// due to the way some data sources can add additional information to termination message
// after finished (ds.close() ) termination message has to be written first, before the
// the ds is closed
// TODO: think about making communication explicit, probably DS interface should be extended
err = importCompleteTerminationMessage(processor.PreallocationApplied())
if err != nil {

termMsg := ds.GetTerminationMessage()
if errors.Is(err, importer.ErrRequiresScratchSpace) {
klog.Errorf("%+v", err)
if termMsg == nil {
termMsg = &common.ImporterTerminationMessage{}
}
termMsg.ScratchSpaceRequired = ptr.To(true)
} else if err == nil {
touchDoneFile()
}

if termMsg != nil {
if err := writeImporterTerminationMessage(termMsg); err != nil {
klog.Errorf("%+v", err)
return 1
}
return common.ParseTerminationMessageExitCode
}

if err := importCompleteTerminationMessage(processor.PreallocationApplied()); err != nil {
klog.Errorf("%+v", err)
return 1
}

return 0
}

func writeImporterTerminationMessage(termMsg *common.ImporterTerminationMessage) error {
msg, err := termMsg.String()
if err != nil {
return err
}
if err := util.WriteTerminationMessage(msg); err != nil {
return err
}
return nil
}

func importCompleteTerminationMessage(preallocationApplied bool) error {
message := "Import Complete"
if preallocationApplied {
Expand Down
15 changes: 14 additions & 1 deletion pkg/common/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
Expand All @@ -7,3 +7,16 @@ go_library(
visibility = ["//visibility:public"],
deps = ["//vendor/k8s.io/api/core/v1:go_default_library"],
)

go_test(
name = "go_default_test",
srcs = [
"common_suite_test.go",
"common_test.go",
],
embed = [":go_default_library"],
deps = [
"//vendor/github.com/onsi/ginkgo/v2:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
],
)
33 changes: 31 additions & 2 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package common

import (
"encoding/json"
"fmt"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -217,8 +219,8 @@ const (
// DefaultResyncPeriod sets a 10 minute resync period, used in the controller pkg and the controller cmd executable
DefaultResyncPeriod = 10 * time.Minute

// ScratchSpaceNeededExitCode is the exit code that indicates the importer pod requires scratch space to function properly.
ScratchSpaceNeededExitCode = 42
// ParseTerminationMessageExitCode is the exit code that indicates the termination message of the importer container contains additional information.
ParseTerminationMessageExitCode = 84

// ScratchNameSuffix (controller pkg only)
ScratchNameSuffix = "scratch"
Expand Down Expand Up @@ -329,3 +331,30 @@ var AsyncUploadFormPaths = []string{
UploadFormAsync,
"/v1alpha1/upload-form-async",
}

// VddkInfo holds VDDK version and connection information returned by an importer pod
type VddkInfo struct {
Version string
Host string
}

// ImporterTerminationMessage contains data to be serialized and used as the termination message of the importer.
type ImporterTerminationMessage struct {
ScratchSpaceRequired *bool `json:"scratchReq,omitempty"`
VddkInfo *VddkInfo `json:"vddkInfo,omitempty"`
AdditionalPVCLabels map[string]string `json:"pvcLabels,omitempty"`
}

func (it *ImporterTerminationMessage) String() (string, error) {
msg, err := json.Marshal(it)
if err != nil {
return "", err
}

// Messages longer than 4096 are truncated by kubelet
if length := len(msg); length > 4096 {
return "", fmt.Errorf("Termination message length %d exceeds maximum length of 4096 bytes", length)
}

return string(msg), nil
}
13 changes: 13 additions & 0 deletions pkg/common/common_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package common_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestCommon(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Common Suite")
}
25 changes: 25 additions & 0 deletions pkg/common/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package common

import (
"fmt"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("ImporterTerminationMessage", func() {
It("Should fail if serialized data is longer than 4096 bytes", func() {
const length = 5000
const serializationOffset = 22

termMsg := ImporterTerminationMessage{
AdditionalPVCLabels: map[string]string{},
}
for i := 0; i < length-serializationOffset; i++ {
termMsg.AdditionalPVCLabels["t"] += "c"
}

_, err := termMsg.String()
Expect(err).To(MatchError(fmt.Sprintf("Termination message length %d exceeds maximum length of 4096 bytes", length)))
})
})
27 changes: 19 additions & 8 deletions pkg/controller/import-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,28 +366,39 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p

log.V(1).Info("Updating PVC from pod")
anno := pvc.GetAnnotations()

setAnnotationsFromPodWithPrefix(anno, pod, cc.AnnRunningCondition)

scratchExitCode := false
var termMsg *common.ImporterTerminationMessage
if pod.Status.ContainerStatuses != nil &&
pod.Status.ContainerStatuses[0].LastTerminationState.Terminated != nil &&
pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode > 0 {
log.Info("Pod termination code", "pod.Name", pod.Name, "ExitCode", pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode)
if pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode == common.ScratchSpaceNeededExitCode {
log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name)
scratchExitCode = true
anno[cc.AnnRequiresScratch] = "true"
if pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode == common.ParseTerminationMessageExitCode {
var err error
termMsg, err = parseTerminationMessage(&pod.Status.ContainerStatuses[0].LastTerminationState)
if err != nil {
log.V(1).Error(err, "failed to parse importer termination message")
return err
}
} else {
r.recorder.Event(pvc, corev1.EventTypeWarning, ErrImportFailedPVC, pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.Message)
}
}

scratchRequired := false
if termMsg != nil && termMsg.ScratchSpaceRequired != nil && *termMsg.ScratchSpaceRequired {
log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name)
pvc.GetAnnotations()[cc.AnnRequiresScratch] = "true"
scratchRequired = true
}

if anno[cc.AnnCurrentCheckpoint] != "" {
anno[cc.AnnCurrentPodID] = string(pod.ObjectMeta.UID)
}

anno[cc.AnnImportPod] = string(pod.Name)
if !scratchExitCode {
if !scratchRequired {
// No scratch exit code, update the phase based on the pod. If we do have scratch exit code we don't want to update the
// phase, because the pod might terminate cleanly and mistakenly mark the import complete.
anno[cc.AnnPodPhase] = string(pod.Status.Phase)
Expand Down Expand Up @@ -421,8 +432,8 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p
log.V(1).Info("Updated PVC", "pvc.anno.Phase", anno[cc.AnnPodPhase], "pvc.anno.Restarts", anno[cc.AnnPodRestarts])
}

if cc.IsPVCComplete(pvc) || scratchExitCode {
if !scratchExitCode {
if cc.IsPVCComplete(pvc) || scratchRequired {
if !scratchRequired {
r.recorder.Event(pvc, corev1.EventTypeNormal, ImportSucceededPVC, "Import Successful")
log.V(1).Info("Import completed successfully")
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/import-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ var _ = Describe("Update PVC from POD", func() {
})

// TODO: Update me to stay in progress if we were in progress already, its a pod failure and it will get restarted.
It("Should update phase on PVC, if pod exited with error state that is NOT scratchspace exit", func() {
It("Should update phase on PVC, if pod exited with error state that is NOT parse termination message", func() {
pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodRunning)}, nil, corev1.ClaimBound)
pod := cc.CreateImporterTestPod(pvc, "testPvc1", nil)
pod.Status = corev1.PodStatus{
Expand Down Expand Up @@ -531,7 +531,7 @@ var _ = Describe("Update PVC from POD", func() {
Expect(resPvc.GetAnnotations()[cc.AnnRunningConditionReason]).To(Equal("Explosion"))
})

It("Should NOT update phase on PVC, if pod exited with error state that is scratchspace exit", func() {
It("Should NOT update phase on PVC, if pod exited with error state that is scratch space required", func() {
pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodRunning)}, nil, corev1.ClaimBound)
scratchPvcName := &corev1.PersistentVolumeClaim{}
scratchPvcName.Name = "testPvc1-scratch"
Expand All @@ -542,8 +542,8 @@ var _ = Describe("Update PVC from POD", func() {
{
LastTerminationState: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: common.ScratchSpaceNeededExitCode,
Message: "scratch space needed",
ExitCode: common.ParseTerminationMessageExitCode,
Message: `{"scratchReq": true}`,
},
},
State: v1.ContainerState{
Expand Down Expand Up @@ -649,8 +649,8 @@ var _ = Describe("Update PVC from POD", func() {
},
State: v1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: 0,
Message: `Import Complete; VDDK: {"Version": "1.0.0", "Host": "esx15.test.lan"}`,
ExitCode: common.ParseTerminationMessageExitCode,
Message: `{"vddkInfo": {"Version": "1.0.0", "Host": "esx15.test.lan"}}`,
Reason: "Completed",
},
},
Expand Down Expand Up @@ -683,8 +683,8 @@ var _ = Describe("Update PVC from POD", func() {
{
LastTerminationState: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: common.ScratchSpaceNeededExitCode,
Message: "",
ExitCode: common.ParseTerminationMessageExitCode,
Message: `{"scratchReq": true}`,
},
},
},
Expand Down
Loading

0 comments on commit 1c8370d

Please sign in to comment.