From fa86b4b384f434ac456ae22d9050ddd23d31812f Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Fri, 25 Oct 2024 15:48:33 +0300 Subject: [PATCH] Add new annotation cassandra.datastax.com/track-cleanup-tasks that enables tracking cleanup after scale up. This will wait for the cleanup to finish before scale up is marked as completed. Also, add a unit test for the CheckRackLabels() --- CHANGELOG.md | 1 + .../v1beta1/cassandradatacenter_types.go | 23 ++++++ pkg/reconciliation/reconcile_racks.go | 60 +++++++++++++- pkg/reconciliation/reconcile_racks_test.go | 78 +++++++++++++++++++ .../default-two-rack-two-node-dc.yaml | 2 +- 5 files changed, 162 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a17178e3..67b90c58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti ## unreleased * [FEATURE] [#651](https://github.com/k8ssandra/cass-operator/issues/651) Add tsreload task for DSE deployments and ability to check if sync operation is available on the mgmt-api side +* [ENHANCEMENT] [#722](https://github.com/k8ssandra/cass-operator/issues/722) Add back the ability to track cleanup task before marking scale up as done. This is controlled by an annotation cassandra.datastax.com/track-cleanup-tasks * [BUGFIX] [#705](https://github.com/k8ssandra/cass-operator/issues/705) Ensure ConfigSecret has annotations map before trying to set a value ## v1.22.4 diff --git a/apis/cassandra/v1beta1/cassandradatacenter_types.go b/apis/cassandra/v1beta1/cassandradatacenter_types.go index 24558a03..b7dd2add 100644 --- a/apis/cassandra/v1beta1/cassandradatacenter_types.go +++ b/apis/cassandra/v1beta1/cassandradatacenter_types.go @@ -77,6 +77,10 @@ const ( // UseClientBuilderAnnotation enforces the usage of new config builder from k8ssandra-client for versions that would otherwise use the cass-config-builder UseClientBuilderAnnotation = "cassandra.datastax.com/use-new-config-builder" + // TrackCleanupTasksAnnotation enforces the operator to track cleanup tasks after doing scale up. This prevents other operations to take place until the cleanup + // task has completed. + TrackCleanupTasksAnnotation = "cassandra.datastax.com/track-cleanup-tasks" + AllowUpdateAlways AllowUpdateType = "always" AllowUpdateOnce AllowUpdateType = "once" @@ -563,6 +567,25 @@ func (status *CassandraDatacenterStatus) GetConditionStatus(conditionType Datace return corev1.ConditionUnknown } +func (status *CassandraDatacenterStatus) AddTaskToTrack(objectMeta metav1.ObjectMeta) { + if status.TrackedTasks == nil { + status.TrackedTasks = make([]corev1.ObjectReference, 0, 1) + } + + status.TrackedTasks = append(status.TrackedTasks, corev1.ObjectReference{ + Name: objectMeta.Name, + Namespace: objectMeta.Namespace, + }) +} + +func (status *CassandraDatacenterStatus) RemoveTrackedTask(objectMeta metav1.ObjectMeta) { + for index, task := range status.TrackedTasks { + if task.Name == objectMeta.Name && task.Namespace == objectMeta.Namespace { + status.TrackedTasks = append(status.TrackedTasks[:index], status.TrackedTasks[index+1:]...) + } + } +} + func (dc *CassandraDatacenter) GetConditionStatus(conditionType DatacenterConditionType) corev1.ConditionStatus { return (&dc.Status).GetConditionStatus(conditionType) } diff --git a/pkg/reconciliation/reconcile_racks.go b/pkg/reconciliation/reconcile_racks.go index d40012b2..af5f4c36 100644 --- a/pkg/reconciliation/reconcile_racks.go +++ b/pkg/reconciliation/reconcile_racks.go @@ -2274,10 +2274,27 @@ func (rc *ReconciliationContext) CheckCassandraNodeStatuses() result.ReconcileRe func (rc *ReconciliationContext) cleanupAfterScaling() result.ReconcileResult { if !metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.NoAutomatedCleanupAnnotation) { + + if metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.TrackCleanupTasksAnnotation) { + // Verify if the cleanup task has completed before moving on the with ScalingUp finished + task, err := rc.findActiveTask(taskapi.CommandCleanup) + if err != nil { + return result.Error(err) + } + + if task != nil { + return rc.activeTaskCompleted(task) + } + } + // Create the cleanup task if err := rc.createTask(taskapi.CommandCleanup); err != nil { return result.Error(err) } + + if metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.TrackCleanupTasksAnnotation) { + return result.RequeueSoon(10) + } } return result.Continue() @@ -2319,7 +2336,48 @@ func (rc *ReconciliationContext) createTask(command taskapi.CassandraCommand) er return err } - return nil + if !metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.TrackCleanupTasksAnnotation) { + return nil + } + + dcPatch := client.MergeFrom(dc.DeepCopy()) + + rc.Datacenter.Status.AddTaskToTrack(task.ObjectMeta) + + return rc.Client.Status().Patch(rc.Ctx, dc, dcPatch) +} + +func (rc *ReconciliationContext) activeTaskCompleted(task *taskapi.CassandraTask) result.ReconcileResult { + if task.Status.CompletionTime != nil { + // Job was completed, remove it from followed task + dc := rc.Datacenter + dcPatch := client.MergeFrom(dc.DeepCopy()) + rc.Datacenter.Status.RemoveTrackedTask(task.ObjectMeta) + if err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch); err != nil { + return result.Error(err) + } + return result.Continue() + } + return result.RequeueSoon(10) +} + +func (rc *ReconciliationContext) findActiveTask(command taskapi.CassandraCommand) (*taskapi.CassandraTask, error) { + if len(rc.Datacenter.Status.TrackedTasks) > 0 { + for _, taskMeta := range rc.Datacenter.Status.TrackedTasks { + taskKey := types.NamespacedName{Name: taskMeta.Name, Namespace: taskMeta.Namespace} + task := &taskapi.CassandraTask{} + if err := rc.Client.Get(rc.Ctx, taskKey, task); err != nil { + return nil, err + } + + for _, job := range task.Spec.Jobs { + if job.Command == command { + return task, nil + } + } + } + } + return nil, nil } func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileResult { diff --git a/pkg/reconciliation/reconcile_racks_test.go b/pkg/reconciliation/reconcile_racks_test.go index 89cc2eee..aa1ab0b4 100644 --- a/pkg/reconciliation/reconcile_racks_test.go +++ b/pkg/reconciliation/reconcile_racks_test.go @@ -1631,6 +1631,49 @@ func TestCleanupAfterScaling(t *testing.T) { r := rc.cleanupAfterScaling() assert.Equal(result.Continue(), r, "expected result of result.Continue()") assert.Equal(taskapi.CommandCleanup, task.Spec.Jobs[0].Command) + assert.Equal(0, len(rc.Datacenter.Status.TrackedTasks)) +} + +func TestCleanupAfterScalingWithTracker(t *testing.T) { + rc, _, cleanupMockScr := setupTest() + defer cleanupMockScr() + assert := assert.New(t) + + // Setup annotation + + mockClient := mocks.NewClient(t) + rc.Client = mockClient + + metav1.SetMetaDataAnnotation(&rc.Datacenter.ObjectMeta, api.TrackCleanupTasksAnnotation, "true") + + var task *taskapi.CassandraTask + // 1. Create task - return ok + k8sMockClientCreate(rc.Client.(*mocks.Client), nil). + Run(func(args mock.Arguments) { + arg := args.Get(1).(*taskapi.CassandraTask) + task = arg + }). + Times(1) + + k8sMockClientStatusPatch(mockClient.Status().(*mocks.SubResourceClient), nil).Once() + + r := rc.cleanupAfterScaling() + assert.Equal(taskapi.CommandCleanup, task.Spec.Jobs[0].Command) + assert.Equal(result.RequeueSoon(10), r, "expected result of result.RequeueSoon(10)") + assert.Equal(1, len(rc.Datacenter.Status.TrackedTasks)) + // 3. GET - return completed task + k8sMockClientGet(rc.Client.(*mocks.Client), nil). + Run(func(args mock.Arguments) { + arg := args.Get(2).(*taskapi.CassandraTask) + task.DeepCopyInto(arg) + timeNow := metav1.Now() + arg.Status.CompletionTime = &timeNow + }).Once() + // 4. Patch to datacenter status + k8sMockClientStatusPatch(mockClient.Status().(*mocks.SubResourceClient), nil).Once() + r = rc.cleanupAfterScaling() + assert.Equal(result.Continue(), r, "expected result of result.Continue()") + assert.Equal(0, len(rc.Datacenter.Status.TrackedTasks)) } func TestStripPassword(t *testing.T) { @@ -2874,3 +2917,38 @@ func TestDatacenterPodsOldLabels(t *testing.T) { // We should still find the pods assert.Equal(int(*desiredStatefulSet.Spec.Replicas), len(rc.datacenterPods())) } + +func TestCheckRackLabels(t *testing.T) { + rc, _, cleanupMockScr := setupTest() + defer cleanupMockScr() + require := require.New(t) + err := rc.CalculateRackInformation() + require.NoError(err) + + desiredStatefulSet, err := newStatefulSetForCassandraDatacenter( + nil, + "default", + rc.Datacenter, + 3) + require.NoErrorf(err, "error occurred creating statefulset") + + desiredStatefulSet.Status.ReadyReplicas = *desiredStatefulSet.Spec.Replicas + + trackObjects := []runtime.Object{ + desiredStatefulSet, + rc.Datacenter, + } + rc.Client = fake.NewClientBuilder().WithStatusSubresource(rc.Datacenter).WithRuntimeObjects(trackObjects...).Build() + + rc.statefulSets = []*appsv1.StatefulSet{desiredStatefulSet} + + res := rc.CheckRackLabels() + require.Equal(result.Continue(), res, "Label updates should not cause errors") + require.Subset(desiredStatefulSet.Labels, rc.Datacenter.GetRackLabels("default")) + desiredStatefulSet.Labels[api.RackLabel] = "r1" + require.NotSubset(desiredStatefulSet.Labels, rc.Datacenter.GetRackLabels("default")) + + res = rc.CheckRackLabels() + require.Equal(result.Continue(), res, "Label updates should not cause errors") + require.Subset(desiredStatefulSet.Labels, rc.Datacenter.GetRackLabels("default")) +} diff --git a/tests/testdata/default-two-rack-two-node-dc.yaml b/tests/testdata/default-two-rack-two-node-dc.yaml index 97817e4c..fbc3f628 100644 --- a/tests/testdata/default-two-rack-two-node-dc.yaml +++ b/tests/testdata/default-two-rack-two-node-dc.yaml @@ -6,7 +6,7 @@ spec: clusterName: cluster1 datacenterName: My_Super_Dc serverType: cassandra - serverVersion: "4.0.10" + serverVersion: "4.1.7" managementApiAuth: insecure: {} size: 2