Skip to content

Commit

Permalink
Check for pvc populated before doing any clone validations and actions (
Browse files Browse the repository at this point in the history
kubevirt#2375)

In case our pvc is already populated no need to check for source PVC
unknown size etc.

Signed-off-by: Shelly Kagan <skagan@redhat.com>

Signed-off-by: Shelly Kagan <skagan@redhat.com>
  • Loading branch information
ShellyKa13 authored and akalenyu committed Aug 17, 2022
1 parent d82fc70 commit f0e866c
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 75 deletions.
150 changes: 75 additions & 75 deletions pkg/controller/datavolume-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ const (
// CloneWithoutSource reports that the source PVC of a clone doesn't exists (reason)
CloneWithoutSource = "CloneWithoutSource"
// MessageCloneWithoutSource reports that the source PVC of a clone doesn't exists (message)
MessageCloneWithoutSource = "The source PVC doesn't exist"
MessageCloneWithoutSource = "The source PVC %s doesn't exist"

// AnnCSICloneRequest annotation associates object with CSI Clone Request
AnnCSICloneRequest = "cdi.kubevirt.io/CSICloneRequest"
Expand Down Expand Up @@ -663,19 +663,23 @@ func (r *DatavolumeReconciler) reconcileClone(log logr.Logger,
prePopulated bool,
pvcPopulated bool) (reconcile.Result, error) {

// Get the most appropiate clone strategy
selectedCloneStrategy, err := r.selectCloneStrategy(datavolume, pvcSpec)
if err != nil {
return reconcile.Result{}, err
}

if pvcPopulated || prePopulated {
return r.reconcileDataVolumeStatus(datavolume, pvc, selectedCloneStrategy)
}

// Check if source PVC exists and do proper validation before attempting to clone
if done, err := r.validateCloneAndSourcePVC(datavolume); err != nil {
return reconcile.Result{}, err
} else if !done {
return reconcile.Result{}, nil
}

// Get the most appropiate clone strategy
selectedCloneStrategy, err := r.selectCloneStrategy(datavolume, pvcSpec)
if err != nil {
return reconcile.Result{}, err
}

if selectedCloneStrategy == SmartClone {
r.sccs.StartController()
}
Expand All @@ -698,83 +702,79 @@ func (r *DatavolumeReconciler) reconcileClone(log logr.Logger,
}
}

if !prePopulated {
if pvc == nil {
if selectedCloneStrategy == SmartClone {
snapshotClassName, _ := r.getSnapshotClassForSmartClone(datavolume, pvcSpec)
return r.reconcileSmartClonePvc(log, datavolume, pvcSpec, transferName, snapshotClassName)
}
if selectedCloneStrategy == CsiClone {
csiDriverAvailable, err := r.storageClassCSIDriverExists(pvcSpec.StorageClassName)
if err != nil && !k8serrors.IsNotFound(err) {
return reconcile.Result{}, err
}
if !csiDriverAvailable {
// err csi clone not possible
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.CloneScheduled, datavolume, pvc, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrUnableToClone,
message: fmt.Sprintf("CSI Clone configured, but no CSIDriver available for %s", *pvcSpec.StorageClassName),
})
}

return r.reconcileCsiClonePvc(log, datavolume, pvcSpec, transferName)
if pvc == nil {
if selectedCloneStrategy == SmartClone {
snapshotClassName, _ := r.getSnapshotClassForSmartClone(datavolume, pvcSpec)
return r.reconcileSmartClonePvc(log, datavolume, pvcSpec, transferName, snapshotClassName)
}
if selectedCloneStrategy == CsiClone {
csiDriverAvailable, err := r.storageClassCSIDriverExists(pvcSpec.StorageClassName)
if err != nil && !k8serrors.IsNotFound(err) {
return reconcile.Result{}, err
}

newPvc, err := r.createPvcForDatavolume(log, datavolume, pvcSpec)
if err != nil {
if errQuotaExceeded(err) {
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Pending, datavolume, nil, selectedCloneStrategy,
if !csiDriverAvailable {
// err csi clone not possible
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.CloneScheduled, datavolume, pvc, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrExceededQuota,
message: err.Error(),
reason: ErrUnableToClone,
message: fmt.Sprintf("CSI Clone configured, but no CSIDriver available for %s", *pvcSpec.StorageClassName),
})
}
return reconcile.Result{}, err
}
pvc = newPvc

return r.reconcileCsiClonePvc(log, datavolume, pvcSpec, transferName)
}

shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
newPvc, err := r.createPvcForDatavolume(log, datavolume, pvcSpec)
if err != nil {
if errQuotaExceeded(err) {
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Pending, datavolume, nil, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrExceededQuota,
message: err.Error(),
})
}
return reconcile.Result{}, err
}
pvc = newPvc
}

