From 2a5d9f5af3750e0e017a63fd54f80e7d83296068 Mon Sep 17 00:00:00 2001 From: Itamar Marom <46691031+itamar-marom@users.noreply.github.com> Date: Wed, 19 Apr 2023 08:12:17 +0300 Subject: [PATCH] Pass context from Reconcile function to inside functions (#44) Co-authored-by: itamar.marom --- controllers/druid/druid_controller.go | 4 +- controllers/druid/handler.go | 129 +++++++++++++------------- 2 files changed, 66 insertions(+), 67 deletions(-) diff --git a/controllers/druid/druid_controller.go b/controllers/druid/druid_controller.go index d52f66f4..6998090b 100644 --- a/controllers/druid/druid_controller.go +++ b/controllers/druid/druid_controller.go @@ -55,8 +55,6 @@ func NewDruidReconciler(mgr ctrl.Manager) *DruidReconciler { func (r *DruidReconciler) Reconcile(ctx context.Context, request reconcile.Request) (ctrl.Result, error) { _ = r.Log.WithValues("druid", request.NamespacedName) - // your logic here - // Fetch the Druid instance instance := &druidv1alpha1.Druid{} err := r.Get(ctx, request.NamespacedName, instance) @@ -74,7 +72,7 @@ func (r *DruidReconciler) Reconcile(ctx context.Context, request reconcile.Reque // Intialize Emit Events var emitEvent EventEmitter = EmitEventFuncs{r.Recorder} - if err := deployDruidCluster(r.Client, instance, emitEvent); err != nil { + if err := deployDruidCluster(ctx, r.Client, instance, emitEvent); err != nil { return ctrl.Result{}, err } else { return ctrl.Result{RequeueAfter: r.ReconcileWait}, nil diff --git a/controllers/druid/handler.go b/controllers/druid/handler.go index 158a8771..d0c22400 100644 --- a/controllers/druid/handler.go +++ b/controllers/druid/handler.go @@ -35,7 +35,7 @@ const ( var logger = logf.Log.WithName("druid_operator_handler") -func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) error { +func deployDruidCluster(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) error { if err := verifyDruidSpec(m); err != nil { e := fmt.Errorf("invalid DruidSpec[%s:%s] due to [%s]", m.Kind, m.Name, err.Error()) @@ -70,7 +70,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm return err } - if _, err := sdkCreateOrUpdateAsNeeded(sdk, + if _, err := sdkCreateOrUpdateAsNeeded(ctx, sdk, func() (object, error) { return makeCommonConfigMap(m, ls) }, func() object { return &v1.ConfigMap{} }, alwaysTrueIsEqualsFn, noopUpdaterFn, m, configMapNames, emitEvents); err != nil { @@ -90,13 +90,13 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm if m.Spec.DisablePVCDeletionFinalizer == false { md := m.GetDeletionTimestamp() != nil if md { - return executeFinalizers(sdk, m, emitEvents) + return executeFinalizers(ctx, sdk, m, emitEvents) } /* If finalizer isn't present add it to object meta. In case cr is already deleted do not call this function */ - cr := checkIfCRExists(sdk, m, emitEvents) + cr := checkIfCRExists(ctx, sdk, m, emitEvents) if cr { if !ContainsString(m.ObjectMeta.Finalizers, finalizerName) { m.SetFinalizers(append(m.GetFinalizers(), finalizerName)) @@ -129,7 +129,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm return err } - if _, err := sdkCreateOrUpdateAsNeeded(sdk, + if _, err := sdkCreateOrUpdateAsNeeded(ctx, sdk, func() (object, error) { return nodeConfig, nil }, func() object { return &v1.ConfigMap{} }, alwaysTrueIsEqualsFn, noopUpdaterFn, m, configMapNames, emitEvents); err != nil { @@ -140,7 +140,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm firstServiceName := "" services := firstNonNilValue(nodeSpec.Services, m.Spec.Services).([]v1.Service) for _, svc := range services { - if _, err := sdkCreateOrUpdateAsNeeded(sdk, + if _, err := sdkCreateOrUpdateAsNeeded(ctx, sdk, func() (object, error) { return makeService(&svc, &nodeSpec, m, lm, nodeSpecUniqueStr) }, func() object { return &v1.Service{} }, alwaysTrueIsEqualsFn, func(prev, curr object) { (curr.(*v1.Service)).Spec.ClusterIP = (prev.(*v1.Service)).Spec.ClusterIP }, @@ -155,7 +155,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm nodeSpec.Ports = append(nodeSpec.Ports, v1.ContainerPort{ContainerPort: nodeSpec.DruidPort, Name: "druid-port"}) if nodeSpec.Kind == "Deployment" { - if deployCreateUpdateStatus, err := sdkCreateOrUpdateAsNeeded(sdk, + if deployCreateUpdateStatus, err := sdkCreateOrUpdateAsNeeded(ctx, sdk, func() (object, error) { return makeDeployment(&nodeSpec, m, lm, nodeSpecUniqueStr, fmt.Sprintf("%s-%s", commonConfigSHA, nodeConfigSHA), firstServiceName) }, @@ -173,7 +173,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm // will be sequential. if m.Generation > 1 { // Check Deployment rolling update status, if in-progress then stop here - done, err := isObjFullyDeployed(sdk, nodeSpec, nodeSpecUniqueStr, m, func() object { return &appsv1.Deployment{} }, emitEvents) + done, err := isObjFullyDeployed(ctx, sdk, nodeSpec, nodeSpecUniqueStr, m, func() object { return &appsv1.Deployment{} }, emitEvents) if !done { return err } @@ -185,8 +185,8 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm // Ignore for the first iteration ie cluster creation, else get sts shall unnecessary log errors. if m.Generation > 1 && m.Spec.ScalePvcSts { - if isVolumeExpansionEnabled(sdk, m, &nodeSpec, emitEvents) { - err := scalePVCForSts(sdk, &nodeSpec, nodeSpecUniqueStr, m, emitEvents) + if isVolumeExpansionEnabled(ctx, sdk, m, &nodeSpec, emitEvents) { + err := scalePVCForSts(ctx, sdk, &nodeSpec, nodeSpecUniqueStr, m, emitEvents) if err != nil { return err } @@ -194,7 +194,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm } // Create/Update StatefulSet - if stsCreateUpdateStatus, err := sdkCreateOrUpdateAsNeeded(sdk, + if stsCreateUpdateStatus, err := sdkCreateOrUpdateAsNeeded(ctx, sdk, func() (object, error) { return makeStatefulSet(&nodeSpec, m, lm, nodeSpecUniqueStr, fmt.Sprintf("%s-%s", commonConfigSHA, nodeConfigSHA), firstServiceName) }, @@ -209,14 +209,14 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm } // Default is set to true - execCheckCrashStatus(sdk, &nodeSpec, m, emitEvents) + execCheckCrashStatus(ctx, sdk, &nodeSpec, m, emitEvents) // Ignore isObjFullyDeployed() for the first iteration ie cluster creation // will force cluster creation in parallel, post first iteration rolling updates // will be sequential. if m.Generation > 1 { //Check StatefulSet rolling update status, if in-progress then stop here - done, err := isObjFullyDeployed(sdk, nodeSpec, nodeSpecUniqueStr, m, func() object { return &appsv1.StatefulSet{} }, emitEvents) + done, err := isObjFullyDeployed(ctx, sdk, nodeSpec, nodeSpecUniqueStr, m, func() object { return &appsv1.StatefulSet{} }, emitEvents) if !done { return err } @@ -224,12 +224,12 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm } // Default is set to true - execCheckCrashStatus(sdk, &nodeSpec, m, emitEvents) + execCheckCrashStatus(ctx, sdk, &nodeSpec, m, emitEvents) } // Create Ingress Spec if nodeSpec.Ingress != nil { - if _, err := sdkCreateOrUpdateAsNeeded(sdk, + if _, err := sdkCreateOrUpdateAsNeeded(ctx, sdk, func() (object, error) { return makeIngress(&nodeSpec, m, ls, nodeSpecUniqueStr) }, @@ -241,7 +241,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm // Create PodDisruptionBudget if nodeSpec.PodDisruptionBudgetSpec != nil { - if _, err := sdkCreateOrUpdateAsNeeded(sdk, + if _, err := sdkCreateOrUpdateAsNeeded(ctx, sdk, func() (object, error) { return makePodDisruptionBudget(&nodeSpec, m, lm, nodeSpecUniqueStr) }, func() object { return &policyv1.PodDisruptionBudget{} }, alwaysTrueIsEqualsFn, noopUpdaterFn, m, podDisruptionBudgetNames, emitEvents); err != nil { @@ -251,7 +251,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm // Create HPA Spec if nodeSpec.HPAutoScaler != nil { - if _, err := sdkCreateOrUpdateAsNeeded(sdk, + if _, err := sdkCreateOrUpdateAsNeeded(ctx, sdk, func() (object, error) { return makeHorizontalPodAutoscaler(&nodeSpec, m, ls, nodeSpecUniqueStr) }, @@ -263,7 +263,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm if nodeSpec.PersistentVolumeClaim != nil { for _, pvc := range nodeSpec.PersistentVolumeClaim { - if _, err := sdkCreateOrUpdateAsNeeded(sdk, + if _, err := sdkCreateOrUpdateAsNeeded(ctx, sdk, func() (object, error) { return makePersistentVolumeClaim(&pvc, &nodeSpec, m, lm, nodeSpecUniqueStr) }, func() object { return &v1.PersistentVolumeClaim{} }, alwaysTrueIsEqualsFn, noopUpdaterFn, @@ -276,7 +276,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm // Ignore on cluster creation if m.Generation > 1 && m.Spec.DeleteOrphanPvc { - if err := deleteOrphanPVC(sdk, m, emitEvents); err != nil { + if err := deleteOrphanPVC(ctx, sdk, m, emitEvents); err != nil { return err } } @@ -284,7 +284,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm //update status and delete unwanted resources updatedStatus := v1alpha1.DruidClusterStatus{} - updatedStatus.StatefulSets = deleteUnusedResources(sdk, m, statefulSetNames, ls, + updatedStatus.StatefulSets = deleteUnusedResources(ctx, sdk, m, statefulSetNames, ls, func() objectList { return &appsv1.StatefulSetList{} }, func(listObj runtime.Object) []object { items := listObj.(*appsv1.StatefulSetList).Items @@ -296,7 +296,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm }, emitEvents) sort.Strings(updatedStatus.StatefulSets) - updatedStatus.Deployments = deleteUnusedResources(sdk, m, deploymentNames, ls, + updatedStatus.Deployments = deleteUnusedResources(ctx, sdk, m, deploymentNames, ls, func() objectList { return &appsv1.DeploymentList{} }, func(listObj runtime.Object) []object { items := listObj.(*appsv1.DeploymentList).Items @@ -308,7 +308,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm }, emitEvents) sort.Strings(updatedStatus.Deployments) - updatedStatus.HPAutoScalers = deleteUnusedResources(sdk, m, hpaNames, ls, + updatedStatus.HPAutoScalers = deleteUnusedResources(ctx, sdk, m, hpaNames, ls, func() objectList { return &autoscalev2.HorizontalPodAutoscalerList{} }, func(listObj runtime.Object) []object { items := listObj.(*autoscalev2.HorizontalPodAutoscalerList).Items @@ -320,7 +320,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm }, emitEvents) sort.Strings(updatedStatus.HPAutoScalers) - updatedStatus.Ingress = deleteUnusedResources(sdk, m, ingressNames, ls, + updatedStatus.Ingress = deleteUnusedResources(ctx, sdk, m, ingressNames, ls, func() objectList { return &networkingv1.IngressList{} }, func(listObj runtime.Object) []object { items := listObj.(*networkingv1.IngressList).Items @@ -332,7 +332,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm }, emitEvents) sort.Strings(updatedStatus.Ingress) - updatedStatus.PodDisruptionBudgets = deleteUnusedResources(sdk, m, podDisruptionBudgetNames, ls, + updatedStatus.PodDisruptionBudgets = deleteUnusedResources(ctx, sdk, m, podDisruptionBudgetNames, ls, func() objectList { return &policyv1.PodDisruptionBudgetList{} }, func(listObj runtime.Object) []object { items := listObj.(*policyv1.PodDisruptionBudgetList).Items @@ -344,7 +344,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm }, emitEvents) sort.Strings(updatedStatus.PodDisruptionBudgets) - updatedStatus.Services = deleteUnusedResources(sdk, m, serviceNames, ls, + updatedStatus.Services = deleteUnusedResources(ctx, sdk, m, serviceNames, ls, func() objectList { return &v1.ServiceList{} }, func(listObj runtime.Object) []object { items := listObj.(*v1.ServiceList).Items @@ -356,7 +356,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm }, emitEvents) sort.Strings(updatedStatus.Services) - updatedStatus.ConfigMaps = deleteUnusedResources(sdk, m, configMapNames, ls, + updatedStatus.ConfigMaps = deleteUnusedResources(ctx, sdk, m, configMapNames, ls, func() objectList { return &v1.ConfigMapList{} }, func(listObj runtime.Object) []object { items := listObj.(*v1.ConfigMapList).Items @@ -368,7 +368,7 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm }, emitEvents) sort.Strings(updatedStatus.ConfigMaps) - podList, _ := readers.List(context.TODO(), sdk, m, makeLabelsForDruid(m.Name), emitEvents, func() objectList { return &v1.PodList{} }, func(listObj runtime.Object) []object { + podList, _ := readers.List(ctx, sdk, m, makeLabelsForDruid(m.Name), emitEvents, func() objectList { return &v1.PodList{} }, func(listObj runtime.Object) []object { items := listObj.(*v1.PodList).Items result := make([]object, len(items)) for i := 0; i < len(items); i++ { @@ -402,17 +402,17 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm return nil } -func deleteSTSAndPVC(sdk client.Client, drd *v1alpha1.Druid, stsList, pvcList []object, emitEvents EventEmitter) error { +func deleteSTSAndPVC(ctx context.Context, sdk client.Client, drd *v1alpha1.Druid, stsList, pvcList []object, emitEvents EventEmitter) error { for _, sts := range stsList { - err := writers.Delete(context.TODO(), sdk, drd, sts, emitEvents, &client.DeleteAllOfOptions{}) + err := writers.Delete(ctx, sdk, drd, sts, emitEvents, &client.DeleteAllOfOptions{}) if err != nil { return err } } for i := range pvcList { - err := writers.Delete(context.TODO(), sdk, drd, pvcList[i], emitEvents, &client.DeleteAllOfOptions{}) + err := writers.Delete(ctx, sdk, drd, pvcList[i], emitEvents, &client.DeleteAllOfOptions{}) if err != nil { return err } @@ -421,8 +421,8 @@ func deleteSTSAndPVC(sdk client.Client, drd *v1alpha1.Druid, stsList, pvcList [] return nil } -func checkIfCRExists(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) bool { - _, err := readers.Get(context.TODO(), sdk, m.Name, m, func() object { return &v1alpha1.Druid{} }, emitEvents) +func checkIfCRExists(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) bool { + _, err := readers.Get(ctx, sdk, m.Name, m, func() object { return &v1alpha1.Druid{} }, emitEvents) if err != nil { return false } else { @@ -430,9 +430,9 @@ func checkIfCRExists(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitt } } -func deleteOrphanPVC(sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEmitter) error { +func deleteOrphanPVC(ctx context.Context, sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEmitter) error { - podList, err := readers.List(context.TODO(), sdk, drd, makeLabelsForDruid(drd.Name), emitEvents, func() objectList { return &v1.PodList{} }, func(listObj runtime.Object) []object { + podList, err := readers.List(ctx, sdk, drd, makeLabelsForDruid(drd.Name), emitEvents, func() objectList { return &v1.PodList{} }, func(listObj runtime.Object) []object { items := listObj.(*v1.PodList).Items result := make([]object, len(items)) for i := 0; i < len(items); i++ { @@ -448,7 +448,7 @@ func deleteOrphanPVC(sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEmi "druid_cr": drd.Name, } - pvcList, err := readers.List(context.TODO(), sdk, drd, pvcLabels, emitEvents, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object { + pvcList, err := readers.List(ctx, sdk, drd, pvcLabels, emitEvents, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object { items := listObj.(*v1.PersistentVolumeClaimList).Items result := make([]object, len(items)) for i := 0; i < len(items); i++ { @@ -490,7 +490,7 @@ func deleteOrphanPVC(sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEmi for i, pvc := range pvcList { if !ContainsString(mountedPVC, pvc.GetName()) { - err := writers.Delete(context.TODO(), sdk, drd, pvcList[i], emitEvents, &client.DeleteAllOfOptions{}) + err := writers.Delete(ctx, sdk, drd, pvcList[i], emitEvents, &client.DeleteAllOfOptions{}) if err != nil { return err } else { @@ -503,14 +503,14 @@ func deleteOrphanPVC(sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEmi return nil } -func executeFinalizers(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) error { +func executeFinalizers(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) error { if ContainsString(m.ObjectMeta.Finalizers, finalizerName) { pvcLabels := map[string]string{ "druid_cr": m.Name, } - pvcList, err := readers.List(context.TODO(), sdk, m, pvcLabels, emitEvents, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object { + pvcList, err := readers.List(ctx, sdk, m, pvcLabels, emitEvents, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object { items := listObj.(*v1.PersistentVolumeClaimList).Items result := make([]object, len(items)) for i := 0; i < len(items); i++ { @@ -522,7 +522,7 @@ func executeFinalizers(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmi return err } - stsList, err := readers.List(context.TODO(), sdk, m, makeLabelsForDruid(m.Name), emitEvents, func() objectList { return &appsv1.StatefulSetList{} }, func(listObj runtime.Object) []object { + stsList, err := readers.List(ctx, sdk, m, makeLabelsForDruid(m.Name), emitEvents, func() objectList { return &appsv1.StatefulSetList{} }, func(listObj runtime.Object) []object { items := listObj.(*appsv1.StatefulSetList).Items result := make([]object, len(items)) for i := 0; i < len(items); i++ { @@ -537,7 +537,7 @@ func executeFinalizers(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmi msg := fmt.Sprintf("Trigerring finalizer for CR [%s] in namespace [%s]", m.Name, m.Namespace) // sendEvent(sdk, m, v1.EventTypeNormal, DruidFinalizer, msg) logger.Info(msg) - if err := deleteSTSAndPVC(sdk, m, stsList, pvcList, emitEvents); err != nil { + if err := deleteSTSAndPVC(ctx, sdk, m, stsList, pvcList, emitEvents); err != nil { return err } else { msg := fmt.Sprintf("Finalizer success for CR [%s] in namespace [%s]", m.Name, m.Namespace) @@ -548,7 +548,7 @@ func executeFinalizers(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmi // remove our finalizer from the list and update it. m.ObjectMeta.Finalizers = RemoveString(m.ObjectMeta.Finalizers, finalizerName) - _, err = writers.Update(context.TODO(), sdk, m, m, emitEvents) + _, err = writers.Update(ctx, sdk, m, m, emitEvents) if err != nil { return err } @@ -558,19 +558,19 @@ func executeFinalizers(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmi } -func execCheckCrashStatus(sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, m *v1alpha1.Druid, event EventEmitter) { +func execCheckCrashStatus(ctx context.Context, sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, m *v1alpha1.Druid, event EventEmitter) { if m.Spec.ForceDeleteStsPodOnError == false { return } else { if nodeSpec.PodManagementPolicy == "OrderedReady" { - checkCrashStatus(sdk, m, event) + checkCrashStatus(ctx, sdk, m, event) } } } -func checkCrashStatus(sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEmitter) error { +func checkCrashStatus(ctx context.Context, sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEmitter) error { - podList, err := readers.List(context.TODO(), sdk, drd, makeLabelsForDruid(drd.Name), emitEvents, func() objectList { return &v1.PodList{} }, func(listObj runtime.Object) []object { + podList, err := readers.List(ctx, sdk, drd, makeLabelsForDruid(drd.Name), emitEvents, func() objectList { return &v1.PodList{} }, func(listObj runtime.Object) []object { items := listObj.(*v1.PodList).Items result := make([]object, len(items)) for i := 0; i < len(items); i++ { @@ -592,7 +592,7 @@ func checkCrashStatus(sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEm // OR condtion.status is false which evalutes if neither of these conditions are met // 1. ContainersReady 2. PodInitialized 3. PodReady 4. PodScheduled if p.(*v1.Pod).Status.Phase != v1.PodRunning || condition.Status == v1.ConditionFalse { - err := writers.Delete(context.TODO(), sdk, drd, p, emitEvents, &client.DeleteOptions{}) + err := writers.Delete(ctx, sdk, drd, p, emitEvents, &client.DeleteOptions{}) if err != nil { return err } else { @@ -608,7 +608,7 @@ func checkCrashStatus(sdk client.Client, drd *v1alpha1.Druid, emitEvents EventEm return nil } -func deleteUnusedResources(sdk client.Client, drd *v1alpha1.Druid, +func deleteUnusedResources(ctx context.Context, sdk client.Client, drd *v1alpha1.Druid, names map[string]bool, selectorLabels map[string]string, emptyListObjFn func() objectList, itemsExtractorFn func(obj runtime.Object) []object, emitEvents EventEmitter) []string { listOpts := []client.ListOption{ @@ -620,13 +620,13 @@ func deleteUnusedResources(sdk client.Client, drd *v1alpha1.Druid, listObj := emptyListObjFn() - if err := sdk.List(context.TODO(), listObj, listOpts...); err != nil { + if err := sdk.List(ctx, listObj, listOpts...); err != nil { e := fmt.Errorf("failed to list [%s] due to [%s]", listObj.GetObjectKind().GroupVersionKind().Kind, err.Error()) logger.Error(e, e.Error(), "name", drd.Name, "namespace", drd.Namespace) } else { for _, s := range itemsExtractorFn(listObj) { if names[s.GetName()] == false { - err := writers.Delete(context.TODO(), sdk, drd, s, emitEvents, &client.DeleteOptions{}) + err := writers.Delete(ctx, sdk, drd, s, emitEvents, &client.DeleteOptions{}) if err != nil { survivorNames = append(survivorNames, s.GetName()) } @@ -648,6 +648,7 @@ func noopUpdaterFn(prev, curr object) { } func sdkCreateOrUpdateAsNeeded( + ctx context.Context, sdk client.Client, objFn func() (object, error), emptyObjFn func() object, @@ -665,10 +666,10 @@ func sdkCreateOrUpdateAsNeeded( addHashToObject(obj) prevObj := emptyObjFn() - if err := sdk.Get(context.TODO(), *namespacedName(obj.GetName(), obj.GetNamespace()), prevObj); err != nil { + if err := sdk.Get(ctx, *namespacedName(obj.GetName(), obj.GetNamespace()), prevObj); err != nil { if apierrors.IsNotFound(err) { // resource does not exist, create it. - create, err := writers.Create(context.TODO(), sdk, drd, obj, emitEvent) + create, err := writers.Create(ctx, sdk, drd, obj, emitEvent) if err != nil { return "", err } else { @@ -686,7 +687,7 @@ func sdkCreateOrUpdateAsNeeded( obj.SetResourceVersion(prevObj.GetResourceVersion()) updaterFn(prevObj, obj) - update, err := writers.Update(context.TODO(), sdk, drd, obj, emitEvent) + update, err := writers.Update(ctx, sdk, drd, obj, emitEvent) if err != nil { return "", err } else { @@ -699,10 +700,10 @@ func sdkCreateOrUpdateAsNeeded( } } -func isObjFullyDeployed(sdk client.Client, nodeSpec v1alpha1.DruidNodeSpec, nodeSpecUniqueStr string, drd *v1alpha1.Druid, emptyObjFn func() object, emitEvent EventEmitter) (bool, error) { +func isObjFullyDeployed(ctx context.Context, sdk client.Client, nodeSpec v1alpha1.DruidNodeSpec, nodeSpecUniqueStr string, drd *v1alpha1.Druid, emptyObjFn func() object, emitEvent EventEmitter) (bool, error) { // Get Object - obj, err := readers.Get(context.TODO(), sdk, nodeSpecUniqueStr, drd, emptyObjFn, emitEvent) + obj, err := readers.Get(ctx, sdk, nodeSpecUniqueStr, drd, emptyObjFn, emitEvent) if err != nil { return false, err } @@ -734,9 +735,9 @@ func isObjFullyDeployed(sdk client.Client, nodeSpec v1alpha1.DruidNodeSpec, node // scalePVCForSts shall expand the sts volumeclaimtemplates size as well as N no of pvc supported by the sts. // NOTE: To be called only if generation > 1 -func scalePVCForSts(sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, nodeSpecUniqueStr string, drd *v1alpha1.Druid, emitEvent EventEmitter) error { +func scalePVCForSts(ctx context.Context, sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, nodeSpecUniqueStr string, drd *v1alpha1.Druid, emitEvent EventEmitter) error { - getSTSList, err := readers.List(context.TODO(), sdk, drd, makeLabelsForDruid(drd.Name), emitEvent, func() objectList { return &appsv1.StatefulSetList{} }, func(listObj runtime.Object) []object { + getSTSList, err := readers.List(ctx, sdk, drd, makeLabelsForDruid(drd.Name), emitEvent, func() objectList { return &appsv1.StatefulSetList{} }, func(listObj runtime.Object) []object { items := listObj.(*appsv1.StatefulSetList).Items result := make([]object, len(items)) for i := 0; i < len(items); i++ { @@ -759,7 +760,7 @@ func scalePVCForSts(sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, nodeSpe // return nil, in case return err the program halts since sts would not be able // we would like the operator to create sts. - sts, err := readers.Get(context.TODO(), sdk, nodeSpecUniqueStr, drd, func() object { return &appsv1.StatefulSet{} }, emitEvent) + sts, err := readers.Get(ctx, sdk, nodeSpecUniqueStr, drd, func() object { return &appsv1.StatefulSet{} }, emitEvent) if err != nil { return nil } @@ -768,7 +769,7 @@ func scalePVCForSts(sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, nodeSpe "component": nodeSpec.NodeType, } - pvcList, err := readers.List(context.TODO(), sdk, drd, pvcLabels, emitEvent, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object { + pvcList, err := readers.List(ctx, sdk, drd, pvcLabels, emitEvent, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object { items := listObj.(*v1.PersistentVolumeClaimList).Items result := make([]object, len(items)) for i := 0; i < len(items); i++ { @@ -811,7 +812,7 @@ func scalePVCForSts(sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, nodeSpe logger.Info(msg) emitEvent.EmitEventGeneric(drd, "DruidOperatorPvcReSizeDetected", msg, nil) - if err := writers.Delete(context.TODO(), sdk, drd, sts, emitEvent, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil { + if err := writers.Delete(ctx, sdk, drd, sts, emitEvent, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil { return err } else { msg := fmt.Sprintf("[StatefuleSet:%s] successfully deleted with casacde=false", sts.(*appsv1.StatefulSet).Name) @@ -828,7 +829,7 @@ func scalePVCForSts(sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, nodeSpe // use deepcopy patch := client.MergeFrom(pvcList[p].(*v1.PersistentVolumeClaim).DeepCopy()) pvcList[p].(*v1.PersistentVolumeClaim).Spec.Resources.Requests[v1.ResourceStorage] = desVolumeClaimTemplateSize[i] - if err := writers.Patch(context.TODO(), sdk, drd, pvcList[p].(*v1.PersistentVolumeClaim), false, patch, emitEvent); err != nil { + if err := writers.Patch(ctx, sdk, drd, pvcList[p].(*v1.PersistentVolumeClaim), false, patch, emitEvent); err != nil { return err } else { msg := fmt.Sprintf("[PVC:%s] successfully Patched with [Size:%s]", pvcList[p].(*v1.PersistentVolumeClaim).Name, desVolumeClaimTemplateSize[i].String()) @@ -864,10 +865,10 @@ func getVolumeClaimTemplateSizes(sts object, nodeSpec *v1alpha1.DruidNodeSpec, p } -func isVolumeExpansionEnabled(sdk client.Client, m *v1alpha1.Druid, nodeSpec *v1alpha1.DruidNodeSpec, emitEvent EventEmitter) bool { +func isVolumeExpansionEnabled(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, nodeSpec *v1alpha1.DruidNodeSpec, emitEvent EventEmitter) bool { for _, nodeVCT := range nodeSpec.VolumeClaimTemplates { - sc, err := readers.Get(context.TODO(), sdk, *nodeVCT.Spec.StorageClassName, m, func() object { return &storage.StorageClass{} }, emitEvent) + sc, err := readers.Get(ctx, sdk, *nodeVCT.Spec.StorageClassName, m, func() object { return &storage.StorageClass{} }, emitEvent) if err != nil { return false } @@ -1493,7 +1494,7 @@ func getPodNames(pods []object) []string { return podNames } -func sendEvent(sdk client.Client, drd *v1alpha1.Druid, eventtype, reason, message string) { +func sendEvent(ctx context.Context, sdk client.Client, drd *v1alpha1.Druid, eventtype, reason, message string) { ref := &v1.ObjectReference{ Kind: drd.Kind, @@ -1529,7 +1530,7 @@ func sendEvent(sdk client.Client, drd *v1alpha1.Druid, eventtype, reason, messag Source: v1.EventSource{Component: "druid-operator"}, } - if err := sdk.Create(context.TODO(), event); err != nil { + if err := sdk.Create(ctx, event); err != nil { logger.Error(err, fmt.Sprintf("Failed to push event [%v]", event)) } }