From 1c8370d34ef188e3414b075119044b4bb4ed792b Mon Sep 17 00:00:00 2001 From: Felix Matouschek Date: Wed, 14 Feb 2024 15:17:45 +0100 Subject: [PATCH] feat: Make importer datasource communication explicit Make the communication of datasources in the importer explicit by adding a GetTerminationMessage method to the DataSourceInterface. Then use this method to communicate additional information to the import controller once the importer pod has terminated, instead of writing additional data to the termination message in the Close method of datasources. Signed-off-by: Felix Matouschek --- cmd/cdi-importer/BUILD.bazel | 2 +- cmd/cdi-importer/importer.go | 55 +++++++++++------ pkg/common/BUILD.bazel | 15 ++++- pkg/common/common.go | 33 ++++++++++- pkg/common/common_suite_test.go | 13 ++++ pkg/common/common_test.go | 25 ++++++++ pkg/controller/import-controller.go | 27 ++++++--- pkg/controller/import-controller_test.go | 16 ++--- pkg/controller/util.go | 75 +++++++++--------------- pkg/controller/util_test.go | 4 +- pkg/importer/data-processor.go | 2 + pkg/importer/data-processor_test.go | 6 ++ pkg/importer/gcs-datasource.go | 6 ++ pkg/importer/http-datasource.go | 5 ++ pkg/importer/imageio-datasource.go | 6 ++ pkg/importer/registry-datasource.go | 5 ++ pkg/importer/s3-datasource.go | 6 ++ pkg/importer/upload-datasource.go | 11 ++++ pkg/importer/vddk-datasource_amd64.go | 27 ++++----- pkg/importer/vddk-datasource_arm64.go | 4 ++ pkg/importer/vddk-datasource_test.go | 13 ++++ pkg/uploadserver/uploadserver_test.go | 5 ++ pkg/util/util.go | 6 -- 23 files changed, 255 insertions(+), 112 deletions(-) create mode 100644 pkg/common/common_suite_test.go create mode 100644 pkg/common/common_test.go diff --git a/cmd/cdi-importer/BUILD.bazel b/cmd/cdi-importer/BUILD.bazel index 46c1239bcb..d9bbbcf238 100644 --- a/cmd/cdi-importer/BUILD.bazel +++ b/cmd/cdi-importer/BUILD.bazel @@ -13,10 +13,10 @@ go_library( "//pkg/util:go_default_library", "//pkg/util/prometheus:go_default_library", "//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1:go_default_library", - "//vendor/github.com/pkg/errors:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", + "//vendor/k8s.io/utils/ptr:go_default_library", ], ) diff --git a/cmd/cdi-importer/importer.go b/cmd/cdi-importer/importer.go index 54eaf491da..d6974d359d 100644 --- a/cmd/cdi-importer/importer.go +++ b/cmd/cdi-importer/importer.go @@ -13,6 +13,7 @@ package main // ImporterSecretKey Optional. Secret key is the password to your account. import ( + "errors" "flag" "fmt" "os" @@ -20,11 +21,10 @@ import ( "strings" "time" - "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/klog/v2" + "k8s.io/utils/ptr" cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" "kubevirt.io/containerized-data-importer/pkg/common" @@ -181,28 +181,34 @@ func handleImport( processor := newDataProcessor(contentType, volumeMode, ds, imageSize, filesystemOverhead, preallocation) err := processor.ProcessData() - if err != nil { + if err != nil && !errors.Is(err, importer.ErrRequiresScratchSpace) { klog.Errorf("%+v", err) - if err == importer.ErrRequiresScratchSpace { - if err := util.WriteTerminationMessage(common.ScratchSpaceRequired); err != nil { - klog.Errorf("%+v", err) - } - return common.ScratchSpaceNeededExitCode - } - err = util.WriteTerminationMessage(fmt.Sprintf("Unable to process data: %v", err.Error())) - if err != nil { + if err := util.WriteTerminationMessage(fmt.Sprintf("Unable to process data: %v", err.Error())); err != nil { klog.Errorf("%+v", err) } - return 1 } - touchDoneFile() - // due to the way some data sources can add additional information to termination message - // after finished (ds.close() ) termination message has to be written first, before the - // the ds is closed - // TODO: think about making communication explicit, probably DS interface should be extended - err = importCompleteTerminationMessage(processor.PreallocationApplied()) - if err != nil { + + termMsg := ds.GetTerminationMessage() + if errors.Is(err, importer.ErrRequiresScratchSpace) { + klog.Errorf("%+v", err) + if termMsg == nil { + termMsg = &common.ImporterTerminationMessage{} + } + termMsg.ScratchSpaceRequired = ptr.To(true) + } else if err == nil { + touchDoneFile() + } + + if termMsg != nil { + if err := writeImporterTerminationMessage(termMsg); err != nil { + klog.Errorf("%+v", err) + return 1 + } + return common.ParseTerminationMessageExitCode + } + + if err := importCompleteTerminationMessage(processor.PreallocationApplied()); err != nil { klog.Errorf("%+v", err) return 1 } @@ -210,6 +216,17 @@ func handleImport( return 0 } +func writeImporterTerminationMessage(termMsg *common.ImporterTerminationMessage) error { + msg, err := termMsg.String() + if err != nil { + return err + } + if err := util.WriteTerminationMessage(msg); err != nil { + return err + } + return nil +} + func importCompleteTerminationMessage(preallocationApplied bool) error { message := "Import Complete" if preallocationApplied { diff --git a/pkg/common/BUILD.bazel b/pkg/common/BUILD.bazel index 197807eae7..f074214aa9 100644 --- a/pkg/common/BUILD.bazel +++ b/pkg/common/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -7,3 +7,16 @@ go_library( visibility = ["//visibility:public"], deps = ["//vendor/k8s.io/api/core/v1:go_default_library"], ) + +go_test( + name = "go_default_test", + srcs = [ + "common_suite_test.go", + "common_test.go", + ], + embed = [":go_default_library"], + deps = [ + "//vendor/github.com/onsi/ginkgo/v2:go_default_library", + "//vendor/github.com/onsi/gomega:go_default_library", + ], +) diff --git a/pkg/common/common.go b/pkg/common/common.go index bcb77cf124..3b228384ed 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -1,6 +1,8 @@ package common import ( + "encoding/json" + "fmt" "time" v1 "k8s.io/api/core/v1" @@ -217,8 +219,8 @@ const ( // DefaultResyncPeriod sets a 10 minute resync period, used in the controller pkg and the controller cmd executable DefaultResyncPeriod = 10 * time.Minute - // ScratchSpaceNeededExitCode is the exit code that indicates the importer pod requires scratch space to function properly. - ScratchSpaceNeededExitCode = 42 + // ParseTerminationMessageExitCode is the exit code that indicates the termination message of the importer container contains additional information. + ParseTerminationMessageExitCode = 84 // ScratchNameSuffix (controller pkg only) ScratchNameSuffix = "scratch" @@ -329,3 +331,30 @@ var AsyncUploadFormPaths = []string{ UploadFormAsync, "/v1alpha1/upload-form-async", } + +// VddkInfo holds VDDK version and connection information returned by an importer pod +type VddkInfo struct { + Version string + Host string +} + +// ImporterTerminationMessage contains data to be serialized and used as the termination message of the importer. +type ImporterTerminationMessage struct { + ScratchSpaceRequired *bool `json:"scratchReq,omitempty"` + VddkInfo *VddkInfo `json:"vddkInfo,omitempty"` + AdditionalPVCLabels map[string]string `json:"pvcLabels,omitempty"` +} + +func (it *ImporterTerminationMessage) String() (string, error) { + msg, err := json.Marshal(it) + if err != nil { + return "", err + } + + // Messages longer than 4096 are truncated by kubelet + if length := len(msg); length > 4096 { + return "", fmt.Errorf("Termination message length %d exceeds maximum length of 4096 bytes", length) + } + + return string(msg), nil +} diff --git a/pkg/common/common_suite_test.go b/pkg/common/common_suite_test.go new file mode 100644 index 0000000000..087a1e5857 --- /dev/null +++ b/pkg/common/common_suite_test.go @@ -0,0 +1,13 @@ +package common_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestCommon(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Common Suite") +} diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go new file mode 100644 index 0000000000..4b4410c4c4 --- /dev/null +++ b/pkg/common/common_test.go @@ -0,0 +1,25 @@ +package common + +import ( + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("ImporterTerminationMessage", func() { + It("Should fail if serialized data is longer than 4096 bytes", func() { + const length = 5000 + const serializationOffset = 22 + + termMsg := ImporterTerminationMessage{ + AdditionalPVCLabels: map[string]string{}, + } + for i := 0; i < length-serializationOffset; i++ { + termMsg.AdditionalPVCLabels["t"] += "c" + } + + _, err := termMsg.String() + Expect(err).To(MatchError(fmt.Sprintf("Termination message length %d exceeds maximum length of 4096 bytes", length))) + }) +}) diff --git a/pkg/controller/import-controller.go b/pkg/controller/import-controller.go index d01975a8a9..2b9a26a01b 100644 --- a/pkg/controller/import-controller.go +++ b/pkg/controller/import-controller.go @@ -366,28 +366,39 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p log.V(1).Info("Updating PVC from pod") anno := pvc.GetAnnotations() + setAnnotationsFromPodWithPrefix(anno, pod, cc.AnnRunningCondition) - scratchExitCode := false + var termMsg *common.ImporterTerminationMessage if pod.Status.ContainerStatuses != nil && pod.Status.ContainerStatuses[0].LastTerminationState.Terminated != nil && pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode > 0 { log.Info("Pod termination code", "pod.Name", pod.Name, "ExitCode", pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode) - if pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode == common.ScratchSpaceNeededExitCode { - log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name) - scratchExitCode = true - anno[cc.AnnRequiresScratch] = "true" + if pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode == common.ParseTerminationMessageExitCode { + var err error + termMsg, err = parseTerminationMessage(&pod.Status.ContainerStatuses[0].LastTerminationState) + if err != nil { + log.V(1).Error(err, "failed to parse importer termination message") + return err + } } else { r.recorder.Event(pvc, corev1.EventTypeWarning, ErrImportFailedPVC, pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.Message) } } + scratchRequired := false + if termMsg != nil && termMsg.ScratchSpaceRequired != nil && *termMsg.ScratchSpaceRequired { + log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name) + pvc.GetAnnotations()[cc.AnnRequiresScratch] = "true" + scratchRequired = true + } + if anno[cc.AnnCurrentCheckpoint] != "" { anno[cc.AnnCurrentPodID] = string(pod.ObjectMeta.UID) } anno[cc.AnnImportPod] = string(pod.Name) - if !scratchExitCode { + if !scratchRequired { // No scratch exit code, update the phase based on the pod. If we do have scratch exit code we don't want to update the // phase, because the pod might terminate cleanly and mistakenly mark the import complete. anno[cc.AnnPodPhase] = string(pod.Status.Phase) @@ -421,8 +432,8 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p log.V(1).Info("Updated PVC", "pvc.anno.Phase", anno[cc.AnnPodPhase], "pvc.anno.Restarts", anno[cc.AnnPodRestarts]) } - if cc.IsPVCComplete(pvc) || scratchExitCode { - if !scratchExitCode { + if cc.IsPVCComplete(pvc) || scratchRequired { + if !scratchRequired { r.recorder.Event(pvc, corev1.EventTypeNormal, ImportSucceededPVC, "Import Successful") log.V(1).Info("Import completed successfully") } diff --git a/pkg/controller/import-controller_test.go b/pkg/controller/import-controller_test.go index 945c3c851e..cf95a20dcf 100644 --- a/pkg/controller/import-controller_test.go +++ b/pkg/controller/import-controller_test.go @@ -488,7 +488,7 @@ var _ = Describe("Update PVC from POD", func() { }) // TODO: Update me to stay in progress if we were in progress already, its a pod failure and it will get restarted. - It("Should update phase on PVC, if pod exited with error state that is NOT scratchspace exit", func() { + It("Should update phase on PVC, if pod exited with error state that is NOT parse termination message", func() { pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodRunning)}, nil, corev1.ClaimBound) pod := cc.CreateImporterTestPod(pvc, "testPvc1", nil) pod.Status = corev1.PodStatus{ @@ -531,7 +531,7 @@ var _ = Describe("Update PVC from POD", func() { Expect(resPvc.GetAnnotations()[cc.AnnRunningConditionReason]).To(Equal("Explosion")) }) - It("Should NOT update phase on PVC, if pod exited with error state that is scratchspace exit", func() { + It("Should NOT update phase on PVC, if pod exited with error state that is scratch space required", func() { pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodRunning)}, nil, corev1.ClaimBound) scratchPvcName := &corev1.PersistentVolumeClaim{} scratchPvcName.Name = "testPvc1-scratch" @@ -542,8 +542,8 @@ var _ = Describe("Update PVC from POD", func() { { LastTerminationState: corev1.ContainerState{ Terminated: &corev1.ContainerStateTerminated{ - ExitCode: common.ScratchSpaceNeededExitCode, - Message: "scratch space needed", + ExitCode: common.ParseTerminationMessageExitCode, + Message: `{"scratchReq": true}`, }, }, State: v1.ContainerState{ @@ -649,8 +649,8 @@ var _ = Describe("Update PVC from POD", func() { }, State: v1.ContainerState{ Terminated: &corev1.ContainerStateTerminated{ - ExitCode: 0, - Message: `Import Complete; VDDK: {"Version": "1.0.0", "Host": "esx15.test.lan"}`, + ExitCode: common.ParseTerminationMessageExitCode, + Message: `{"vddkInfo": {"Version": "1.0.0", "Host": "esx15.test.lan"}}`, Reason: "Completed", }, }, @@ -683,8 +683,8 @@ var _ = Describe("Update PVC from POD", func() { { LastTerminationState: corev1.ContainerState{ Terminated: &corev1.ContainerStateTerminated{ - ExitCode: common.ScratchSpaceNeededExitCode, - Message: "", + ExitCode: common.ParseTerminationMessageExitCode, + Message: `{"scratchReq": true}`, }, }, }, diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 6828590fc7..49f778f043 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -20,7 +20,6 @@ import ( "context" "crypto/rsa" "encoding/json" - "regexp" "strconv" "strings" @@ -79,10 +78,6 @@ const ( ClusterWideProxyConfigMapKey = "ca-bundle.crt" ) -var ( - vddkInfoMatch = regexp.MustCompile(`((.*; )|^)VDDK: (?P{.*})`) -) - func checkPVC(pvc *v1.PersistentVolumeClaim, annotation string, log logr.Logger) bool { // check if we have proper annotation if !metav1.HasAnnotation(pvc.ObjectMeta, annotation) { @@ -286,8 +281,18 @@ func setAnnotationsFromPodWithPrefix(anno map[string]string, pod *v1.Pod, prefix if podRestarts >= annPodRestarts { anno[cc.AnnPodRestarts] = strconv.Itoa(podRestarts) } - setVddkAnnotations(anno, pod) + containerState := pod.Status.ContainerStatuses[0].State + termMsg, _ := parseTerminationMessage(&containerState) + if termMsg != nil && termMsg.VddkInfo != nil { + if termMsg.VddkInfo.Host != "" { + anno[cc.AnnVddkHostConnection] = termMsg.VddkInfo.Host + } + if termMsg.VddkInfo.Version != "" { + anno[cc.AnnVddkVersion] = termMsg.VddkInfo.Version + } + } + if containerState.Running != nil { anno[prefix] = "true" anno[prefix+".message"] = "" @@ -298,28 +303,20 @@ func setAnnotationsFromPodWithPrefix(anno map[string]string, pod *v1.Pod, prefix anno[prefix+".message"] = simplifyKnownMessage(containerState.Waiting.Message) anno[prefix+".reason"] = containerState.Waiting.Reason } else if containerState.Terminated != nil { - anno[prefix+".message"] = simplifyKnownMessage(containerState.Terminated.Message) - reason := containerState.Terminated.Reason - if reason == common.GenericError { - reason = handleGenericErrorReason(containerState.Terminated.Message) - } - anno[prefix+".reason"] = reason - if strings.Contains(containerState.Terminated.Message, common.PreallocationApplied) { - anno[cc.AnnPreallocationApplied] = "true" + if termMsg != nil && termMsg.ScratchSpaceRequired != nil && *termMsg.ScratchSpaceRequired { + anno[prefix+".message"] = common.ScratchSpaceRequired + anno[prefix+".reason"] = ScratchSpaceRequiredReason + } else { + anno[prefix+".message"] = simplifyKnownMessage(containerState.Terminated.Message) + anno[prefix+".reason"] = containerState.Terminated.Reason + if strings.Contains(containerState.Terminated.Message, common.PreallocationApplied) { + anno[cc.AnnPreallocationApplied] = "true" + } } } } } -func handleGenericErrorReason(message string) string { - if strings.Contains(message, common.ScratchSpaceRequired) { - // Sometimes the pod will need scratch space to complete some operations. - // Better to add a custom reason instead of a generic container state. - return ScratchSpaceRequiredReason - } - return common.GenericError -} - func simplifyKnownMessage(msg string) string { if strings.Contains(msg, "is larger than the reported available") || strings.Contains(msg, "no space left on device") || @@ -330,33 +327,15 @@ func simplifyKnownMessage(msg string) string { return msg } -func setVddkAnnotations(anno map[string]string, pod *v1.Pod) { - if pod.Status.ContainerStatuses[0].State.Terminated == nil { - return - } - terminationMessage := pod.Status.ContainerStatuses[0].State.Terminated.Message - klog.V(1).Info("Saving VDDK annotations from pod status message: ", "message", terminationMessage) - - var terminationInfo string - matches := vddkInfoMatch.FindAllStringSubmatch(terminationMessage, -1) - for index, matchName := range vddkInfoMatch.SubexpNames() { - if matchName == "info" && len(matches) > 0 { - terminationInfo = matches[0][index] - break - } - } - - var vddkInfo util.VddkInfo - err := json.Unmarshal([]byte(terminationInfo), &vddkInfo) - if err != nil { - return +func parseTerminationMessage(state *v1.ContainerState) (*common.ImporterTerminationMessage, error) { + if state.Terminated == nil || state.Terminated.ExitCode != common.ParseTerminationMessageExitCode { + return nil, nil } - if vddkInfo.Host != "" { - anno[cc.AnnVddkHostConnection] = vddkInfo.Host - } - if vddkInfo.Version != "" { - anno[cc.AnnVddkVersion] = vddkInfo.Version + termMsg := &common.ImporterTerminationMessage{} + if err := json.Unmarshal([]byte(state.Terminated.Message), termMsg); err != nil { + return nil, err } + return termMsg, nil } func setBoundConditionFromPVC(anno map[string]string, prefix string, pvc *v1.PersistentVolumeClaim) { diff --git a/pkg/controller/util_test.go b/pkg/controller/util_test.go index 8423107b88..72540502dd 100644 --- a/pkg/controller/util_test.go +++ b/pkg/controller/util_test.go @@ -222,8 +222,8 @@ var _ = Describe("setAnnotationsFromPod", func() { { State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ - Message: common.ScratchSpaceRequired, - Reason: common.GenericError, + ExitCode: common.ParseTerminationMessageExitCode, + Message: `{"scratchReq": true}`, }, }, }, diff --git a/pkg/importer/data-processor.go b/pkg/importer/data-processor.go index ca7b2e8538..4bc6f2663a 100644 --- a/pkg/importer/data-processor.go +++ b/pkg/importer/data-processor.go @@ -89,6 +89,8 @@ type DataSourceInterface interface { TransferFile(fileName string) (ProcessingPhase, error) // Geturl returns the url that the data processor can use when converting the data. GetURL() *url.URL + // GetTerminationMessage returns data to be serialized and used as the termination message of the importer. + GetTerminationMessage() *common.ImporterTerminationMessage // Close closes any readers or other open resources. Close() error } diff --git a/pkg/importer/data-processor_test.go b/pkg/importer/data-processor_test.go index 1ffe02e9c6..2da1dad331 100644 --- a/pkg/importer/data-processor_test.go +++ b/pkg/importer/data-processor_test.go @@ -12,6 +12,7 @@ import ( "github.com/pkg/errors" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/image" ) @@ -89,6 +90,11 @@ func (m *MockDataProvider) GetURL() *url.URL { return m.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (m *MockDataProvider) GetTerminationMessage() *common.ImporterTerminationMessage { + return nil +} + // Close closes any readers or other open resources. func (m *MockDataProvider) Close() error { return nil diff --git a/pkg/importer/gcs-datasource.go b/pkg/importer/gcs-datasource.go index afcb122710..1fbc6b924f 100644 --- a/pkg/importer/gcs-datasource.go +++ b/pkg/importer/gcs-datasource.go @@ -16,6 +16,7 @@ import ( "k8s.io/klog/v2" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/util" ) @@ -156,6 +157,11 @@ func (sd *GCSDataSource) GetURL() *url.URL { return sd.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (sd *GCSDataSource) GetTerminationMessage() *common.ImporterTerminationMessage { + return nil +} + // Close closes any readers or other open resources. func (sd *GCSDataSource) Close() error { var err error diff --git a/pkg/importer/http-datasource.go b/pkg/importer/http-datasource.go index fd98003d88..51910dbab2 100644 --- a/pkg/importer/http-datasource.go +++ b/pkg/importer/http-datasource.go @@ -194,6 +194,11 @@ func (hs *HTTPDataSource) GetURL() *url.URL { return hs.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (hs *HTTPDataSource) GetTerminationMessage() *common.ImporterTerminationMessage { + return nil +} + // Close all readers. func (hs *HTTPDataSource) Close() error { var err error diff --git a/pkg/importer/imageio-datasource.go b/pkg/importer/imageio-datasource.go index bf937e442f..39a4910dbe 100644 --- a/pkg/importer/imageio-datasource.go +++ b/pkg/importer/imageio-datasource.go @@ -38,6 +38,7 @@ import ( "github.com/pkg/errors" "k8s.io/klog/v2" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/util" ) @@ -193,6 +194,11 @@ func (is *ImageioDataSource) GetURL() *url.URL { return is.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (is *ImageioDataSource) GetTerminationMessage() *common.ImporterTerminationMessage { + return nil +} + // Close all readers. func (is *ImageioDataSource) Close() error { var err error diff --git a/pkg/importer/registry-datasource.go b/pkg/importer/registry-datasource.go index b6ccdba7ec..57a5c4d95c 100644 --- a/pkg/importer/registry-datasource.go +++ b/pkg/importer/registry-datasource.go @@ -120,6 +120,11 @@ func (rd *RegistryDataSource) GetURL() *url.URL { return rd.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (rd *RegistryDataSource) GetTerminationMessage() *common.ImporterTerminationMessage { + return nil +} + // Close closes any readers or other open resources. func (rd *RegistryDataSource) Close() error { // No-op, no open readers diff --git a/pkg/importer/s3-datasource.go b/pkg/importer/s3-datasource.go index f44fdbf1e4..4106d6320a 100644 --- a/pkg/importer/s3-datasource.go +++ b/pkg/importer/s3-datasource.go @@ -17,6 +17,7 @@ import ( "k8s.io/klog/v2" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/util" ) @@ -126,6 +127,11 @@ func (sd *S3DataSource) GetURL() *url.URL { return sd.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (sd *S3DataSource) GetTerminationMessage() *common.ImporterTerminationMessage { + return nil +} + // Close closes any readers or other open resources. func (sd *S3DataSource) Close() error { var err error diff --git a/pkg/importer/upload-datasource.go b/pkg/importer/upload-datasource.go index f4ecc286af..0df3204e37 100644 --- a/pkg/importer/upload-datasource.go +++ b/pkg/importer/upload-datasource.go @@ -9,6 +9,7 @@ import ( "k8s.io/klog/v2" cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/util" ) @@ -107,6 +108,11 @@ func (ud *UploadDataSource) GetURL() *url.URL { return ud.url } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (ud *UploadDataSource) GetTerminationMessage() *common.ImporterTerminationMessage { + return nil +} + // Close closes any readers or other open resources. func (ud *UploadDataSource) Close() error { if ud.stream != nil { @@ -187,6 +193,11 @@ func (aud *AsyncUploadDataSource) GetURL() *url.URL { return aud.uploadDataSource.GetURL() } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (aud *AsyncUploadDataSource) GetTerminationMessage() *common.ImporterTerminationMessage { + return nil +} + // GetResumePhase returns the next phase to process when resuming func (aud *AsyncUploadDataSource) GetResumePhase() ProcessingPhase { return aud.ResumePhase diff --git a/pkg/importer/vddk-datasource_amd64.go b/pkg/importer/vddk-datasource_amd64.go index c3feddf6b6..1950eb380a 100644 --- a/pkg/importer/vddk-datasource_amd64.go +++ b/pkg/importer/vddk-datasource_amd64.go @@ -24,7 +24,6 @@ import ( "bytes" "container/ring" "context" - "encoding/json" "errors" "fmt" "net/url" @@ -941,22 +940,6 @@ func (vs *VDDKDataSource) Info() (ProcessingPhase, error) { // Close closes any readers or other open resources. func (vs *VDDKDataSource) Close() error { - if vddkVersion != "" || vddkHost != "" { - existingbytes, _ := os.ReadFile(common.PodTerminationMessageFile) - existing := string(existingbytes) - if existing != "" { - existing += "; " - } - stopinfo := util.VddkInfo{ - Version: vddkVersion, - Host: vddkHost, - } - stopmsg, _ := json.Marshal(stopinfo) - err := util.WriteTerminationMessage(existing + "VDDK: " + string(stopmsg)) - if err != nil { - klog.Errorf("Unable to write termination message: %v", err) - } - } vs.NbdKit.Handle.Close() return vs.NbdKit.n.KillNbdkit() } @@ -966,6 +949,16 @@ func (vs *VDDKDataSource) GetURL() *url.URL { return vs.NbdKit.Socket } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (vs *VDDKDataSource) GetTerminationMessage() *common.ImporterTerminationMessage { + return &common.ImporterTerminationMessage{ + VddkInfo: &common.VddkInfo{ + Version: vddkVersion, + Host: vddkHost, + }, + } +} + // Transfer is called to transfer the data from the source to the path passed in. func (vs *VDDKDataSource) Transfer(path string) (ProcessingPhase, error) { return ProcessingPhaseTransferDataFile, nil diff --git a/pkg/importer/vddk-datasource_arm64.go b/pkg/importer/vddk-datasource_arm64.go index ae7639b65e..605f7473ee 100644 --- a/pkg/importer/vddk-datasource_arm64.go +++ b/pkg/importer/vddk-datasource_arm64.go @@ -30,6 +30,10 @@ func (V VDDKDataSource) GetURL() *url.URL { panic("not support") } +func (V VDDKDataSource) GetTerminationMessage() *common.ImporterTerminationMessage { + panic("not support") +} + func (V VDDKDataSource) Close() error { panic("not support") } diff --git a/pkg/importer/vddk-datasource_test.go b/pkg/importer/vddk-datasource_test.go index 2e11cd55eb..2a1a404607 100644 --- a/pkg/importer/vddk-datasource_test.go +++ b/pkg/importer/vddk-datasource_test.go @@ -21,6 +21,7 @@ import ( "github.com/vmware/govmomi/vim25/types" v1 "k8s.io/api/core/v1" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/image" libnbd "libguestfs.org/libnbd" ) @@ -432,6 +433,18 @@ var _ = Describe("VDDK data source", func() { Expect(err).ToNot(HaveOccurred()) Expect(MaxPreadLength).To(Equal(uint32(MaxPreadLengthVC))) }) + + It("GetTerminationMessage should contain VDDK connection information", func() { + const testVersion = "testVersion" + const testHost = "testHost" + + source, err := NewVDDKDataSource("http://esx.test", "user", "pass", "aa:bb:cc:dd", "1-2-3-4", "testdisk.vmdk", "", "", "", v1.PersistentVolumeFilesystem) + Expect(err).ToNot(HaveOccurred()) + + vddkVersion = testVersion + vddkHost = testHost + Expect(*source.GetTerminationMessage()).To(Equal(common.ImporterTerminationMessage{VddkInfo: &common.VddkInfo{Version: testVersion, Host: testHost}})) + }) }) var _ = Describe("VDDK log watcher", func() { diff --git a/pkg/uploadserver/uploadserver_test.go b/pkg/uploadserver/uploadserver_test.go index 49ad0f3b5f..e510503765 100644 --- a/pkg/uploadserver/uploadserver_test.go +++ b/pkg/uploadserver/uploadserver_test.go @@ -147,6 +147,11 @@ func (amd *AsyncMockDataSource) GetURL() *url.URL { return nil } +// GetTerminationMessage returns data to be serialized and used as the termination message of the importer. +func (amd *AsyncMockDataSource) GetTerminationMessage() *common.ImporterTerminationMessage { + return nil +} + // GetResumePhase returns the next phase to process when resuming func (amd *AsyncMockDataSource) GetResumePhase() importer.ProcessingPhase { return importer.ProcessingPhaseComplete diff --git a/pkg/util/util.go b/pkg/util/util.go index 904c7719af..be7a7d63c9 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -42,12 +42,6 @@ type CountingReader struct { Done bool } -// VddkInfo holds VDDK version and connection information returned by an importer pod -type VddkInfo struct { - Version string - Host string -} - // RandAlphaNum provides an implementation to generate a random alpha numeric string of the specified length func RandAlphaNum(n int) string { r := rand.New(rand.NewSource(time.Now().UnixNano()))