switch selectedCloneStrategy {
case HostAssistedClone:
if !pvcPopulated {
if err := r.ensureExtendedToken(pvc); err != nil {
return reconcile.Result{}, err
}
}
case CsiClone:
switch pvc.Status.Phase {
case corev1.ClaimBound:
if err := r.setCloneOfOnPvc(pvc); err != nil {
return reconcile.Result{}, err
}
case corev1.ClaimPending:
r.log.V(3).Info("ClaimPending CSIClone")
if !shouldBeMarkedWaitForFirstConsumer {
return reconcile.Result{}, r.updateCloneStatusPhase(cdiv1.CSICloneInProgress, datavolume, pvc, selectedCloneStrategy)
}
case corev1.ClaimLost:
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Failed, datavolume, pvc, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrClaimLost,
message: fmt.Sprintf(MessageErrClaimLost, pvc.Name),
})
shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
if err != nil {
return reconcile.Result{}, err
}

switch selectedCloneStrategy {
case HostAssistedClone:
if err := r.ensureExtendedToken(pvc); err != nil {
return reconcile.Result{}, err
}
case CsiClone:
switch pvc.Status.Phase {
case corev1.ClaimBound:
if err := r.setCloneOfOnPvc(pvc); err != nil {
return reconcile.Result{}, err
}
fallthrough
case SmartClone:
if !pvcPopulated && !shouldBeMarkedWaitForFirstConsumer {
return r.finishClone(log, datavolume, pvc, pvcSpec, transferName, selectedCloneStrategy)
case corev1.ClaimPending:
r.log.V(3).Info("ClaimPending CSIClone")
if !shouldBeMarkedWaitForFirstConsumer {
return reconcile.Result{}, r.updateCloneStatusPhase(cdiv1.CSICloneInProgress, datavolume, pvc, selectedCloneStrategy)
}
case corev1.ClaimLost:
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Failed, datavolume, pvc, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrClaimLost,
message: fmt.Sprintf(MessageErrClaimLost, pvc.Name),
})
}
fallthrough
case SmartClone:
if !shouldBeMarkedWaitForFirstConsumer {
return r.finishClone(log, datavolume, pvc, pvcSpec, transferName, selectedCloneStrategy)
}
}

