Skip to content

Commit

Permalink
Allow the usage of AnnPodRetainAfterCompletion with populators
Browse files Browse the repository at this point in the history
This annotation causes CDI transfer pods (importer, uploader, cloner) to be retained after a successful or failed completion.

This makes debugging and testing easier, as users can get the pod state and logs after completion.

Signed-off-by: Alvaro Romero <alromero@redhat.com>
  • Loading branch information
alromeros committed Aug 29, 2023
1 parent 67616b8 commit f7d44bc
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 25 deletions.
6 changes: 0 additions & 6 deletions pkg/controller/datavolume/controller-base.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,12 +1177,6 @@ func (r *ReconcilerBase) shouldUseCDIPopulator(syncState *dvSyncState) (bool, er
return boolUsePopulator, nil
}
log := r.log.WithValues("DataVolume", dv.Name, "Namespace", dv.Namespace)
// currently populators don't support retain pod annotation so don't use populators in that case
if retain := dv.Annotations[cc.AnnPodRetainAfterCompletion]; retain == "true" {
log.Info("Not using CDI populators, currently we don't support populators with retainAfterCompletion annotation")
return false, nil
}

usePopulator, err := storageClassCSIDriverExists(r.client, r.log, syncState.pvcSpec.StorageClassName)
if err != nil {
return false, err
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/datavolume/import-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,17 +1573,20 @@ var _ = Describe("All DataVolume Tests", func() {
dv := createDataVolumeWithStorageAPI("test-dv", metav1.NamespaceDefault, httpSource, storageSpec)
AddAnnotation(dv, annotation, value)

reconciler = createImportReconciler()
reconciler = createImportReconciler(sc, csiDriver)
syncState := dvSyncState{
dvMutated: dv,
pvcSpec: &corev1.PersistentVolumeClaimSpec{
StorageClassName: &scName,
},
}
usePopulator, err := reconciler.shouldUseCDIPopulator(&syncState)
Expect(err).ToNot(HaveOccurred())
Expect(usePopulator).To(Equal(expected))
},
Entry("AnnUsePopulator=true return true", AnnUsePopulator, "true", true),
Entry("AnnUsePopulator=false return false", AnnUsePopulator, "false", false),
Entry("AnnPodRetainAfterCompletion return false", AnnPodRetainAfterCompletion, "true", false),
Entry("AnnPodRetainAfterCompletion return true", AnnPodRetainAfterCompletion, "true", true),
)

It("Should return true if storage class has wffc bindingMode and honorWaitForFirstConsumer feature gate is disabled", func() {
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/populators/clone-populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,13 @@ func (r *ClonePopulatorReconciler) validateCrossNamespace(pvc *corev1.Persistent

func (r *ClonePopulatorReconciler) reconcileDone(ctx context.Context, log logr.Logger, pvc *corev1.PersistentVolumeClaim) (reconcile.Result, error) {
log.V(3).Info("executing cleanup")
if err := r.planner.Cleanup(ctx, log, pvc); err != nil {
return reconcile.Result{}, err
if pvc.Annotations[cc.AnnPodRetainAfterCompletion] == "true" {
// Avoiding cleanup so we can keep clone objects for debugging purposes.
r.recorder.Eventf(pvc, corev1.EventTypeWarning, retainedPVCPrime, messageRetainedPVCPrime)
} else {
if err := r.planner.Cleanup(ctx, log, pvc); err != nil {
return reconcile.Result{}, err
}
}

log.V(1).Info("removing finalizer")
Expand Down
28 changes: 28 additions & 0 deletions pkg/controller/populators/import-populator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,36 @@ var _ = Describe("Import populator tests", func() {
Entry("pod network is passed", AnnPodNetwork, "test", "test"),
Entry("side car injection is passed", AnnPodSidecarInjection, AnnPodSidecarInjectionDefault, AnnPodSidecarInjectionDefault),
Entry("multus default network is passed", AnnPodMultusDefaultNetwork, "test", "test"),
Entry("retain pod annotation is passed", AnnPodRetainAfterCompletion, "true", "true"),
)

It("should trigger appropriate event when using AnnPodRetainAfterCompletion", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &sc.Name,
map[string]string{AnnPodPhase: string(corev1.PodSucceeded)}, nil, corev1.ClaimPending)
targetPvc.Spec.DataSourceRef = dataSourceRef
targetPvc.Spec.VolumeName = "pv"
volumeImportSource := getVolumeImportSource(true, metav1.NamespaceDefault)
pvcPrime := getPVCPrime(targetPvc, nil)
pvcPrime.Annotations = map[string]string{AnnPodRetainAfterCompletion: "true"}

By("Reconcile")
reconciler = createImportPopulatorReconciler(targetPvc, pvcPrime, volumeImportSource, sc)
result, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: targetPvcName, Namespace: metav1.NamespaceDefault}})
Expect(err).To(Not(HaveOccurred()))
Expect(result).To(Not(BeNil()))

By("Checking events recorded")
close(reconciler.recorder.(*record.FakeRecorder).Events)
found := false
for event := range reconciler.recorder.(*record.FakeRecorder).Events {
if strings.Contains(event, retainedPVCPrime) {
found = true
}
}
reconciler.recorder = nil
Expect(found).To(BeTrue())
})

It("shouldn't error when reconciling PVC with non-import DataSourceRef", func() {
targetPvc := CreatePvcInStorageClass(targetPvcName, metav1.NamespaceDefault, &sc.Name, nil, nil, corev1.ClaimBound)
targetPvc.Spec.DataSourceRef = &corev1.TypedObjectReference{
Expand Down
12 changes: 10 additions & 2 deletions pkg/controller/populators/populator-base.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ func (r *ReconcilerBase) createPVCPrime(pvc *corev1.PersistentVolumeClaim, sourc
if waitForFirstConsumer {
annotations[cc.AnnSelectedNode] = pvc.Annotations[cc.AnnSelectedNode]
}
if _, ok := pvc.Annotations[cc.AnnPodRetainAfterCompletion]; ok {
annotations[cc.AnnPodRetainAfterCompletion] = pvc.Annotations[cc.AnnPodRetainAfterCompletion]
}

// Assemble PVC' spec
pvcPrime := &corev1.PersistentVolumeClaim{
Expand Down Expand Up @@ -328,8 +331,13 @@ func (r *ReconcilerBase) reconcileCommon(pvc *corev1.PersistentVolumeClaim, popu

func (r *ReconcilerBase) reconcileCleanup(pvcPrime *corev1.PersistentVolumeClaim) (reconcile.Result, error) {
if pvcPrime != nil {
if err := r.client.Delete(context.TODO(), pvcPrime); err != nil {
return reconcile.Result{}, err
if pvcPrime.Annotations[cc.AnnPodRetainAfterCompletion] == "true" {
// Retaining PVC' in Lost state. We can then keep the pod for debugging purposes.
r.recorder.Eventf(pvcPrime, corev1.EventTypeWarning, retainedPVCPrime, messageRetainedPVCPrime)
} else {
if err := r.client.Delete(context.TODO(), pvcPrime); err != nil {
return reconcile.Result{}, err
}
}
}
return reconcile.Result{}, nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/populators/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
// messageCreatedPVCPrimeSuccessfully provides a const to indicate we created PVC prime for population (message)
messageCreatedPVCPrimeSuccessfully = "PVC Prime created successfully"

// retainedPVCPrime provides a const to indicate that the PVC prime has been retained in lost state (reason)
retainedPVCPrime = "RetainedPVCPrime"
// messageRetainedPVCPrime provides a const to indicate that the PVC prime has been retained in lost state (message)
messageRetainedPVCPrime = "PVC Prime retained in Lost state for debugging purposes"

// AnnPVCPrimeName annotation is the name of the PVC' that is added to the target PVC
// used by the upload-proxy in order to get the service name
AnnPVCPrimeName = cc.AnnAPIGroup + "/storage.populator.pvcPrime"
Expand Down
13 changes: 12 additions & 1 deletion tests/cloner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
. "github.com/onsi/gomega"

snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -954,6 +955,16 @@ var _ = Describe("all clone tests", func() {
targetDvs = nil
})

getClonerPodName := func(pvc *corev1.PersistentVolumeClaim) string {
usedPvc := pvc
if pvc.Spec.DataSourceRef != nil {
pvcPrime, err := utils.WaitForPVC(f.K8sClient, pvc.Namespace, fmt.Sprintf("tmp-pvc-%s", string(pvc.UID)))
Expect(err).ToNot(HaveOccurred())
usedPvc = pvcPrime
}
return controller.CreateCloneSourcePodName(usedPvc)
}

It("[rfe_id:1277][test_id:1899][crit:High][vendor:cnv-qe@redhat.com][level:component] Should allow multiple cloning operations in parallel", func() {
const NumOfClones int = 3

Expand Down Expand Up @@ -1002,7 +1013,7 @@ var _ = Describe("all clone tests", func() {
for _, dv := range targetDvs {
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(dv.Namespace).Get(context.TODO(), dv.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
clonerPodName := controller.CreateCloneSourcePodName(pvc)
clonerPodName := getClonerPodName(pvc)
cloner, err := f.K8sClient.CoreV1().Pods(dv.Namespace).Get(context.TODO(), clonerPodName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
restartCount := cloner.Status.ContainerStatuses[0].RestartCount
Expand Down
31 changes: 21 additions & 10 deletions tests/import_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"kubevirt.io/containerized-data-importer/pkg/common"
cont "kubevirt.io/containerized-data-importer/pkg/controller"
controller "kubevirt.io/containerized-data-importer/pkg/controller/common"
"kubevirt.io/containerized-data-importer/pkg/controller/populators"
"kubevirt.io/containerized-data-importer/tests/framework"
"kubevirt.io/containerized-data-importer/tests/utils"

Expand Down Expand Up @@ -129,22 +130,30 @@ var _ = Describe("Import Proxy tests", func() {
}, 30*time.Second, time.Second).Should(BeTrue())
})

verifyImportProxyConfigMap := func(pvcName string) {
getPVCNameForConfigMap := func(pvc *corev1.PersistentVolumeClaim) string {
if pvc.Spec.DataSourceRef != nil {
return populators.PVCPrimeName(pvc)
}
return pvc.Name
}

verifyImportProxyConfigMap := func(pvc *corev1.PersistentVolumeClaim) {
By("Verify import proxy ConfigMap copied to the import namespace")
trustedCAProxy := cont.GetImportProxyConfigMapName(pvcName)
trustedCAProxy := cont.GetImportProxyConfigMapName(getPVCNameForConfigMap(pvc))
Eventually(func() error {
_, err := f.K8sClient.CoreV1().ConfigMaps(f.Namespace.Name).Get(context.TODO(), trustedCAProxy, metav1.GetOptions{})
return err
}, time.Second*60, time.Second).Should(BeNil())
}

verifyImportProxyConfigMapIsDeletedOnPodDeletion := func(pvcName string) {
verifyImportProxyConfigMapIsDeletedOnPodDeletion := func(pvc *corev1.PersistentVolumeClaim) {
By("Verify import proxy ConfigMap is deleted from import namespace on importer pod deletion")
pvcName := pvc.Name
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Get(context.TODO(), pvcName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
err = utils.DeletePodByName(f.K8sClient, pvc.Annotations[controller.AnnImportPod], f.Namespace.Name, nil)
Expect(err).ToNot(HaveOccurred())
trustedCAProxy := cont.GetImportProxyConfigMapName(pvcName)
trustedCAProxy := cont.GetImportProxyConfigMapName(getPVCNameForConfigMap(pvc))
Eventually(func() bool {
_, err := f.K8sClient.CoreV1().ConfigMaps(f.Namespace.Name).Get(context.TODO(), trustedCAProxy, metav1.GetOptions{})
return k8serrors.IsNotFound(err)
Expand Down Expand Up @@ -178,15 +187,15 @@ var _ = Describe("Import Proxy tests", func() {
pvc, err := utils.WaitForPVC(f.K8sClient, dataVolume.Namespace, dvName)
Expect(err).ToNot(HaveOccurred())
f.ForceBindIfWaitForFirstConsumer(pvc)
verifyImportProxyConfigMap(dvName)
verifyImportProxyConfigMap(pvc)
By(fmt.Sprintf("Waiting for datavolume to match phase %s", string(cdiv1.Succeeded)))
err = utils.WaitForDataVolumePhase(f, f.Namespace.Name, cdiv1.Succeeded, dv.Name)
Expect(err).ToNot(HaveOccurred())

By("Checking the importer pod information in the proxy log to verify if the requests were proxied")
verifyImporterPodInfoInProxyLogs(f, imgURL, args.userAgent, now, args.expected)

verifyImportProxyConfigMapIsDeletedOnPodDeletion(dvName)
verifyImportProxyConfigMapIsDeletedOnPodDeletion(pvc)
},
Entry("succeed creating import dv with a proxied server (http)", importProxyTestArguments{
name: "dv-import-http-proxy",
Expand Down Expand Up @@ -333,15 +342,15 @@ var _ = Describe("Import Proxy tests", func() {
pvc, err := utils.WaitForPVC(f.K8sClient, dv.Namespace, dv.Name)
Expect(err).ToNot(HaveOccurred())
f.ForceBindIfWaitForFirstConsumer(pvc)
verifyImportProxyConfigMap(dvName)
verifyImportProxyConfigMap(pvc)
By(fmt.Sprintf("Waiting for datavolume to match phase %s", string(cdiv1.Succeeded)))
err = utils.WaitForDataVolumePhase(f, f.Namespace.Name, cdiv1.Succeeded, dv.Name)
Expect(err).ToNot(HaveOccurred())

By("Checking the importer pod information in the proxy log to verify if the requests were proxied")
verifyImporterPodInfoInProxyLogs(f, *dv.Spec.Source.Registry.URL, registryUserAgent, now, BeTrue)

verifyImportProxyConfigMapIsDeletedOnPodDeletion(dvName)
verifyImportProxyConfigMapIsDeletedOnPodDeletion(pvc)
},
Entry("with http proxy, no auth", false, false),
Entry("with http proxy, auth", false, true),
Expand Down Expand Up @@ -421,7 +430,9 @@ var _ = Describe("Import Proxy tests", func() {
return dvName
}, timeout, pollingInterval).ShouldNot(BeEmpty())

verifyImportProxyConfigMap(dvName)
pvc, err := utils.WaitForPVC(f.K8sClient, ns, dvName)
Expect(err).ToNot(HaveOccurred())
verifyImportProxyConfigMap(pvc)

By("Wait for DataImportCron UpToDate")
Eventually(func() bool {
Expand All @@ -436,7 +447,7 @@ var _ = Describe("Import Proxy tests", func() {
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, dvName)
Expect(err).ToNot(HaveOccurred())

verifyImportProxyConfigMapIsDeletedOnPodDeletion(dvName)
verifyImportProxyConfigMapIsDeletedOnPodDeletion(pvc)
},
Entry("with http proxy, no auth", false, false),
Entry("with http proxy, auth", false, true),
Expand Down
57 changes: 55 additions & 2 deletions tests/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ var _ = Describe("[rfe_id:1115][crit:high][vendor:cnv-qe@redhat.com][level:compo

By("Find importer pods after completion")
for _, checkpoint := range dataVolume.Spec.Checkpoints {
name := fmt.Sprintf("%s-%s-checkpoint-%s", common.ImporterPodName, dataVolume.Name, checkpoint.Current)
pvcName := dataVolume.Name
// When using populators, the PVC Prime name is used to build the importer pod
if pvc.Spec.DataSourceRef != nil {
pvcName = populators.PVCPrimeName(pvc)
}
name := fmt.Sprintf("%s-%s-checkpoint-%s", common.ImporterPodName, pvcName, checkpoint.Current)
By("Find importer pod " + name)
importer, err := utils.FindPodByPrefixOnce(f.K8sClient, dataVolume.Namespace, name, common.CDILabelSelector)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -1902,7 +1907,7 @@ var _ = Describe("Import populator", func() {
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
})

It("Should update volumeImportSource accordingly when doind a multi-stage import", func() {
It("Should update volumeImportSource accordingly when doing a multi-stage import", func() {
vcenterURL := fmt.Sprintf(utils.VcenterURL, f.CdiInstallNs)
dataVolume := f.CreateVddkWarmImportDataVolume("multi-stage-import-test", "100Mi", vcenterURL)

Expand Down Expand Up @@ -2021,6 +2026,54 @@ var _ = Describe("Import populator", func() {
return err != nil && k8serrors.IsNotFound(err)
}, timeout, pollingInterval).Should(BeTrue())
})

It("should retain PVC Prime and importer pod with AnnPodRetainAfterCompletion", func() {
dataVolume := utils.NewDataVolumeWithHTTPImport("import-dv", "100Mi", fmt.Sprintf(utils.TinyCoreIsoURL, f.CdiInstallNs))
dataVolume.Annotations[controller.AnnPodRetainAfterCompletion] = "true"
dv, err := utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, dataVolume)
Expect(err).ToNot(HaveOccurred())

pvc, err = utils.WaitForPVC(f.K8sClient, dv.Namespace, dv.Name)
Expect(err).ToNot(HaveOccurred())
f.ForceBindIfWaitForFirstConsumer(pvc)

By("Verify PVC prime was created")
pvcPrime, err = utils.WaitForPVC(f.K8sClient, pvc.Namespace, populators.PVCPrimeName(pvc))
Expect(err).ToNot(HaveOccurred())

By("Verify target PVC is bound")
err = utils.WaitForPersistentVolumeClaimPhase(f.K8sClient, pvc.Namespace, v1.ClaimBound, pvc.Name)
Expect(err).ToNot(HaveOccurred())

By("Verify content")
md5, err := f.GetMD5(f.Namespace, pvc, utils.DefaultImagePath, utils.MD5PrefixSize)
Expect(err).ToNot(HaveOccurred())
Expect(md5).To(Equal(utils.TinyCoreMD5))

By("Verify 100.0% annotation")
progress, ok, err := utils.WaitForPVCAnnotation(f.K8sClient, f.Namespace.Name, pvc, controller.AnnPopulatorProgress)
Expect(err).ToNot(HaveOccurred())
Expect(ok).To(BeTrue())
Expect(progress).Should(BeEquivalentTo("100.0%"))

By("Verify PVC Prime is Lost")
err = utils.WaitForPersistentVolumeClaimPhase(f.K8sClient, pvcPrime.Namespace, v1.ClaimLost, pvcPrime.Name)
Expect(err).ToNot(HaveOccurred())

By("Find importer pod after completion")
importer, err := utils.FindPodByPrefixOnce(f.K8sClient, dataVolume.Namespace, common.ImporterPodName, common.CDILabelSelector)
Expect(err).ToNot(HaveOccurred())
Expect(importer.DeletionTimestamp).To(BeNil())

By("Cleanup importer Pod, DataVolume and PVC Prime")
zero := int64(0)
err = utils.DeletePodByName(f.K8sClient, fmt.Sprintf("%s-%s", common.ImporterPodName, pvcPrime.Name), f.Namespace.Name, &zero)
Expect(err).ToNot(HaveOccurred())
err = utils.DeleteDataVolume(f.CdiClient, f.Namespace.Name, dataVolume.Name)
Expect(err).ToNot(HaveOccurred())
err = f.DeletePVC(pvcPrime)
Expect(err).ToNot(HaveOccurred())
})
})

func generateRegistryOnlySidecar() *unstructured.Unstructured {
Expand Down

0 comments on commit f7d44bc

Please sign in to comment.