From 8963678038647f6fdad602aa9a6e9f9bc38128aa Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Wed, 11 Oct 2023 19:06:19 +0300 Subject: [PATCH 01/12] Add flush and garbagecollect methods, sync and async --- pkg/httphelper/client.go | 72 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/pkg/httphelper/client.go b/pkg/httphelper/client.go index a2821d01..100e7e61 100644 --- a/pkg/httphelper/client.go +++ b/pkg/httphelper/client.go @@ -138,6 +138,8 @@ const ( FullQuerySupport Feature = "full_query_logging" Rebuild Feature = "rebuild" Move Feature = "async_move_task" + AsyncGarbageCollect Feature = "async_gc_task" + AsyncFlush Feature = "async_flush_task" ) func (f *FeatureSet) UnmarshalJSON(b []byte) error { @@ -593,7 +595,6 @@ type ScrubRequest struct { Tables []string `json:"tables"` } -// TODO This, keyspaceRequest and compactRequest are all pretty much the same.. func createScrubRequest(pod *corev1.Pod, scrubRequest *ScrubRequest, endpoint string) (*nodeMgmtRequest, error) { body, err := json.Marshal(scrubRequest) if err != nil { @@ -1263,5 +1264,74 @@ func (client *NodeMgmtClient) CallSetFullQueryLog(pod *corev1.Pod, enableFullQue ) _, err = callNodeMgmtEndpoint(client, request, "") + + return err +} + +func (client *NodeMgmtClient) CallFlush(pod *corev1.Pod, keyspaceName string, tables []string) (string, error) { + client.Log.Info( + "calling Management API keyspace flush - POST /api/v1/ops/tables/flush", + "pod", pod.Name, + ) + + req, err := createKeySpaceRequest(pod, -1, keyspaceName, tables, "/api/v1/ops/tables/flush") + if err != nil { + return "", err + } + + req.timeout = 20 * time.Second + + jobId, err := callNodeMgmtEndpoint(client, *req, "application/json") + return string(jobId), err +} + +func (client *NodeMgmtClient) CallFlushEndpoint(pod *corev1.Pod, keyspaceName string, tables []string) error { + client.Log.Info( + "calling Management API keyspace flush - POST /api/v0/ops/tables/flush", + "pod", pod.Name, + ) + + req, err := createKeySpaceRequest(pod, -1, keyspaceName, tables, "/api/v0/ops/tables/flush") + if err != nil { + return err + } + + req.timeout = 60 * time.Second + + _, err = callNodeMgmtEndpoint(client, *req, "application/json") + return err +} + +func (client *NodeMgmtClient) CallGarbageCollect(pod *corev1.Pod, keyspaceName string, tables []string) (string, error) { + client.Log.Info( + "calling Management API keyspace flush - POST /api/v1/ops/tables/garbagecollect", + "pod", pod.Name, + ) + + req, err := createKeySpaceRequest(pod, -1, keyspaceName, tables, "/api/v1/ops/tables/garbagecollect") + if err != nil { + return "", err + } + + req.timeout = 20 * time.Second + + jobId, err := callNodeMgmtEndpoint(client, *req, "application/json") + return string(jobId), err +} + +func (client *NodeMgmtClient) CallGarbageCollectEndpoint(pod *corev1.Pod, keyspaceName string, tables []string) error { + client.Log.Info( + "calling Management API keyspace flush - POST /api/v0/ops/tables/garbagecollect", + "pod", pod.Name, + ) + + req, err := createKeySpaceRequest(pod, -1, keyspaceName, tables, "/api/v0/ops/tables/garbagecollect") + if err != nil { + return err + } + + req.timeout = 60 * time.Second + + _, err = callNodeMgmtEndpoint(client, *req, "application/json") return err } From 095bb862f0b821be8679bb62729f2c0159f41c2f Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Wed, 11 Oct 2023 20:41:40 +0300 Subject: [PATCH 02/12] Implement new tasks, flush and garbagecollect --- apis/control/v1alpha1/cassandratask_types.go | 11 ++- .../control/v1alpha1/zz_generated.deepcopy.go | 5 ++ .../control.k8ssandra.io_cassandratasks.yaml | 4 + .../control/cassandratask_controller.go | 10 ++- .../control/cassandratask_controller_test.go | 82 +++++++++++++++++++ internal/controllers/control/jobs.go | 67 ++++++++++++--- pkg/httphelper/server_test_utils.go | 8 +- 7 files changed, 165 insertions(+), 22 deletions(-) diff --git a/apis/control/v1alpha1/cassandratask_types.go b/apis/control/v1alpha1/cassandratask_types.go index d7a24845..cf669cc1 100644 --- a/apis/control/v1alpha1/cassandratask_types.go +++ b/apis/control/v1alpha1/cassandratask_types.go @@ -78,6 +78,8 @@ const ( CommandCompaction CassandraCommand = "compact" CommandScrub CassandraCommand = "scrub" CommandMove CassandraCommand = "move" + CommandGarbageCollect CassandraCommand = "garbagecollect" + CommandFlush CassandraCommand = "flush" ) type CassandraJob struct { @@ -91,10 +93,11 @@ type CassandraJob struct { } type JobArguments struct { - KeyspaceName string `json:"keyspace_name,omitempty"` - SourceDatacenter string `json:"source_datacenter,omitempty"` - PodName string `json:"pod_name,omitempty"` - RackName string `json:"rack,omitempty"` + KeyspaceName string `json:"keyspace_name,omitempty"` + SourceDatacenter string `json:"source_datacenter,omitempty"` + PodName string `json:"pod_name,omitempty"` + RackName string `json:"rack,omitempty"` + Tables []string `json:"tables,omitempty"` // NewTokens is a map of pod names to their newly-assigned tokens. Required for the move // command, ignored otherwise. Pods referenced in this map must exist; any existing pod not diff --git a/apis/control/v1alpha1/zz_generated.deepcopy.go b/apis/control/v1alpha1/zz_generated.deepcopy.go index f8bf3403..7c593d05 100644 --- a/apis/control/v1alpha1/zz_generated.deepcopy.go +++ b/apis/control/v1alpha1/zz_generated.deepcopy.go @@ -181,6 +181,11 @@ func (in *CassandraTaskTemplate) DeepCopy() *CassandraTaskTemplate { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JobArguments) DeepCopyInto(out *JobArguments) { *out = *in + if in.Tables != nil { + in, out := &in.Tables, &out.Tables + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.NewTokens != nil { in, out := &in.NewTokens, &out.NewTokens *out = make(map[string]string, len(*in)) diff --git a/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml b/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml index ab5c25c3..1cccff3f 100644 --- a/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml +++ b/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml @@ -124,6 +124,10 @@ spec: type: string source_datacenter: type: string + tables: + items: + type: string + type: array type: object command: description: Command defines what is run against Cassandra pods diff --git a/internal/controllers/control/cassandratask_controller.go b/internal/controllers/control/cassandratask_controller.go index 648a5b3e..60eb2a6d 100644 --- a/internal/controllers/control/cassandratask_controller.go +++ b/internal/controllers/control/cassandratask_controller.go @@ -292,16 +292,18 @@ JobDefinition: break JobDefinition case api.CommandReplaceNode: r.replace(taskConfig) - case "forceupgraderacks": - // res, failed, completed, err = r.reconcileDatacenter(ctx, &dc, forceupgrade(taskConfigProto)) case api.CommandUpgradeSSTables: upgradesstables(taskConfig) case api.CommandScrub: - // res, failed, completed, err = r.reconcileEveryPodTask(ctx, &dc, scrub(taskConfigProto)) + // scrub(taskConfig) case api.CommandCompaction: - // res, failed, completed, err = r.reconcileEveryPodTask(ctx, &dc, compact(taskConfigProto, job.Arguments)) + // compact(taskConfig) case api.CommandMove: r.move(taskConfig) + case api.CommandFlush: + flush(taskConfig) + case api.CommandGarbageCollect: + gc(taskConfig) default: err = fmt.Errorf("unknown job command: %s", job.Command) return ctrl.Result{}, err diff --git a/internal/controllers/control/cassandratask_controller_test.go b/internal/controllers/control/cassandratask_controller_test.go index 02ca3c66..b425c6fd 100644 --- a/internal/controllers/control/cassandratask_controller_test.go +++ b/internal/controllers/control/cassandratask_controller_test.go @@ -307,6 +307,54 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 3)) }) + It("Runs a flush task against the datacenter pods", func() { + By("Creating a task for flush") + + taskKey, task := buildTask(api.CommandFlush, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/flush"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + It("Runs a flush task against a pod", func() { + By("Creating a task for flush") + + taskKey, task := buildTask(api.CommandFlush, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName) + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/flush"]).To(Equal(1)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 1)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + + It("Runs a garbagecollect task against the datacenter pods", func() { + By("Creating a task for garbagecollect") + taskKey, task := buildTask(api.CommandGarbageCollect, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/garbagecollect"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) + + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + When("Running cleanup twice in the same datacenter", func() { It("Runs a cleanup task against the datacenter pods", func() { By("Creating a task for cleanup") @@ -461,6 +509,40 @@ var _ = Describe("CassandraTask controller tests", func() { // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) }) + + It("Runs a flush task against the datacenter pods", func() { + By("Creating a task for flush") + time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new + taskKey, task := buildTask(api.CommandFlush, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v0/ops/tables/flush"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + + It("Runs a garbagecollect task against the datacenter pods", func() { + By("Creating a task for garbagecollect") + time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new + taskKey, task := buildTask(api.CommandGarbageCollect, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v0/ops/tables/garbagecollect"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) }) Context("Task TTL", func() { var testNamespaceName string diff --git a/internal/controllers/control/jobs.go b/internal/controllers/control/jobs.go index 72b5d04b..e8f85b0a 100644 --- a/internal/controllers/control/jobs.go +++ b/internal/controllers/control/jobs.go @@ -21,15 +21,15 @@ import ( // Cleanup functionality func callCleanup(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { - // TODO Add more arguments configurations keyspaceName := taskConfig.Arguments.KeyspaceName - return nodeMgmtClient.CallKeyspaceCleanup(pod, -1, keyspaceName, nil) + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallKeyspaceCleanup(pod, -1, keyspaceName, tables) } func callCleanupSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { - // TODO Add more arguments configurations keyspaceName := taskConfig.Arguments.KeyspaceName - return nodeMgmtClient.CallKeyspaceCleanupEndpoint(pod, -1, keyspaceName, nil) + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallKeyspaceCleanupEndpoint(pod, -1, keyspaceName, tables) } func cleanup(taskConfig *TaskConfiguration) { @@ -109,15 +109,15 @@ func (r *CassandraTaskReconciler) restartSts(ctx context.Context, sts []appsv1.S // UpgradeSSTables functionality func callUpgradeSSTables(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { - // TODO Add more arguments configurations keyspaceName := taskConfig.Arguments.KeyspaceName - return nodeMgmtClient.CallUpgradeSSTables(pod, -1, keyspaceName, nil) + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallUpgradeSSTables(pod, -1, keyspaceName, tables) } func callUpgradeSSTablesSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { - // TODO Add more arguments configurations keyspaceName := taskConfig.Arguments.KeyspaceName - return nodeMgmtClient.CallUpgradeSSTablesEndpoint(pod, -1, keyspaceName, nil) + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallUpgradeSSTablesEndpoint(pod, -1, keyspaceName, tables) } func upgradesstables(taskConfig *TaskConfiguration) { @@ -178,10 +178,13 @@ func (r *CassandraTaskReconciler) replaceValidator(taskConfig *TaskConfiguration return fmt.Errorf("valid pod_name to replace is required") } -func replaceFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { +func genericPodFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { // If pod isn't in the to be replaced pods, return false podName := taskConfig.Arguments.PodName - return pod.Name == podName + if podName != "" { + return pod.Name == podName + } + return true } // replacePreProcess adds enough information to CassandraDatacenter to ensure cass-operator knows this pod is being replaced @@ -207,7 +210,7 @@ func (r *CassandraTaskReconciler) setDatacenterCondition(dc *cassapi.CassandraDa func (r *CassandraTaskReconciler) replace(taskConfig *TaskConfiguration) { taskConfig.SyncFunc = r.replacePod taskConfig.ValidateFunc = r.replaceValidator - taskConfig.PodFilter = replaceFilter + taskConfig.PodFilter = genericPodFilter taskConfig.PreProcessFunc = r.replacePreProcess } @@ -253,6 +256,48 @@ func (r *CassandraTaskReconciler) move(taskConfig *TaskConfiguration) { taskConfig.ValidateFunc = r.moveValidator } +// Flush functionality + +func callFlushSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { + keyspaceName := taskConfig.Arguments.KeyspaceName + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallFlushEndpoint(pod, keyspaceName, tables) +} + +func callFlushAsync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { + keyspaceName := taskConfig.Arguments.KeyspaceName + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallFlush(pod, keyspaceName, tables) +} + +func flush(taskConfig *TaskConfiguration) { + taskConfig.AsyncFeature = httphelper.AsyncFlush + taskConfig.PodFilter = genericPodFilter + taskConfig.SyncFunc = callFlushSync + taskConfig.AsyncFunc = callFlushAsync +} + +// GarbageCollect functionality + +func callGarbageCollectAsync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { + keyspaceName := taskConfig.Arguments.KeyspaceName + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallGarbageCollect(pod, keyspaceName, tables) +} + +func callGarbageCollectSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { + keyspaceName := taskConfig.Arguments.KeyspaceName + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallGarbageCollectEndpoint(pod, keyspaceName, tables) +} + +func gc(taskConfig *TaskConfiguration) { + taskConfig.AsyncFeature = httphelper.AsyncGarbageCollect + taskConfig.PodFilter = genericPodFilter + taskConfig.AsyncFunc = callGarbageCollectAsync + taskConfig.SyncFunc = callGarbageCollectSync +} + // Common functions func isCassandraUp(pod *corev1.Pod) bool { diff --git a/pkg/httphelper/server_test_utils.go b/pkg/httphelper/server_test_utils.go index ae839804..8d9cfe32 100644 --- a/pkg/httphelper/server_test_utils.go +++ b/pkg/httphelper/server_test_utils.go @@ -15,7 +15,9 @@ var featuresReply = `{ "async_sstable_tasks", "rebuild", "async_upgrade_sstable_task", - "async_move_task" + "async_move_task", + "async_gc_task", + "async_flush_task" ] }` @@ -66,7 +68,7 @@ func FakeExecutorServerWithDetails(callDetails *CallDetails) (*httptest.Server, w.WriteHeader(http.StatusOK) jobId := query.Get("job_id") _, err = w.Write([]byte(fmt.Sprintf(jobDetailsCompleted, jobId))) - } else if r.Method == http.MethodPost && (r.URL.Path == "/api/v1/ops/keyspace/cleanup" || r.URL.Path == "/api/v1/ops/node/rebuild" || r.URL.Path == "/api/v1/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/move") { + } else if r.Method == http.MethodPost && (r.URL.Path == "/api/v1/ops/keyspace/cleanup" || r.URL.Path == "/api/v1/ops/node/rebuild" || r.URL.Path == "/api/v1/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/move" || r.URL.Path == "/api/v1/ops/tables/compact" || r.URL.Path == "/api/v1/ops/tables/scrub" || r.URL.Path == "/api/v1/ops/tables/flush" || r.URL.Path == "/api/v1/ops/tables/garbagecollect") { w.WriteHeader(http.StatusOK) // Write jobId jobId++ @@ -117,7 +119,7 @@ func FakeExecutorServerWithDetailsFails(callDetails *CallDetails) (*httptest.Ser func FakeServerWithoutFeaturesEndpoint(callDetails *CallDetails) (*httptest.Server, error) { return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method == http.MethodPost && (r.URL.Path == "/api/v0/ops/keyspace/cleanup" || r.URL.Path == "/api/v0/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/drain") { + if r.Method == http.MethodPost && (r.URL.Path == "/api/v0/ops/keyspace/cleanup" || r.URL.Path == "/api/v0/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/drain" || r.URL.Path == "/api/v0/ops/tables/flush" || r.URL.Path == "/api/v0/ops/tables/garbagecollect" || r.URL.Path == "/api/v0/ops/tables/compact") { w.WriteHeader(http.StatusOK) } else { w.WriteHeader(http.StatusNotFound) From d8683239251ec7c0bb13832ca64aee8056b6d2c1 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Thu, 12 Oct 2023 19:32:42 +0300 Subject: [PATCH 03/12] Modify Conditions to be metav1.Condition as it was close enough to JobCondition. Also, fix sync pod annotations to patch the correct instance. Modify the task Validator to mark the task as Failed instead of retrying until the task is fixed (and only logging to cass-operator logs why it is failing). --- CHANGELOG.md | 1 + apis/control/v1alpha1/cassandratask_types.go | 24 +----- .../control/v1alpha1/zz_generated.deepcopy.go | 20 +---- .../control.k8ssandra.io_cassandratasks.yaml | 60 ++++++++++--- .../control/cassandratask_controller.go | 86 +++++++++++++------ .../control/cassandratask_controller_test.go | 26 ++++-- internal/controllers/control/jobs.go | 41 ++++++--- pkg/httphelper/client.go | 2 +- 8 files changed, 162 insertions(+), 98 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44d97332..e7509472 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti ## unreleased * [CHANGE] [#573](https://github.com/k8ssandra/cass-operator/issues/573) Add the namespace as env variable in the server-system-logger container to label metrics with. +* [BUGFIX] [#585](https://github.com/k8ssandra/cass-operator/issues/585) If task validation fails, stop processing the task and mark the validation error to Failed condition ## v1.17.2 diff --git a/apis/control/v1alpha1/cassandratask_types.go b/apis/control/v1alpha1/cassandratask_types.go index cf669cc1..3a852563 100644 --- a/apis/control/v1alpha1/cassandratask_types.go +++ b/apis/control/v1alpha1/cassandratask_types.go @@ -107,9 +107,6 @@ type JobArguments struct { // CassandraTaskStatus defines the observed state of CassandraJob type CassandraTaskStatus struct { - - // TODO Status and Conditions is almost 1:1 to Kubernetes Job's definitions. - // The latest available observations of an object's current state. When a Job // fails, one of the conditions will have type "Failed" and status true. When // a Job is suspended, one of the conditions will have type "Suspended" and @@ -121,7 +118,7 @@ type CassandraTaskStatus struct { // +patchMergeKey=type // +patchStrategy=merge // +listType=atomic - Conditions []JobCondition `json:"conditions,omitempty"` + Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"` // Represents time when the job controller started processing a job. When a // Job is created in the suspended state, this field is not set until the @@ -161,25 +158,6 @@ const ( JobRunning JobConditionType = "Running" ) -type JobCondition struct { - // Type of job condition, Complete or Failed. - Type JobConditionType `json:"type"` - // Status of the condition, one of True, False, Unknown. - Status corev1.ConditionStatus `json:"status"` - // Last time the condition was checked. - // +optional - LastProbeTime metav1.Time `json:"lastProbeTime,omitempty"` - // Last time the condition transit from one status to another. - // +optional - LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` - // (brief) reason for the condition's last transition. - // +optional - Reason string `json:"reason,omitempty"` - // Human readable message indicating details about last transition. - // +optional - Message string `json:"message,omitempty"` -} - //+kubebuilder:object:root=true //+kubebuilder:subresource:status diff --git a/apis/control/v1alpha1/zz_generated.deepcopy.go b/apis/control/v1alpha1/zz_generated.deepcopy.go index 7c593d05..45f5748f 100644 --- a/apis/control/v1alpha1/zz_generated.deepcopy.go +++ b/apis/control/v1alpha1/zz_generated.deepcopy.go @@ -22,6 +22,7 @@ limitations under the License. package v1alpha1 import ( + "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -122,7 +123,7 @@ func (in *CassandraTaskStatus) DeepCopyInto(out *CassandraTaskStatus) { *out = *in if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make([]JobCondition, len(*in)) + *out = make([]v1.Condition, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -204,20 +205,3 @@ func (in *JobArguments) DeepCopy() *JobArguments { in.DeepCopyInto(out) return out } - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *JobCondition) DeepCopyInto(out *JobCondition) { - *out = *in - in.LastProbeTime.DeepCopyInto(&out.LastProbeTime) - in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobCondition. -func (in *JobCondition) DeepCopy() *JobCondition { - if in == nil { - return nil - } - out := new(JobCondition) - in.DeepCopyInto(out) - return out -} diff --git a/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml b/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml index 1cccff3f..44397b33 100644 --- a/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml +++ b/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml @@ -180,30 +180,68 @@ spec: one of the conditions will have type "Complete" and status true. More info: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/' items: + description: "Condition contains details for one aspect of the current + state of this API Resource. --- This struct is intended for direct + use as an array at the field path .status.conditions. For example, + \n type FooStatus struct{ // Represents the observations of a + foo's current state. // Known .status.conditions.type are: \"Available\", + \"Progressing\", and \"Degraded\" // +patchMergeKey=type // +patchStrategy=merge + // +listType=map // +listMapKey=type Conditions []metav1.Condition + `json:\"conditions,omitempty\" patchStrategy:\"merge\" patchMergeKey:\"type\" + protobuf:\"bytes,1,rep,name=conditions\"` \n // other fields }" properties: - lastProbeTime: - description: Last time the condition was checked. - format: date-time - type: string lastTransitionTime: - description: Last time the condition transit from one status - to another. + description: lastTransitionTime is the last time the condition + transitioned from one status to another. This should be when + the underlying condition changed. If that is not known, then + using the time when the API field changed is acceptable. format: date-time type: string message: - description: Human readable message indicating details about - last transition. + description: message is a human readable message indicating + details about the transition. This may be an empty string. + maxLength: 32768 type: string + observedGeneration: + description: observedGeneration represents the .metadata.generation + that the condition was set based upon. For instance, if .metadata.generation + is currently 12, but the .status.conditions[x].observedGeneration + is 9, the condition is out of date with respect to the current + state of the instance. + format: int64 + minimum: 0 + type: integer reason: - description: (brief) reason for the condition's last transition. + description: reason contains a programmatic identifier indicating + the reason for the condition's last transition. Producers + of specific condition types may define expected values and + meanings for this field, and whether the values are considered + a guaranteed API. The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ type: string status: - description: Status of the condition, one of True, False, Unknown. + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown type: string type: - description: Type of job condition, Complete or Failed. + description: type of condition in CamelCase or in foo.example.com/CamelCase. + --- Many .condition.type values are consistent across resources + like Available, but because arbitrary conditions can be useful + (see .node.status.conditions), the ability to deconflict is + important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string required: + - lastTransitionTime + - message + - reason - status - type type: object diff --git a/internal/controllers/control/cassandratask_controller.go b/internal/controllers/control/cassandratask_controller.go index 60eb2a6d..74913adc 100644 --- a/internal/controllers/control/cassandratask_controller.go +++ b/internal/controllers/control/cassandratask_controller.go @@ -70,8 +70,10 @@ type AsyncTaskExecutorFunc func(httphelper.NodeMgmtClient, *corev1.Pod, *TaskCon // SyncTaskExecutorFunc is called as a backup if async one isn't supported type SyncTaskExecutorFunc func(httphelper.NodeMgmtClient, *corev1.Pod, *TaskConfiguration) error -// ValidatorFunc validates that necessary parameters are set for the task -type ValidatorFunc func(*TaskConfiguration) error +// ValidatorFunc validates that necessary parameters are set for the task. If false is returned, the task +// has failed the validation and the error has the details. If true is returned, the error is transient and +// should be retried. +type ValidatorFunc func(*TaskConfiguration) (bool, error) // ProcessFunc is a function that's run before the pods are being processed individually, or after // the pods have been processed. @@ -106,11 +108,11 @@ type TaskConfiguration struct { Completed int } -func (t *TaskConfiguration) Validate() error { +func (t *TaskConfiguration) Validate() (bool, error) { if t.ValidateFunc != nil { return t.ValidateFunc(t) } - return nil + return true, nil } func (t *TaskConfiguration) Filter(pod *corev1.Pod) bool { @@ -295,9 +297,9 @@ JobDefinition: case api.CommandUpgradeSSTables: upgradesstables(taskConfig) case api.CommandScrub: - // scrub(taskConfig) + scrub(taskConfig) case api.CommandCompaction: - // compact(taskConfig) + compact(taskConfig) case api.CommandMove: r.move(taskConfig) case api.CommandFlush: @@ -309,17 +311,26 @@ JobDefinition: return ctrl.Result{}, err } - if err := taskConfig.Validate(); err != nil { - return ctrl.Result{}, err + valid, errValidate := taskConfig.Validate() + if errValidate != nil && valid { + // Retry, this is a transient error + return ctrl.Result{}, errValidate + } + + if !valid { + failed++ + err = errValidate + res = ctrl.Result{} + break } - if !r.HasCondition(cassTask, api.JobRunning, corev1.ConditionTrue) { + if !r.HasCondition(cassTask, api.JobRunning, metav1.ConditionTrue) { if err := taskConfig.PreProcess(); err != nil { return ctrl.Result{}, err } } - if modified := SetCondition(&cassTask, api.JobRunning, corev1.ConditionTrue); modified { + if modified := SetCondition(&cassTask, api.JobRunning, metav1.ConditionTrue, ""); modified { if err = r.Client.Status().Update(ctx, &cassTask); err != nil { return ctrl.Result{}, err } @@ -341,20 +352,27 @@ JobDefinition: if res.RequeueAfter == 0 && !res.Requeue { // Job has been completed cassTask.GetLabels()[taskStatusLabel] = completedTaskLabelValue - if err = r.Client.Update(ctx, &cassTask); err != nil { - return res, err + if errUpdate := r.Client.Update(ctx, &cassTask); errUpdate != nil { + return res, errUpdate } - err = r.cleanupJobAnnotations(ctx, dc, taskId) - if err != nil { + if errCleanup := r.cleanupJobAnnotations(ctx, dc, taskId); errCleanup != nil { // Not the end of the world - logger.Error(err, "Failed to cleanup job annotations from pods") + logger.Error(errCleanup, "Failed to cleanup job annotations from pods") } cassTask.Status.Active = 0 cassTask.Status.CompletionTime = &timeNow - SetCondition(&cassTask, api.JobComplete, corev1.ConditionTrue) - SetCondition(&cassTask, api.JobRunning, corev1.ConditionFalse) + SetCondition(&cassTask, api.JobComplete, metav1.ConditionTrue, "") + SetCondition(&cassTask, api.JobRunning, metav1.ConditionFalse, "") + + if failed > 0 { + errMsg := "" + if err != nil { + errMsg = err.Error() + } + SetCondition(&cassTask, api.JobFailed, metav1.ConditionTrue, errMsg) + } // Requeue for deletion later deletionTime := calculateDeletionTime(&cassTask) @@ -380,26 +398,29 @@ func (r *CassandraTaskReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *CassandraTaskReconciler) HasCondition(task api.CassandraTask, condition api.JobConditionType, status corev1.ConditionStatus) bool { +func (r *CassandraTaskReconciler) HasCondition(task api.CassandraTask, condition api.JobConditionType, status metav1.ConditionStatus) bool { for _, cond := range task.Status.Conditions { - if cond.Type == condition { + if cond.Type == string(condition) { return cond.Status == status } } return false } -func SetCondition(task *api.CassandraTask, condition api.JobConditionType, status corev1.ConditionStatus) bool { +func SetCondition(task *api.CassandraTask, condition api.JobConditionType, status metav1.ConditionStatus, message string) bool { existing := false for i := 0; i < len(task.Status.Conditions); i++ { cond := task.Status.Conditions[i] - if cond.Type == condition { + if cond.Type == string(condition) { if cond.Status == status { // Already correct status return false } cond.Status = status cond.LastTransitionTime = metav1.Now() + if message != "" { + cond.Message = message + } existing = true task.Status.Conditions[i] = cond break @@ -407,11 +428,15 @@ func SetCondition(task *api.CassandraTask, condition api.JobConditionType, statu } if !existing { - cond := api.JobCondition{ - Type: condition, + cond := metav1.Condition{ + Type: string(condition), + Reason: string(condition), Status: status, LastTransitionTime: metav1.Now(), } + if message != "" { + cond.Message = message + } task.Status.Conditions = append(task.Status.Conditions, cond) } @@ -713,6 +738,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc // This feature is not supported in sync mode, mark everything as done err := fmt.Errorf("this job isn't supported by the target pod") logger.Error(err, "unable to execute requested job against pod", "Pod", pod) + failed++ return ctrl.Result{}, failed, completed, err } @@ -736,6 +762,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc go func(targetPod *corev1.Pod) { // Write value to the jobRunner to indicate we're running + podKey := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace} logger.V(1).Info("starting execution of sync blocking job", "Pod", targetPod) jobRunner <- idx defer func() { @@ -751,12 +778,19 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc jobStatus.Status = podJobCompleted } - if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil { + if err := r.Client.Get(context.Background(), podKey, &pod); err != nil { + logger.Error(err, "Failed to get pod for annotation update", "Pod", targetPod) + return + } + + podPatch := client.MergeFrom(targetPod.DeepCopy()) + + if err = JobStatusToPodAnnotations(taskConfig.Id, targetPod.Annotations, jobStatus); err != nil { logger.Error(err, "Failed to update local job's status", "Pod", targetPod) } - err = r.Client.Update(ctx, &pod) - if err != nil { + if err = r.Client.Patch(ctx, targetPod, podPatch); err != nil { + // err = r.Client.Update(ctx, &pod) logger.Error(err, "Failed to update local job's status", "Pod", targetPod) } }(&pod) diff --git a/internal/controllers/control/cassandratask_controller_test.go b/internal/controllers/control/cassandratask_controller_test.go index b425c6fd..8b9f9d9e 100644 --- a/internal/controllers/control/cassandratask_controller_test.go +++ b/internal/controllers/control/cassandratask_controller_test.go @@ -264,10 +264,10 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(len(completedTask.Status.Conditions)).To(Equal(2)) for _, cond := range completedTask.Status.Conditions { switch cond.Type { - case api.JobComplete: - Expect(cond.Status).To(Equal(corev1.ConditionTrue)) - case api.JobRunning: - Expect(cond.Status).To(Equal(corev1.ConditionFalse)) + case string(api.JobComplete): + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + case string(api.JobRunning): + Expect(cond.Status).To(Equal(metav1.ConditionFalse)) } } }) @@ -432,6 +432,20 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(completedTask.Status.Failed).To(BeNumerically(">=", nodeCount)) }) + It("Replace a node in the datacenter without specifying the pod", func() { + testFailedNamespaceName := fmt.Sprintf("test-task-failed-%d", rand.Int31()) + By("creating a datacenter", createDatacenter("dc1", testFailedNamespaceName)) + By("Creating a task for replacenode") + taskKey, task := buildTask(api.CommandReplaceNode, testFailedNamespaceName) + + Expect(k8sClient.Create(context.TODO(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(completedTask.Status.Failed).To(BeNumerically(">=", 1)) + Expect(completedTask.Status.Conditions[2].Type).To(Equal(string(api.JobFailed))) + Expect(completedTask.Status.Conditions[2].Message).To(Equal("valid pod_name to replace is required")) + }) }) }) Context("Sync jobs", func() { @@ -468,7 +482,6 @@ var _ = Describe("CassandraTask controller tests", func() { It("Runs a upgradesstables task against the datacenter pods", func() { By("Creating a task for upgradesstables") - time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new taskKey := createTask(api.CommandUpgradeSSTables, testNamespaceName) completedTask := waitForTaskCompletion(taskKey) @@ -483,7 +496,6 @@ var _ = Describe("CassandraTask controller tests", func() { It("Replaces a node in the datacenter", func() { By("Creating a task for replacenode") - time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new taskKey, task := buildTask(api.CommandReplaceNode, testNamespaceName) podKey := types.NamespacedName{ @@ -512,7 +524,6 @@ var _ = Describe("CassandraTask controller tests", func() { It("Runs a flush task against the datacenter pods", func() { By("Creating a task for flush") - time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new taskKey, task := buildTask(api.CommandFlush, testNamespaceName) task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) @@ -529,7 +540,6 @@ var _ = Describe("CassandraTask controller tests", func() { It("Runs a garbagecollect task against the datacenter pods", func() { By("Creating a task for garbagecollect") - time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new taskKey, task := buildTask(api.CommandGarbageCollect, testNamespaceName) task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) diff --git a/internal/controllers/control/jobs.go b/internal/controllers/control/jobs.go index e8f85b0a..fe2c739c 100644 --- a/internal/controllers/control/jobs.go +++ b/internal/controllers/control/jobs.go @@ -161,25 +161,30 @@ func (r *CassandraTaskReconciler) replacePod(nodeMgmtClient httphelper.NodeMgmtC return nil } -func (r *CassandraTaskReconciler) replaceValidator(taskConfig *TaskConfiguration) error { +func (r *CassandraTaskReconciler) replaceValidator(taskConfig *TaskConfiguration) (bool, error) { // Check that arguments has replaceable pods and that those pods are actually existing pods if taskConfig.Arguments.PodName != "" { pods, err := r.getDatacenterPods(taskConfig.Context, taskConfig.Datacenter) if err != nil { - return err + return true, err } for _, pod := range pods { if pod.Name == taskConfig.Arguments.PodName { - return nil + return true, nil } } } - return fmt.Errorf("valid pod_name to replace is required") + return false, fmt.Errorf("valid pod_name to replace is required") } -func genericPodFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { +func requiredPodFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { // If pod isn't in the to be replaced pods, return false + podName := taskConfig.Arguments.PodName + return pod.Name == podName +} + +func genericPodFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { podName := taskConfig.Arguments.PodName if podName != "" { return pod.Name == podName @@ -210,7 +215,7 @@ func (r *CassandraTaskReconciler) setDatacenterCondition(dc *cassapi.CassandraDa func (r *CassandraTaskReconciler) replace(taskConfig *TaskConfiguration) { taskConfig.SyncFunc = r.replacePod taskConfig.ValidateFunc = r.replaceValidator - taskConfig.PodFilter = genericPodFilter + taskConfig.PodFilter = requiredPodFilter taskConfig.PreProcessFunc = r.replacePreProcess } @@ -226,13 +231,13 @@ func moveFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { return found } -func (r *CassandraTaskReconciler) moveValidator(taskConfig *TaskConfiguration) error { +func (r *CassandraTaskReconciler) moveValidator(taskConfig *TaskConfiguration) (bool, error) { if len(taskConfig.Arguments.NewTokens) == 0 { - return fmt.Errorf("missing required new_tokens argument") + return false, fmt.Errorf("missing required new_tokens argument") } pods, err := r.getDatacenterPods(taskConfig.Context, taskConfig.Datacenter) if err != nil { - return err + return true, err } for podName := range taskConfig.Arguments.NewTokens { found := false @@ -243,10 +248,10 @@ func (r *CassandraTaskReconciler) moveValidator(taskConfig *TaskConfiguration) e } } if !found { - return fmt.Errorf("invalid new_tokens argument: pod doesn't exist: %s", podName) + return false, fmt.Errorf("invalid new_tokens argument: pod doesn't exist: %s", podName) } } - return nil + return true, nil } func (r *CassandraTaskReconciler) move(taskConfig *TaskConfiguration) { @@ -298,6 +303,20 @@ func gc(taskConfig *TaskConfiguration) { taskConfig.SyncFunc = callGarbageCollectSync } +// Scrub functionality + +func scrub(taskConfig *TaskConfiguration) { + taskConfig.AsyncFeature = httphelper.AsyncScrubTask + taskConfig.PodFilter = genericPodFilter +} + +// Compaction functionality + +func compact(taskConfig *TaskConfiguration) { + taskConfig.AsyncFeature = httphelper.AsyncCompactionTask + taskConfig.PodFilter = genericPodFilter +} + // Common functions func isCassandraUp(pod *corev1.Pod) bool { diff --git a/pkg/httphelper/client.go b/pkg/httphelper/client.go index 100e7e61..f2d75bd8 100644 --- a/pkg/httphelper/client.go +++ b/pkg/httphelper/client.go @@ -133,7 +133,7 @@ const ( // AsyncSSTableTasks includes "cleanup" and "decommission" AsyncSSTableTasks Feature = "async_sstable_tasks" AsyncUpgradeSSTableTask Feature = "async_upgrade_sstable_task" - AsyncCompactionTasks Feature = "async_compaction_task" + AsyncCompactionTask Feature = "async_compaction_task" AsyncScrubTask Feature = "async_scrub_task" FullQuerySupport Feature = "full_query_logging" Rebuild Feature = "rebuild" From 277d9b1098d03747c63202fcc0cab259213d9c87 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Fri, 13 Oct 2023 15:47:22 +0300 Subject: [PATCH 04/12] Fix a flake in replacenode tests that was due to the speed of the requeue causing a matching timing with the pod delete (before recreate). Also, implement scrub and compaction calls. --- CHANGELOG.md | 4 + apis/control/v1alpha1/cassandratask_types.go | 8 ++ .../control/v1alpha1/zz_generated.deepcopy.go | 5 ++ .../control.k8ssandra.io_cassandratasks.yaml | 9 ++ .../control/cassandratask_controller.go | 65 +++++++++------ .../control/cassandratask_controller_test.go | 2 +- internal/controllers/control/jobs.go | 83 ++++++++++++++++++- 7 files changed, 147 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7509472..313d6fe6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti ## unreleased * [CHANGE] [#573](https://github.com/k8ssandra/cass-operator/issues/573) Add the namespace as env variable in the server-system-logger container to label metrics with. +* [ENHANCEMENT] [#580](https://github.com/k8ssandra/cass-operator/issues/580) Add garbageCollect CassandraTask that removes deleted data +* [ENHANCEMENT] [#578](https://github.com/k8ssandra/cass-operator/issues/578) Add flush CassandraTask that flushed memtables to the disk +* [ENHANCEMENT] [#586](https://github.com/k8ssandra/cass-operator/issues/578) Add scrub CassandraTask that allows rebuilding SSTables +* [ENHANCEMENT] [#582](https://github.com/k8ssandra/cass-operator/issues/582) Add compaction CassandraTask to force a compaction * [BUGFIX] [#585](https://github.com/k8ssandra/cass-operator/issues/585) If task validation fails, stop processing the task and mark the validation error to Failed condition ## v1.17.2 diff --git a/apis/control/v1alpha1/cassandratask_types.go b/apis/control/v1alpha1/cassandratask_types.go index 3a852563..4dc67bc6 100644 --- a/apis/control/v1alpha1/cassandratask_types.go +++ b/apis/control/v1alpha1/cassandratask_types.go @@ -98,6 +98,14 @@ type JobArguments struct { PodName string `json:"pod_name,omitempty"` RackName string `json:"rack,omitempty"` Tables []string `json:"tables,omitempty"` + JobsCount *int `json:"jobs,omitempty"` + + // Scrub arguments + NoValidate bool `json:"no_validate,omitempty"` + NoSnapshot bool `json:"no_snapshot,omitempty"` + SkipCorrupted bool `json:"skip_corrupted,omitempty"` + + // Compaction arguments // NewTokens is a map of pod names to their newly-assigned tokens. Required for the move // command, ignored otherwise. Pods referenced in this map must exist; any existing pod not diff --git a/apis/control/v1alpha1/zz_generated.deepcopy.go b/apis/control/v1alpha1/zz_generated.deepcopy.go index 45f5748f..096ead4c 100644 --- a/apis/control/v1alpha1/zz_generated.deepcopy.go +++ b/apis/control/v1alpha1/zz_generated.deepcopy.go @@ -187,6 +187,11 @@ func (in *JobArguments) DeepCopyInto(out *JobArguments) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.JobsCount != nil { + in, out := &in.JobsCount, &out.JobsCount + *out = new(int) + **out = **in + } if in.NewTokens != nil { in, out := &in.NewTokens, &out.NewTokens *out = make(map[string]string, len(*in)) diff --git a/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml b/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml index 44397b33..15f07a38 100644 --- a/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml +++ b/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml @@ -108,6 +108,8 @@ spec: args: description: Arguments are additional parameters for the command properties: + jobs: + type: integer keyspace_name: type: string new_tokens: @@ -118,10 +120,17 @@ spec: Pods referenced in this map must exist; any existing pod not referenced in this map will not be moved. type: object + no_snapshot: + type: boolean + no_validate: + description: Scrub arguments + type: boolean pod_name: type: string rack: type: string + skip_corrupted: + type: boolean source_datacenter: type: string tables: diff --git a/internal/controllers/control/cassandratask_controller.go b/internal/controllers/control/cassandratask_controller.go index 74913adc..ba71df4e 100644 --- a/internal/controllers/control/cassandratask_controller.go +++ b/internal/controllers/control/cassandratask_controller.go @@ -311,20 +311,20 @@ JobDefinition: return ctrl.Result{}, err } - valid, errValidate := taskConfig.Validate() - if errValidate != nil && valid { - // Retry, this is a transient error - return ctrl.Result{}, errValidate - } + if !r.HasCondition(cassTask, api.JobRunning, metav1.ConditionTrue) { + valid, errValidate := taskConfig.Validate() + if errValidate != nil && valid { + // Retry, this is a transient error + return ctrl.Result{}, errValidate + } - if !valid { - failed++ - err = errValidate - res = ctrl.Result{} - break - } + if !valid { + failed++ + err = errValidate + res = ctrl.Result{} + break + } - if !r.HasCondition(cassTask, api.JobRunning, metav1.ConditionTrue) { if err := taskConfig.PreProcess(); err != nil { return ctrl.Result{}, err } @@ -344,12 +344,15 @@ JobDefinition: if res.RequeueAfter > 0 { // This job isn't complete yet or there's an error, do not continue + logger.V(1).Info("This job isn't complete yet or there's an error, requeueing", "requeueAfter", res.RequeueAfter) break } // completedCount++ + logger.V(1).Info("We're returning from this", "completed", completed, "failed", failed) } if res.RequeueAfter == 0 && !res.Requeue { + logger.V(1).Info("No requeues queued..", "completed", completed, "failed", failed, "jobCount", len(cassTask.Spec.Jobs)) // Job has been completed cassTask.GetLabels()[taskStatusLabel] = completedTaskLabelValue if errUpdate := r.Client.Update(ctx, &cassTask); errUpdate != nil { @@ -734,6 +737,11 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc return ctrl.Result{}, failed, completed, err } } else { + if len(jobRunner) > 0 { + // Something is still holding the worker + return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil + } + if taskConfig.SyncFunc == nil { // This feature is not supported in sync mode, mark everything as done err := fmt.Errorf("this job isn't supported by the target pod") @@ -760,44 +768,53 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc pod := pod - go func(targetPod *corev1.Pod) { + go func() { + // go func(targetPod *corev1.Pod) { // Write value to the jobRunner to indicate we're running podKey := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace} - logger.V(1).Info("starting execution of sync blocking job", "Pod", targetPod) + logger.V(1).Info("starting execution of sync blocking job", "Pod", pod) jobRunner <- idx defer func() { // Remove the value from the jobRunner <-jobRunner }() - if err = taskConfig.SyncFunc(nodeMgmtClient, targetPod, taskConfig); err != nil { + if err = taskConfig.SyncFunc(nodeMgmtClient, &pod, taskConfig); err != nil { // We only log, nothing else to do - we won't even retry this pod - logger.Error(err, "executing the sync task failed", "Pod", targetPod) + logger.Error(err, "executing the sync task failed", "Pod", pod) jobStatus.Status = podJobError } else { jobStatus.Status = podJobCompleted } if err := r.Client.Get(context.Background(), podKey, &pod); err != nil { - logger.Error(err, "Failed to get pod for annotation update", "Pod", targetPod) - return + logger.Error(err, "Failed to get pod for annotation update", "Pod", pod) } - podPatch := client.MergeFrom(targetPod.DeepCopy()) + podPatch := client.MergeFrom(pod.DeepCopy()) - if err = JobStatusToPodAnnotations(taskConfig.Id, targetPod.Annotations, jobStatus); err != nil { - logger.Error(err, "Failed to update local job's status", "Pod", targetPod) + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil { + logger.Error(err, "Failed to update local job's status", "Pod", pod) } - if err = r.Client.Patch(ctx, targetPod, podPatch); err != nil { + if err = r.Client.Patch(ctx, &pod, podPatch); err != nil { // err = r.Client.Update(ctx, &pod) - logger.Error(err, "Failed to update local job's status", "Pod", targetPod) + logger.Error(err, "Failed to update local job's status", "Pod", pod) } - }(&pod) + }() } // We have a job going on, return back later to check the status return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil } + + if len(jobRunner) > 0 { + // Something is still holding the worker while none of the existing pods are, probably the replace job. + return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil + } + return ctrl.Result{}, failed, completed, nil } diff --git a/internal/controllers/control/cassandratask_controller_test.go b/internal/controllers/control/cassandratask_controller_test.go index 8b9f9d9e..4687d8af 100644 --- a/internal/controllers/control/cassandratask_controller_test.go +++ b/internal/controllers/control/cassandratask_controller_test.go @@ -448,7 +448,7 @@ var _ = Describe("CassandraTask controller tests", func() { }) }) }) - Context("Sync jobs", func() { + FContext("Sync jobs", func() { var testNamespaceName string BeforeEach(func() { By("Creating fake synchronous mgmt-api server") diff --git a/internal/controllers/control/jobs.go b/internal/controllers/control/jobs.go index fe2c739c..be2cb25c 100644 --- a/internal/controllers/control/jobs.go +++ b/internal/controllers/control/jobs.go @@ -23,13 +23,21 @@ import ( func callCleanup(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { keyspaceName := taskConfig.Arguments.KeyspaceName tables := taskConfig.Arguments.Tables - return nodeMgmtClient.CallKeyspaceCleanup(pod, -1, keyspaceName, tables) + jobCount := -1 + if taskConfig.Arguments.JobsCount != nil { + jobCount = *taskConfig.Arguments.JobsCount + } + return nodeMgmtClient.CallKeyspaceCleanup(pod, jobCount, keyspaceName, tables) } func callCleanupSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { keyspaceName := taskConfig.Arguments.KeyspaceName tables := taskConfig.Arguments.Tables - return nodeMgmtClient.CallKeyspaceCleanupEndpoint(pod, -1, keyspaceName, tables) + jobCount := -1 + if taskConfig.Arguments.JobsCount != nil { + jobCount = *taskConfig.Arguments.JobsCount + } + return nodeMgmtClient.CallKeyspaceCleanupEndpoint(pod, jobCount, keyspaceName, tables) } func cleanup(taskConfig *TaskConfiguration) { @@ -111,13 +119,22 @@ func (r *CassandraTaskReconciler) restartSts(ctx context.Context, sts []appsv1.S func callUpgradeSSTables(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { keyspaceName := taskConfig.Arguments.KeyspaceName tables := taskConfig.Arguments.Tables - return nodeMgmtClient.CallUpgradeSSTables(pod, -1, keyspaceName, tables) + jobCount := -1 + if taskConfig.Arguments.JobsCount != nil { + jobCount = *taskConfig.Arguments.JobsCount + } + + return nodeMgmtClient.CallUpgradeSSTables(pod, jobCount, keyspaceName, tables) } func callUpgradeSSTablesSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { keyspaceName := taskConfig.Arguments.KeyspaceName tables := taskConfig.Arguments.Tables - return nodeMgmtClient.CallUpgradeSSTablesEndpoint(pod, -1, keyspaceName, tables) + jobCount := -1 + if taskConfig.Arguments.JobsCount != nil { + jobCount = *taskConfig.Arguments.JobsCount + } + return nodeMgmtClient.CallUpgradeSSTablesEndpoint(pod, jobCount, keyspaceName, tables) } func upgradesstables(taskConfig *TaskConfiguration) { @@ -132,6 +149,8 @@ func upgradesstables(taskConfig *TaskConfiguration) { func (r *CassandraTaskReconciler) replacePod(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { // We check the podStartTime to prevent replacing the pod multiple times since annotations are removed when we delete the pod podStartTime := pod.GetCreationTimestamp() + uid := pod.UID + podKey := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} if podStartTime.Before(taskConfig.TaskStartTime) { if isCassandraUp(pod) { // Verify the cassandra pod is healthy before trying the drain @@ -158,6 +177,18 @@ func (r *CassandraTaskReconciler) replacePod(nodeMgmtClient httphelper.NodeMgmtC return err } } + + for i := 0; i < 10; i++ { + time.Sleep(1 * time.Second) + newPod := &corev1.Pod{} + if err := r.Client.Get(taskConfig.Context, podKey, newPod); err != nil { + continue + } + if uid != newPod.UID { + break + } + } + return nil } @@ -305,16 +336,60 @@ func gc(taskConfig *TaskConfiguration) { // Scrub functionality +func createScrubRequest(taskConfig *TaskConfiguration) *httphelper.ScrubRequest { + sr := &httphelper.ScrubRequest{ + DisableSnapshot: taskConfig.Arguments.NoSnapshot, + SkipCorrupted: taskConfig.Arguments.SkipCorrupted, + CheckData: !taskConfig.Arguments.NoValidate, + Jobs: -1, + KeyspaceName: taskConfig.Arguments.KeyspaceName, + Tables: taskConfig.Arguments.Tables, + } + + if taskConfig.Arguments.JobsCount != nil { + sr.Jobs = *taskConfig.Arguments.JobsCount + } + + return sr +} + +func scrubSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { + return nodeMgmtClient.CallScrubEndpoint(pod, createScrubRequest(taskConfig)) +} + +func scrubAsync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { + return nodeMgmtClient.CallScrub(pod, createScrubRequest(taskConfig)) +} + func scrub(taskConfig *TaskConfiguration) { taskConfig.AsyncFeature = httphelper.AsyncScrubTask taskConfig.PodFilter = genericPodFilter + taskConfig.SyncFunc = scrubSync + taskConfig.AsyncFunc = scrubAsync } // Compaction functionality +func createCompactRequest(taskConfig *TaskConfiguration) *httphelper.CompactRequest { + return &httphelper.CompactRequest{ + KeyspaceName: taskConfig.Arguments.KeyspaceName, + Tables: taskConfig.Arguments.Tables, + } +} + +func compactSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { + return nodeMgmtClient.CallCompactionEndpoint(pod, createCompactRequest(taskConfig)) +} + +func compactAsync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { + return nodeMgmtClient.CallCompaction(pod, createCompactRequest(taskConfig)) +} + func compact(taskConfig *TaskConfiguration) { taskConfig.AsyncFeature = httphelper.AsyncCompactionTask taskConfig.PodFilter = genericPodFilter + taskConfig.SyncFunc = compactSync + taskConfig.AsyncFunc = compactAsync } // Common functions From f9388c99d0e0620520477b14a86eb7dc048a4ffc Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Mon, 16 Oct 2023 14:02:43 +0300 Subject: [PATCH 05/12] Add compaction and scrub tasks with tests. Improve testing in control tasks envtests by allowing to verify the payload sent to the management-api --- apis/control/v1alpha1/cassandratask_types.go | 3 + .../control.k8ssandra.io_cassandratasks.yaml | 7 ++ .../control/cassandratask_controller.go | 3 - .../control/cassandratask_controller_test.go | 68 ++++++++++++++++--- internal/controllers/control/jobs.go | 3 + pkg/httphelper/server_test_utils.go | 28 ++++++-- 6 files changed, 96 insertions(+), 16 deletions(-) diff --git a/apis/control/v1alpha1/cassandratask_types.go b/apis/control/v1alpha1/cassandratask_types.go index 4dc67bc6..b54eb213 100644 --- a/apis/control/v1alpha1/cassandratask_types.go +++ b/apis/control/v1alpha1/cassandratask_types.go @@ -106,6 +106,9 @@ type JobArguments struct { SkipCorrupted bool `json:"skip_corrupted,omitempty"` // Compaction arguments + SplitOutput bool `json:"split_output,omitempty"` + StartToken string `json:"start_token,omitempty"` + EndToken string `json:"end_token,omitempty"` // NewTokens is a map of pod names to their newly-assigned tokens. Required for the move // command, ignored otherwise. Pods referenced in this map must exist; any existing pod not diff --git a/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml b/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml index 15f07a38..2d88342b 100644 --- a/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml +++ b/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml @@ -108,6 +108,8 @@ spec: args: description: Arguments are additional parameters for the command properties: + end_token: + type: string jobs: type: integer keyspace_name: @@ -133,6 +135,11 @@ spec: type: boolean source_datacenter: type: string + split_output: + description: Compaction arguments + type: boolean + start_token: + type: string tables: items: type: string diff --git a/internal/controllers/control/cassandratask_controller.go b/internal/controllers/control/cassandratask_controller.go index ba71df4e..c4b0125c 100644 --- a/internal/controllers/control/cassandratask_controller.go +++ b/internal/controllers/control/cassandratask_controller.go @@ -347,12 +347,9 @@ JobDefinition: logger.V(1).Info("This job isn't complete yet or there's an error, requeueing", "requeueAfter", res.RequeueAfter) break } - // completedCount++ - logger.V(1).Info("We're returning from this", "completed", completed, "failed", failed) } if res.RequeueAfter == 0 && !res.Requeue { - logger.V(1).Info("No requeues queued..", "completed", completed, "failed", failed, "jobCount", len(cassTask.Spec.Jobs)) // Job has been completed cassTask.GetLabels()[taskStatusLabel] = completedTaskLabelValue if errUpdate := r.Client.Update(ctx, &cassTask); errUpdate != nil { diff --git a/internal/controllers/control/cassandratask_controller_test.go b/internal/controllers/control/cassandratask_controller_test.go index 4687d8af..157e86b0 100644 --- a/internal/controllers/control/cassandratask_controller_test.go +++ b/internal/controllers/control/cassandratask_controller_test.go @@ -2,6 +2,7 @@ package control import ( "context" + "encoding/json" "fmt" "math/rand" "net/http/httptest" @@ -259,7 +260,7 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount)) Expect(len(completedTask.Status.Conditions)).To(Equal(2)) for _, cond := range completedTask.Status.Conditions { @@ -282,8 +283,7 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) - // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount)) }) It("Runs a node move task against the datacenter pods", func() { By("Creating a task for move") @@ -305,7 +305,7 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 3)) Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 3)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 3)) + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 3)) }) It("Runs a flush task against the datacenter pods", func() { By("Creating a task for flush") @@ -320,7 +320,7 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount)) }) It("Runs a flush task against a pod", func() { By("Creating a task for flush") @@ -336,7 +336,7 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 1)) Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 1)) }) It("Runs a garbagecollect task against the datacenter pods", func() { @@ -351,8 +351,58 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) - // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount)) + }) + + It("Runs a scrub task against a pod", func() { + By("Creating a task for scrub") + + taskKey, task := buildTask(api.CommandScrub, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName) + task.Spec.Jobs[0].Arguments.NoValidate = false + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/scrub"]).To(Equal(1)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 1)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 1)) + + // Payloads should be of type ScrubRequest + var sreq httphelper.ScrubRequest + Expect(json.Unmarshal(callDetails.Payloads[0], &sreq)).Should(Succeed()) + Expect(sreq.CheckData).To(BeTrue()) + Expect(sreq.KeyspaceName).To(Equal("ks1")) + }) + + FIt("Runs a compaction task against a pod", func() { + By("Creating a task for compaction") + + taskKey, task := buildTask(api.CommandCompaction, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName) + task.Spec.Jobs[0].Arguments.Tables = []string{"table1"} + task.Spec.Jobs[0].Arguments.SplitOutput = true + + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/compact"]).To(Equal(1)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 1)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 1)) + + // Payloads should be of type CompactRequest + var req httphelper.CompactRequest + Expect(json.Unmarshal(callDetails.Payloads[0], &req)).Should(Succeed()) + Expect(req.KeyspaceName).To(Equal("ks1")) + Expect(req.SplitOutput).To(BeTrue()) + Expect(len(req.Tables)).To(BeNumerically("==", 1)) }) When("Running cleanup twice in the same datacenter", func() { @@ -448,7 +498,7 @@ var _ = Describe("CassandraTask controller tests", func() { }) }) }) - FContext("Sync jobs", func() { + Context("Sync jobs", func() { var testNamespaceName string BeforeEach(func() { By("Creating fake synchronous mgmt-api server") diff --git a/internal/controllers/control/jobs.go b/internal/controllers/control/jobs.go index be2cb25c..652dac15 100644 --- a/internal/controllers/control/jobs.go +++ b/internal/controllers/control/jobs.go @@ -374,6 +374,9 @@ func createCompactRequest(taskConfig *TaskConfiguration) *httphelper.CompactRequ return &httphelper.CompactRequest{ KeyspaceName: taskConfig.Arguments.KeyspaceName, Tables: taskConfig.Arguments.Tables, + SplitOutput: taskConfig.Arguments.SplitOutput, + StartToken: taskConfig.Arguments.StartToken, + EndToken: taskConfig.Arguments.EndToken, } } diff --git a/pkg/httphelper/server_test_utils.go b/pkg/httphelper/server_test_utils.go index 8d9cfe32..7b555aae 100644 --- a/pkg/httphelper/server_test_utils.go +++ b/pkg/httphelper/server_test_utils.go @@ -2,6 +2,7 @@ package httphelper import ( "fmt" + "io" "net" "net/http" "net/http/httptest" @@ -17,7 +18,9 @@ var featuresReply = `{ "async_upgrade_sstable_task", "async_move_task", "async_gc_task", - "async_flush_task" + "async_flush_task", + "async_scrub_task", + "async_compaction_task" ] }` @@ -36,11 +39,13 @@ func mgmtApiListener() (net.Listener, error) { type CallDetails struct { URLCounts map[string]int + Payloads [][]byte } func NewCallDetails() *CallDetails { return &CallDetails{ URLCounts: make(map[string]int), + Payloads: make([][]byte, 0), } } @@ -68,7 +73,15 @@ func FakeExecutorServerWithDetails(callDetails *CallDetails) (*httptest.Server, w.WriteHeader(http.StatusOK) jobId := query.Get("job_id") _, err = w.Write([]byte(fmt.Sprintf(jobDetailsCompleted, jobId))) - } else if r.Method == http.MethodPost && (r.URL.Path == "/api/v1/ops/keyspace/cleanup" || r.URL.Path == "/api/v1/ops/node/rebuild" || r.URL.Path == "/api/v1/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/move" || r.URL.Path == "/api/v1/ops/tables/compact" || r.URL.Path == "/api/v1/ops/tables/scrub" || r.URL.Path == "/api/v1/ops/tables/flush" || r.URL.Path == "/api/v1/ops/tables/garbagecollect") { + } else if r.Method == http.MethodPost && + (r.URL.Path == "/api/v1/ops/keyspace/cleanup" || + r.URL.Path == "/api/v1/ops/node/rebuild" || + r.URL.Path == "/api/v1/ops/tables/sstables/upgrade" || + r.URL.Path == "/api/v0/ops/node/move" || + r.URL.Path == "/api/v1/ops/tables/compact" || + r.URL.Path == "/api/v1/ops/tables/scrub" || + r.URL.Path == "/api/v1/ops/tables/flush" || + r.URL.Path == "/api/v1/ops/tables/garbagecollect") { w.WriteHeader(http.StatusOK) // Write jobId jobId++ @@ -87,8 +100,6 @@ func FakeExecutorServerWithDetails(callDetails *CallDetails) (*httptest.Server, func FakeExecutorServerWithDetailsFails(callDetails *CallDetails) (*httptest.Server, error) { jobId := 0 - // TODO Repeated code from above.. refactor - return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { query, err := url.ParseQuery(r.URL.RawQuery) if err != nil { @@ -135,6 +146,15 @@ func FakeMgmtApiServer(callDetails *CallDetails, handlerFunc http.HandlerFunc) ( callerFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if callDetails != nil { callDetails.incr(r.URL.Path) + + if r.ContentLength > 0 { + payload, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + callDetails.Payloads = append(callDetails.Payloads, payload) + } } handlerFunc(w, r) }) From 1b4961e4dba589d003cc149580ce6e8ebbf10c8a Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Mon, 16 Oct 2023 14:14:02 +0300 Subject: [PATCH 06/12] Add rebuild task to the decommission_dc e2e test --- tests/decommission_dc/decommission_dc_suite_test.go | 10 ++++++++++ tests/node_move/node_move_suite_test.go | 5 +++-- tests/testdata/tasks/rebuild_task.yaml | 13 +++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 tests/testdata/tasks/rebuild_task.yaml diff --git a/tests/decommission_dc/decommission_dc_suite_test.go b/tests/decommission_dc/decommission_dc_suite_test.go index 2d2db0c5..a3be6fc6 100644 --- a/tests/decommission_dc/decommission_dc_suite_test.go +++ b/tests/decommission_dc/decommission_dc_suite_test.go @@ -27,6 +27,7 @@ var ( dc1Label = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", api.CleanupForKubernetes(dc1OverrideName)) dc2Label = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dc2Name) seedLabel = "cassandra.datastax.com/seed-node=true" + taskYaml = "../testdata/tasks/rebuild_dc_task.yaml" // dcLabel = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dcName) ns = ginkgo_util.NewWrapper(testName, namespace) ) @@ -127,6 +128,15 @@ var _ = Describe(testName, func() { ns.ExpectDoneReconciling(dc1Name) ns.ExpectDoneReconciling(dc2Name) + // Create a CassandraTask to rebuild dc2 from dc1 + // Create CassandraTask that should replace a node + step = "creating a cassandra rebuild dc2" + k = kubectl.ApplyFiles(taskYaml) + ns.ExecAndLog(step, k) + + // Wait for the task to be completed + ns.WaitForCompleteTask("rebuild-dc") + podNames := ns.GetDatacenterReadyPodNames(dc1OverrideName) Expect(len(podNames)).To(Equal(2)) dcs := findDatacenters(podNames[0]) diff --git a/tests/node_move/node_move_suite_test.go b/tests/node_move/node_move_suite_test.go index 2d79627d..3ad10916 100644 --- a/tests/node_move/node_move_suite_test.go +++ b/tests/node_move/node_move_suite_test.go @@ -5,9 +5,10 @@ package node_move import ( "fmt" + "testing" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "testing" "github.com/k8ssandra/cass-operator/tests/kustomize" ginkgo_util "github.com/k8ssandra/cass-operator/tests/util/ginkgo" @@ -60,7 +61,7 @@ var _ = Describe(testName, func() { }) Specify("the operator can move a cassandra node to a new token", func() { - // Create CassandraTask that should replace a node + // Create CassandraTask that should move tokens step := "creating a cassandra task to move a node" k := kubectl.ApplyFiles(taskYaml) ns.ExecAndLog(step, k) diff --git a/tests/testdata/tasks/rebuild_task.yaml b/tests/testdata/tasks/rebuild_task.yaml new file mode 100644 index 00000000..3ef55b5a --- /dev/null +++ b/tests/testdata/tasks/rebuild_task.yaml @@ -0,0 +1,13 @@ +apiVersion: control.k8ssandra.io/v1alpha1 +kind: CassandraTask +metadata: + name: rebuild-dc +spec: + datacenter: + name: dc2 + namespace: test-decommission-dc + jobs: + - name: rebuild-dc2 + command: rebuild + args: + datacenter: dc1 From 9a52376b953e489745f589434230f91eace1292e Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Mon, 16 Oct 2023 14:36:24 +0300 Subject: [PATCH 07/12] Remove unintentional focused test --- internal/controllers/control/cassandratask_controller_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/controllers/control/cassandratask_controller_test.go b/internal/controllers/control/cassandratask_controller_test.go index 157e86b0..55f3a60f 100644 --- a/internal/controllers/control/cassandratask_controller_test.go +++ b/internal/controllers/control/cassandratask_controller_test.go @@ -378,7 +378,7 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(sreq.KeyspaceName).To(Equal("ks1")) }) - FIt("Runs a compaction task against a pod", func() { + It("Runs a compaction task against a pod", func() { By("Creating a task for compaction") taskKey, task := buildTask(api.CommandCompaction, testNamespaceName) From 99133a0589782c795a1c55c137fcb88588008fc0 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Mon, 16 Oct 2023 19:12:49 +0300 Subject: [PATCH 08/12] Fix filename typo --- tests/decommission_dc/decommission_dc_suite_test.go | 2 +- tests/testdata/tasks/rebuild_task.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/decommission_dc/decommission_dc_suite_test.go b/tests/decommission_dc/decommission_dc_suite_test.go index a3be6fc6..4958858d 100644 --- a/tests/decommission_dc/decommission_dc_suite_test.go +++ b/tests/decommission_dc/decommission_dc_suite_test.go @@ -27,7 +27,7 @@ var ( dc1Label = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", api.CleanupForKubernetes(dc1OverrideName)) dc2Label = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dc2Name) seedLabel = "cassandra.datastax.com/seed-node=true" - taskYaml = "../testdata/tasks/rebuild_dc_task.yaml" + taskYaml = "../testdata/tasks/rebuild_task.yaml" // dcLabel = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dcName) ns = ginkgo_util.NewWrapper(testName, namespace) ) diff --git a/tests/testdata/tasks/rebuild_task.yaml b/tests/testdata/tasks/rebuild_task.yaml index 3ef55b5a..a73d5fa7 100644 --- a/tests/testdata/tasks/rebuild_task.yaml +++ b/tests/testdata/tasks/rebuild_task.yaml @@ -10,4 +10,4 @@ spec: - name: rebuild-dc2 command: rebuild args: - datacenter: dc1 + datacenter: My_Super_Dc From 513669f05990a1310d4d4ecbf849dc64841effac Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Mon, 16 Oct 2023 20:16:37 +0300 Subject: [PATCH 09/12] rebuild_task datacenter -> source_datacenter --- tests/testdata/tasks/rebuild_task.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testdata/tasks/rebuild_task.yaml b/tests/testdata/tasks/rebuild_task.yaml index a73d5fa7..97ca4ac1 100644 --- a/tests/testdata/tasks/rebuild_task.yaml +++ b/tests/testdata/tasks/rebuild_task.yaml @@ -10,4 +10,4 @@ spec: - name: rebuild-dc2 command: rebuild args: - datacenter: My_Super_Dc + source_datacenter: My_Super_Dc From ce6419045b71c1d4c12ee462f6820d5b6b62055a Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Tue, 17 Oct 2023 13:23:36 +0300 Subject: [PATCH 10/12] Add logging to show the jobId that we fetch --- pkg/httphelper/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/httphelper/client.go b/pkg/httphelper/client.go index f2d75bd8..bfece9f2 100644 --- a/pkg/httphelper/client.go +++ b/pkg/httphelper/client.go @@ -1059,8 +1059,9 @@ func (client *NodeMgmtClient) FeatureSet(pod *corev1.Pod) (*FeatureSet, error) { func (client *NodeMgmtClient) JobDetails(pod *corev1.Pod, jobId string) (*JobDetails, error) { client.Log.Info( - "calling Management API features - GET /api/v0/ops/executor/job", + "calling Management API jobDetails - GET /api/v0/ops/executor/job", "pod", pod.Name, + "jobId", jobId, ) podHost, podPort, err := BuildPodHostFromPod(pod) From b0e3827ed4cb952a7e82564af460123c9d79f5ba Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Tue, 17 Oct 2023 13:24:04 +0300 Subject: [PATCH 11/12] Verify in the test_all_the_things that the cleanup task has completed --- tests/test_all_the_things/test_all_the_things_suite_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_all_the_things/test_all_the_things_suite_test.go b/tests/test_all_the_things/test_all_the_things_suite_test.go index 79eb86c6..68946e5a 100644 --- a/tests/test_all_the_things/test_all_the_things_suite_test.go +++ b/tests/test_all_the_things/test_all_the_things_suite_test.go @@ -96,6 +96,9 @@ var _ = Describe(testName, func() { ns.ExpectDatacenterNameStatusUpdated(dcName, dcNameOverride) + // Ensure we have a single CassandraTask created which is a cleanup (and it succeeded) + ns.WaitForCompletedCassandraTasks(dcName, "cleanup", 1) + step = "stopping the dc" json = "{\"spec\": {\"stopped\": true}}" k = kubectl.PatchMerge(dcResource, json) From c1da7c4374e8decfb0cefe164ca3521c2710c730 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Tue, 17 Oct 2023 13:41:54 +0300 Subject: [PATCH 12/12] Add rack filtering to all genericPodFilter jobs --- .../control/cassandratask_controller_test.go | 17 +++++++++++++++++ internal/controllers/control/jobs.go | 10 ++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/internal/controllers/control/cassandratask_controller_test.go b/internal/controllers/control/cassandratask_controller_test.go index 55f3a60f..75912fab 100644 --- a/internal/controllers/control/cassandratask_controller_test.go +++ b/internal/controllers/control/cassandratask_controller_test.go @@ -143,6 +143,7 @@ func createPod(namespace, clusterName, dcName, rackName string, ordinal int) { Labels: map[string]string{ cassdcapi.ClusterLabel: clusterName, cassdcapi.DatacenterLabel: dcName, + cassdcapi.RackLabel: rackName, }, }, Spec: corev1.PodSpec{ @@ -354,6 +355,22 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount)) }) + It("Runs a garbagecollect task against rack's pods", func() { + By("Creating a task for garbagecollect") + taskKey, task := buildTask(api.CommandGarbageCollect, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + task.Spec.Jobs[0].Arguments.RackName = "r2" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/garbagecollect"]).To(Equal(nodeCount / rackCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount/rackCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount/rackCount)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount/rackCount)) + }) + It("Runs a scrub task against a pod", func() { By("Creating a task for scrub") diff --git a/internal/controllers/control/jobs.go b/internal/controllers/control/jobs.go index 652dac15..0cc5a039 100644 --- a/internal/controllers/control/jobs.go +++ b/internal/controllers/control/jobs.go @@ -216,11 +216,17 @@ func requiredPodFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { } func genericPodFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { + accept := true + rackName := taskConfig.Arguments.RackName + if rackName != "" { + accept = accept && pod.Labels[cassapi.RackLabel] == rackName + } + podName := taskConfig.Arguments.PodName if podName != "" { - return pod.Name == podName + accept = accept && pod.Name == podName } - return true + return accept } // replacePreProcess adds enough information to CassandraDatacenter to ensure cass-operator knows this pod is being replaced