Skip to content

Commit

Permalink
Add sourceRef support for VolumeSnapshot sources (#2616)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>
Co-authored-by: Alex Kalenyuk <akalenyu@redhat.com>
  • Loading branch information
kubevirt-bot and akalenyu authored Mar 3, 2023
1 parent 5a8a1ea commit 709efdf
Show file tree
Hide file tree
Showing 13 changed files with 413 additions and 89 deletions.
22 changes: 9 additions & 13 deletions pkg/apiserver/webhooks/datavolume-validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,21 +425,17 @@ func (wh *dataVolumeValidatingWebhook) validateSourceRef(request *admissionv1.Ad
Field: field.Child("sourceRef").String(),
}
}
dataSourceSnapshot := dataSource.Spec.Source.Snapshot
if dataSourceSnapshot != nil {
return &metav1.StatusCause{
Message: "Snapshot sourceRef not allowed at this point",
Field: field.Child("sourceRef").String(),
}
switch {
case dataSource.Spec.Source.PVC != nil:
return wh.validateDataVolumeSourcePVC(dataSource.Spec.Source.PVC, field.Child("sourceRef"), spec)
case dataSource.Spec.Source.Snapshot != nil:
return wh.validateDataVolumeSourceSnapshot(dataSource.Spec.Source.Snapshot, field.Child("sourceRef"), spec)
}
dataSourcePVC := dataSource.Spec.Source.PVC
if dataSourcePVC == nil {
return &metav1.StatusCause{
Message: fmt.Sprintf("Empty PVC field in '%s'. DataSource may not be ready yet", dataSource.Name),
Field: field.Child("sourceRef").String(),
}

return &metav1.StatusCause{
Message: fmt.Sprintf("Empty source field in '%s'. DataSource may not be ready yet", dataSource.Name),
Field: field.Child("sourceRef").String(),
}
return wh.validateDataVolumeSourcePVC(dataSourcePVC, field.Child("sourceRef"), spec)
}

func (wh *dataVolumeValidatingWebhook) validateDataVolumeSourcePVC(PVC *cdiv1.DataVolumeSourcePVC, field *k8sfield.Path, spec *cdiv1.DataVolumeSpec) *metav1.StatusCause {
Expand Down
20 changes: 20 additions & 0 deletions pkg/apiserver/webhooks/datavolume-validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,26 @@ var _ = Describe("Validating Webhook", func() {
Expect(resp.Allowed).To(Equal(true))
})

It("should accept DataVolume with SourceRef on create if DataSource exists but snapshot does not exist", func() {
dataVolume := newDataSourceDataVolume("testDV", &testNamespace, "test")
dataSource := &cdiv1.DataSource{
ObjectMeta: metav1.ObjectMeta{
Name: dataVolume.Spec.SourceRef.Name,
Namespace: testNamespace,
},
Spec: cdiv1.DataSourceSpec{
Source: cdiv1.DataSourceSource{
Snapshot: &cdiv1.DataVolumeSourceSnapshot{
Name: "testNonExistentSnap",
Namespace: testNamespace,
},
},
},
}
resp := validateDataVolumeCreateEx(dataVolume, nil, []runtime.Object{dataSource}, nil)
Expect(resp.Allowed).To(Equal(true))
})

It("should reject DataVolume with empty SourceRef name on create", func() {
dataVolume := newDataSourceDataVolume("testDV", &testNamespace, "")
resp := validateDataVolumeCreate(dataVolume)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//vendor/github.com/containers/image/v5/docker/reference:go_default_library",
"//vendor/github.com/go-logr/logr:go_default_library",
"//vendor/github.com/gorhill/cronexpr:go_default_library",
"//vendor/github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1:go_default_library",
"//vendor/github.com/openshift/api/config/v1:go_default_library",
"//vendor/github.com/openshift/api/image/v1:go_default_library",
"//vendor/github.com/openshift/api/route/v1:go_default_library",
Expand Down Expand Up @@ -99,6 +100,7 @@ go_test(
"//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1:go_default_library",
"//tests/reporters:go_default_library",
"//vendor/github.com/google/uuid:go_default_library",
"//vendor/github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",
"//vendor/github.com/onsi/ginkgo/extensions/table:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/dataimportcron-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ var _ = Describe("All DataImportCron Tests", func() {
err := reconciler.client.Update(context.TODO(), cron)
Expect(err).ToNot(HaveOccurred())
dataSource = &cdiv1.DataSource{}
verifyConditions("After DesiredDigest is set", false, false, false, noImport, outdated, noPvc)
verifyConditions("After DesiredDigest is set", false, false, false, noImport, outdated, noSource)

imports := cron.Status.CurrentImports
Expect(imports).ToNot(BeNil())
Expand All @@ -344,12 +344,12 @@ var _ = Describe("All DataImportCron Tests", func() {
dv.Status.Phase = cdiv1.ImportScheduled
err = reconciler.client.Update(context.TODO(), dv)
Expect(err).ToNot(HaveOccurred())
verifyConditions("Import scheduled", false, false, false, scheduled, inProgress, noPvc)
verifyConditions("Import scheduled", false, false, false, scheduled, inProgress, noSource)

dv.Status.Phase = cdiv1.ImportInProgress
err = reconciler.client.Update(context.TODO(), dv)
Expect(err).ToNot(HaveOccurred())
verifyConditions("Import in progress", true, false, false, inProgress, inProgress, noPvc)
verifyConditions("Import in progress", true, false, false, inProgress, inProgress, noSource)

dv.Status.Phase = cdiv1.Succeeded
err = reconciler.client.Update(context.TODO(), dv)
Expand Down
166 changes: 126 additions & 40 deletions pkg/controller/datasource-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"reflect"

"github.com/go-logr/logr"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand All @@ -50,8 +52,8 @@ type DataSourceReconciler struct {
}

const (
ready = "Ready"
noPvc = "NoPvc"
ready = "Ready"
noSource = "NoSource"
)

// Reconcile loop for DataSourceReconciler
Expand All @@ -75,42 +77,71 @@ func (r *DataSourceReconciler) update(ctx context.Context, dataSource *cdiv1.Dat
dataSource.Status.Conditions = nil
}
dataSourceCopy := dataSource.DeepCopy()
sourcePVC := dataSource.Spec.Source.PVC
if sourcePVC != nil {
dv := &cdiv1.DataVolume{}
ns := cc.GetNamespace(sourcePVC.Namespace, dataSource.Namespace)
isReady := false
if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, dv); err != nil {
if sourcePVC := dataSource.Spec.Source.PVC; sourcePVC != nil {
if err := r.handlePvcSource(ctx, sourcePVC, dataSource); err != nil {
return err
}
} else if sourceSnapshot := dataSource.Spec.Source.Snapshot; sourceSnapshot != nil {
if err := r.handleSnapshotSource(ctx, sourceSnapshot, dataSource); err != nil {
return err
}
} else {
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "No source PVC set", noSource)
}

if !reflect.DeepEqual(dataSource, dataSourceCopy) {
if err := r.client.Update(ctx, dataSource); err != nil {
return err
}
}
return nil
}

func (r *DataSourceReconciler) handlePvcSource(ctx context.Context, sourcePVC *cdiv1.DataVolumeSourcePVC, dataSource *cdiv1.DataSource) error {
dv := &cdiv1.DataVolume{}
ns := cc.GetNamespace(sourcePVC.Namespace, dataSource.Namespace)
isReady := false
if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, dv); err != nil {
if !k8serrors.IsNotFound(err) {
return err
}
pvc := &corev1.PersistentVolumeClaim{}
if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, pvc); err != nil {
if !k8serrors.IsNotFound(err) {
return err
}
pvc := &corev1.PersistentVolumeClaim{}
if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourcePVC.Name}, pvc); err != nil {
if !k8serrors.IsNotFound(err) {
return err
}
r.log.Info("PVC not found", "name", sourcePVC.Name)
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "PVC not found", cc.NotFound)
} else {
isReady = true
}
} else if dv.Status.Phase == cdiv1.Succeeded {
isReady = true
r.log.Info("PVC not found", "name", sourcePVC.Name)
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "PVC not found", cc.NotFound)
} else {
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dv.Status.Phase), string(dv.Status.Phase))
}
if isReady {
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
isReady = true
}
} else if dv.Status.Phase == cdiv1.Succeeded {
isReady = true
} else {
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "No source PVC set", noPvc)
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dv.Status.Phase), string(dv.Status.Phase))
}
if isReady {
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
}

