Skip to content

Commit

Permalink
Merge pull request #841 from zimnx/mz/manager-startdate
Browse files Browse the repository at this point in the history
Evaluate special 'now' terms in Scylla Manager tasks only on task creation
  • Loading branch information
scylla-operator-bot[bot] authored Nov 27, 2023
2 parents 212e4a5 + d5c6b67 commit 9135be1
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 91 deletions.
79 changes: 54 additions & 25 deletions pkg/controller/manager/sync_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"context"
"fmt"
"reflect"
"strings"

"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
"github.com/scylladb/scylla-operator/pkg/mermaidclient"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/util/uuid"
Expand All @@ -21,7 +22,7 @@ type state struct {
BackupTasks []*BackupTask
}

func runSync(ctx context.Context, cluster *v1.ScyllaCluster, authToken string, state *state) ([]action, bool, error) {
func runSync(ctx context.Context, cluster *scyllav1.ScyllaCluster, authToken string, state *state) ([]action, bool, error) {
var actions []action
requeue := false
clusterID := ""
Expand Down Expand Up @@ -76,7 +77,7 @@ func runSync(ctx context.Context, cluster *v1.ScyllaCluster, authToken string, s
return actions, requeue, nil
}

func syncTasks(clusterID string, cluster *v1.ScyllaCluster, state *state) ([]action, error) {
func syncTasks(clusterID string, cluster *scyllav1.ScyllaCluster, state *state) ([]action, error) {
syncer := newStateCache(cluster, state)

var actions []action
Expand Down Expand Up @@ -116,11 +117,18 @@ func syncTasks(clusterID string, cluster *v1.ScyllaCluster, state *state) ([]act
return actions, nil
}

func syncBackupTasks(clusterID string, cluster *v1.ScyllaCluster, syncer stateCache, managerState *state) ([]action, error) {
func syncBackupTasks(clusterID string, cluster *scyllav1.ScyllaCluster, syncer stateCache, managerState *state) ([]action, error) {
var actions []action

for _, bt := range cluster.Spec.Backups {
backupTask := &BackupTask{BackupTaskSpec: bt}
btCopy := *bt.DeepCopy()
backupTask := &BackupTask{BackupTaskSpec: btCopy}

for _, managerTask := range managerState.BackupTasks {
if syncer.taskID(backupTask.Name) == managerTask.ID {
evaluateDates(backupTask, managerTask)
}
}

if syncer.shouldCreateTask(backupTask.Name) {
mt, err := backupTask.ToManager()
Expand All @@ -130,7 +138,7 @@ func syncBackupTasks(clusterID string, cluster *v1.ScyllaCluster, syncer stateCa
actions = append(actions, &addTaskAction{
clusterID: clusterID,
task: mt,
taskSpec: bt,
taskSpec: btCopy,
})
} else if syncer.shouldUpdateTask(backupTask.Name) {
backupTask.ID = syncer.taskID(backupTask.Name)
Expand All @@ -139,6 +147,7 @@ func syncBackupTasks(clusterID string, cluster *v1.ScyllaCluster, syncer stateCa
for _, managerTask := range managerState.BackupTasks {
if managerTask.ID == backupTask.ID {
update = !reflect.DeepEqual(backupTask, managerTask)
break
}
}
if update {
Expand All @@ -150,7 +159,7 @@ func syncBackupTasks(clusterID string, cluster *v1.ScyllaCluster, syncer stateCa
actions = append(actions, &updateTaskAction{
clusterID: clusterID,
task: mt,
taskSpec: bt,
taskSpec: btCopy,
})
}
}
Expand All @@ -159,11 +168,18 @@ func syncBackupTasks(clusterID string, cluster *v1.ScyllaCluster, syncer stateCa
return actions, nil
}

func syncRepairTasks(clusterID string, cluster *v1.ScyllaCluster, syncer stateCache, managerState *state) ([]action, error) {
func syncRepairTasks(clusterID string, cluster *scyllav1.ScyllaCluster, syncer stateCache, managerState *state) ([]action, error) {
var actions []action

for _, rt := range cluster.Spec.Repairs {
repairTask := &RepairTask{RepairTaskSpec: rt}
rtCopy := *rt.DeepCopy()
repairTask := &RepairTask{RepairTaskSpec: rtCopy}

for _, managerTask := range managerState.RepairTasks {
if syncer.taskID(repairTask.Name) == managerTask.ID {
evaluateDates(repairTask, managerTask)
}
}

if syncer.shouldCreateTask(rt.Name) {
mt, err := repairTask.ToManager()
Expand All @@ -173,7 +189,7 @@ func syncRepairTasks(clusterID string, cluster *v1.ScyllaCluster, syncer stateCa
actions = append(actions, &addTaskAction{
clusterID: clusterID,
task: mt,
taskSpec: rt,
taskSpec: rtCopy,
})
} else if syncer.shouldUpdateTask(rt.Name) {
repairTask.ID = syncer.taskID(rt.Name)
Expand All @@ -182,6 +198,7 @@ func syncRepairTasks(clusterID string, cluster *v1.ScyllaCluster, syncer stateCa
for _, managerTask := range managerState.RepairTasks {
if managerTask.ID == repairTask.ID {
update = !reflect.DeepEqual(repairTask, managerTask)
break
}
}
if update {
Expand All @@ -192,7 +209,7 @@ func syncRepairTasks(clusterID string, cluster *v1.ScyllaCluster, syncer stateCa
actions = append(actions, &updateTaskAction{
clusterID: clusterID,
task: mt,
taskSpec: rt,
taskSpec: rtCopy,
})
}
}
Expand All @@ -201,14 +218,26 @@ func syncRepairTasks(clusterID string, cluster *v1.ScyllaCluster, syncer stateCa
return actions, nil
}

type startDateGetterSetter interface {
GetStartDate() string
SetStartDate(sd string)
}

func evaluateDates(spec, managerTask startDateGetterSetter) {
// Keep special "now" value evaluated on task creation.
if strings.HasPrefix(spec.GetStartDate(), "now") {
spec.SetStartDate(managerTask.GetStartDate())
}
}

type stateCache struct {
stateTasks *strset.Set
specTasks *strset.Set
statusNameIDMapping map[string]string
statusIDNameMapping map[string]string
}

func newStateCache(cluster *v1.ScyllaCluster, state *state) stateCache {
func newStateCache(cluster *scyllav1.ScyllaCluster, state *state) stateCache {
s := stateCache{
stateTasks: strset.New(),
specTasks: strset.New(),
Expand Down Expand Up @@ -274,15 +303,15 @@ func (s stateCache) taskID(taskName string) string {
}

type action interface {
Execute(ctx context.Context, client *mermaidclient.Client, status *v1.ScyllaClusterStatus) error
Execute(ctx context.Context, client *mermaidclient.Client, status *scyllav1.ScyllaClusterStatus) error
}

type addClusterAction struct {
cluster *mermaidclient.Cluster
clusterID string
}

func (a *addClusterAction) Execute(ctx context.Context, client *mermaidclient.Client, status *v1.ScyllaClusterStatus) error {
func (a *addClusterAction) Execute(ctx context.Context, client *mermaidclient.Client, status *scyllav1.ScyllaClusterStatus) error {
id, err := client.CreateCluster(ctx, a.cluster)
if err != nil {
return err
Expand All @@ -301,7 +330,7 @@ type updateClusterAction struct {
cluster *mermaidclient.Cluster
}

func (a *updateClusterAction) Execute(ctx context.Context, client *mermaidclient.Client, _ *v1.ScyllaClusterStatus) error {
func (a *updateClusterAction) Execute(ctx context.Context, client *mermaidclient.Client, _ *scyllav1.ScyllaClusterStatus) error {
return client.UpdateCluster(ctx, a.cluster)
}

Expand All @@ -313,7 +342,7 @@ type deleteClusterAction struct {
clusterID string
}

func (a *deleteClusterAction) Execute(ctx context.Context, client *mermaidclient.Client, status *v1.ScyllaClusterStatus) error {
func (a *deleteClusterAction) Execute(ctx context.Context, client *mermaidclient.Client, status *scyllav1.ScyllaClusterStatus) error {
return client.DeleteCluster(ctx, a.clusterID)
}

Expand All @@ -327,7 +356,7 @@ type deleteTaskAction struct {
taskID string
}

func (a *deleteTaskAction) Execute(ctx context.Context, client *mermaidclient.Client, status *v1.ScyllaClusterStatus) error {
func (a *deleteTaskAction) Execute(ctx context.Context, client *mermaidclient.Client, status *scyllav1.ScyllaClusterStatus) error {
err := client.DeleteTask(ctx, a.clusterID, a.taskType, uuid.MustParse(a.taskID))

if a.taskType == "repair" {
Expand Down Expand Up @@ -372,12 +401,12 @@ func (a addTaskAction) String() string {
return fmt.Sprintf("add task %+v", a.task)
}

func (a *addTaskAction) Execute(ctx context.Context, client *mermaidclient.Client, status *v1.ScyllaClusterStatus) error {
func (a *addTaskAction) Execute(ctx context.Context, client *mermaidclient.Client, status *scyllav1.ScyllaClusterStatus) error {
id, err := client.CreateTask(ctx, a.clusterID, a.task)

if a.task.Type == "repair" {
rt := v1.RepairTaskStatus{
RepairTaskSpec: a.taskSpec.(v1.RepairTaskSpec),
rt := scyllav1.RepairTaskStatus{
RepairTaskSpec: a.taskSpec.(scyllav1.RepairTaskSpec),
ID: id.String(),
}
if err != nil {
Expand All @@ -397,8 +426,8 @@ func (a *addTaskAction) Execute(ctx context.Context, client *mermaidclient.Clien
}
}
if a.task.Type == "backup" {
bt := v1.BackupTaskStatus{
BackupTaskSpec: a.taskSpec.(v1.BackupTaskSpec),
bt := scyllav1.BackupTaskStatus{
BackupTaskSpec: a.taskSpec.(scyllav1.BackupTaskSpec),
ID: id.String(),
}
if err != nil {
Expand Down Expand Up @@ -431,13 +460,13 @@ func (a updateTaskAction) String() string {
return fmt.Sprintf("update task %+v", a.task)
}

func (a *updateTaskAction) Execute(ctx context.Context, client *mermaidclient.Client, status *v1.ScyllaClusterStatus) error {
func (a *updateTaskAction) Execute(ctx context.Context, client *mermaidclient.Client, status *scyllav1.ScyllaClusterStatus) error {
err := client.UpdateTask(ctx, a.clusterID, a.task)

if a.task.Type == "repair" {
for i, repairStatus := range status.Repairs {
if a.task.ID == repairStatus.ID {
status.Repairs[i].RepairTaskSpec = a.taskSpec.(v1.RepairTaskSpec)
status.Repairs[i].RepairTaskSpec = a.taskSpec.(scyllav1.RepairTaskSpec)
if err != nil {
status.Repairs[i].Error = mermaidclient.MessageOf(err)
}
Expand All @@ -448,7 +477,7 @@ func (a *updateTaskAction) Execute(ctx context.Context, client *mermaidclient.Cl
if a.task.Type == "backup" {
for i, backupStatus := range status.Backups {
if a.task.ID == backupStatus.ID {
status.Backups[i].BackupTaskSpec = a.taskSpec.(v1.BackupTaskSpec)
status.Backups[i].BackupTaskSpec = a.taskSpec.(scyllav1.BackupTaskSpec)
if err != nil {
status.Backups[i].Error = mermaidclient.MessageOf(err)
}
Expand Down
Loading

0 comments on commit 9135be1

Please sign in to comment.