Skip to content

Commit

Permalink
Check for pvc populated before doing any clone validations and actions
Browse files Browse the repository at this point in the history
In case our pvc is already populated no need to check for source PVC
unknown size etc.

Signed-off-by: Shelly Kagan <skagan@redhat.com>
  • Loading branch information
ShellyKa13 committed Aug 4, 2022
1 parent 90ee754 commit 083ee01
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 71 deletions.
140 changes: 69 additions & 71 deletions pkg/controller/datavolume-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ const (
// CloneWithoutSource reports that the source PVC of a clone doesn't exists (reason)
CloneWithoutSource = "CloneWithoutSource"
// MessageCloneWithoutSource reports that the source PVC of a clone doesn't exists (message)
MessageCloneWithoutSource = "The source PVC doesn't exist"
MessageCloneWithoutSource = "The source PVC %s doesn't exist"

// AnnCSICloneRequest annotation associates object with CSI Clone Request
AnnCSICloneRequest = "cdi.kubevirt.io/CSICloneRequest"
Expand Down Expand Up @@ -664,19 +664,23 @@ func (r *DatavolumeReconciler) reconcileClone(log logr.Logger,
prePopulated bool,
pvcPopulated bool) (reconcile.Result, error) {

// Get the most appropiate clone strategy
selectedCloneStrategy, err := r.selectCloneStrategy(datavolume, pvcSpec)
if err != nil {
return reconcile.Result{}, err
}

if pvcPopulated || prePopulated {
return r.reconcileDataVolumeStatus(datavolume, pvc, selectedCloneStrategy)
}

// Check if source PVC exists and do proper validation before attempting to clone
if done, err := r.validateCloneAndSourcePVC(datavolume); err != nil {
return reconcile.Result{}, err
} else if !done {
return reconcile.Result{}, nil
}

// Get the most appropiate clone strategy
selectedCloneStrategy, err := r.selectCloneStrategy(datavolume, pvcSpec)
if err != nil {
return reconcile.Result{}, err
}

if selectedCloneStrategy == SmartClone {
r.sccs.StartController()
}
Expand All @@ -699,76 +703,70 @@ func (r *DatavolumeReconciler) reconcileClone(log logr.Logger,
}
}

if !prePopulated {
if pvc == nil {
if selectedCloneStrategy == SmartClone {
snapshotClassName, _ := r.getSnapshotClassForSmartClone(datavolume, pvcSpec)
return r.reconcileSmartClonePvc(log, datavolume, pvcSpec, transferName, snapshotClassName)
}
if selectedCloneStrategy == CsiClone {
csiDriverAvailable, err := r.storageClassCSIDriverExists(pvcSpec.StorageClassName)
if err != nil && !k8serrors.IsNotFound(err) {
return reconcile.Result{}, err
}
if !csiDriverAvailable {
// err csi clone not possible
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.CloneScheduled, datavolume, pvc, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrUnableToClone,
message: fmt.Sprintf("CSI Clone configured, but no CSIDriver available for %s", *pvcSpec.StorageClassName),
})
}

return r.reconcileCsiClonePvc(log, datavolume, pvcSpec, transferName)
if pvc == nil {
if selectedCloneStrategy == SmartClone {
snapshotClassName, _ := r.getSnapshotClassForSmartClone(datavolume, pvcSpec)
return r.reconcileSmartClonePvc(log, datavolume, pvcSpec, transferName, snapshotClassName)
}
if selectedCloneStrategy == CsiClone {
csiDriverAvailable, err := r.storageClassCSIDriverExists(pvcSpec.StorageClassName)
if err != nil && !k8serrors.IsNotFound(err) {
return reconcile.Result{}, err
}

newPvc, err := r.createPvcForDatavolume(log, datavolume, pvcSpec)
if err != nil {
if errQuotaExceeded(err) {
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Pending, datavolume, nil, selectedCloneStrategy,
if !csiDriverAvailable {
// err csi clone not possible
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.CloneScheduled, datavolume, pvc, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrExceededQuota,
message: err.Error(),
reason: ErrUnableToClone,
message: fmt.Sprintf("CSI Clone configured, but no CSIDriver available for %s", *pvcSpec.StorageClassName),
})
}
return reconcile.Result{}, err
}
pvc = newPvc

return r.reconcileCsiClonePvc(log, datavolume, pvcSpec, transferName)
}

switch selectedCloneStrategy {
case HostAssistedClone:
if !pvcPopulated {
if err := r.ensureExtendedToken(pvc); err != nil {
return reconcile.Result{}, err
}
}
case CsiClone:
switch pvc.Status.Phase {
case corev1.ClaimBound:
if err := r.setCloneOfOnPvc(pvc); err != nil {
return reconcile.Result{}, err
}
case corev1.ClaimPending:
return reconcile.Result{}, r.updateCloneStatusPhase(cdiv1.CSICloneInProgress, datavolume, pvc, selectedCloneStrategy)
case corev1.ClaimLost:
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Failed, datavolume, pvc, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrClaimLost,
message: fmt.Sprintf(MessageErrClaimLost, pvc.Name),
})
newPvc, err := r.createPvcForDatavolume(log, datavolume, pvcSpec)
if err != nil {
if errQuotaExceeded(err) {
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Pending, datavolume, nil, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrExceededQuota,
message: err.Error(),
})
}
fallthrough
case SmartClone:
if !pvcPopulated {
return r.finishClone(log, datavolume, pvc, pvcSpec, transferName, selectedCloneStrategy)
return reconcile.Result{}, err
}
pvc = newPvc
}

