-
Notifications
You must be signed in to change notification settings - Fork 267
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
The import populator is a controller that handles the import of data in PVCs without the need of DataVolumes while still taking advantage of the import-controller flow. This controller creates an additional PVC' with import annotations. After the import process succeeds, the controller rebinds the PV to the original target PVc and deletes the PVC prime. Signed-off-by: Alvaro Romero <alromero@redhat.com>
- Loading branch information
Showing
4 changed files
with
294 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,286 @@ | ||
package populators | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net/http" | ||
"regexp" | ||
"strconv" | ||
"time" | ||
|
||
"github.com/go-logr/logr" | ||
"github.com/pkg/errors" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
k8serrors "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/types" | ||
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" | ||
cc "kubevirt.io/containerized-data-importer/pkg/controller/common" | ||
featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/controller" | ||
"sigs.k8s.io/controller-runtime/pkg/manager" | ||
"sigs.k8s.io/controller-runtime/pkg/reconcile" | ||
) | ||
|
||
const ( | ||
importPopulatorName = "import-populator" | ||
|
||
// importFailed provides a const to indicate import has failed | ||
importFailed = "importFailed" | ||
// importSucceeded provides a const to indicate import has succeeded | ||
importSucceeded = "importSucceeded" | ||
|
||
// messageImportFailed provides a const to form import has failed message | ||
messageImportFailed = "import into %s failed" | ||
// messageImportSucceeded provides a const to form import has succeeded message | ||
messageImportSucceeded = "Successfully imported into %s" | ||
) | ||
|
||
// ImportPopulatorReconciler members | ||
type ImportPopulatorReconciler struct { | ||
ReconcilerBase | ||
} | ||
|
||
// http client to get metrics | ||
var httpClient *http.Client | ||
|
||
// NewImportPopulator creates a new instance of the import-populator controller | ||
func NewImportPopulator( | ||
ctx context.Context, | ||
mgr manager.Manager, | ||
log logr.Logger, | ||
installerLabels map[string]string, | ||
) (controller.Controller, error) { | ||
client := mgr.GetClient() | ||
reconciler := &ImportPopulatorReconciler{ | ||
ReconcilerBase: ReconcilerBase{ | ||
client: client, | ||
scheme: mgr.GetScheme(), | ||
log: log.WithName(importPopulatorName), | ||
recorder: mgr.GetEventRecorderFor(importPopulatorName), | ||
featureGates: featuregates.NewFeatureGates(client), | ||
sourceKind: cdiv1.ImportSourceRef, | ||
installerLabels: installerLabels, | ||
}, | ||
} | ||
|
||
importPopulator, err := controller.New(importPopulatorName, mgr, controller.Options{ | ||
Reconciler: reconciler, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if err := addCommonPopulatorsWatches(mgr, importPopulator, log, cdiv1.ImportSourceRef, &cdiv1.ImportSource{}); err != nil { | ||
return nil, err | ||
} | ||
|
||
return importPopulator, nil | ||
} | ||
|
||
// Reconcile the reconcile loop for the PVC with DataSourceRef of ImportSource kind | ||
func (r *ImportPopulatorReconciler) Reconcile(_ context.Context, req reconcile.Request) (reconcile.Result, error) { | ||
log := r.log.WithValues("PVC", req.NamespacedName) | ||
log.V(1).Info("reconciling Import Source PVCs") | ||
return r.reconcile(req, r, log) | ||
} | ||
|
||
// Implementations of populatorController methods | ||
|
||
// Import-specific implementation of getPopulationSource | ||
func (r *ImportPopulatorReconciler) getPopulationSource(namespace, name string) (client.Object, error) { | ||
importSource := &cdiv1.ImportSource{} | ||
importSourceKey := types.NamespacedName{Namespace: namespace, Name: name} | ||
if err := r.client.Get(context.TODO(), importSourceKey, importSource); err != nil { | ||
// reconcile will be triggered once created | ||
if k8serrors.IsNotFound(err) { | ||
return nil, nil | ||
} | ||
return nil, err | ||
} | ||
return importSource, nil | ||
} | ||
|
||
// Import-specific implementation of reconcileTargetPVC | ||
func (r *ImportPopulatorReconciler) reconcileTargetPVC(pvc, pvcPrime *corev1.PersistentVolumeClaim) (reconcile.Result, error) { | ||
phase, _ := pvcPrime.Annotations[cc.AnnPodPhase] | ||
if err := r.updateImportProgress(phase, pvc, pvcPrime); err != nil { | ||
return reconcile.Result{}, err | ||
} | ||
|
||
switch phase { | ||
case string(corev1.PodRunning): | ||
// We requeue to keep reporting progress | ||
return reconcile.Result{RequeueAfter: 2 * time.Second}, nil | ||
case string(corev1.PodFailed): | ||
// We'll get called later once it succeeds | ||
r.recorder.Eventf(pvcPrime, corev1.EventTypeWarning, importFailed, messageImportFailed, pvc.Name) | ||
case string(corev1.PodSucceeded): | ||
// Once the import is succeeded, we rebind the PV from PVC' to target PVC | ||
if err := r.rebindPV(pvc, pvcPrime); err != nil { | ||
return reconcile.Result{}, err | ||
} | ||
r.recorder.Eventf(pvc, corev1.EventTypeNormal, importSucceeded, messageImportSucceeded, pvc.Name) | ||
} | ||
|
||
return reconcile.Result{}, nil | ||
} | ||
|
||
// Import-specific implementation of updateAnnotations | ||
func (r *ImportPopulatorReconciler) updateAnnotations(pvc *corev1.PersistentVolumeClaim, source client.Object) { | ||
importSource := source.(*cdiv1.ImportSource) | ||
annotations := pvc.Annotations | ||
|
||
annotations[cc.AnnContentType] = cc.GetContentType(string(importSource.Spec.ContentType)) | ||
|
||
// TODO: Avoid reusing that much code. Non trivial because DataVolumes and ImportSources store these fields in diferent ways. | ||
// TODO: Should implement some kind of webhook to avoid allowing different sources. | ||
if importSource.Spec.HTTP != nil { | ||
annotations[cc.AnnEndpoint] = importSource.Spec.HTTP.URL | ||
annotations[cc.AnnSource] = cc.SourceHTTP | ||
|
||
if importSource.Spec.HTTP.SecretRef != "" { | ||
annotations[cc.AnnSecret] = importSource.Spec.HTTP.SecretRef | ||
} | ||
if importSource.Spec.HTTP.CertConfigMap != "" { | ||
annotations[cc.AnnCertConfigMap] = importSource.Spec.HTTP.CertConfigMap | ||
} | ||
for index, header := range importSource.Spec.HTTP.ExtraHeaders { | ||
annotations[fmt.Sprintf("%s.%d", cc.AnnExtraHeaders, index)] = header | ||
} | ||
for index, header := range importSource.Spec.HTTP.SecretExtraHeaders { | ||
annotations[fmt.Sprintf("%s.%d", cc.AnnSecretExtraHeaders, index)] = header | ||
} | ||
return | ||
} | ||
if importSource.Spec.S3 != nil { | ||
annotations[cc.AnnEndpoint] = importSource.Spec.S3.URL | ||
annotations[cc.AnnSource] = cc.SourceS3 | ||
if importSource.Spec.S3.SecretRef != "" { | ||
annotations[cc.AnnSecret] = importSource.Spec.S3.SecretRef | ||
} | ||
if importSource.Spec.S3.CertConfigMap != "" { | ||
annotations[cc.AnnCertConfigMap] = importSource.Spec.S3.CertConfigMap | ||
} | ||
return | ||
} | ||
if importSource.Spec.Registry != nil { | ||
annotations[cc.AnnSource] = cc.SourceRegistry | ||
pullMethod := importSource.Spec.Registry.PullMethod | ||
if pullMethod != nil && *pullMethod != "" { | ||
annotations[cc.AnnRegistryImportMethod] = string(*pullMethod) | ||
} | ||
url := importSource.Spec.Registry.URL | ||
if url != nil && *url != "" { | ||
annotations[cc.AnnEndpoint] = *url | ||
} else { | ||
imageStream := importSource.Spec.Registry.ImageStream | ||
if imageStream != nil && *imageStream != "" { | ||
annotations[cc.AnnEndpoint] = *imageStream | ||
annotations[cc.AnnRegistryImageStream] = "true" | ||
} | ||
} | ||
secretRef := importSource.Spec.Registry.SecretRef | ||
if secretRef != nil && *secretRef != "" { | ||
annotations[cc.AnnSecret] = *secretRef | ||
} | ||
certConfigMap := importSource.Spec.Registry.CertConfigMap | ||
if certConfigMap != nil && *certConfigMap != "" { | ||
annotations[cc.AnnCertConfigMap] = *certConfigMap | ||
} | ||
return | ||
} | ||
if importSource.Spec.Blank != nil { | ||
annotations[cc.AnnSource] = cc.SourceNone | ||
return | ||
} | ||
if importSource.Spec.Imageio != nil { | ||
annotations[cc.AnnEndpoint] = importSource.Spec.Imageio.URL | ||
annotations[cc.AnnSource] = cc.SourceImageio | ||
annotations[cc.AnnSecret] = importSource.Spec.Imageio.SecretRef | ||
annotations[cc.AnnCertConfigMap] = importSource.Spec.Imageio.CertConfigMap | ||
annotations[cc.AnnDiskID] = importSource.Spec.Imageio.DiskID | ||
return | ||
} | ||
if importSource.Spec.VDDK != nil { | ||
annotations[cc.AnnEndpoint] = importSource.Spec.VDDK.URL | ||
annotations[cc.AnnSource] = cc.SourceVDDK | ||
annotations[cc.AnnSecret] = importSource.Spec.VDDK.SecretRef | ||
annotations[cc.AnnBackingFile] = importSource.Spec.VDDK.BackingFile | ||
annotations[cc.AnnUUID] = importSource.Spec.VDDK.UUID | ||
annotations[cc.AnnThumbprint] = importSource.Spec.VDDK.Thumbprint | ||
if importSource.Spec.VDDK.InitImageURL != "" { | ||
annotations[cc.AnnVddkInitImageURL] = importSource.Spec.VDDK.InitImageURL | ||
} | ||
return | ||
} | ||
} | ||
|
||
// Progress reporting | ||
|
||
func (r *ImportPopulatorReconciler) updateImportProgress(podPhase string, pvc, pvcPrime *corev1.PersistentVolumeClaim) error { | ||
if pvc.Annotations == nil { | ||
pvc.Annotations = make(map[string]string) | ||
} | ||
// Just add 100.0% if pod is succeeded | ||
if podPhase == string(corev1.PodSucceeded) { | ||
pvc.Annotations[cc.AnnImportProgressReporting] = "100.0%" | ||
if err := r.client.Update(context.TODO(), pvc); err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
importPod, err := r.getImportPod(pvc) | ||
if err != nil { | ||
return err | ||
} | ||
// This will only work when the import pod is running | ||
if importPod != nil && importPod.Status.Phase != corev1.PodRunning { | ||
return nil | ||
} | ||
url, err := cc.GetMetricsURL(importPod) | ||
if err != nil { | ||
return err | ||
} | ||
if url == "" { | ||
return nil | ||
} | ||
// We fetch the import progress from the import pod metrics | ||
importRegExp := regexp.MustCompile("progress\\{ownerUID\\=\"" + string(pvc.UID) + "\"\\} (\\d{1,3}\\.?\\d*)") | ||
httpClient := cc.BuildHTTPClient(httpClient) | ||
progressReport, err := cc.GetProgressReportFromURL(url, importRegExp, httpClient) | ||
if err != nil { | ||
return err | ||
} | ||
if progressReport != "" { | ||
if f, err := strconv.ParseFloat(progressReport, 64); err == nil { | ||
pvc.Annotations[cc.AnnImportProgressReporting] = fmt.Sprintf("%.2f%%", f) | ||
if err := r.client.Update(context.TODO(), pvc); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (r *ImportPopulatorReconciler) getImportPod(pvc *corev1.PersistentVolumeClaim) (*corev1.Pod, error) { | ||
importPodName, ok := pvc.Annotations[cc.AnnImportPod] | ||
if !ok { | ||
return nil, nil | ||
} | ||
|
||
pod := &corev1.Pod{} | ||
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: importPodName, Namespace: pvc.GetNamespace()}, pod); err != nil { | ||
if !k8serrors.IsNotFound(err) { | ||
return nil, err | ||
} | ||
return nil, nil | ||
} | ||
if !metav1.IsControlledBy(pod, pvc) && !cc.IsImageStream(pvc) { | ||
return nil, errors.Errorf("Pod is not owned by PVC") | ||
} | ||
return pod, nil | ||
} |