Skip to content

Commit

Permalink
[release-v1.53] Manual backport of 2364, 2375 (#2401)
Browse files Browse the repository at this point in the history
* Status reporting for CSI & Smart clones with WFFC storage (#2364)

* Fix logging level so we respect it in controllers/operator

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

* Fix CSI & Smart clones with WFFC storage status reporting

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

* Check for pvc populated before doing any clone validations and actions (#2375)

In case our pvc is already populated no need to check for source PVC
unknown size etc.

Signed-off-by: Shelly Kagan <skagan@redhat.com>

Signed-off-by: Shelly Kagan <skagan@redhat.com>

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>
Signed-off-by: Shelly Kagan <skagan@redhat.com>
Co-authored-by: Shelly Kagan <78472213+ShellyKa13@users.noreply.github.com>
  • Loading branch information
akalenyu and ShellyKa13 authored Aug 17, 2022
1 parent b3024f5 commit c8f06b4
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 85 deletions.
1 change: 1 addition & 0 deletions cmd/cdi-controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//vendor/github.com/openshift/api/route/v1:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/go.uber.org/zap/zapcore:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/networking/v1:go_default_library",
"//vendor/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1:go_default_library",
Expand Down
8 changes: 5 additions & 3 deletions cmd/cdi-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
routev1 "github.com/openshift/api/route/v1"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap/zapcore"
networkingv1 "k8s.io/api/networking/v1"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -250,15 +251,16 @@ func start(ctx context.Context, cfg *rest.Config) {
func main() {
defer klog.Flush()
debug := false
if i, err := strconv.Atoi(verbose); err == nil && i > 1 {
verbosityLevel, err := strconv.Atoi(verbose)
if err == nil && verbosityLevel > 1 {
debug = true
}
err := envconfig.Process("", &controllerEnvs)
err = envconfig.Process("", &controllerEnvs)
if err != nil {
klog.Fatalf("Unable to get environment variables: %v\n", errors.WithStack(err))
}

logf.SetLogger(zap.New(zap.UseDevMode(debug)))
logf.SetLogger(zap.New(zap.Level(zapcore.Level(-1*verbosityLevel)), zap.UseDevMode(debug)))
logf.Log.WithName("main").Info("Verbosity level", "verbose", verbose, "debug", debug)

cfg, err := clientcmd.BuildConfigFromFlags(kubeURL, kubeconfig)
Expand Down
1 change: 1 addition & 0 deletions cmd/cdi-operator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//vendor/github.com/openshift/api/config/v1:go_default_library",
"//vendor/github.com/openshift/api/route/v1:go_default_library",
"//vendor/github.com/openshift/api/security/v1:go_default_library",
"//vendor/go.uber.org/zap/zapcore:go_default_library",
"//vendor/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/client/config:go_default_library",
Expand Down
19 changes: 18 additions & 1 deletion cmd/cdi-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"fmt"
"os"
"runtime"
"strconv"

promv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1"
ocpconfigv1 "github.com/openshift/api/config/v1"
routev1 "github.com/openshift/api/route/v1"
secv1 "github.com/openshift/api/security/v1"
"go.uber.org/zap/zapcore"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand All @@ -49,11 +51,26 @@ func printVersion() {
func main() {
flag.Parse()

defVerbose := fmt.Sprintf("%d", 1) // note flag values are strings
verbose := defVerbose
// visit actual flags passed in and if passed check -v and set verbose
if verboseEnvVarVal := os.Getenv("VERBOSITY"); verboseEnvVarVal != "" {
verbose = verboseEnvVarVal
}
if verbose == defVerbose {
log.V(1).Info(fmt.Sprintf("Note: increase the -v level in the controller deployment for more detailed logging, eg. -v=%d or -v=%d\n", 2, 3))
}
verbosityLevel, err := strconv.Atoi(verbose)
debug := false
if err == nil && verbosityLevel > 1 {
debug = true
}

// The logger instantiated here can be changed to any logger
// implementing the logr.Logger interface. This logger will
// be propagated through the whole operator, generating
// uniform and structured logs.
logf.SetLogger(zap.New(zap.UseDevMode(false)))
logf.SetLogger(zap.New(zap.Level(zapcore.Level(-1*verbosityLevel)), zap.UseDevMode(debug)))

printVersion()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/rs/cors v1.7.0
github.com/ulikunitz/xz v0.5.10
github.com/vmware/govmomi v0.23.1
go.uber.org/zap v1.19.1
golang.org/x/sys v0.0.0-20220209214540-3681064d5158
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/square/go-jose.v2 v2.5.1
Expand Down Expand Up @@ -118,7 +119,6 @@ require (
go.etcd.io/bbolt v1.3.6 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.19.1 // indirect
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
Expand Down
182 changes: 102 additions & 80 deletions pkg/controller/datavolume-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ const (
// CloneWithoutSource reports that the source PVC of a clone doesn't exists (reason)
CloneWithoutSource = "CloneWithoutSource"
// MessageCloneWithoutSource reports that the source PVC of a clone doesn't exists (message)
MessageCloneWithoutSource = "The source PVC doesn't exist"
MessageCloneWithoutSource = "The source PVC %s doesn't exist"

// AnnCSICloneRequest annotation associates object with CSI Clone Request
AnnCSICloneRequest = "cdi.kubevirt.io/CSICloneRequest"
Expand Down Expand Up @@ -572,7 +572,6 @@ func (r *DatavolumeReconciler) Reconcile(_ context.Context, req reconcile.Reques
} else if err != nil {
return reconcile.Result{}, err
}

} else {
res, err := r.garbageCollect(datavolume, pvc, log)
if err != nil {
Expand Down Expand Up @@ -664,19 +663,23 @@ func (r *DatavolumeReconciler) reconcileClone(log logr.Logger,
prePopulated bool,
pvcPopulated bool) (reconcile.Result, error) {

// Get the most appropiate clone strategy
selectedCloneStrategy, err := r.selectCloneStrategy(datavolume, pvcSpec)
if err != nil {
return reconcile.Result{}, err
}

if pvcPopulated || prePopulated {
return r.reconcileDataVolumeStatus(datavolume, pvc, selectedCloneStrategy)
}

// Check if source PVC exists and do proper validation before attempting to clone
if done, err := r.validateCloneAndSourcePVC(datavolume); err != nil {
return reconcile.Result{}, err
} else if !done {
return reconcile.Result{}, nil
}

// Get the most appropiate clone strategy
selectedCloneStrategy, err := r.selectCloneStrategy(datavolume, pvcSpec)
if err != nil {
return reconcile.Result{}, err
}

if selectedCloneStrategy == SmartClone {
r.sccs.StartController()
}
Expand All @@ -699,75 +702,79 @@ func (r *DatavolumeReconciler) reconcileClone(log logr.Logger,
}
}

if !prePopulated {
if pvc == nil {
if selectedCloneStrategy == SmartClone {
snapshotClassName, _ := r.getSnapshotClassForSmartClone(datavolume, pvcSpec)
return r.reconcileSmartClonePvc(log, datavolume, pvcSpec, transferName, snapshotClassName)
}
if selectedCloneStrategy == CsiClone {
csiDriverAvailable, err := r.storageClassCSIDriverExists(pvcSpec.StorageClassName)
if err != nil && !k8serrors.IsNotFound(err) {
return reconcile.Result{}, err
}
if !csiDriverAvailable {
// err csi clone not possible
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.CloneScheduled, datavolume, pvc, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrUnableToClone,
message: fmt.Sprintf("CSI Clone configured, but no CSIDriver available for %s", *pvcSpec.StorageClassName),
})
}

return r.reconcileCsiClonePvc(log, datavolume, pvcSpec, transferName)
if pvc == nil {
if selectedCloneStrategy == SmartClone {
snapshotClassName, _ := r.getSnapshotClassForSmartClone(datavolume, pvcSpec)
return r.reconcileSmartClonePvc(log, datavolume, pvcSpec, transferName, snapshotClassName)
}
if selectedCloneStrategy == CsiClone {
csiDriverAvailable, err := r.storageClassCSIDriverExists(pvcSpec.StorageClassName)
if err != nil && !k8serrors.IsNotFound(err) {
return reconcile.Result{}, err
}

newPvc, err := r.createPvcForDatavolume(log, datavolume, pvcSpec)
if err != nil {
if errQuotaExceeded(err) {
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Pending, datavolume, nil, selectedCloneStrategy,
if !csiDriverAvailable {
// err csi clone not possible
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.CloneScheduled, datavolume, pvc, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrExceededQuota,
message: err.Error(),
reason: ErrUnableToClone,
message: fmt.Sprintf("CSI Clone configured, but no CSIDriver available for %s", *pvcSpec.StorageClassName),
})
}
return reconcile.Result{}, err
}
pvc = newPvc

return r.reconcileCsiClonePvc(log, datavolume, pvcSpec, transferName)
}

switch selectedCloneStrategy {
case HostAssistedClone:
if !pvcPopulated {
if err := r.ensureExtendedToken(pvc); err != nil {
return reconcile.Result{}, err
}
newPvc, err := r.createPvcForDatavolume(log, datavolume, pvcSpec)
if err != nil {
if errQuotaExceeded(err) {
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Pending, datavolume, nil, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrExceededQuota,
message: err.Error(),
})
}
case CsiClone:
switch pvc.Status.Phase {
case corev1.ClaimBound:
if err := r.setCloneOfOnPvc(pvc); err != nil {
return reconcile.Result{}, err
}
case corev1.ClaimPending:
return reconcile.Result{}, r.updateCloneStatusPhase(cdiv1.CSICloneInProgress, datavolume, pvc, selectedCloneStrategy)
case corev1.ClaimLost:
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Failed, datavolume, pvc, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrClaimLost,
message: fmt.Sprintf(MessageErrClaimLost, pvc.Name),
})
return reconcile.Result{}, err
}
pvc = newPvc
}

shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
if err != nil {
return reconcile.Result{}, err
}

switch selectedCloneStrategy {
case HostAssistedClone:
if err := r.ensureExtendedToken(pvc); err != nil {
return reconcile.Result{}, err
}
case CsiClone:
switch pvc.Status.Phase {
case corev1.ClaimBound:
if err := r.setCloneOfOnPvc(pvc); err != nil {
return reconcile.Result{}, err
}
fallthrough
case SmartClone:
if !pvcPopulated {
return r.finishClone(log, datavolume, pvc, pvcSpec, transferName, selectedCloneStrategy)
case corev1.ClaimPending:
r.log.V(3).Info("ClaimPending CSIClone")
if !shouldBeMarkedWaitForFirstConsumer {
return reconcile.Result{}, r.updateCloneStatusPhase(cdiv1.CSICloneInProgress, datavolume, pvc, selectedCloneStrategy)
}
case corev1.ClaimLost:
return reconcile.Result{},
r.updateDataVolumeStatusPhaseWithEvent(cdiv1.Failed, datavolume, pvc, selectedCloneStrategy,
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: ErrClaimLost,
message: fmt.Sprintf(MessageErrClaimLost, pvc.Name),
})
}
fallthrough
case SmartClone:
if !shouldBeMarkedWaitForFirstConsumer {
return r.finishClone(log, datavolume, pvc, pvcSpec, transferName, selectedCloneStrategy)
}
}

Expand Down Expand Up @@ -814,6 +821,9 @@ func (r *DatavolumeReconciler) ensureExtendedToken(pvc *corev1.PersistentVolumeC
func (r *DatavolumeReconciler) selectCloneStrategy(datavolume *cdiv1.DataVolume, pvcSpec *corev1.PersistentVolumeClaimSpec) (cloneStrategy, error) {
preferredCloneStrategy, err := r.getCloneStrategy(datavolume)
if err != nil {
if k8serrors.IsNotFound(err) {
return NoClone, nil
}
return NoClone, err
}

Expand Down Expand Up @@ -879,6 +889,7 @@ func (r *DatavolumeReconciler) reconcileCsiClonePvc(log logr.Logger,
pvcSpec *corev1.PersistentVolumeClaimSpec,
transferName string) (reconcile.Result, error) {

log = log.WithName("reconcileCsiClonePvc")
pvcName := datavolume.Name

if isCrossNamespaceClone(datavolume) {
Expand All @@ -898,7 +909,7 @@ func (r *DatavolumeReconciler) reconcileCsiClonePvc(log logr.Logger,
if sourcePvcNs == "" {
sourcePvcNs = datavolume.Namespace
}
r.log.V(3).Info("CSI-Clone is available")
log.V(3).Info("CSI-Clone is available")

// Get source pvc
sourcePvc := &corev1.PersistentVolumeClaim{}
Expand Down Expand Up @@ -1909,9 +1920,6 @@ func (r *DatavolumeReconciler) getCloneStrategy(dataVolume *cdiv1.DataVolume) (*
defaultCloneStrategy := cdiv1.CloneStrategySnapshot
sourcePvc, err := r.findSourcePvc(dataVolume)
if err != nil {
if k8serrors.IsNotFound(err) {
r.recorder.Eventf(dataVolume, corev1.EventTypeWarning, ErrUnableToClone, "Source pvc %s not found", dataVolume.Spec.Source.PVC.Name)
}
return nil, err
}
storageClass, err := GetStorageClassByName(r.client, sourcePvc.Spec.StorageClassName)
Expand Down Expand Up @@ -2217,14 +2225,11 @@ func (r *DatavolumeReconciler) updateUploadStatusPhase(pvc *corev1.PersistentVol
func (r *DatavolumeReconciler) reconcileDataVolumeStatus(dataVolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim, selectedCloneStrategy cloneStrategy) (reconcile.Result, error) {
dataVolumeCopy := dataVolume.DeepCopy()
var event DataVolumeEvent
var err error
result := reconcile.Result{}

curPhase := dataVolumeCopy.Status.Phase
if pvc != nil {
storageClassBindingMode, err := r.getStorageClassBindingMode(pvc.Spec.StorageClassName)
if err != nil {
return reconcile.Result{}, err
}
dataVolumeCopy.Status.ClaimName = pvc.Name

// the following check is for a case where the request is to create a blank disk for a block device.
Expand Down Expand Up @@ -2276,19 +2281,17 @@ func (r *DatavolumeReconciler) reconcileDataVolumeStatus(dataVolume *cdiv1.DataV
}
} else {
dataVolumeCopy.Status.Phase = cdiv1.Succeeded

}
r.updateImportStatusPhase(pvc, dataVolumeCopy, &event)
}
} else {
switch pvc.Status.Phase {
case corev1.ClaimPending:
honorWaitForFirstConsumerEnabled, err := r.featureGates.HonorWaitForFirstConsumerEnabled()
shouldBeMarkedWaitForFirstConsumer, err := r.shouldBeMarkedWaitForFirstConsumer(pvc)
if err != nil {
return reconcile.Result{}, err
}
if honorWaitForFirstConsumerEnabled &&
*storageClassBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
if shouldBeMarkedWaitForFirstConsumer {
dataVolumeCopy.Status.Phase = cdiv1.WaitForFirstConsumer
} else {
dataVolumeCopy.Status.Phase = cdiv1.Pending
Expand Down Expand Up @@ -2846,7 +2849,7 @@ func (r *DatavolumeReconciler) validateCloneAndSourcePVC(datavolume *cdiv1.DataV
DataVolumeEvent{
eventType: corev1.EventTypeWarning,
reason: CloneWithoutSource,
message: MessageCloneWithoutSource,
message: fmt.Sprintf(MessageCloneWithoutSource, datavolume.Spec.Source.PVC.Name),
})
return false, nil
}
Expand Down Expand Up @@ -2895,6 +2898,25 @@ func (r *DatavolumeReconciler) isSourceReadyToClone(
return true, nil
}

// shouldBeMarkedWaitForFirstConsumer decided whether we should mark DV as WFFC
func (r *DatavolumeReconciler) shouldBeMarkedWaitForFirstConsumer(pvc *corev1.PersistentVolumeClaim) (bool, error) {
storageClassBindingMode, err := r.getStorageClassBindingMode(pvc.Spec.StorageClassName)
if err != nil {
return false, err
}

honorWaitForFirstConsumerEnabled, err := r.featureGates.HonorWaitForFirstConsumerEnabled()
if err != nil {
return false, err
}

res := honorWaitForFirstConsumerEnabled &&
storageClassBindingMode != nil && *storageClassBindingMode == storagev1.VolumeBindingWaitForFirstConsumer &&
pvc.Status.Phase == corev1.ClaimPending

return res, nil
}

// detectCloneSize obtains and assigns the original PVC's size when cloning using an empty storage value
func (r *DatavolumeReconciler) detectCloneSize(
dv *cdiv1.DataVolume,
Expand Down
Loading

0 comments on commit c8f06b4

Please sign in to comment.