Skip to content

Commit

Permalink
Fix CSI & Smart clones with WFFC storage status reporting
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>
  • Loading branch information
akalenyu committed Aug 3, 2022
1 parent d7b41f7 commit ef82763
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 12 deletions.
46 changes: 34 additions & 12 deletions pkg/controller/datavolume-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,6 @@ func (r *DatavolumeReconciler) Reconcile(_ context.Context, req reconcile.Reques
} else if err != nil {
return reconcile.Result{}, err
}

} else {
res, err := r.garbageCollect(datavolume, pvc, log)
if err != nil {
Expand Down Expand Up @@ -718,6 +717,11 @@ func (r *DatavolumeReconciler) reconcileClone(log logr.Logger,
pvc = newPvc
}

shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
if err != nil {
return reconcile.Result{}, err
}

switch selectedCloneStrategy {
case HostAssistedClone:
if !pvcPopulated {
Expand All @@ -732,7 +736,10 @@ func (r *DatavolumeReconciler) reconcileClone(log logr.Logger,
return reconcile.Result{}, err
}
case corev1.ClaimPending:
return reconcile.Result{}, r.updateCloneStatusPhase(cdiv1.CSICloneInProgress, datavolume, pvc, selectedCloneStrategy)
r.log.V(3).Info("ClaimPending CSIClone")
if !shouldBeMarkedWaitForFirstConsumer {
return reconcile.Result{}, r.updateCloneStatusPhase(cdiv1.CSICloneInProgress, datavolume, pvc, selectedCloneStrategy)
}
case corev1.ClaimLost:
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Failed, datavolume, pvc, selectedCloneStrategy,
Expand All @@ -744,7 +751,7 @@ func (r *DatavolumeReconciler) reconcileClone(log logr.Logger,
}
fallthrough
case SmartClone:
if !pvcPopulated {
if !pvcPopulated && !shouldBeMarkedWaitForFirstConsumer {
return r.finishClone(log, datavolume, pvc, pvcSpec, transferName, selectedCloneStrategy)
}
}
Expand Down Expand Up @@ -858,6 +865,7 @@ func (r *DatavolumeReconciler) reconcileCsiClonePvc(log logr.Logger,
pvcSpec *corev1.PersistentVolumeClaimSpec,
transferName string) (reconcile.Result, error) {

log = log.WithName("reconcileCsiClonePvc")
pvcName := datavolume.Name

if isCrossNamespaceClone(datavolume) {
Expand All @@ -877,7 +885,7 @@ func (r *DatavolumeReconciler) reconcileCsiClonePvc(log logr.Logger,
if sourcePvcNs == "" {
sourcePvcNs = datavolume.Namespace
}
r.log.V(3).Info("CSI-Clone is available")
log.V(3).Info("CSI-Clone is available")

// Get source pvc
sourcePvc := &corev1.PersistentVolumeClaim{}
Expand Down Expand Up @@ -2196,14 +2204,11 @@ func (r *DatavolumeReconciler) updateUploadStatusPhase(pvc *corev1.PersistentVol
func (r *DatavolumeReconciler) reconcileDataVolumeStatus(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, selectedCloneStrategy cloneStrategy) (reconcile.Result, error) {
dataVolumeCopy := dataVolume.DeepCopy()
var event DataVolumeEvent
var err error
result := reconcile.Result{}

curPhase := dataVolumeCopy.Status.Phase
if pvc != nil {
storageClassBindingMode, err := r.getStorageClassBindingMode(pvc.Spec.StorageClassName)
if err != nil {
return reconcile.Result{}, err
}
dataVolumeCopy.Status.ClaimName = pvc.Name

// the following check is for a case where the request is to create a blank disk for a block device.
Expand Down Expand Up @@ -2255,19 +2260,17 @@ func (r *DatavolumeReconciler) reconcileDataVolumeStatus(dataVolume *cdiv1.DataV
}
} else {
dataVolumeCopy.Status.Phase = cdiv1.Succeeded

}
r.updateImportStatusPhase(pvc, dataVolumeCopy, &event)
}
} else {
switch pvc.Status.Phase {
case corev1.ClaimPending:
honorWaitForFirstConsumerEnabled, err := r.featureGates.HonorWaitForFirstConsumerEnabled()
shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
if err != nil {
return reconcile.Result{}, err
}
if honorWaitForFirstConsumerEnabled &&
*storageClassBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
if shouldBeMarkedWaitForFirstConsumer {
dataVolumeCopy.Status.Phase = cdiv1.WaitForFirstConsumer
} else {
dataVolumeCopy.Status.Phase = cdiv1.Pending
Expand Down Expand Up @@ -2874,6 +2877,25 @@ func (r *DatavolumeReconciler) isSourceReadyToClone(
return true, nil
}

// shouldBeMarkedWaitForFirstConsumer decided whether we should mark DV as WFFC
func (r *DatavolumeReconciler) shouldBeMarkedWaitForFirstConsumer(pvc *corev1.PersistentVolumeClaim) (bool, error) {
storageClassBindingMode, err := r.getStorageClassBindingMode(pvc.Spec.StorageClassName)
if err != nil {
return false, err
}

honorWaitForFirstConsumerEnabled, err := r.featureGates.HonorWaitForFirstConsumerEnabled()
if err != nil {
return false, err
}

res := honorWaitForFirstConsumerEnabled &&
storageClassBindingMode != nil && *storageClassBindingMode == storagev1.VolumeBindingWaitForFirstConsumer &&
pvc.Status.Phase == corev1.ClaimPending

return res, nil
}

// detectCloneSize obtains and assigns the original PVC's size when cloning using an empty storage value
func (r *DatavolumeReconciler) detectCloneSize(
dv *cdiv1.DataVolume,
Expand Down
89 changes: 89 additions & 0 deletions tests/cloner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -519,6 +520,7 @@ var _ = Describe("all clone tests", func() {
err = utils.DeleteVerifierPod(f.K8sClient, f.Namespace.Name)
Expect(err).ToNot(HaveOccurred())
})

It("[test_id:cnv-5570]Should clone data from block to filesystem", func() {
if !f.IsBlockVolumeStorageClassAvailable() {
Skip("Storage Class for block volume is not available")
Expand Down Expand Up @@ -815,6 +817,93 @@ var _ = Describe("all clone tests", func() {
Entry("[test_id:8491]Block to filesystem (empty storage size)", v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem),
Entry("[test_id:8490]Filesystem to filesystem(empty storage size)", v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem),
)

Context("WaitForFirstConsumer status with advanced cloning methods", func() {
var wffcStorageClass *storagev1.StorageClass

BeforeEach(func() {
if cloneType != "csivolumeclone" && cloneType != "snapshot" {
Skip("relevant for csi/smart clones only")
}

sc, err := f.K8sClient.StorageV1().StorageClasses().Get(context.TODO(), utils.DefaultStorageClass.GetName(), metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
if sc.VolumeBindingMode == nil || *sc.VolumeBindingMode == storagev1.VolumeBindingImmediate {
sc, err = f.CreateWFFCVariationOfStorageClass(sc)
Expect(err).ToNot(HaveOccurred())
wffcStorageClass = sc
Eventually(func() bool {
_, err := f.CdiClient.CdiV1beta1().StorageProfiles().Get(context.TODO(), wffcStorageClass.Name, metav1.GetOptions{})
return err == nil
}, time.Minute, time.Second).Should(BeTrue())
spec, err := utils.GetStorageProfileSpec(f.CdiClient, wffcStorageClass.Name)
Expect(err).ToNot(HaveOccurred())
if cloneType == "csivolumeclone" {
utils.ConfigureCloneStrategy(f.CrClient, f.CdiClient, wffcStorageClass.Name, spec, cdiv1.CloneStrategyCsiClone)
} else if cloneType == "snapshot" {
utils.ConfigureCloneStrategy(f.CrClient, f.CdiClient, wffcStorageClass.Name, spec, cdiv1.CloneStrategySnapshot)
}
}
})

AfterEach(func() {
if wffcStorageClass != nil {
err := f.K8sClient.StorageV1().StorageClasses().Delete(context.TODO(), wffcStorageClass.Name, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
Eventually(func() bool {
_, err := f.CdiClient.CdiV1beta1().StorageProfiles().Get(context.TODO(), wffcStorageClass.Name, metav1.GetOptions{})
return err != nil && k8serrors.IsNotFound(err)
}, time.Minute, time.Second).Should(BeTrue())
}
})

It("should report correct status for smart/CSI clones", func() {
volumeMode := v1.PersistentVolumeMode(v1.PersistentVolumeFilesystem)

dataVolume := utils.NewDataVolumeWithHTTPImportAndStorageSpec(dataVolumeName, "1Gi", fmt.Sprintf(utils.TinyCoreIsoURL, f.CdiInstallNs))
dataVolume.Spec.Storage.VolumeMode = &volumeMode
if wffcStorageClass != nil {
dataVolume.Spec.Storage.StorageClassName = &wffcStorageClass.Name
}
dataVolume.Annotations[controller.AnnImmediateBinding] = "true"
dataVolume, err := utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, dataVolume)
Expect(err).ToNot(HaveOccurred())
By("Waiting for import to be completed")
err = utils.WaitForDataVolumePhase(f, f.Namespace.Name, cdiv1.Succeeded, dataVolume.Name)
Expect(err).ToNot(HaveOccurred())
sourcePvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(dataVolume.Namespace).Get(context.TODO(), dataVolume.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())

targetDV := utils.NewDataVolumeForImageCloningAndStorageSpec("target-dv", "1Gi", sourcePvc.Namespace, sourcePvc.Name, nil, &volumeMode)
if wffcStorageClass != nil {
targetDV.Spec.Storage.StorageClassName = &wffcStorageClass.Name
}
targetDataVolume, err := utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, targetDV)
Expect(err).ToNot(HaveOccurred())
targetPvc, err := utils.WaitForPVC(f.K8sClient, targetDataVolume.Namespace, targetDataVolume.Name)
Expect(err).ToNot(HaveOccurred())
By("Ensure WFFC is reported to reflect the situation correctly")
err = utils.WaitForDataVolumePhase(f, targetDataVolume.Namespace, cdiv1.WaitForFirstConsumer, targetDataVolume.Name)
Expect(err).ToNot(HaveOccurred())

// Force bind to ensure integrity after first consumer
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDataVolume)
By("Wait for target PVC Bound phase")
err = utils.WaitForPersistentVolumeClaimPhase(f.K8sClient, f.Namespace.Name, v1.ClaimBound, targetPvc.Name)
Expect(err).ToNot(HaveOccurred())
By("Wait for target DV Succeeded phase")
err = utils.WaitForDataVolumePhase(f, f.Namespace.Name, cdiv1.Succeeded, targetDataVolume.Name)
Expect(err).ToNot(HaveOccurred())

By("Verify content")
same, err := f.VerifyTargetPVCContentMD5(f.Namespace, targetPvc, utils.DefaultImagePath, utils.UploadFileMD5, utils.UploadFileSize)
Expect(err).ToNot(HaveOccurred())
Expect(same).To(BeTrue())
By("Deleting verifier pod")
err = utils.DeleteVerifierPod(f.K8sClient, f.Namespace.Name)
Expect(err).ToNot(HaveOccurred())
})
})
}

Context("HostAssisted Clone", func() {
Expand Down
11 changes: 11 additions & 0 deletions tests/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,17 @@ func (f *Framework) DeleteStorageQuota() error {
})
}

// CreateWFFCVariationOfStorageClass creates a WFFC variation of a storage class
func (f *Framework) CreateWFFCVariationOfStorageClass(sc *storagev1.StorageClass) (*storagev1.StorageClass, error) {
wffc := storagev1.VolumeBindingWaitForFirstConsumer
sc.ObjectMeta = metav1.ObjectMeta{
Name: fmt.Sprintf("%s-temp-wffc", sc.Name),
}
sc.VolumeBindingMode = &wffc

return f.K8sClient.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{})
}

// UpdateCdiConfigResourceLimits sets the limits in the CDIConfig object
func (f *Framework) UpdateCdiConfigResourceLimits(resourceCPU, resourceMemory, limitsCPU, limitsMemory int64) error {
err := utils.UpdateCDIConfig(f.CrClient, func(config *cdiv1.CDIConfigSpec) {
Expand Down

0 comments on commit ef82763

Please sign in to comment.