Skip to content

Commit

Permalink
Modify Conditions to be metav1.Condition as it was close enough to Jo…
Browse files Browse the repository at this point in the history
…bCondition. Also, fix sync pod annotations to patch the correct instance. Modify the task Validator to mark the task as Failed instead of retrying until the task is fixed (and only logging to cass-operator logs why it is failing).
  • Loading branch information
burmanm committed Oct 12, 2023
1 parent 095bb86 commit d868323
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 98 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti
## unreleased

* [CHANGE] [#573](https://github.com/k8ssandra/cass-operator/issues/573) Add the namespace as env variable in the server-system-logger container to label metrics with.
* [BUGFIX] [#585](https://github.com/k8ssandra/cass-operator/issues/585) If task validation fails, stop processing the task and mark the validation error to Failed condition

## v1.17.2

Expand Down
24 changes: 1 addition & 23 deletions apis/control/v1alpha1/cassandratask_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ type JobArguments struct {

// CassandraTaskStatus defines the observed state of CassandraJob
type CassandraTaskStatus struct {

// TODO Status and Conditions is almost 1:1 to Kubernetes Job's definitions.

// The latest available observations of an object's current state. When a Job
// fails, one of the conditions will have type "Failed" and status true. When
// a Job is suspended, one of the conditions will have type "Suspended" and
Expand All @@ -121,7 +118,7 @@ type CassandraTaskStatus struct {
// +patchMergeKey=type
// +patchStrategy=merge
// +listType=atomic
Conditions []JobCondition `json:"conditions,omitempty"`
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`

// Represents time when the job controller started processing a job. When a
// Job is created in the suspended state, this field is not set until the
Expand Down Expand Up @@ -161,25 +158,6 @@ const (
JobRunning JobConditionType = "Running"
)

type JobCondition struct {
// Type of job condition, Complete or Failed.
Type JobConditionType `json:"type"`
// Status of the condition, one of True, False, Unknown.
Status corev1.ConditionStatus `json:"status"`
// Last time the condition was checked.
// +optional
LastProbeTime metav1.Time `json:"lastProbeTime,omitempty"`
// Last time the condition transit from one status to another.
// +optional
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
// (brief) reason for the condition's last transition.
// +optional
Reason string `json:"reason,omitempty"`
// Human readable message indicating details about last transition.
// +optional
Message string `json:"message,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

Expand Down
20 changes: 2 additions & 18 deletions apis/control/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 49 additions & 11 deletions config/crd/bases/control.k8ssandra.io_cassandratasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -180,30 +180,68 @@ spec:
one of the conditions will have type "Complete" and status true.
More info: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/'
items:
description: "Condition contains details for one aspect of the current
state of this API Resource. --- This struct is intended for direct
use as an array at the field path .status.conditions. For example,
\n type FooStatus struct{ // Represents the observations of a
foo's current state. // Known .status.conditions.type are: \"Available\",
\"Progressing\", and \"Degraded\" // +patchMergeKey=type // +patchStrategy=merge
// +listType=map // +listMapKey=type Conditions []metav1.Condition
`json:\"conditions,omitempty\" patchStrategy:\"merge\" patchMergeKey:\"type\"
protobuf:\"bytes,1,rep,name=conditions\"` \n // other fields }"
properties:
lastProbeTime:
description: Last time the condition was checked.
format: date-time
type: string
lastTransitionTime:
description: Last time the condition transit from one status
to another.
description: lastTransitionTime is the last time the condition
transitioned from one status to another. This should be when
the underlying condition changed. If that is not known, then
using the time when the API field changed is acceptable.
format: date-time
type: string
message:
description: Human readable message indicating details about
last transition.
description: message is a human readable message indicating
details about the transition. This may be an empty string.
maxLength: 32768
type: string
observedGeneration:
description: observedGeneration represents the .metadata.generation
that the condition was set based upon. For instance, if .metadata.generation
is currently 12, but the .status.conditions[x].observedGeneration
is 9, the condition is out of date with respect to the current
state of the instance.
format: int64
minimum: 0
type: integer
reason:
description: (brief) reason for the condition's last transition.
description: reason contains a programmatic identifier indicating
the reason for the condition's last transition. Producers
of specific condition types may define expected values and
meanings for this field, and whether the values are considered
a guaranteed API. The value should be a CamelCase string.
This field may not be empty.
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
type: string
status:
description: Status of the condition, one of True, False, Unknown.
description: status of the condition, one of True, False, Unknown.
enum:
- "True"
- "False"
- Unknown
type: string
type:
description: Type of job condition, Complete or Failed.
description: type of condition in CamelCase or in foo.example.com/CamelCase.
--- Many .condition.type values are consistent across resources
like Available, but because arbitrary conditions can be useful
(see .node.status.conditions), the ability to deconflict is
important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt)
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string
required:
- lastTransitionTime
- message
- reason
- status
- type
type: object
Expand Down
86 changes: 60 additions & 26 deletions internal/controllers/control/cassandratask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ type AsyncTaskExecutorFunc func(httphelper.NodeMgmtClient, *corev1.Pod, *TaskCon
// SyncTaskExecutorFunc is called as a backup if async one isn't supported
type SyncTaskExecutorFunc func(httphelper.NodeMgmtClient, *corev1.Pod, *TaskConfiguration) error

// ValidatorFunc validates that necessary parameters are set for the task
type ValidatorFunc func(*TaskConfiguration) error
// ValidatorFunc validates that necessary parameters are set for the task. If false is returned, the task
// has failed the validation and the error has the details. If true is returned, the error is transient and
// should be retried.
type ValidatorFunc func(*TaskConfiguration) (bool, error)

// ProcessFunc is a function that's run before the pods are being processed individually, or after
// the pods have been processed.
Expand Down Expand Up @@ -106,11 +108,11 @@ type TaskConfiguration struct {
Completed int
}

func (t *TaskConfiguration) Validate() error {
func (t *TaskConfiguration) Validate() (bool, error) {
if t.ValidateFunc != nil {
return t.ValidateFunc(t)
}
return nil
return true, nil
}

func (t *TaskConfiguration) Filter(pod *corev1.Pod) bool {
Expand Down Expand Up @@ -295,9 +297,9 @@ JobDefinition:
case api.CommandUpgradeSSTables:
upgradesstables(taskConfig)
case api.CommandScrub:
// scrub(taskConfig)
scrub(taskConfig)
case api.CommandCompaction:
// compact(taskConfig)
compact(taskConfig)
case api.CommandMove:
r.move(taskConfig)
case api.CommandFlush:
Expand All @@ -309,17 +311,26 @@ JobDefinition:
return ctrl.Result{}, err
}

if err := taskConfig.Validate(); err != nil {
return ctrl.Result{}, err
valid, errValidate := taskConfig.Validate()
if errValidate != nil && valid {
// Retry, this is a transient error
return ctrl.Result{}, errValidate
}

if !valid {
failed++
err = errValidate
res = ctrl.Result{}
break
}

if !r.HasCondition(cassTask, api.JobRunning, corev1.ConditionTrue) {
if !r.HasCondition(cassTask, api.JobRunning, metav1.ConditionTrue) {
if err := taskConfig.PreProcess(); err != nil {
return ctrl.Result{}, err
}
}

if modified := SetCondition(&cassTask, api.JobRunning, corev1.ConditionTrue); modified {
if modified := SetCondition(&cassTask, api.JobRunning, metav1.ConditionTrue, ""); modified {
if err = r.Client.Status().Update(ctx, &cassTask); err != nil {
return ctrl.Result{}, err
}
Expand All @@ -341,20 +352,27 @@ JobDefinition:
if res.RequeueAfter == 0 && !res.Requeue {
// Job has been completed
cassTask.GetLabels()[taskStatusLabel] = completedTaskLabelValue
if err = r.Client.Update(ctx, &cassTask); err != nil {
return res, err
if errUpdate := r.Client.Update(ctx, &cassTask); errUpdate != nil {
return res, errUpdate
}

err = r.cleanupJobAnnotations(ctx, dc, taskId)
if err != nil {
if errCleanup := r.cleanupJobAnnotations(ctx, dc, taskId); errCleanup != nil {
// Not the end of the world
logger.Error(err, "Failed to cleanup job annotations from pods")
logger.Error(errCleanup, "Failed to cleanup job annotations from pods")
}

cassTask.Status.Active = 0
cassTask.Status.CompletionTime = &timeNow
SetCondition(&cassTask, api.JobComplete, corev1.ConditionTrue)
SetCondition(&cassTask, api.JobRunning, corev1.ConditionFalse)
SetCondition(&cassTask, api.JobComplete, metav1.ConditionTrue, "")
SetCondition(&cassTask, api.JobRunning, metav1.ConditionFalse, "")

if failed > 0 {
errMsg := ""
if err != nil {
errMsg = err.Error()
}
SetCondition(&cassTask, api.JobFailed, metav1.ConditionTrue, errMsg)
}

// Requeue for deletion later
deletionTime := calculateDeletionTime(&cassTask)
Expand All @@ -380,38 +398,45 @@ func (r *CassandraTaskReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *CassandraTaskReconciler) HasCondition(task api.CassandraTask, condition api.JobConditionType, status corev1.ConditionStatus) bool {
func (r *CassandraTaskReconciler) HasCondition(task api.CassandraTask, condition api.JobConditionType, status metav1.ConditionStatus) bool {
for _, cond := range task.Status.Conditions {
if cond.Type == condition {
if cond.Type == string(condition) {
return cond.Status == status
}
}
return false
}

func SetCondition(task *api.CassandraTask, condition api.JobConditionType, status corev1.ConditionStatus) bool {
func SetCondition(task *api.CassandraTask, condition api.JobConditionType, status metav1.ConditionStatus, message string) bool {
existing := false
for i := 0; i < len(task.Status.Conditions); i++ {
cond := task.Status.Conditions[i]
if cond.Type == condition {
if cond.Type == string(condition) {
if cond.Status == status {
// Already correct status
return false
}
cond.Status = status
cond.LastTransitionTime = metav1.Now()
if message != "" {
cond.Message = message
}
existing = true
task.Status.Conditions[i] = cond
break
}
}

if !existing {
cond := api.JobCondition{
Type: condition,
cond := metav1.Condition{
Type: string(condition),
Reason: string(condition),
Status: status,
LastTransitionTime: metav1.Now(),
}
if message != "" {
cond.Message = message
}
task.Status.Conditions = append(task.Status.Conditions, cond)
}

Expand Down Expand Up @@ -713,6 +738,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
// This feature is not supported in sync mode, mark everything as done
err := fmt.Errorf("this job isn't supported by the target pod")
logger.Error(err, "unable to execute requested job against pod", "Pod", pod)
failed++
return ctrl.Result{}, failed, completed, err
}

Expand All @@ -736,6 +762,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc

go func(targetPod *corev1.Pod) {
// Write value to the jobRunner to indicate we're running
podKey := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
logger.V(1).Info("starting execution of sync blocking job", "Pod", targetPod)
jobRunner <- idx
defer func() {
Expand All @@ -751,12 +778,19 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
jobStatus.Status = podJobCompleted
}

if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil {
if err := r.Client.Get(context.Background(), podKey, &pod); err != nil {
logger.Error(err, "Failed to get pod for annotation update", "Pod", targetPod)
return
}

podPatch := client.MergeFrom(targetPod.DeepCopy())

if err = JobStatusToPodAnnotations(taskConfig.Id, targetPod.Annotations, jobStatus); err != nil {
logger.Error(err, "Failed to update local job's status", "Pod", targetPod)
}

err = r.Client.Update(ctx, &pod)
if err != nil {
if err = r.Client.Patch(ctx, targetPod, podPatch); err != nil {
// err = r.Client.Update(ctx, &pod)
logger.Error(err, "Failed to update local job's status", "Pod", targetPod)
}
}(&pod)
Expand Down
Loading

0 comments on commit d868323

Please sign in to comment.