Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix76: Scale STS / PVC vertically to support resizing of druid nodes #97

Merged
merged 6 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apis/druid/v1alpha1/druid_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type DruidSpec struct {
// doc: https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#forced-rollback
ForceDeleteStsPodOnError bool `json:"forceDeleteStsPodOnError,omitempty"`

// Optional: ScalePvcSts, defaults to false. When enabled, operator will allow volume expansion of sts and pvc's.
ScalePvcSts bool `json:"scalePvcSts,omitempty"`

// Required: in-container directory to mount with common.runtime.properties
CommonConfigMountPath string `json:"commonConfigMountPath"`

Expand Down
2 changes: 2 additions & 0 deletions chart/templates/crds/druid.apache.org_druids.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3438,6 +3438,8 @@ spec:
type: object
rollingDeploy:
type: boolean
scalePvcSts:
type: boolean
securityContext:
properties:
fsGroup:
Expand Down
172 changes: 172 additions & 0 deletions controllers/druid/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (

autoscalev2beta2 "k8s.io/api/autoscaling/v2beta2"
networkingv1 "k8s.io/api/networking/v1"
storage "k8s.io/api/storage/v1"

"github.com/druid-io/druid-operator/apis/druid/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -189,6 +191,19 @@ func deployDruidCluster(sdk client.Client, m *v1alpha1.Druid, emitEvents EventEm
}
}
} else {

// scalePVCForSTS to be only called only if volumeExpansion is supported by the storage class.
// Ignore for the first iteration ie cluster creation, else get sts shall unnecessary log errors.

if m.Generation > 1 && m.Spec.ScalePvcSts {
if isVolumeExpansionEnabled(sdk, m, &nodeSpec, emitEvents) {
err := scalePVCForSts(sdk, &nodeSpec, nodeSpecUniqueStr, m, emitEvents)
if err != nil {
return err
}
}
}

// Create/Update StatefulSet
if stsCreateUpdateStatus, err := sdkCreateOrUpdateAsNeeded(sdk,
func() (object, error) {
Expand Down Expand Up @@ -728,6 +743,153 @@ func isObjFullyDeployed(sdk client.Client, nodeSpec v1alpha1.DruidNodeSpec, node
return false, nil
}

// scalePVCForSts shall expand the sts volumeclaimtemplates size as well as N no of pvc supported by the sts.
// NOTE: To be called only if generation > 1
func scalePVCForSts(sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, nodeSpecUniqueStr string, drd *v1alpha1.Druid, emitEvent EventEmitter) error {

getSTSList, err := readers.List(context.TODO(), sdk, drd, makeLabelsForDruid(drd.Name), emitEvent, func() objectList { return makeStatefulSetListEmptyObj() }, func(listObj runtime.Object) []object {
items := listObj.(*appsv1.StatefulSetList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
}
return result
})
if err != nil {
return nil
}

// Dont proceed unless all statefulsets are up and running.
// This can cause the go routine to panic

for _, sts := range getSTSList {
if sts.(*appsv1.StatefulSet).Status.Replicas != sts.(*appsv1.StatefulSet).Status.ReadyReplicas {
return nil
}
}

// return nil, in case return err the program halts since sts would not be able
// we would like the operator to create sts.
sts, err := readers.Get(context.TODO(), sdk, nodeSpecUniqueStr, drd, func() object { return makeStatefulSetEmptyObj() }, emitEvent)
if err != nil {
return nil
}

pvcLabels := map[string]string{
"component": nodeSpec.NodeType,
}

pvcList, err := readers.List(context.TODO(), sdk, drd, pvcLabels, emitEvent, func() objectList { return makePersistentVolumeClaimListEmptyObj() }, func(listObj runtime.Object) []object {
items := listObj.(*v1.PersistentVolumeClaimList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
}
return result
})
if err != nil {
return nil
}

desVolumeClaimTemplateSize, currVolumeClaimTemplateSize, pvcSize := getVolumeClaimTemplateSizes(sts, nodeSpec, pvcList)

// current number of PVC can't be less than desired number of pvc
if len(pvcSize) < len(desVolumeClaimTemplateSize) {
return nil
}

// iterate over array for matching each index in desVolumeClaimTemplateSize, currVolumeClaimTemplateSize and pvcSize
for i := range desVolumeClaimTemplateSize {

// Validate Request, shrinking of pvc not supported
// desired size cant be less than current size
// in that case re-create sts/pvc which is a user execute manual step

desiredSize, _ := desVolumeClaimTemplateSize[i].AsInt64()
currentSize, _ := currVolumeClaimTemplateSize[i].AsInt64()

if desiredSize < currentSize {
e := fmt.Errorf("Request for Shrinking of sts pvc size [sts:%s] in [namespace:%s] is not Supported", sts.(*appsv1.StatefulSet).Name, sts.(*appsv1.StatefulSet).Namespace)
logger.Error(e, e.Error(), "name", drd.Name, "namespace", drd.Namespace)
emitEvent.EmitEventGeneric(drd, "DruidOperatorPvcReSizeFail", "", err)
return e
}

// In case size dont match and dessize > currsize, delete the sts using casacde=false / propagation policy set to orphan
// The operator on next reconcile shall create the sts with latest changes
if desiredSize != currentSize {
msg := fmt.Sprintf("Detected Change in VolumeClaimTemplate Sizes for Statefuleset [%s] in Namespace [%s], desVolumeClaimTemplateSize: [%s], currVolumeClaimTemplateSize: [%s]\n, deleteing STS [%s] with casacde=false]", sts.(*appsv1.StatefulSet).Name, sts.(*appsv1.StatefulSet).Namespace, desVolumeClaimTemplateSize[i].String(), currVolumeClaimTemplateSize[i].String(), sts.(*appsv1.StatefulSet).Name)
logger.Info(msg)
emitEvent.EmitEventGeneric(drd, "DruidOperatorPvcReSizeDetected", msg, nil)

if err := writers.Delete(context.TODO(), sdk, drd, sts, emitEvent, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
return err
} else {
msg := fmt.Sprintf("[StatefuleSet:%s] successfully deleted with casacde=false", sts.(*appsv1.StatefulSet).Name)
logger.Info(msg, "name", drd.Name, "namespace", drd.Namespace)
emitEvent.EmitEventGeneric(drd, "DruidOperatorStsOrphaned", msg, nil)
}

}

// In case size dont match, patch the pvc with the desiredsize from druid CR
for p := range pvcSize {
pSize, _ := pvcSize[p].AsInt64()
if desiredSize != pSize {
// use deepcopy
patch := client.MergeFrom(pvcList[p].(*v1.PersistentVolumeClaim).DeepCopy())
pvcList[p].(*v1.PersistentVolumeClaim).Spec.Resources.Requests[v1.ResourceStorage] = desVolumeClaimTemplateSize[i]
if err := writers.Patch(context.TODO(), sdk, drd, pvcList[p].(*v1.PersistentVolumeClaim), false, patch, emitEvent); err != nil {
return err
} else {
msg := fmt.Sprintf("[PVC:%s] successfully Patched with [Size:%s]", pvcList[p].(*v1.PersistentVolumeClaim).Name, desVolumeClaimTemplateSize[i].String())
logger.Info(msg, "name", drd.Name, "namespace", drd.Namespace)
}
}
}

}

return nil
}

// desVolumeClaimTemplateSize: the druid CR holds this value for a sts volumeclaimtemplate
// currVolumeClaimTemplateSize: the sts owned by druid CR holds this value in volumeclaimtemplate
// pvcSize: the pvc referenced by the sts holds this value
// type of vars is resource.Quantity. ref: https://godoc.org/k8s.io/apimachinery/pkg/api/resource
func getVolumeClaimTemplateSizes(sts object, nodeSpec *v1alpha1.DruidNodeSpec, pvc []object) (desVolumeClaimTemplateSize, currVolumeClaimTemplateSize, pvcSize []resource.Quantity) {

for i := range nodeSpec.VolumeClaimTemplates {
desVolumeClaimTemplateSize = append(desVolumeClaimTemplateSize, nodeSpec.VolumeClaimTemplates[i].Spec.Resources.Requests[v1.ResourceStorage])
}

for i := range sts.(*appsv1.StatefulSet).Spec.VolumeClaimTemplates {
currVolumeClaimTemplateSize = append(currVolumeClaimTemplateSize, sts.(*appsv1.StatefulSet).Spec.VolumeClaimTemplates[i].Spec.Resources.Requests[v1.ResourceStorage])
}

for i := range pvc {
pvcSize = append(pvcSize, pvc[i].(*v1.PersistentVolumeClaim).Spec.Resources.Requests[v1.ResourceStorage])
}

return desVolumeClaimTemplateSize, currVolumeClaimTemplateSize, pvcSize

}

func isVolumeExpansionEnabled(sdk client.Client, m *v1alpha1.Druid, nodeSpec *v1alpha1.DruidNodeSpec, emitEvent EventEmitter) bool {

for _, nodeVCT := range nodeSpec.VolumeClaimTemplates {
sc, err := readers.Get(context.TODO(), sdk, *nodeVCT.Spec.StorageClassName, m, func() object { return makeStorageClassEmptyObj() }, emitEvent)
if err != nil {
return false
}

if sc.(*storage.StorageClass).AllowVolumeExpansion != boolFalse() {
return true
}
}
return false
}

func stringifyForLogging(obj object, drd *v1alpha1.Druid) string {
if bytes, err := json.Marshal(obj); err != nil {
logger.Error(err, err.Error(), fmt.Sprintf("Failed to serialize [%s:%s]", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName()), "name", drd.Name, "namespace", drd.Namespace)
Expand Down Expand Up @@ -1278,6 +1440,7 @@ func makeLabelsForNodeSpec(nodeSpec *v1alpha1.DruidNodeSpec, m *v1alpha1.Druid,
labels["app"] = "druid"
labels["druid_cr"] = clusterName
labels["nodeSpecUniqueStr"] = nodeSpecUniqueStr
labels["component"] = nodeSpec.NodeType
return labels
}

Expand Down Expand Up @@ -1461,6 +1624,15 @@ func makeServiceEmptyObj() *v1.Service {
}
}

func makeStorageClassEmptyObj() *storage.StorageClass {
return &storage.StorageClass{
TypeMeta: metav1.TypeMeta{
APIVersion: "storage.k8s.io/v1",
Kind: "StorageClass",
},
}
}

func makeConfigMapEmptyObj() *v1.ConfigMap {
return &v1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Expand Down
3 changes: 2 additions & 1 deletion controllers/druid/testdata/broker-config-map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ metadata:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
name: druid-druid-test-brokers-config
namespace: test-namespace
annotations:
druidOpResourceHash: 9kOh5Qyj7xud9ED6REtjhXM6zJU=
druidOpResourceHash: O3jmICgrTjJkMBlGlE05W7dGhA0=
5 changes: 4 additions & 1 deletion controllers/druid/testdata/broker-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ metadata:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
annotations:
druidOpResourceHash: UFf9ygahn6p+juXtqdzSghIjDvI=
druidOpResourceHash: rjo/DfrcRi1LUUEhdkBJch19eDs=
spec:
replicas: 2
selector:
matchLabels:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
strategy:
rollingUpdate:
maxSurge: 25
Expand All @@ -27,6 +29,7 @@ spec:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
annotations:
key1: value1
key2: value2
Expand Down
6 changes: 4 additions & 2 deletions controllers/druid/testdata/broker-headless-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ apiVersion: v1
kind: Service
metadata:
annotations:
druidOpResourceHash: DuchVNIByBSJSv7rVvTVlXW6zkE=
druidOpResourceHash: 5zzzIiXTlupyCeyb/P8llEZN1Ag=
creationTimestamp: null
labels:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
name: druid-druid-test-brokers
namespace: test-namespace
spec:
Expand All @@ -20,6 +21,7 @@ spec:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
type: ClusterIP
status:
loadBalancer: {}
loadBalancer: {}
4 changes: 3 additions & 1 deletion controllers/druid/testdata/broker-load-balancer-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ kind: Service
metadata:
annotations:
service.beta.kubernetes.io/aws-load-balancer-internal: 0.0.0.0/0
druidOpResourceHash: IScIBhsS6qaK7VyK+TCCFM/s3VA=
druidOpResourceHash: vQE/6DfWCQnWFVvW3KxfOLBfdlA=
labels:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
name: broker-druid-druid-test-brokers-service
namespace: test-namespace
spec:
Expand All @@ -19,4 +20,5 @@ spec:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
type: LoadBalancer
6 changes: 4 additions & 2 deletions controllers/druid/testdata/broker-pod-disruption-budget.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ metadata:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
name: druid-druid-test-brokers
namespace: test-namespace
annotations:
druidOpResourceHash: iF0BSmXKN+Oi1rBfod127UnVkR0=
druidOpResourceHash: PSUil0VZr4P6YHR8+EGWPiCSiXg=
spec:
maxUnavailable: 1
selector:
matchLabels:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
5 changes: 4 additions & 1 deletion controllers/druid/testdata/broker-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ metadata:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
annotations:
druidOpResourceHash: OnEopGbFnwSCMqd84ePrk0Kvd9o=
druidOpResourceHash: ZzOn5v/f82XpT1su/DEQzCViuKg=
spec:
podManagementPolicy: Parallel
replicas: 2
Expand All @@ -17,13 +18,15 @@ spec:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
serviceName: druid-druid-test-brokers
template:
metadata:
labels:
app: druid
druid_cr: druid-test
nodeSpecUniqueStr: druid-druid-test-brokers
component: broker
annotations:
key1: value1
key2: value2
Expand Down
6 changes: 6 additions & 0 deletions controllers/druid/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,9 @@ func RemoveString(slice []string, s string) (result []string) {
}
return
}

// returns pointer to bool
func boolFalse() *bool {
bool := false
return &bool
}
2 changes: 2 additions & 0 deletions deploy/crds/druid.apache.org_druids.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3438,6 +3438,8 @@ spec:
type: object
rollingDeploy:
type: boolean
scalePvcSts:
type: boolean
securityContext:
properties:
fsGroup:
Expand Down