if !reflect.DeepEqual(dataSource, dataSourceCopy) {
if err := r.client.Update(ctx, dataSource); err != nil {
return nil
}

func (r *DataSourceReconciler) handleSnapshotSource(ctx context.Context, sourceSnapshot *cdiv1.DataVolumeSourceSnapshot, dataSource *cdiv1.DataSource) error {
snapshot := &snapshotv1.VolumeSnapshot{}
ns := cc.GetNamespace(sourceSnapshot.Namespace, dataSource.Namespace)
if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: sourceSnapshot.Name}, snapshot); err != nil {
if !k8serrors.IsNotFound(err) {
return err
}
r.log.Info("Snapshot not found", "name", sourceSnapshot.Name)
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot not found", cc.NotFound)
} else if snapshot.Status != nil && snapshot.Status.ReadyToUse != nil && *snapshot.Status.ReadyToUse {
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
} else {
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot phase is not ready", "SnapshotNotReady")
}

return nil
}

Expand Down Expand Up @@ -159,18 +190,32 @@ func addDataSourceControllerWatches(mgr manager.Manager, c controller.Controller
predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return true },
DeleteFunc: func(e event.DeleteEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool { return !sameDataSourcePvc(e.ObjectOld, e.ObjectNew) },
UpdateFunc: func(e event.UpdateEvent) bool { return !sameSourceSpec(e.ObjectOld, e.ObjectNew) },
},
); err != nil {
return err
}

