diff --git a/apis/druid/v1alpha1/druid_types.go b/apis/druid/v1alpha1/druid_types.go index e4dac4d1..2c897376 100644 --- a/apis/druid/v1alpha1/druid_types.go +++ b/apis/druid/v1alpha1/druid_types.go @@ -40,6 +40,9 @@ type DruidSpec struct { // doc: https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#forced-rollback ForceDeleteStsPodOnError bool `json:"forceDeleteStsPodOnError,omitempty"` + // Optional: ScalePvcSts, defaults to false. When enabled, operator will allow volume expansion of sts and pvc's. + ScalePvcSts bool `json:"scalePvcSts,omitempty"` + // Required: in-container directory to mount with common.runtime.properties CommonConfigMountPath string `json:"commonConfigMountPath"` diff --git a/chart/templates/crds/druid.apache.org_druids.yaml b/chart/templates/crds/druid.apache.org_druids.yaml index effddb0e..64dfc9a0 100644 --- a/chart/templates/crds/druid.apache.org_druids.yaml +++ b/chart/templates/crds/druid.apache.org_druids.yaml @@ -3438,6 +3438,8 @@ spec: type: object rollingDeploy: type: boolean + scalePvcSts: + type: boolean securityContext: properties: fsGroup: diff --git a/controllers/druid/handler.go b/controllers/druid/handler.go index 31457d36..1946874f 100644 --- a/controllers/druid/handler.go +++ b/controllers/druid/handler.go @@ -12,12 +12,14 @@ import ( autoscalev2beta2 "k8s.io/api/autoscaling/v2beta2" networkingv1 "k8s.io/api/networking/v1" + storage "k8s.io/api/storage/v1" "github.com/druid-io/druid-operator/apis/druid/v1alpha1" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/api/policy/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -189,6 +191,19 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm } } } else { + + // scalePVCForSTS to be only called only if volumeExpansion is supported by the storage class. + // Ignore for the first iteration ie cluster creation, else get sts shall unnecessary log errors. + + if m.Generation > 1 && m.Spec.ScalePvcSts { + if isVolumeExpansionEnabled(sdk, m, &nodeSpec, emitEvents) { + err := scalePVCForSts(sdk, &nodeSpec, nodeSpecUniqueStr, m, emitEvents) + if err != nil { + return err + } + } + } + // Create/Update StatefulSet if stsCreateUpdateStatus, err := sdkCreateOrUpdateAsNeeded(sdk, func() (object, error) { @@ -728,6 +743,153 @@ func isObjFullyDeployed(sdk client.Client, nodeSpec v1alpha1.DruidNodeSpec, node return false, nil } +// scalePVCForSts shall expand the sts volumeclaimtemplates size as well as N no of pvc supported by the sts. +// NOTE: To be called only if generation > 1 +func scalePVCForSts(sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, nodeSpecUniqueStr string, drd *v1alpha1.Druid, emitEvent EventEmitter) error { + + getSTSList, err := readers.List(context.TODO(), sdk, drd, makeLabelsForDruid(drd.Name), emitEvent, func() objectList { return makeStatefulSetListEmptyObj() }, func(listObj runtime.Object) []object { + items := listObj.(*appsv1.StatefulSetList).Items + result := make([]object, len(items)) + for i := 0; i < len(items); i++ { + result[i] = &items[i] + } + return result + }) + if err != nil { + return nil + } + + // Dont proceed unless all statefulsets are up and running. + // This can cause the go routine to panic + + for _, sts := range getSTSList { + if sts.(*appsv1.StatefulSet).Status.Replicas != sts.(*appsv1.StatefulSet).Status.ReadyReplicas { + return nil + } + } + + // return nil, in case return err the program halts since sts would not be able + // we would like the operator to create sts. + sts, err := readers.Get(context.TODO(), sdk, nodeSpecUniqueStr, drd, func() object { return makeStatefulSetEmptyObj() }, emitEvent) + if err != nil { + return nil + } + + pvcLabels := map[string]string{ + "component": nodeSpec.NodeType, + } + + pvcList, err := readers.List(context.TODO(), sdk, drd, pvcLabels, emitEvent, func() objectList { return makePersistentVolumeClaimListEmptyObj() }, func(listObj runtime.Object) []object { + items := listObj.(*v1.PersistentVolumeClaimList).Items + result := make([]object, len(items)) + for i := 0; i < len(items); i++ { + result[i] = &items[i] + } + return result + }) + if err != nil { + return nil + } + + desVolumeClaimTemplateSize, currVolumeClaimTemplateSize, pvcSize := getVolumeClaimTemplateSizes(sts, nodeSpec, pvcList) + + // current number of PVC can't be less than desired number of pvc + if len(pvcSize) < len(desVolumeClaimTemplateSize) { + return nil + } + + // iterate over array for matching each index in desVolumeClaimTemplateSize, currVolumeClaimTemplateSize and pvcSize + for i := range desVolumeClaimTemplateSize { + + // Validate Request, shrinking of pvc not supported + // desired size cant be less than current size + // in that case re-create sts/pvc which is a user execute manual step + + desiredSize, _ := desVolumeClaimTemplateSize[i].AsInt64() + currentSize, _ := currVolumeClaimTemplateSize[i].AsInt64() + + if desiredSize < currentSize { + e := fmt.Errorf("Request for Shrinking of sts pvc size [sts:%s] in [namespace:%s] is not Supported", sts.(*appsv1.StatefulSet).Name, sts.(*appsv1.StatefulSet).Namespace) + logger.Error(e, e.Error(), "name", drd.Name, "namespace", drd.Namespace) + emitEvent.EmitEventGeneric(drd, "DruidOperatorPvcReSizeFail", "", err) + return e + } + + // In case size dont match and dessize > currsize, delete the sts using casacde=false / propagation policy set to orphan + // The operator on next reconcile shall create the sts with latest changes + if desiredSize != currentSize { + msg := fmt.Sprintf("Detected Change in VolumeClaimTemplate Sizes for Statefuleset [%s] in Namespace [%s], desVolumeClaimTemplateSize: [%s], currVolumeClaimTemplateSize: [%s]\n, deleteing STS [%s] with casacde=false]", sts.(*appsv1.StatefulSet).Name, sts.(*appsv1.StatefulSet).Namespace, desVolumeClaimTemplateSize[i].String(), currVolumeClaimTemplateSize[i].String(), sts.(*appsv1.StatefulSet).Name) + logger.Info(msg) + emitEvent.EmitEventGeneric(drd, "DruidOperatorPvcReSizeDetected", msg, nil) + + if err := writers.Delete(context.TODO(), sdk, drd, sts, emitEvent, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil { + return err + } else { + msg := fmt.Sprintf("[StatefuleSet:%s] successfully deleted with casacde=false", sts.(*appsv1.StatefulSet).Name) + logger.Info(msg, "name", drd.Name, "namespace", drd.Namespace) + emitEvent.EmitEventGeneric(drd, "DruidOperatorStsOrphaned", msg, nil) + } + + } + + // In case size dont match, patch the pvc with the desiredsize from druid CR + for p := range pvcSize { + pSize, _ := pvcSize[p].AsInt64() + if desiredSize != pSize { + // use deepcopy + patch := client.MergeFrom(pvcList[p].(*v1.PersistentVolumeClaim).DeepCopy()) + pvcList[p].(*v1.PersistentVolumeClaim).Spec.Resources.Requests[v1.ResourceStorage] = desVolumeClaimTemplateSize[i] + if err := writers.Patch(context.TODO(), sdk, drd, pvcList[p].(*v1.PersistentVolumeClaim), false, patch, emitEvent); err != nil { + return err + } else { + msg := fmt.Sprintf("[PVC:%s] successfully Patched with [Size:%s]", pvcList[p].(*v1.PersistentVolumeClaim).Name, desVolumeClaimTemplateSize[i].String()) + logger.Info(msg, "name", drd.Name, "namespace", drd.Namespace) + } + } + } + + } + + return nil +} + +// desVolumeClaimTemplateSize: the druid CR holds this value for a sts volumeclaimtemplate +// currVolumeClaimTemplateSize: the sts owned by druid CR holds this value in volumeclaimtemplate +// pvcSize: the pvc referenced by the sts holds this value +// type of vars is resource.Quantity. ref: https://godoc.org/k8s.io/apimachinery/pkg/api/resource +func getVolumeClaimTemplateSizes(sts object, nodeSpec *v1alpha1.DruidNodeSpec, pvc []object) (desVolumeClaimTemplateSize, currVolumeClaimTemplateSize, pvcSize []resource.Quantity) { + + for i := range nodeSpec.VolumeClaimTemplates { + desVolumeClaimTemplateSize = append(desVolumeClaimTemplateSize, nodeSpec.VolumeClaimTemplates[i].Spec.Resources.Requests[v1.ResourceStorage]) + } + + for i := range sts.(*appsv1.StatefulSet).Spec.VolumeClaimTemplates { + currVolumeClaimTemplateSize = append(currVolumeClaimTemplateSize, sts.(*appsv1.StatefulSet).Spec.VolumeClaimTemplates[i].Spec.Resources.Requests[v1.ResourceStorage]) + } + + for i := range pvc { + pvcSize = append(pvcSize, pvc[i].(*v1.PersistentVolumeClaim).Spec.Resources.Requests[v1.ResourceStorage]) + } + + return desVolumeClaimTemplateSize, currVolumeClaimTemplateSize, pvcSize + +} + +func isVolumeExpansionEnabled(sdk client.Client, m *v1alpha1.Druid, nodeSpec *v1alpha1.DruidNodeSpec, emitEvent EventEmitter) bool { + + for _, nodeVCT := range nodeSpec.VolumeClaimTemplates { + sc, err := readers.Get(context.TODO(), sdk, *nodeVCT.Spec.StorageClassName, m, func() object { return makeStorageClassEmptyObj() }, emitEvent) + if err != nil { + return false + } + + if sc.(*storage.StorageClass).AllowVolumeExpansion != boolFalse() { + return true + } + } + return false +} + func stringifyForLogging(obj object, drd *v1alpha1.Druid) string { if bytes, err := json.Marshal(obj); err != nil { logger.Error(err, err.Error(), fmt.Sprintf("Failed to serialize [%s:%s]", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName()), "name", drd.Name, "namespace", drd.Namespace) @@ -1278,6 +1440,7 @@ func makeLabelsForNodeSpec(nodeSpec *v1alpha1.DruidNodeSpec, m *v1alpha1.Druid, labels["app"] = "druid" labels["druid_cr"] = clusterName labels["nodeSpecUniqueStr"] = nodeSpecUniqueStr + labels["component"] = nodeSpec.NodeType return labels } @@ -1461,6 +1624,15 @@ func makeServiceEmptyObj() *v1.Service { } } +func makeStorageClassEmptyObj() *storage.StorageClass { + return &storage.StorageClass{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "storage.k8s.io/v1", + Kind: "StorageClass", + }, + } +} + func makeConfigMapEmptyObj() *v1.ConfigMap { return &v1.ConfigMap{ TypeMeta: metav1.TypeMeta{ diff --git a/controllers/druid/testdata/broker-config-map.yaml b/controllers/druid/testdata/broker-config-map.yaml index 9172c236..24970859 100644 --- a/controllers/druid/testdata/broker-config-map.yaml +++ b/controllers/druid/testdata/broker-config-map.yaml @@ -44,7 +44,8 @@ metadata: app: druid druid_cr: druid-test nodeSpecUniqueStr: druid-druid-test-brokers + component: broker name: druid-druid-test-brokers-config namespace: test-namespace annotations: - druidOpResourceHash: 9kOh5Qyj7xud9ED6REtjhXM6zJU= + druidOpResourceHash: O3jmICgrTjJkMBlGlE05W7dGhA0= diff --git a/controllers/druid/testdata/broker-deployment.yaml b/controllers/druid/testdata/broker-deployment.yaml index 512596eb..b81bc6e5 100644 --- a/controllers/druid/testdata/broker-deployment.yaml +++ b/controllers/druid/testdata/broker-deployment.yaml @@ -7,8 +7,9 @@ metadata: app: druid druid_cr: druid-test nodeSpecUniqueStr: druid-druid-test-brokers + component: broker annotations: - druidOpResourceHash: UFf9ygahn6p+juXtqdzSghIjDvI= + druidOpResourceHash: rjo/DfrcRi1LUUEhdkBJch19eDs= spec: replicas: 2 selector: @@ -16,6 +17,7 @@ spec: app: druid druid_cr: druid-test nodeSpecUniqueStr: druid-druid-test-brokers + component: broker strategy: rollingUpdate: maxSurge: 25 @@ -27,6 +29,7 @@ spec: app: druid druid_cr: druid-test nodeSpecUniqueStr: druid-druid-test-brokers + component: broker annotations: key1: value1 key2: value2 diff --git a/controllers/druid/testdata/broker-headless-service.yaml b/controllers/druid/testdata/broker-headless-service.yaml index e82b4cdf..9ef7904e 100644 --- a/controllers/druid/testdata/broker-headless-service.yaml +++ b/controllers/druid/testdata/broker-headless-service.yaml @@ -2,12 +2,13 @@ apiVersion: v1 kind: Service metadata: annotations: - druidOpResourceHash: DuchVNIByBSJSv7rVvTVlXW6zkE= + druidOpResourceHash: 5zzzIiXTlupyCeyb/P8llEZN1Ag= creationTimestamp: null labels: app: druid druid_cr: druid-test nodeSpecUniqueStr: druid-druid-test-brokers + component: broker name: druid-druid-test-brokers namespace: test-namespace spec: @@ -20,6 +21,7 @@ spec: app: druid druid_cr: druid-test nodeSpecUniqueStr: druid-druid-test-brokers + component: broker type: ClusterIP status: - loadBalancer: {} \ No newline at end of file + loadBalancer: {} diff --git a/controllers/druid/testdata/broker-load-balancer-service.yaml b/controllers/druid/testdata/broker-load-balancer-service.yaml index 87a0ae80..fed52c35 100644 --- a/controllers/druid/testdata/broker-load-balancer-service.yaml +++ b/controllers/druid/testdata/broker-load-balancer-service.yaml @@ -3,11 +3,12 @@ kind: Service metadata: annotations: service.beta.kubernetes.io/aws-load-balancer-internal: 0.0.0.0/0 - druidOpResourceHash: IScIBhsS6qaK7VyK+TCCFM/s3VA= + druidOpResourceHash: vQE/6DfWCQnWFVvW3KxfOLBfdlA= labels: app: druid druid_cr: druid-test nodeSpecUniqueStr: druid-druid-test-brokers + component: broker name: broker-druid-druid-test-brokers-service namespace: test-namespace spec: @@ -19,4 +20,5 @@ spec: app: druid druid_cr: druid-test nodeSpecUniqueStr: druid-druid-test-brokers + component: broker type: LoadBalancer diff --git a/controllers/druid/testdata/broker-pod-disruption-budget.yaml b/controllers/druid/testdata/broker-pod-disruption-budget.yaml index 51bae17c..9f2b1474 100644 --- a/controllers/druid/testdata/broker-pod-disruption-budget.yaml +++ b/controllers/druid/testdata/broker-pod-disruption-budget.yaml @@ -5,14 +5,16 @@ metadata: app: druid druid_cr: druid-test nodeSpecUniqueStr: druid-druid-test-brokers + component: broker name: druid-druid-test-brokers namespace: test-namespace annotations: - druidOpResourceHash: iF0BSmXKN+Oi1rBfod127UnVkR0= + druidOpResourceHash: PSUil0VZr4P6YHR8+EGWPiCSiXg= spec: maxUnavailable: 1 selector: matchLabels: app: druid druid_cr: druid-test - nodeSpecUniqueStr: druid-druid-test-brokers \ No newline at end of file + nodeSpecUniqueStr: druid-druid-test-brokers + component: broker diff --git a/controllers/druid/testdata/broker-statefulset.yaml b/controllers/druid/testdata/broker-statefulset.yaml index 02c5206b..3f20246b 100644 --- a/controllers/druid/testdata/broker-statefulset.yaml +++ b/controllers/druid/testdata/broker-statefulset.yaml @@ -7,8 +7,9 @@ metadata: app: druid druid_cr: druid-test nodeSpecUniqueStr: druid-druid-test-brokers + component: broker annotations: - druidOpResourceHash: OnEopGbFnwSCMqd84ePrk0Kvd9o= + druidOpResourceHash: ZzOn5v/f82XpT1su/DEQzCViuKg= spec: podManagementPolicy: Parallel replicas: 2 @@ -17,6 +18,7 @@ spec: app: druid druid_cr: druid-test nodeSpecUniqueStr: druid-druid-test-brokers + component: broker serviceName: druid-druid-test-brokers template: metadata: @@ -24,6 +26,7 @@ spec: app: druid druid_cr: druid-test nodeSpecUniqueStr: druid-druid-test-brokers + component: broker annotations: key1: value1 key2: value2 diff --git a/controllers/druid/util.go b/controllers/druid/util.go index 89326601..41697946 100644 --- a/controllers/druid/util.go +++ b/controllers/druid/util.go @@ -60,3 +60,9 @@ func RemoveString(slice []string, s string) (result []string) { } return } + +// returns pointer to bool +func boolFalse() *bool { + bool := false + return &bool +} diff --git a/deploy/crds/druid.apache.org_druids.yaml b/deploy/crds/druid.apache.org_druids.yaml index effddb0e..64dfc9a0 100644 --- a/deploy/crds/druid.apache.org_druids.yaml +++ b/deploy/crds/druid.apache.org_druids.yaml @@ -3438,6 +3438,8 @@ spec: type: object rollingDeploy: type: boolean + scalePvcSts: + type: boolean securityContext: properties: fsGroup: