diff --git a/CHANGELOG.md b/CHANGELOG.md index 44d97332..313d6fe6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,11 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti ## unreleased * [CHANGE] [#573](https://github.com/k8ssandra/cass-operator/issues/573) Add the namespace as env variable in the server-system-logger container to label metrics with. +* [ENHANCEMENT] [#580](https://github.com/k8ssandra/cass-operator/issues/580) Add garbageCollect CassandraTask that removes deleted data +* [ENHANCEMENT] [#578](https://github.com/k8ssandra/cass-operator/issues/578) Add flush CassandraTask that flushed memtables to the disk +* [ENHANCEMENT] [#586](https://github.com/k8ssandra/cass-operator/issues/578) Add scrub CassandraTask that allows rebuilding SSTables +* [ENHANCEMENT] [#582](https://github.com/k8ssandra/cass-operator/issues/582) Add compaction CassandraTask to force a compaction +* [BUGFIX] [#585](https://github.com/k8ssandra/cass-operator/issues/585) If task validation fails, stop processing the task and mark the validation error to Failed condition ## v1.17.2 diff --git a/apis/control/v1alpha1/cassandratask_types.go b/apis/control/v1alpha1/cassandratask_types.go index d7a24845..b54eb213 100644 --- a/apis/control/v1alpha1/cassandratask_types.go +++ b/apis/control/v1alpha1/cassandratask_types.go @@ -78,6 +78,8 @@ const ( CommandCompaction CassandraCommand = "compact" CommandScrub CassandraCommand = "scrub" CommandMove CassandraCommand = "move" + CommandGarbageCollect CassandraCommand = "garbagecollect" + CommandFlush CassandraCommand = "flush" ) type CassandraJob struct { @@ -91,10 +93,22 @@ type CassandraJob struct { } type JobArguments struct { - KeyspaceName string `json:"keyspace_name,omitempty"` - SourceDatacenter string `json:"source_datacenter,omitempty"` - PodName string `json:"pod_name,omitempty"` - RackName string `json:"rack,omitempty"` + KeyspaceName string `json:"keyspace_name,omitempty"` + SourceDatacenter string `json:"source_datacenter,omitempty"` + PodName string `json:"pod_name,omitempty"` + RackName string `json:"rack,omitempty"` + Tables []string `json:"tables,omitempty"` + JobsCount *int `json:"jobs,omitempty"` + + // Scrub arguments + NoValidate bool `json:"no_validate,omitempty"` + NoSnapshot bool `json:"no_snapshot,omitempty"` + SkipCorrupted bool `json:"skip_corrupted,omitempty"` + + // Compaction arguments + SplitOutput bool `json:"split_output,omitempty"` + StartToken string `json:"start_token,omitempty"` + EndToken string `json:"end_token,omitempty"` // NewTokens is a map of pod names to their newly-assigned tokens. Required for the move // command, ignored otherwise. Pods referenced in this map must exist; any existing pod not @@ -104,9 +118,6 @@ type JobArguments struct { // CassandraTaskStatus defines the observed state of CassandraJob type CassandraTaskStatus struct { - - // TODO Status and Conditions is almost 1:1 to Kubernetes Job's definitions. - // The latest available observations of an object's current state. When a Job // fails, one of the conditions will have type "Failed" and status true. When // a Job is suspended, one of the conditions will have type "Suspended" and @@ -118,7 +129,7 @@ type CassandraTaskStatus struct { // +patchMergeKey=type // +patchStrategy=merge // +listType=atomic - Conditions []JobCondition `json:"conditions,omitempty"` + Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"` // Represents time when the job controller started processing a job. When a // Job is created in the suspended state, this field is not set until the @@ -158,25 +169,6 @@ const ( JobRunning JobConditionType = "Running" ) -type JobCondition struct { - // Type of job condition, Complete or Failed. - Type JobConditionType `json:"type"` - // Status of the condition, one of True, False, Unknown. - Status corev1.ConditionStatus `json:"status"` - // Last time the condition was checked. - // +optional - LastProbeTime metav1.Time `json:"lastProbeTime,omitempty"` - // Last time the condition transit from one status to another. - // +optional - LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` - // (brief) reason for the condition's last transition. - // +optional - Reason string `json:"reason,omitempty"` - // Human readable message indicating details about last transition. - // +optional - Message string `json:"message,omitempty"` -} - //+kubebuilder:object:root=true //+kubebuilder:subresource:status diff --git a/apis/control/v1alpha1/zz_generated.deepcopy.go b/apis/control/v1alpha1/zz_generated.deepcopy.go index f8bf3403..096ead4c 100644 --- a/apis/control/v1alpha1/zz_generated.deepcopy.go +++ b/apis/control/v1alpha1/zz_generated.deepcopy.go @@ -22,6 +22,7 @@ limitations under the License. package v1alpha1 import ( + "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -122,7 +123,7 @@ func (in *CassandraTaskStatus) DeepCopyInto(out *CassandraTaskStatus) { *out = *in if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions - *out = make([]JobCondition, len(*in)) + *out = make([]v1.Condition, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -181,6 +182,16 @@ func (in *CassandraTaskTemplate) DeepCopy() *CassandraTaskTemplate { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JobArguments) DeepCopyInto(out *JobArguments) { *out = *in + if in.Tables != nil { + in, out := &in.Tables, &out.Tables + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.JobsCount != nil { + in, out := &in.JobsCount, &out.JobsCount + *out = new(int) + **out = **in + } if in.NewTokens != nil { in, out := &in.NewTokens, &out.NewTokens *out = make(map[string]string, len(*in)) @@ -199,20 +210,3 @@ func (in *JobArguments) DeepCopy() *JobArguments { in.DeepCopyInto(out) return out } - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *JobCondition) DeepCopyInto(out *JobCondition) { - *out = *in - in.LastProbeTime.DeepCopyInto(&out.LastProbeTime) - in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobCondition. -func (in *JobCondition) DeepCopy() *JobCondition { - if in == nil { - return nil - } - out := new(JobCondition) - in.DeepCopyInto(out) - return out -} diff --git a/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml b/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml index ab5c25c3..2d88342b 100644 --- a/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml +++ b/config/crd/bases/control.k8ssandra.io_cassandratasks.yaml @@ -108,6 +108,10 @@ spec: args: description: Arguments are additional parameters for the command properties: + end_token: + type: string + jobs: + type: integer keyspace_name: type: string new_tokens: @@ -118,12 +122,28 @@ spec: Pods referenced in this map must exist; any existing pod not referenced in this map will not be moved. type: object + no_snapshot: + type: boolean + no_validate: + description: Scrub arguments + type: boolean pod_name: type: string rack: type: string + skip_corrupted: + type: boolean source_datacenter: type: string + split_output: + description: Compaction arguments + type: boolean + start_token: + type: string + tables: + items: + type: string + type: array type: object command: description: Command defines what is run against Cassandra pods @@ -176,30 +196,68 @@ spec: one of the conditions will have type "Complete" and status true. More info: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/' items: + description: "Condition contains details for one aspect of the current + state of this API Resource. --- This struct is intended for direct + use as an array at the field path .status.conditions. For example, + \n type FooStatus struct{ // Represents the observations of a + foo's current state. // Known .status.conditions.type are: \"Available\", + \"Progressing\", and \"Degraded\" // +patchMergeKey=type // +patchStrategy=merge + // +listType=map // +listMapKey=type Conditions []metav1.Condition + `json:\"conditions,omitempty\" patchStrategy:\"merge\" patchMergeKey:\"type\" + protobuf:\"bytes,1,rep,name=conditions\"` \n // other fields }" properties: - lastProbeTime: - description: Last time the condition was checked. - format: date-time - type: string lastTransitionTime: - description: Last time the condition transit from one status - to another. + description: lastTransitionTime is the last time the condition + transitioned from one status to another. This should be when + the underlying condition changed. If that is not known, then + using the time when the API field changed is acceptable. format: date-time type: string message: - description: Human readable message indicating details about - last transition. + description: message is a human readable message indicating + details about the transition. This may be an empty string. + maxLength: 32768 type: string + observedGeneration: + description: observedGeneration represents the .metadata.generation + that the condition was set based upon. For instance, if .metadata.generation + is currently 12, but the .status.conditions[x].observedGeneration + is 9, the condition is out of date with respect to the current + state of the instance. + format: int64 + minimum: 0 + type: integer reason: - description: (brief) reason for the condition's last transition. + description: reason contains a programmatic identifier indicating + the reason for the condition's last transition. Producers + of specific condition types may define expected values and + meanings for this field, and whether the values are considered + a guaranteed API. The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ type: string status: - description: Status of the condition, one of True, False, Unknown. + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown type: string type: - description: Type of job condition, Complete or Failed. + description: type of condition in CamelCase or in foo.example.com/CamelCase. + --- Many .condition.type values are consistent across resources + like Available, but because arbitrary conditions can be useful + (see .node.status.conditions), the ability to deconflict is + important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string required: + - lastTransitionTime + - message + - reason - status - type type: object diff --git a/internal/controllers/control/cassandratask_controller.go b/internal/controllers/control/cassandratask_controller.go index 648a5b3e..c4b0125c 100644 --- a/internal/controllers/control/cassandratask_controller.go +++ b/internal/controllers/control/cassandratask_controller.go @@ -70,8 +70,10 @@ type AsyncTaskExecutorFunc func(httphelper.NodeMgmtClient, *corev1.Pod, *TaskCon // SyncTaskExecutorFunc is called as a backup if async one isn't supported type SyncTaskExecutorFunc func(httphelper.NodeMgmtClient, *corev1.Pod, *TaskConfiguration) error -// ValidatorFunc validates that necessary parameters are set for the task -type ValidatorFunc func(*TaskConfiguration) error +// ValidatorFunc validates that necessary parameters are set for the task. If false is returned, the task +// has failed the validation and the error has the details. If true is returned, the error is transient and +// should be retried. +type ValidatorFunc func(*TaskConfiguration) (bool, error) // ProcessFunc is a function that's run before the pods are being processed individually, or after // the pods have been processed. @@ -106,11 +108,11 @@ type TaskConfiguration struct { Completed int } -func (t *TaskConfiguration) Validate() error { +func (t *TaskConfiguration) Validate() (bool, error) { if t.ValidateFunc != nil { return t.ValidateFunc(t) } - return nil + return true, nil } func (t *TaskConfiguration) Filter(pod *corev1.Pod) bool { @@ -292,32 +294,43 @@ JobDefinition: break JobDefinition case api.CommandReplaceNode: r.replace(taskConfig) - case "forceupgraderacks": - // res, failed, completed, err = r.reconcileDatacenter(ctx, &dc, forceupgrade(taskConfigProto)) case api.CommandUpgradeSSTables: upgradesstables(taskConfig) case api.CommandScrub: - // res, failed, completed, err = r.reconcileEveryPodTask(ctx, &dc, scrub(taskConfigProto)) + scrub(taskConfig) case api.CommandCompaction: - // res, failed, completed, err = r.reconcileEveryPodTask(ctx, &dc, compact(taskConfigProto, job.Arguments)) + compact(taskConfig) case api.CommandMove: r.move(taskConfig) + case api.CommandFlush: + flush(taskConfig) + case api.CommandGarbageCollect: + gc(taskConfig) default: err = fmt.Errorf("unknown job command: %s", job.Command) return ctrl.Result{}, err } - if err := taskConfig.Validate(); err != nil { - return ctrl.Result{}, err - } + if !r.HasCondition(cassTask, api.JobRunning, metav1.ConditionTrue) { + valid, errValidate := taskConfig.Validate() + if errValidate != nil && valid { + // Retry, this is a transient error + return ctrl.Result{}, errValidate + } + + if !valid { + failed++ + err = errValidate + res = ctrl.Result{} + break + } - if !r.HasCondition(cassTask, api.JobRunning, corev1.ConditionTrue) { if err := taskConfig.PreProcess(); err != nil { return ctrl.Result{}, err } } - if modified := SetCondition(&cassTask, api.JobRunning, corev1.ConditionTrue); modified { + if modified := SetCondition(&cassTask, api.JobRunning, metav1.ConditionTrue, ""); modified { if err = r.Client.Status().Update(ctx, &cassTask); err != nil { return ctrl.Result{}, err } @@ -331,28 +344,35 @@ JobDefinition: if res.RequeueAfter > 0 { // This job isn't complete yet or there's an error, do not continue + logger.V(1).Info("This job isn't complete yet or there's an error, requeueing", "requeueAfter", res.RequeueAfter) break } - // completedCount++ } if res.RequeueAfter == 0 && !res.Requeue { // Job has been completed cassTask.GetLabels()[taskStatusLabel] = completedTaskLabelValue - if err = r.Client.Update(ctx, &cassTask); err != nil { - return res, err + if errUpdate := r.Client.Update(ctx, &cassTask); errUpdate != nil { + return res, errUpdate } - err = r.cleanupJobAnnotations(ctx, dc, taskId) - if err != nil { + if errCleanup := r.cleanupJobAnnotations(ctx, dc, taskId); errCleanup != nil { // Not the end of the world - logger.Error(err, "Failed to cleanup job annotations from pods") + logger.Error(errCleanup, "Failed to cleanup job annotations from pods") } cassTask.Status.Active = 0 cassTask.Status.CompletionTime = &timeNow - SetCondition(&cassTask, api.JobComplete, corev1.ConditionTrue) - SetCondition(&cassTask, api.JobRunning, corev1.ConditionFalse) + SetCondition(&cassTask, api.JobComplete, metav1.ConditionTrue, "") + SetCondition(&cassTask, api.JobRunning, metav1.ConditionFalse, "") + + if failed > 0 { + errMsg := "" + if err != nil { + errMsg = err.Error() + } + SetCondition(&cassTask, api.JobFailed, metav1.ConditionTrue, errMsg) + } // Requeue for deletion later deletionTime := calculateDeletionTime(&cassTask) @@ -378,26 +398,29 @@ func (r *CassandraTaskReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *CassandraTaskReconciler) HasCondition(task api.CassandraTask, condition api.JobConditionType, status corev1.ConditionStatus) bool { +func (r *CassandraTaskReconciler) HasCondition(task api.CassandraTask, condition api.JobConditionType, status metav1.ConditionStatus) bool { for _, cond := range task.Status.Conditions { - if cond.Type == condition { + if cond.Type == string(condition) { return cond.Status == status } } return false } -func SetCondition(task *api.CassandraTask, condition api.JobConditionType, status corev1.ConditionStatus) bool { +func SetCondition(task *api.CassandraTask, condition api.JobConditionType, status metav1.ConditionStatus, message string) bool { existing := false for i := 0; i < len(task.Status.Conditions); i++ { cond := task.Status.Conditions[i] - if cond.Type == condition { + if cond.Type == string(condition) { if cond.Status == status { // Already correct status return false } cond.Status = status cond.LastTransitionTime = metav1.Now() + if message != "" { + cond.Message = message + } existing = true task.Status.Conditions[i] = cond break @@ -405,11 +428,15 @@ func SetCondition(task *api.CassandraTask, condition api.JobConditionType, statu } if !existing { - cond := api.JobCondition{ - Type: condition, + cond := metav1.Condition{ + Type: string(condition), + Reason: string(condition), Status: status, LastTransitionTime: metav1.Now(), } + if message != "" { + cond.Message = message + } task.Status.Conditions = append(task.Status.Conditions, cond) } @@ -707,10 +734,16 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc return ctrl.Result{}, failed, completed, err } } else { + if len(jobRunner) > 0 { + // Something is still holding the worker + return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil + } + if taskConfig.SyncFunc == nil { // This feature is not supported in sync mode, mark everything as done err := fmt.Errorf("this job isn't supported by the target pod") logger.Error(err, "unable to execute requested job against pod", "Pod", pod) + failed++ return ctrl.Result{}, failed, completed, err } @@ -732,36 +765,53 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc pod := pod - go func(targetPod *corev1.Pod) { + go func() { + // go func(targetPod *corev1.Pod) { // Write value to the jobRunner to indicate we're running - logger.V(1).Info("starting execution of sync blocking job", "Pod", targetPod) + podKey := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace} + logger.V(1).Info("starting execution of sync blocking job", "Pod", pod) jobRunner <- idx defer func() { // Remove the value from the jobRunner <-jobRunner }() - if err = taskConfig.SyncFunc(nodeMgmtClient, targetPod, taskConfig); err != nil { + if err = taskConfig.SyncFunc(nodeMgmtClient, &pod, taskConfig); err != nil { // We only log, nothing else to do - we won't even retry this pod - logger.Error(err, "executing the sync task failed", "Pod", targetPod) + logger.Error(err, "executing the sync task failed", "Pod", pod) jobStatus.Status = podJobError } else { jobStatus.Status = podJobCompleted } + if err := r.Client.Get(context.Background(), podKey, &pod); err != nil { + logger.Error(err, "Failed to get pod for annotation update", "Pod", pod) + } + + podPatch := client.MergeFrom(pod.DeepCopy()) + + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil { - logger.Error(err, "Failed to update local job's status", "Pod", targetPod) + logger.Error(err, "Failed to update local job's status", "Pod", pod) } - err = r.Client.Update(ctx, &pod) - if err != nil { - logger.Error(err, "Failed to update local job's status", "Pod", targetPod) + if err = r.Client.Patch(ctx, &pod, podPatch); err != nil { + // err = r.Client.Update(ctx, &pod) + logger.Error(err, "Failed to update local job's status", "Pod", pod) } - }(&pod) + }() } // We have a job going on, return back later to check the status return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil } + + if len(jobRunner) > 0 { + // Something is still holding the worker while none of the existing pods are, probably the replace job. + return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil + } + return ctrl.Result{}, failed, completed, nil } diff --git a/internal/controllers/control/cassandratask_controller_test.go b/internal/controllers/control/cassandratask_controller_test.go index 02ca3c66..75912fab 100644 --- a/internal/controllers/control/cassandratask_controller_test.go +++ b/internal/controllers/control/cassandratask_controller_test.go @@ -2,6 +2,7 @@ package control import ( "context" + "encoding/json" "fmt" "math/rand" "net/http/httptest" @@ -142,6 +143,7 @@ func createPod(namespace, clusterName, dcName, rackName string, ordinal int) { Labels: map[string]string{ cassdcapi.ClusterLabel: clusterName, cassdcapi.DatacenterLabel: dcName, + cassdcapi.RackLabel: rackName, }, }, Spec: corev1.PodSpec{ @@ -259,15 +261,15 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount)) Expect(len(completedTask.Status.Conditions)).To(Equal(2)) for _, cond := range completedTask.Status.Conditions { switch cond.Type { - case api.JobComplete: - Expect(cond.Status).To(Equal(corev1.ConditionTrue)) - case api.JobRunning: - Expect(cond.Status).To(Equal(corev1.ConditionFalse)) + case string(api.JobComplete): + Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + case string(api.JobRunning): + Expect(cond.Status).To(Equal(metav1.ConditionFalse)) } } }) @@ -282,8 +284,7 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) - // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount)) }) It("Runs a node move task against the datacenter pods", func() { By("Creating a task for move") @@ -305,8 +306,122 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 3)) Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 3)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 3)) + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 3)) + }) + It("Runs a flush task against the datacenter pods", func() { + By("Creating a task for flush") + + taskKey, task := buildTask(api.CommandFlush, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/flush"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount)) + }) + It("Runs a flush task against a pod", func() { + By("Creating a task for flush") + + taskKey, task := buildTask(api.CommandFlush, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName) + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/flush"]).To(Equal(1)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 1)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 1)) + }) + + It("Runs a garbagecollect task against the datacenter pods", func() { + By("Creating a task for garbagecollect") + taskKey, task := buildTask(api.CommandGarbageCollect, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/garbagecollect"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount)) + }) + + It("Runs a garbagecollect task against rack's pods", func() { + By("Creating a task for garbagecollect") + taskKey, task := buildTask(api.CommandGarbageCollect, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + task.Spec.Jobs[0].Arguments.RackName = "r2" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/garbagecollect"]).To(Equal(nodeCount / rackCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount/rackCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount/rackCount)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", nodeCount/rackCount)) + }) + + It("Runs a scrub task against a pod", func() { + By("Creating a task for scrub") + + taskKey, task := buildTask(api.CommandScrub, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName) + task.Spec.Jobs[0].Arguments.NoValidate = false + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/scrub"]).To(Equal(1)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 1)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 1)) + + // Payloads should be of type ScrubRequest + var sreq httphelper.ScrubRequest + Expect(json.Unmarshal(callDetails.Payloads[0], &sreq)).Should(Succeed()) + Expect(sreq.CheckData).To(BeTrue()) + Expect(sreq.KeyspaceName).To(Equal("ks1")) + }) + + It("Runs a compaction task against a pod", func() { + By("Creating a task for compaction") + + taskKey, task := buildTask(api.CommandCompaction, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName) + task.Spec.Jobs[0].Arguments.Tables = []string{"table1"} + task.Spec.Jobs[0].Arguments.SplitOutput = true + + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/tables/compact"]).To(Equal(1)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 1)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 1)) + + // Payloads should be of type CompactRequest + var req httphelper.CompactRequest + Expect(json.Unmarshal(callDetails.Payloads[0], &req)).Should(Succeed()) + Expect(req.KeyspaceName).To(Equal("ks1")) + Expect(req.SplitOutput).To(BeTrue()) + Expect(len(req.Tables)).To(BeNumerically("==", 1)) }) + When("Running cleanup twice in the same datacenter", func() { It("Runs a cleanup task against the datacenter pods", func() { By("Creating a task for cleanup") @@ -384,6 +499,20 @@ var _ = Describe("CassandraTask controller tests", func() { Expect(completedTask.Status.Failed).To(BeNumerically(">=", nodeCount)) }) + It("Replace a node in the datacenter without specifying the pod", func() { + testFailedNamespaceName := fmt.Sprintf("test-task-failed-%d", rand.Int31()) + By("creating a datacenter", createDatacenter("dc1", testFailedNamespaceName)) + By("Creating a task for replacenode") + taskKey, task := buildTask(api.CommandReplaceNode, testFailedNamespaceName) + + Expect(k8sClient.Create(context.TODO(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(completedTask.Status.Failed).To(BeNumerically(">=", 1)) + Expect(completedTask.Status.Conditions[2].Type).To(Equal(string(api.JobFailed))) + Expect(completedTask.Status.Conditions[2].Message).To(Equal("valid pod_name to replace is required")) + }) }) }) Context("Sync jobs", func() { @@ -420,7 +549,6 @@ var _ = Describe("CassandraTask controller tests", func() { It("Runs a upgradesstables task against the datacenter pods", func() { By("Creating a task for upgradesstables") - time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new taskKey := createTask(api.CommandUpgradeSSTables, testNamespaceName) completedTask := waitForTaskCompletion(taskKey) @@ -435,7 +563,6 @@ var _ = Describe("CassandraTask controller tests", func() { It("Replaces a node in the datacenter", func() { By("Creating a task for replacenode") - time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new taskKey, task := buildTask(api.CommandReplaceNode, testNamespaceName) podKey := types.NamespacedName{ @@ -461,6 +588,38 @@ var _ = Describe("CassandraTask controller tests", func() { // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) }) + + It("Runs a flush task against the datacenter pods", func() { + By("Creating a task for flush") + taskKey, task := buildTask(api.CommandFlush, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v0/ops/tables/flush"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + + It("Runs a garbagecollect task against the datacenter pods", func() { + By("Creating a task for garbagecollect") + taskKey, task := buildTask(api.CommandGarbageCollect, testNamespaceName) + task.Spec.Jobs[0].Arguments.KeyspaceName = "ks1" + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v0/ops/tables/garbagecollect"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) + + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) }) Context("Task TTL", func() { var testNamespaceName string diff --git a/internal/controllers/control/jobs.go b/internal/controllers/control/jobs.go index 72b5d04b..0cc5a039 100644 --- a/internal/controllers/control/jobs.go +++ b/internal/controllers/control/jobs.go @@ -21,15 +21,23 @@ import ( // Cleanup functionality func callCleanup(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { - // TODO Add more arguments configurations keyspaceName := taskConfig.Arguments.KeyspaceName - return nodeMgmtClient.CallKeyspaceCleanup(pod, -1, keyspaceName, nil) + tables := taskConfig.Arguments.Tables + jobCount := -1 + if taskConfig.Arguments.JobsCount != nil { + jobCount = *taskConfig.Arguments.JobsCount + } + return nodeMgmtClient.CallKeyspaceCleanup(pod, jobCount, keyspaceName, tables) } func callCleanupSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { - // TODO Add more arguments configurations keyspaceName := taskConfig.Arguments.KeyspaceName - return nodeMgmtClient.CallKeyspaceCleanupEndpoint(pod, -1, keyspaceName, nil) + tables := taskConfig.Arguments.Tables + jobCount := -1 + if taskConfig.Arguments.JobsCount != nil { + jobCount = *taskConfig.Arguments.JobsCount + } + return nodeMgmtClient.CallKeyspaceCleanupEndpoint(pod, jobCount, keyspaceName, tables) } func cleanup(taskConfig *TaskConfiguration) { @@ -109,15 +117,24 @@ func (r *CassandraTaskReconciler) restartSts(ctx context.Context, sts []appsv1.S // UpgradeSSTables functionality func callUpgradeSSTables(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { - // TODO Add more arguments configurations keyspaceName := taskConfig.Arguments.KeyspaceName - return nodeMgmtClient.CallUpgradeSSTables(pod, -1, keyspaceName, nil) + tables := taskConfig.Arguments.Tables + jobCount := -1 + if taskConfig.Arguments.JobsCount != nil { + jobCount = *taskConfig.Arguments.JobsCount + } + + return nodeMgmtClient.CallUpgradeSSTables(pod, jobCount, keyspaceName, tables) } func callUpgradeSSTablesSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { - // TODO Add more arguments configurations keyspaceName := taskConfig.Arguments.KeyspaceName - return nodeMgmtClient.CallUpgradeSSTablesEndpoint(pod, -1, keyspaceName, nil) + tables := taskConfig.Arguments.Tables + jobCount := -1 + if taskConfig.Arguments.JobsCount != nil { + jobCount = *taskConfig.Arguments.JobsCount + } + return nodeMgmtClient.CallUpgradeSSTablesEndpoint(pod, jobCount, keyspaceName, tables) } func upgradesstables(taskConfig *TaskConfiguration) { @@ -132,6 +149,8 @@ func upgradesstables(taskConfig *TaskConfiguration) { func (r *CassandraTaskReconciler) replacePod(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { // We check the podStartTime to prevent replacing the pod multiple times since annotations are removed when we delete the pod podStartTime := pod.GetCreationTimestamp() + uid := pod.UID + podKey := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} if podStartTime.Before(taskConfig.TaskStartTime) { if isCassandraUp(pod) { // Verify the cassandra pod is healthy before trying the drain @@ -158,32 +177,58 @@ func (r *CassandraTaskReconciler) replacePod(nodeMgmtClient httphelper.NodeMgmtC return err } } + + for i := 0; i < 10; i++ { + time.Sleep(1 * time.Second) + newPod := &corev1.Pod{} + if err := r.Client.Get(taskConfig.Context, podKey, newPod); err != nil { + continue + } + if uid != newPod.UID { + break + } + } + return nil } -func (r *CassandraTaskReconciler) replaceValidator(taskConfig *TaskConfiguration) error { +func (r *CassandraTaskReconciler) replaceValidator(taskConfig *TaskConfiguration) (bool, error) { // Check that arguments has replaceable pods and that those pods are actually existing pods if taskConfig.Arguments.PodName != "" { pods, err := r.getDatacenterPods(taskConfig.Context, taskConfig.Datacenter) if err != nil { - return err + return true, err } for _, pod := range pods { if pod.Name == taskConfig.Arguments.PodName { - return nil + return true, nil } } } - return fmt.Errorf("valid pod_name to replace is required") + return false, fmt.Errorf("valid pod_name to replace is required") } -func replaceFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { +func requiredPodFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { // If pod isn't in the to be replaced pods, return false podName := taskConfig.Arguments.PodName return pod.Name == podName } +func genericPodFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { + accept := true + rackName := taskConfig.Arguments.RackName + if rackName != "" { + accept = accept && pod.Labels[cassapi.RackLabel] == rackName + } + + podName := taskConfig.Arguments.PodName + if podName != "" { + accept = accept && pod.Name == podName + } + return accept +} + // replacePreProcess adds enough information to CassandraDatacenter to ensure cass-operator knows this pod is being replaced func (r *CassandraTaskReconciler) replacePreProcess(taskConfig *TaskConfiguration) error { dc := taskConfig.Datacenter @@ -207,7 +252,7 @@ func (r *CassandraTaskReconciler) setDatacenterCondition(dc *cassapi.CassandraDa func (r *CassandraTaskReconciler) replace(taskConfig *TaskConfiguration) { taskConfig.SyncFunc = r.replacePod taskConfig.ValidateFunc = r.replaceValidator - taskConfig.PodFilter = replaceFilter + taskConfig.PodFilter = requiredPodFilter taskConfig.PreProcessFunc = r.replacePreProcess } @@ -223,13 +268,13 @@ func moveFilter(pod *corev1.Pod, taskConfig *TaskConfiguration) bool { return found } -func (r *CassandraTaskReconciler) moveValidator(taskConfig *TaskConfiguration) error { +func (r *CassandraTaskReconciler) moveValidator(taskConfig *TaskConfiguration) (bool, error) { if len(taskConfig.Arguments.NewTokens) == 0 { - return fmt.Errorf("missing required new_tokens argument") + return false, fmt.Errorf("missing required new_tokens argument") } pods, err := r.getDatacenterPods(taskConfig.Context, taskConfig.Datacenter) if err != nil { - return err + return true, err } for podName := range taskConfig.Arguments.NewTokens { found := false @@ -240,10 +285,10 @@ func (r *CassandraTaskReconciler) moveValidator(taskConfig *TaskConfiguration) e } } if !found { - return fmt.Errorf("invalid new_tokens argument: pod doesn't exist: %s", podName) + return false, fmt.Errorf("invalid new_tokens argument: pod doesn't exist: %s", podName) } } - return nil + return true, nil } func (r *CassandraTaskReconciler) move(taskConfig *TaskConfiguration) { @@ -253,6 +298,109 @@ func (r *CassandraTaskReconciler) move(taskConfig *TaskConfiguration) { taskConfig.ValidateFunc = r.moveValidator } +// Flush functionality + +func callFlushSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { + keyspaceName := taskConfig.Arguments.KeyspaceName + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallFlushEndpoint(pod, keyspaceName, tables) +} + +func callFlushAsync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { + keyspaceName := taskConfig.Arguments.KeyspaceName + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallFlush(pod, keyspaceName, tables) +} + +func flush(taskConfig *TaskConfiguration) { + taskConfig.AsyncFeature = httphelper.AsyncFlush + taskConfig.PodFilter = genericPodFilter + taskConfig.SyncFunc = callFlushSync + taskConfig.AsyncFunc = callFlushAsync +} + +// GarbageCollect functionality + +func callGarbageCollectAsync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { + keyspaceName := taskConfig.Arguments.KeyspaceName + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallGarbageCollect(pod, keyspaceName, tables) +} + +func callGarbageCollectSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { + keyspaceName := taskConfig.Arguments.KeyspaceName + tables := taskConfig.Arguments.Tables + return nodeMgmtClient.CallGarbageCollectEndpoint(pod, keyspaceName, tables) +} + +func gc(taskConfig *TaskConfiguration) { + taskConfig.AsyncFeature = httphelper.AsyncGarbageCollect + taskConfig.PodFilter = genericPodFilter + taskConfig.AsyncFunc = callGarbageCollectAsync + taskConfig.SyncFunc = callGarbageCollectSync +} + +// Scrub functionality + +func createScrubRequest(taskConfig *TaskConfiguration) *httphelper.ScrubRequest { + sr := &httphelper.ScrubRequest{ + DisableSnapshot: taskConfig.Arguments.NoSnapshot, + SkipCorrupted: taskConfig.Arguments.SkipCorrupted, + CheckData: !taskConfig.Arguments.NoValidate, + Jobs: -1, + KeyspaceName: taskConfig.Arguments.KeyspaceName, + Tables: taskConfig.Arguments.Tables, + } + + if taskConfig.Arguments.JobsCount != nil { + sr.Jobs = *taskConfig.Arguments.JobsCount + } + + return sr +} + +func scrubSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { + return nodeMgmtClient.CallScrubEndpoint(pod, createScrubRequest(taskConfig)) +} + +func scrubAsync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { + return nodeMgmtClient.CallScrub(pod, createScrubRequest(taskConfig)) +} + +func scrub(taskConfig *TaskConfiguration) { + taskConfig.AsyncFeature = httphelper.AsyncScrubTask + taskConfig.PodFilter = genericPodFilter + taskConfig.SyncFunc = scrubSync + taskConfig.AsyncFunc = scrubAsync +} + +// Compaction functionality + +func createCompactRequest(taskConfig *TaskConfiguration) *httphelper.CompactRequest { + return &httphelper.CompactRequest{ + KeyspaceName: taskConfig.Arguments.KeyspaceName, + Tables: taskConfig.Arguments.Tables, + SplitOutput: taskConfig.Arguments.SplitOutput, + StartToken: taskConfig.Arguments.StartToken, + EndToken: taskConfig.Arguments.EndToken, + } +} + +func compactSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { + return nodeMgmtClient.CallCompactionEndpoint(pod, createCompactRequest(taskConfig)) +} + +func compactAsync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { + return nodeMgmtClient.CallCompaction(pod, createCompactRequest(taskConfig)) +} + +func compact(taskConfig *TaskConfiguration) { + taskConfig.AsyncFeature = httphelper.AsyncCompactionTask + taskConfig.PodFilter = genericPodFilter + taskConfig.SyncFunc = compactSync + taskConfig.AsyncFunc = compactAsync +} + // Common functions func isCassandraUp(pod *corev1.Pod) bool { diff --git a/pkg/httphelper/client.go b/pkg/httphelper/client.go index a2821d01..bfece9f2 100644 --- a/pkg/httphelper/client.go +++ b/pkg/httphelper/client.go @@ -133,11 +133,13 @@ const ( // AsyncSSTableTasks includes "cleanup" and "decommission" AsyncSSTableTasks Feature = "async_sstable_tasks" AsyncUpgradeSSTableTask Feature = "async_upgrade_sstable_task" - AsyncCompactionTasks Feature = "async_compaction_task" + AsyncCompactionTask Feature = "async_compaction_task" AsyncScrubTask Feature = "async_scrub_task" FullQuerySupport Feature = "full_query_logging" Rebuild Feature = "rebuild" Move Feature = "async_move_task" + AsyncGarbageCollect Feature = "async_gc_task" + AsyncFlush Feature = "async_flush_task" ) func (f *FeatureSet) UnmarshalJSON(b []byte) error { @@ -593,7 +595,6 @@ type ScrubRequest struct { Tables []string `json:"tables"` } -// TODO This, keyspaceRequest and compactRequest are all pretty much the same.. func createScrubRequest(pod *corev1.Pod, scrubRequest *ScrubRequest, endpoint string) (*nodeMgmtRequest, error) { body, err := json.Marshal(scrubRequest) if err != nil { @@ -1058,8 +1059,9 @@ func (client *NodeMgmtClient) FeatureSet(pod *corev1.Pod) (*FeatureSet, error) { func (client *NodeMgmtClient) JobDetails(pod *corev1.Pod, jobId string) (*JobDetails, error) { client.Log.Info( - "calling Management API features - GET /api/v0/ops/executor/job", + "calling Management API jobDetails - GET /api/v0/ops/executor/job", "pod", pod.Name, + "jobId", jobId, ) podHost, podPort, err := BuildPodHostFromPod(pod) @@ -1263,5 +1265,74 @@ func (client *NodeMgmtClient) CallSetFullQueryLog(pod *corev1.Pod, enableFullQue ) _, err = callNodeMgmtEndpoint(client, request, "") + + return err +} + +func (client *NodeMgmtClient) CallFlush(pod *corev1.Pod, keyspaceName string, tables []string) (string, error) { + client.Log.Info( + "calling Management API keyspace flush - POST /api/v1/ops/tables/flush", + "pod", pod.Name, + ) + + req, err := createKeySpaceRequest(pod, -1, keyspaceName, tables, "/api/v1/ops/tables/flush") + if err != nil { + return "", err + } + + req.timeout = 20 * time.Second + + jobId, err := callNodeMgmtEndpoint(client, *req, "application/json") + return string(jobId), err +} + +func (client *NodeMgmtClient) CallFlushEndpoint(pod *corev1.Pod, keyspaceName string, tables []string) error { + client.Log.Info( + "calling Management API keyspace flush - POST /api/v0/ops/tables/flush", + "pod", pod.Name, + ) + + req, err := createKeySpaceRequest(pod, -1, keyspaceName, tables, "/api/v0/ops/tables/flush") + if err != nil { + return err + } + + req.timeout = 60 * time.Second + + _, err = callNodeMgmtEndpoint(client, *req, "application/json") + return err +} + +func (client *NodeMgmtClient) CallGarbageCollect(pod *corev1.Pod, keyspaceName string, tables []string) (string, error) { + client.Log.Info( + "calling Management API keyspace flush - POST /api/v1/ops/tables/garbagecollect", + "pod", pod.Name, + ) + + req, err := createKeySpaceRequest(pod, -1, keyspaceName, tables, "/api/v1/ops/tables/garbagecollect") + if err != nil { + return "", err + } + + req.timeout = 20 * time.Second + + jobId, err := callNodeMgmtEndpoint(client, *req, "application/json") + return string(jobId), err +} + +func (client *NodeMgmtClient) CallGarbageCollectEndpoint(pod *corev1.Pod, keyspaceName string, tables []string) error { + client.Log.Info( + "calling Management API keyspace flush - POST /api/v0/ops/tables/garbagecollect", + "pod", pod.Name, + ) + + req, err := createKeySpaceRequest(pod, -1, keyspaceName, tables, "/api/v0/ops/tables/garbagecollect") + if err != nil { + return err + } + + req.timeout = 60 * time.Second + + _, err = callNodeMgmtEndpoint(client, *req, "application/json") return err } diff --git a/pkg/httphelper/server_test_utils.go b/pkg/httphelper/server_test_utils.go index ae839804..7b555aae 100644 --- a/pkg/httphelper/server_test_utils.go +++ b/pkg/httphelper/server_test_utils.go @@ -2,6 +2,7 @@ package httphelper import ( "fmt" + "io" "net" "net/http" "net/http/httptest" @@ -15,7 +16,11 @@ var featuresReply = `{ "async_sstable_tasks", "rebuild", "async_upgrade_sstable_task", - "async_move_task" + "async_move_task", + "async_gc_task", + "async_flush_task", + "async_scrub_task", + "async_compaction_task" ] }` @@ -34,11 +39,13 @@ func mgmtApiListener() (net.Listener, error) { type CallDetails struct { URLCounts map[string]int + Payloads [][]byte } func NewCallDetails() *CallDetails { return &CallDetails{ URLCounts: make(map[string]int), + Payloads: make([][]byte, 0), } } @@ -66,7 +73,15 @@ func FakeExecutorServerWithDetails(callDetails *CallDetails) (*httptest.Server, w.WriteHeader(http.StatusOK) jobId := query.Get("job_id") _, err = w.Write([]byte(fmt.Sprintf(jobDetailsCompleted, jobId))) - } else if r.Method == http.MethodPost && (r.URL.Path == "/api/v1/ops/keyspace/cleanup" || r.URL.Path == "/api/v1/ops/node/rebuild" || r.URL.Path == "/api/v1/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/move") { + } else if r.Method == http.MethodPost && + (r.URL.Path == "/api/v1/ops/keyspace/cleanup" || + r.URL.Path == "/api/v1/ops/node/rebuild" || + r.URL.Path == "/api/v1/ops/tables/sstables/upgrade" || + r.URL.Path == "/api/v0/ops/node/move" || + r.URL.Path == "/api/v1/ops/tables/compact" || + r.URL.Path == "/api/v1/ops/tables/scrub" || + r.URL.Path == "/api/v1/ops/tables/flush" || + r.URL.Path == "/api/v1/ops/tables/garbagecollect") { w.WriteHeader(http.StatusOK) // Write jobId jobId++ @@ -85,8 +100,6 @@ func FakeExecutorServerWithDetails(callDetails *CallDetails) (*httptest.Server, func FakeExecutorServerWithDetailsFails(callDetails *CallDetails) (*httptest.Server, error) { jobId := 0 - // TODO Repeated code from above.. refactor - return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { query, err := url.ParseQuery(r.URL.RawQuery) if err != nil { @@ -117,7 +130,7 @@ func FakeExecutorServerWithDetailsFails(callDetails *CallDetails) (*httptest.Ser func FakeServerWithoutFeaturesEndpoint(callDetails *CallDetails) (*httptest.Server, error) { return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method == http.MethodPost && (r.URL.Path == "/api/v0/ops/keyspace/cleanup" || r.URL.Path == "/api/v0/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/drain") { + if r.Method == http.MethodPost && (r.URL.Path == "/api/v0/ops/keyspace/cleanup" || r.URL.Path == "/api/v0/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/drain" || r.URL.Path == "/api/v0/ops/tables/flush" || r.URL.Path == "/api/v0/ops/tables/garbagecollect" || r.URL.Path == "/api/v0/ops/tables/compact") { w.WriteHeader(http.StatusOK) } else { w.WriteHeader(http.StatusNotFound) @@ -133,6 +146,15 @@ func FakeMgmtApiServer(callDetails *CallDetails, handlerFunc http.HandlerFunc) ( callerFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if callDetails != nil { callDetails.incr(r.URL.Path) + + if r.ContentLength > 0 { + payload, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + callDetails.Payloads = append(callDetails.Payloads, payload) + } } handlerFunc(w, r) }) diff --git a/tests/decommission_dc/decommission_dc_suite_test.go b/tests/decommission_dc/decommission_dc_suite_test.go index 2d2db0c5..4958858d 100644 --- a/tests/decommission_dc/decommission_dc_suite_test.go +++ b/tests/decommission_dc/decommission_dc_suite_test.go @@ -27,6 +27,7 @@ var ( dc1Label = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", api.CleanupForKubernetes(dc1OverrideName)) dc2Label = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dc2Name) seedLabel = "cassandra.datastax.com/seed-node=true" + taskYaml = "../testdata/tasks/rebuild_task.yaml" // dcLabel = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dcName) ns = ginkgo_util.NewWrapper(testName, namespace) ) @@ -127,6 +128,15 @@ var _ = Describe(testName, func() { ns.ExpectDoneReconciling(dc1Name) ns.ExpectDoneReconciling(dc2Name) + // Create a CassandraTask to rebuild dc2 from dc1 + // Create CassandraTask that should replace a node + step = "creating a cassandra rebuild dc2" + k = kubectl.ApplyFiles(taskYaml) + ns.ExecAndLog(step, k) + + // Wait for the task to be completed + ns.WaitForCompleteTask("rebuild-dc") + podNames := ns.GetDatacenterReadyPodNames(dc1OverrideName) Expect(len(podNames)).To(Equal(2)) dcs := findDatacenters(podNames[0]) diff --git a/tests/node_move/node_move_suite_test.go b/tests/node_move/node_move_suite_test.go index 2d79627d..3ad10916 100644 --- a/tests/node_move/node_move_suite_test.go +++ b/tests/node_move/node_move_suite_test.go @@ -5,9 +5,10 @@ package node_move import ( "fmt" + "testing" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "testing" "github.com/k8ssandra/cass-operator/tests/kustomize" ginkgo_util "github.com/k8ssandra/cass-operator/tests/util/ginkgo" @@ -60,7 +61,7 @@ var _ = Describe(testName, func() { }) Specify("the operator can move a cassandra node to a new token", func() { - // Create CassandraTask that should replace a node + // Create CassandraTask that should move tokens step := "creating a cassandra task to move a node" k := kubectl.ApplyFiles(taskYaml) ns.ExecAndLog(step, k) diff --git a/tests/test_all_the_things/test_all_the_things_suite_test.go b/tests/test_all_the_things/test_all_the_things_suite_test.go index 79eb86c6..68946e5a 100644 --- a/tests/test_all_the_things/test_all_the_things_suite_test.go +++ b/tests/test_all_the_things/test_all_the_things_suite_test.go @@ -96,6 +96,9 @@ var _ = Describe(testName, func() { ns.ExpectDatacenterNameStatusUpdated(dcName, dcNameOverride) + // Ensure we have a single CassandraTask created which is a cleanup (and it succeeded) + ns.WaitForCompletedCassandraTasks(dcName, "cleanup", 1) + step = "stopping the dc" json = "{\"spec\": {\"stopped\": true}}" k = kubectl.PatchMerge(dcResource, json) diff --git a/tests/testdata/tasks/rebuild_task.yaml b/tests/testdata/tasks/rebuild_task.yaml new file mode 100644 index 00000000..97ca4ac1 --- /dev/null +++ b/tests/testdata/tasks/rebuild_task.yaml @@ -0,0 +1,13 @@ +apiVersion: control.k8ssandra.io/v1alpha1 +kind: CassandraTask +metadata: + name: rebuild-dc +spec: + datacenter: + name: dc2 + namespace: test-decommission-dc + jobs: + - name: rebuild-dc2 + command: rebuild + args: + source_datacenter: My_Super_Dc