Expand Down Expand Up @@ -821,6 +821,9 @@ func (r *DatavolumeReconciler) ensureExtendedToken(pvc *corev1.PersistentVolumeC
func (r *DatavolumeReconciler) selectCloneStrategy(datavolume *cdiv1.DataVolume, pvcSpec *corev1.PersistentVolumeClaimSpec) (cloneStrategy, error) {
preferredCloneStrategy, err := r.getCloneStrategy(datavolume)
if err != nil {
if k8serrors.IsNotFound(err) {
return NoClone, nil
}
return NoClone, err
}

Expand Down Expand Up @@ -1917,9 +1920,6 @@ func (r *DatavolumeReconciler) getCloneStrategy(dataVolume *cdiv1.DataVolume) (*
defaultCloneStrategy := cdiv1.CloneStrategySnapshot
sourcePvc, err := r.findSourcePvc(dataVolume)
if err != nil {
if k8serrors.IsNotFound(err) {
r.recorder.Eventf(dataVolume, corev1.EventTypeWarning, ErrUnableToClone, "Source pvc %s not found", dataVolume.Spec.Source.PVC.Name)
}
return nil, err
}
storageClass, err := GetStorageClassByName(r.client, sourcePvc.Spec.StorageClassName)
Expand Down Expand Up @@ -2849,7 +2849,7 @@ func (r *DatavolumeReconciler) validateCloneAndSourcePVC(datavolume *cdiv1.DataV
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: CloneWithoutSource,
message: MessageCloneWithoutSource,
message: fmt.Sprintf(MessageCloneWithoutSource, datavolume.Spec.Source.PVC.Name),
})
return false, nil
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/controller/datavolume-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,6 +1592,29 @@ var _ = Describe("All DataVolume Tests", func() {
Expect(done).To(BeTrue())
})

It("Validate clone already populated without source completes", func() {
dv := newCloneDataVolume("test-dv")
storageProfile := createStorageProfile(scName, nil, filesystemMode)
pvcAnnotations := make(map[string]string)
pvcAnnotations[AnnPopulatedFor] = "test-dv"
pvc := createPvcInStorageClass("test-dv", metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimBound)
pvc.SetAnnotations(make(map[string]string))
pvc.GetAnnotations()[AnnPopulatedFor] = "test-dv"
reconciler = createDatavolumeReconciler(dv, pvc, storageProfile, sc)

prePopulated := false
pvcPopulated := true
result, err := reconciler.reconcileClone(reconciler.log, dv, pvc, dv.Spec.PVC.DeepCopy(), "", prePopulated, pvcPopulated)
Expect(err).ToNot(HaveOccurred())
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "test-dv", Namespace: metav1.NamespaceDefault}, dv)
Expect(err).ToNot(HaveOccurred())
Expect(dv.Status.ClaimName).To(Equal("test-dv"))
Expect(dv.Status.Phase).To(Equal(cdiv1.Succeeded))
Expect(dv.Annotations[AnnPrePopulated]).To(Equal("test-dv"))
Expect(dv.Annotations[annCloneType]).To(BeEmpty())
Expect(result).To(Equal(reconcile.Result{}))
})

DescribeTable("Validation mechanism rejects or accepts the clone depending on the contentType combination",
func(sourceContentType, targetContentType string, expectedResult bool) {
dv := newCloneDataVolume("test-dv")
Expand Down
1 change: 1 addition & 0 deletions tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
Expand Down
23 changes: 23 additions & 0 deletions tests/cloner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"sigs.k8s.io/controller-runtime/pkg/client"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
Expand Down Expand Up @@ -1354,6 +1355,28 @@ var _ = Describe("all clone tests", func() {
By("The clone should fail")
f.ExpectEvent(f.Namespace.Name).Should(ContainSubstring(controller.CloneValidationFailed))
})

It("Should not clone when PopulatedFor annotation exists", func() {
targetName := "target" + rand.String(12)

By(fmt.Sprintf("Creating target pvc: %s/%s", f.Namespace.Name, targetName))
targetPvc, err := utils.CreatePVCFromDefinition(f.K8sClient, f.Namespace.Name,
utils.NewPVCDefinition(targetName, "1Gi", map[string]string{controller.AnnPopulatedFor: targetName}, nil))
Expect(err).ToNot(HaveOccurred())
f.ForceBindIfWaitForFirstConsumer(targetPvc)
cloneDV := utils.NewDataVolumeForImageCloningAndStorageSpec(targetName, "1Gi", f.Namespace.Name, "non-existing-source", nil, &fsVM)
controller.AddAnnotation(cloneDV, controller.AnnDeleteAfterCompletion, "false")
_, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, cloneDV)
Expect(err).ToNot(HaveOccurred())
By("Wait for clone DV Succeeded phase")
err = utils.WaitForDataVolumePhaseWithTimeout(f, f.Namespace.Name, cdiv1.Succeeded, targetName, cloneCompleteTimeout)
Expect(err).ToNot(HaveOccurred())
dv, err := f.CdiClient.CdiV1beta1().DataVolumes(f.Namespace.Name).Get(context.TODO(), targetName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
_, ok := dv.Annotations["cdi.kubevirt.io/cloneType"]
Expect(ok).To(BeFalse())
})

})
})

Expand Down

0 comments on commit f0e866c

Please sign in to comment.