switch selectedCloneStrategy {
case HostAssistedClone:
if err := r.ensureExtendedToken(pvc); err != nil {
return reconcile.Result{}, err
}
case CsiClone:
switch pvc.Status.Phase {
case corev1.ClaimBound:
if err := r.setCloneOfOnPvc(pvc); err != nil {
return reconcile.Result{}, err
}
case corev1.ClaimPending:
return reconcile.Result{}, r.updateCloneStatusPhase(cdiv1.CSICloneInProgress, datavolume, pvc, selectedCloneStrategy)
case corev1.ClaimLost:
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Failed, datavolume, pvc, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrClaimLost,
message: fmt.Sprintf(MessageErrClaimLost, pvc.Name),
})
}
fallthrough
case SmartClone:
return r.finishClone(log, datavolume, pvc, pvcSpec, transferName, selectedCloneStrategy)
}

// Finally, we update the status block of the DataVolume resource to reflect the
Expand Down Expand Up @@ -814,6 +812,9 @@ func (r *DatavolumeReconciler) ensureExtendedToken(pvc *corev1.PersistentVolumeC
func (r *DatavolumeReconciler) selectCloneStrategy(datavolume *cdiv1.DataVolume, pvcSpec *corev1.PersistentVolumeClaimSpec) (cloneStrategy, error) {
preferredCloneStrategy, err := r.getCloneStrategy(datavolume)
if err != nil {
if k8serrors.IsNotFound(err) {
return NoClone, nil
}
return NoClone, err
}

Expand Down Expand Up @@ -1909,9 +1910,6 @@ func (r *DatavolumeReconciler) getCloneStrategy(dataVolume *cdiv1.DataVolume) (*
defaultCloneStrategy := cdiv1.CloneStrategySnapshot
sourcePvc, err := r.findSourcePvc(dataVolume)
if err != nil {
if k8serrors.IsNotFound(err) {
r.recorder.Eventf(dataVolume, corev1.EventTypeWarning, ErrUnableToClone, "Source pvc %s not found", dataVolume.Spec.Source.PVC.Name)
}
return nil, err
}
storageClass, err := GetStorageClassByName(r.client, sourcePvc.Spec.StorageClassName)
Expand Down Expand Up @@ -2846,7 +2844,7 @@ func (r *DatavolumeReconciler) validateCloneAndSourcePVC(datavolume *cdiv1.DataV
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: CloneWithoutSource,
message: MessageCloneWithoutSource,
message: fmt.Sprintf(MessageCloneWithoutSource, datavolume.Spec.Source.PVC.Name),
})
return false, nil
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/controller/datavolume-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,6 +1592,29 @@ var _ = Describe("All DataVolume Tests", func() {
Expect(done).To(BeTrue())
})

It("Validate clone already populated without source completes", func() {
dv := newCloneDataVolume("test-dv")
storageProfile := createStorageProfile(scName, nil, filesystemMode)
pvcAnnotations := make(map[string]string)
pvcAnnotations[AnnPopulatedFor] = "test-dv"
pvc := createPvcInStorageClass("test-dv", metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimBound)
pvc.SetAnnotations(make(map[string]string))
pvc.GetAnnotations()[AnnPopulatedFor] = "test-dv"
reconciler = createDatavolumeReconciler(dv, pvc, storageProfile, sc)

prePopulated := false
pvcPopulated := true
result, err := reconciler.reconcileClone(reconciler.log, dv, pvc, dv.Spec.PVC.DeepCopy(), "", prePopulated, pvcPopulated)
Expect(err).ToNot(HaveOccurred())
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "test-dv", Namespace: metav1.NamespaceDefault}, dv)
Expect(err).ToNot(HaveOccurred())
Expect(dv.Status.ClaimName).To(Equal("test-dv"))
Expect(dv.Status.Phase).To(Equal(cdiv1.Succeeded))
Expect(dv.Annotations[AnnPrePopulated]).To(Equal("test-dv"))
Expect(dv.Annotations[annCloneType]).To(BeEmpty())
Expect(result).To(Equal(reconcile.Result{}))
})

DescribeTable("Validation mechanism rejects or accepts the clone depending on the contentType combination",
func(sourceContentType, targetContentType string, expectedResult bool) {
dv := newCloneDataVolume("test-dv")
Expand Down
1 change: 1 addition & 0 deletions tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
Expand Down
23 changes: 23 additions & 0 deletions tests/cloner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"sigs.k8s.io/controller-runtime/pkg/client"

cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
Expand Down Expand Up @@ -1265,6 +1266,28 @@ var _ = Describe("all clone tests", func() {
By("The clone should fail")
f.ExpectEvent(f.Namespace.Name).Should(ContainSubstring(controller.CloneValidationFailed))
})

It("Should not clone when PopulatedFor annotation exists", func() {
targetName := "target" + rand.String(12)

By(fmt.Sprintf("Creating target pvc: %s/%s", f.Namespace.Name, targetName))
targetPvc, err := utils.CreatePVCFromDefinition(f.K8sClient, f.Namespace.Name, utils.NewPVCDefinition(
targetName,
"1Gi",
map[string]string{
controller.AnnPopulatedFor: targetName,
},
nil))
Expect(err).ToNot(HaveOccurred())
f.ForceBindIfWaitForFirstConsumer(targetPvc)
cloneDV := utils.NewDataVolumeForImageCloningAndStorageSpec(targetName, "1Gi", f.Namespace.Name, "non-existing-source", nil, &fsVM)
_, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, cloneDV)
Expect(err).ToNot(HaveOccurred())
By("Wait for clone DV Succeeded phase")
err = utils.WaitForDataVolumePhaseWithTimeout(f, f.Namespace.Name, cdiv1.Succeeded, targetName, cloneCompleteTimeout)
Expect(err).ToNot(HaveOccurred())
})

})
})

Expand Down

0 comments on commit 083ee01

Please sign in to comment.