const dataSourcePvcField = "spec.source.pvc"
const dataSourceSnapshotField = "spec.source.snapshot"

getKey := func(namespace, name string) string {
return namespace + "/" + name
}

appendMatchingDataSourceRequests := func(indexingKey string, obj client.Object, reqs []reconcile.Request) []reconcile.Request {
var dataSources cdiv1.DataSourceList
matchingFields := client.MatchingFields{indexingKey: getKey(obj.GetNamespace(), obj.GetName())}
if err := mgr.GetClient().List(context.TODO(), &dataSources, matchingFields); err != nil {
log.Error(err, "Unable to list DataSources", "matchingFields", matchingFields)
return reqs
}
for _, ds := range dataSources.Items {
reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: ds.Namespace, Name: ds.Name}})
}
return reqs
}

if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourcePvcField, func(obj client.Object) []string {
if pvc := obj.(*cdiv1.DataSource).Spec.Source.PVC; pvc != nil {
ns := cc.GetNamespace(pvc.Namespace, obj.GetNamespace())
Expand All @@ -180,17 +225,19 @@ func addDataSourceControllerWatches(mgr manager.Manager, c controller.Controller
}); err != nil {
return err
}
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &cdiv1.DataSource{}, dataSourceSnapshotField, func(obj client.Object) []string {
if snapshot := obj.(*cdiv1.DataSource).Spec.Source.Snapshot; snapshot != nil {
ns := cc.GetNamespace(snapshot.Namespace, obj.GetNamespace())
return []string{getKey(ns, snapshot.Name)}
}
return nil
}); err != nil {
return err
}

mapToDataSource := func(obj client.Object) (reqs []reconcile.Request) {
var dataSources cdiv1.DataSourceList
matchingFields := client.MatchingFields{dataSourcePvcField: getKey(obj.GetNamespace(), obj.GetName())}
if err := mgr.GetClient().List(context.TODO(), &dataSources, matchingFields); err != nil {
log.Error(err, "Unable to list DataSources", "matchingFields", matchingFields)
return
}
for _, ds := range dataSources.Items {
reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: ds.Namespace, Name: ds.Name}})
}
reqs = appendMatchingDataSourceRequests(dataSourcePvcField, obj, reqs)
reqs = appendMatchingDataSourceRequests(dataSourceSnapshotField, obj, reqs)
return
}

Expand All @@ -215,7 +262,35 @@ func addDataSourceControllerWatches(mgr manager.Manager, c controller.Controller
predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return true },
DeleteFunc: func(e event.DeleteEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool { return false },
UpdateFunc: func(e event.UpdateEvent) bool {
pvcOld, okOld := e.ObjectOld.(*corev1.PersistentVolumeClaim)
pvcNew, okNew := e.ObjectNew.(*corev1.PersistentVolumeClaim)
return okOld && okNew && pvcOld.Status.Phase != pvcNew.Status.Phase
},
},
); err != nil {
return err
}

if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
if meta.IsNoMatchError(err) {
// Back out if there's no point to attempt watch
return nil
}
if !cc.IsErrCacheNotStarted(err) {
return err
}
}
if err := c.Watch(&source.Kind{Type: &snapshotv1.VolumeSnapshot{}},
handler.EnqueueRequestsFromMapFunc(mapToDataSource),
predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return true },
DeleteFunc: func(e event.DeleteEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
snapOld, okOld := e.ObjectOld.(*snapshotv1.VolumeSnapshot)
snapNew, okNew := e.ObjectNew.(*snapshotv1.VolumeSnapshot)
return okOld && okNew && !reflect.DeepEqual(snapOld.Status, snapNew.Status)
},
},
); err != nil {
return err
Expand All @@ -224,8 +299,19 @@ func addDataSourceControllerWatches(mgr manager.Manager, c controller.Controller
return nil
}

func sameDataSourcePvc(objOld, objNew client.Object) bool {
func sameSourceSpec(objOld, objNew client.Object) bool {
dsOld, okOld := objOld.(*cdiv1.DataSource)
dsNew, okNew := objNew.(*cdiv1.DataSource)
return okOld && okNew && reflect.DeepEqual(dsOld.Spec.Source.PVC, dsNew.Spec.Source.PVC)

if !okOld || !okNew {
return false
}
if dsOld.Spec.Source.PVC != nil {
return reflect.DeepEqual(dsOld.Spec.Source.PVC, dsNew.Spec.Source.PVC)
}
if dsOld.Spec.Source.Snapshot != nil {
return reflect.DeepEqual(dsOld.Spec.Source.Snapshot, dsNew.Spec.Source.Snapshot)
}

return false
}
Loading

0 comments on commit 709efdf

Please sign in to comment.