Skip to content

Commit

Permalink
Merge pull request #47 from shopware/new-sidecar-management
Browse files Browse the repository at this point in the history
feat: delete jobs also on errors
  • Loading branch information
TrayserCassa authored Oct 8, 2024
2 parents 6e86e9a + 1e29144 commit 7218ee7
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 56 deletions.
24 changes: 20 additions & 4 deletions internal/controller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,22 @@ func (r *StoreReconciler) stateSetup(ctx context.Context, store *v1.Store) v1.St
return v1.StateSetup
}

done, err := job.IsJobContainerDone(ctx, r.Client, setup, job.CONTAINER_NAME_SETUP_JOB)
jobState, err := job.IsJobContainerDone(ctx, r.Client, setup, job.CONTAINER_NAME_SETUP_JOB)
if err != nil {
con.Reason = err.Error()
con.Status = Error
return v1.StateSetup
}

if done {
if jobState.IsDone() && jobState.HasErrors() {
con.Message = "Setup is Done but has Errors. Check logs for more details"
con.Reason = fmt.Sprintf("Exit code: %d", jobState.ExitCode)
con.Status = Error
con.LastTransitionTime = metav1.Now()
return v1.StateSetup
}

if jobState.IsDone() && !jobState.HasErrors() {
con.Message = "Setup finished"
con.LastTransitionTime = metav1.Now()
return v1.StateInitializing
Expand Down Expand Up @@ -281,14 +289,22 @@ func (r *StoreReconciler) stateMigration(ctx context.Context, store *v1.Store) v
return v1.StateMigration
}

done, err := job.IsJobContainerDone(ctx, r.Client, migration, job.MigrateJobName(store))
jobState, err := job.IsJobContainerDone(ctx, r.Client, migration, job.MigrateJobName(store))
if err != nil {
con.Reason = err.Error()
con.Status = Error
return v1.StateMigration
}

if done {
if jobState.IsDone() && jobState.HasErrors() {
con.Message = "Migration is Done but has Errors. Check logs for more details"
con.Reason = fmt.Sprintf("Exit code: %d", jobState.ExitCode)
con.Status = Error
con.LastTransitionTime = metav1.Now()
return v1.StateMigration
}

if jobState.IsDone() && !jobState.HasErrors() {
con.Message = "Migration finished"
con.LastTransitionTime = metav1.Now()
return v1.StateInitializing
Expand Down
36 changes: 30 additions & 6 deletions internal/controller/store_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ func (r *StoreReconciler) doReconcile(
// EDIT: This makes more problems then it will help. So we process the way of terminating to
// the user to close all sidecars correctly.
// Check if sidecars are active
// if len(store.Spec.Container.ExtraContainers) > 0 {
// log.Info("Delete setup job because sidecars are used")
// if err := r.completeJobs(ctx, store); err != nil {
// return fmt.Errorf("Can't cleanup setup and migration jobs: %w", err)
// }
// }
if len(store.Spec.Container.ExtraContainers) > 0 {
log.Info("Delete setup/migration job if they are finished because sidecars are used")
if err := r.completeJobs(ctx, store); err != nil {
log.Error(err, "Can't cleanup setup and migration jobs")
}
}

log.Info("reconcile deployment")
if err := r.reconcileDeployment(ctx, store); err != nil {
Expand Down Expand Up @@ -446,3 +446,27 @@ func (r *StoreReconciler) reconcileSetupJob(ctx context.Context, store *v1.Store

return nil
}

func (r *StoreReconciler) completeJobs(ctx context.Context, store *v1.Store) error {
done, err := job.IsSetupJobCompleted(ctx, r.Client, store)
if err != nil {
return err
}
// The job is not completed because active containers are running
if !done {
if err = job.DeleteSetupJob(ctx, r.Client, store); err != nil {
return err
}
}
done, err = job.IsMigrationJobCompleted(ctx, r.Client, store)
if err != nil {
return err
}
// The job is not completed because active containers are running
if !done {
if err = job.DeleteAllMigrationJobs(ctx, r.Client, store); err != nil {
return err
}
}
return nil
}
36 changes: 19 additions & 17 deletions internal/job/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

var MigrationJobIdentifyer = map[string]string{"type": "migration"}

const CONTAINER_NAME_MIGRATION_JOB = "shopware-migration"

func GetMigrationJob(
ctx context.Context,
client client.Client,
Expand All @@ -37,6 +39,7 @@ func GetMigrationJob(
func MigrationJob(store *v1.Store) *batchv1.Job {
parallelism := int32(1)
completions := int32(1)
sharedProcessNamespace := true

labels := map[string]string{
"hash": GetMigrateHash(store),
Expand All @@ -52,22 +55,22 @@ func MigrationJob(store *v1.Store) *batchv1.Job {
}
maps.Copy(annotations, store.Spec.Container.Annotations)

var command []string
if store.Spec.SetupHook.Before != "" {
command = append(command, store.Spec.MigrationHook.Before)
var stringCommand string
if store.Spec.MigrationHook.Before != "" {
stringCommand = fmt.Sprintf("%s %s", stringCommand, store.Spec.MigrationHook.Before)
}
command = append(command, " /setup")
if store.Spec.SetupHook.After != "" {
command = append(command, store.Spec.MigrationHook.After)
stringCommand = fmt.Sprintf("%s /setup", stringCommand)
if store.Spec.MigrationHook.After != "" {
stringCommand = fmt.Sprintf("%s %s", stringCommand, store.Spec.MigrationHook.After)
}

containers := append(store.Spec.Container.ExtraContainers, corev1.Container{
Name: MigrateJobName(store),
Name: CONTAINER_NAME_MIGRATION_JOB,
VolumeMounts: store.Spec.Container.VolumeMounts,
ImagePullPolicy: store.Spec.Container.ImagePullPolicy,
Image: store.Spec.Container.Image,
Command: []string{"sh", "-c"},
Args: command,
Command: []string{"sh"},
Args: []string{"-c", stringCommand},
Env: store.GetEnv(),
})

Expand All @@ -89,6 +92,7 @@ func MigrationJob(store *v1.Store) *batchv1.Job {
Labels: labels,
},
Spec: corev1.PodSpec{
ShareProcessNamespace: &sharedProcessNamespace,
Volumes: store.Spec.Container.Volumes,
TopologySpreadConstraints: store.Spec.Container.TopologySpreadConstraints,
NodeSelector: store.Spec.Container.NodeSelector,
Expand Down Expand Up @@ -116,26 +120,24 @@ func DeleteAllMigrationJobs(ctx context.Context, c client.Client, store *v1.Stor
}

// This is just a soft check, use container check for a clean check
// Will return true if container is stopped (Completed, Error)
func IsMigrationJobCompleted(
ctx context.Context,
c client.Client,
store *v1.Store,
) (bool, error) {
setup, err := GetMigrationJob(ctx, c, store)
migration, err := GetMigrationJob(ctx, c, store)
if err != nil {
if k8serrors.IsNotFound(err) {
return false, nil
}
return false, err
}

if setup == nil {
return false, nil
state, err := IsJobContainerDone(ctx, c, migration, CONTAINER_NAME_MIGRATION_JOB)
if err != nil {
return false, err
}

// No active jobs are running and more of them are succeeded
if setup.Status.Active <= 0 && setup.Status.Succeeded >= 1 {
return true, nil
}
return false, nil
return state.IsDone(), nil
}
26 changes: 13 additions & 13 deletions internal/job/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,20 @@ func GetSetupJob(ctx context.Context, client client.Client, store *v1.Store) (*b
func SetupJob(store *v1.Store) *batchv1.Job {
parallelism := int32(1)
completions := int32(1)
sharedProcessNamespace := true

labels := map[string]string{
"type": "setup",
}
maps.Copy(labels, util.GetDefaultLabels(store))

var command []string
var stringCommand string
if store.Spec.SetupHook.Before != "" {
command = append(command, store.Spec.SetupHook.Before)
stringCommand = fmt.Sprintf("%s %s", stringCommand, store.Spec.SetupHook.Before)
}
command = append(command, " /setup")
stringCommand = fmt.Sprintf("%s sleep 5", stringCommand)
if store.Spec.SetupHook.After != "" {
command = append(command, store.Spec.SetupHook.After)
stringCommand = fmt.Sprintf("%s %s", stringCommand, store.Spec.SetupHook.After)
}

envs := append(store.GetEnv(),
Expand All @@ -70,8 +71,8 @@ func SetupJob(store *v1.Store) *batchv1.Job {
VolumeMounts: store.Spec.Container.VolumeMounts,
ImagePullPolicy: store.Spec.Container.ImagePullPolicy,
Image: store.Spec.Container.Image,
Command: []string{"sh", "-c"},
Args: command,
Command: []string{"sh"},
Args: []string{"-c", stringCommand},
Env: envs,
})

Expand All @@ -94,6 +95,7 @@ func SetupJob(store *v1.Store) *batchv1.Job {
Labels: labels,
},
Spec: corev1.PodSpec{
ShareProcessNamespace: &sharedProcessNamespace,
Volumes: store.Spec.Container.Volumes,
TopologySpreadConstraints: store.Spec.Container.TopologySpreadConstraints,
NodeSelector: store.Spec.Container.NodeSelector,
Expand Down Expand Up @@ -124,6 +126,7 @@ func DeleteSetupJob(ctx context.Context, c client.Client, store *v1.Store) error
}

// This is just a soft check, use container check for a clean check
// Will return true if container is stopped (Completed, Error)
func IsSetupJobCompleted(
ctx context.Context,
c client.Client,
Expand All @@ -137,13 +140,10 @@ func IsSetupJobCompleted(
return false, err
}

if setup == nil {
return false, nil
state, err := IsJobContainerDone(ctx, c, setup, CONTAINER_NAME_SETUP_JOB)
if err != nil {
return false, err
}

// No active jobs are running and more of them are succeeded
if setup.Status.Active <= 0 && setup.Status.Succeeded >= 1 {
return true, nil
}
return false, nil
return state.IsDone(), nil
}
48 changes: 32 additions & 16 deletions internal/job/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,35 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
)

type JobState struct {
ExitCode int
Running bool
}

func (s JobState) HasErrors() bool {
return s.ExitCode != 0
}

func (s JobState) IsDone() bool {
return !s.Running
}

// This is used when sidecars are able to run. We should always use this method for checking
func IsJobContainerDone(
ctx context.Context,
c client.Client,
job *batchv1.Job,
containerName string,
) (bool, error) {

) (JobState, error) {
if job == nil {
return false, fmt.Errorf("job to check is nil")
return JobState{}, fmt.Errorf("job to check is nil")
}

for _, container := range job.Spec.Template.Spec.Containers {
if container.Name == containerName {
selector, err := labels.ValidatedSelectorFromSet(job.Labels)
if err != nil {
return false, fmt.Errorf("get selector: %w", err)
return JobState{}, fmt.Errorf("get selector: %w", err)
}

listOptions := client.ListOptions{
Expand All @@ -38,40 +50,44 @@ func IsJobContainerDone(
var pods corev1.PodList
err = c.List(ctx, &pods, &listOptions)
if err != nil {
return false, fmt.Errorf("get pods: %w", err)
return JobState{}, fmt.Errorf("get pods: %w", err)
}

var isOneFinished bool
for _, pod := range pods.Items {
for _, c := range pod.Status.ContainerStatuses {
if c.Name == containerName {
log.FromContext(ctx).Info(fmt.Sprintf("Found container for job `%s`", c.Name))
if c.State.Terminated == nil {
log.FromContext(ctx).Info("Job not terminated still running")
continue
return JobState{
ExitCode: -1,
Running: true,
}, nil
}
if c.State.Terminated.ExitCode != 0 {
log.FromContext(ctx).
Info("Job has not 0 as exit code, check job")
continue
Info("Job has not 0 as exit code, check job", "exitcode", c.State.Terminated.ExitCode)
return JobState{
ExitCode: int(c.State.Terminated.ExitCode),
Running: false,
}, nil
}
if c.State.Terminated.Reason == "Completed" {
log.FromContext(ctx).Info("Job completed")
isOneFinished = true
return JobState{
ExitCode: 0,
Running: false,
}, nil
}
}
}
}
if isOneFinished {
return true, nil
} else {
return false, nil
}
}
}

err := fmt.Errorf("job not found in container")
log.FromContext(ctx).Error(err, "job not found in container")
return false, err
return JobState{}, err
}

func deleteJobsByLabel(
Expand Down

0 comments on commit 7218ee7

Please sign in to comment.