Skip to content

Commit

Permalink
fix: prevent workflow taskresult from being deleted before completion…
Browse files Browse the repository at this point in the history
…. Fixes:argoproj#12993

Signed-off-by: shuangkun <tsk2013uestc@163.com>
  • Loading branch information
shuangkun committed May 14, 2024
1 parent cd52436 commit 0f9dd8a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 12 deletions.
10 changes: 10 additions & 0 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,13 @@ func GetDeletePropagation() *metav1.DeletionPropagation {
}
return &propagationPolicy
}

func RemoveFinalizer(finalizers []string, targetFinalizer string) []string {
var updatedFinalizers []string
for _, finalizer := range finalizers {
if finalizer != targetFinalizer {
updatedFinalizers = append(updatedFinalizers, finalizer)
}
}
return updatedFinalizers
}
3 changes: 3 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ const (
// Finalizer blocks the deletion of pods until the controller captures their status.
FinalizerPodStatus = workflow.WorkflowFullName + "/status"

// Finalizer blocks the deletion of pods until the taskresult marked completed.
FinalizerTaskResultStatus = workflow.WorkflowFullName + "/taskresult"

// Variables that are added to the scope during template execution and can be referenced using {{}} syntax

// GlobalVarWorkflowName is a global workflow variable referencing the workflow's metadata.name field
Expand Down
13 changes: 2 additions & 11 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/argoproj/argo-workflows/v3"
"github.com/argoproj/argo-workflows/v3/config"
argoErr "github.com/argoproj/argo-workflows/v3/errors"
argoUtil "github.com/argoproj/argo-workflows/v3/util"
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -622,7 +623,7 @@ func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods ty
updatedPod.Labels[common.LabelKeyCompleted] = "true"
}

updatedPod.Finalizers = removeFinalizer(updatedPod.Finalizers, common.FinalizerPodStatus)
updatedPod.Finalizers = argoUtil.RemoveFinalizer(updatedPod.Finalizers, common.FinalizerPodStatus)

_, err = pods.Update(ctx, updatedPod, metav1.UpdateOptions{})
return err
Expand All @@ -633,16 +634,6 @@ func (wfc *WorkflowController) enablePodForDeletion(ctx context.Context, pods ty
return nil
}

func removeFinalizer(finalizers []string, targetFinalizer string) []string {
var updatedFinalizers []string
for _, finalizer := range finalizers {
if finalizer != targetFinalizer {
updatedFinalizers = append(updatedFinalizers, finalizer)
}
}
return updatedFinalizers
}

func (wfc *WorkflowController) signalContainers(ctx context.Context, namespace string, podName string, sig syscall.Signal) (time.Duration, error) {
pod, err := wfc.getPodFromCache(namespace, podName)
if pod == nil || err != nil {
Expand Down
36 changes: 35 additions & 1 deletion workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
argoerrs "github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
argoprojv1 "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util"
argoutil "github.com/argoproj/argo-workflows/v3/util"
"github.com/argoproj/argo-workflows/v3/util/archive"
errorsutil "github.com/argoproj/argo-workflows/v3/util/errors"
"github.com/argoproj/argo-workflows/v3/util/retry"
Expand Down Expand Up @@ -803,6 +803,7 @@ func (we *WorkflowExecutor) FinalizeOutput(ctx context.Context) {
// Only added as a backup in case LabelKeyReportOutputsCompleted could not be set
err = we.AddAnnotation(ctx, common.AnnotationKeyReportOutputsCompleted, "true")
}
err = we.RemoveFinalizer(ctx, common.FinalizerTaskResultStatus)
return err
})
if err != nil {
Expand All @@ -824,6 +825,7 @@ func (we *WorkflowExecutor) InitializeOutput(ctx context.Context) {
// Only added as a backup in case LabelKeyReportOutputsCompleted could not be set
err = we.AddAnnotation(ctx, common.AnnotationKeyReportOutputsCompleted, "false")
}
err = we.AddFinalizer(ctx, common.FinalizerTaskResultStatus)
return err
})
if err != nil {
Expand Down Expand Up @@ -897,6 +899,38 @@ func (we *WorkflowExecutor) AddAnnotation(ctx context.Context, key, value string

}

// AddFinalizer adds a Finalizer to the workflow pod
func (we *WorkflowExecutor) AddFinalizer(ctx context.Context, finalizer string) error {
data, err := json.Marshal(map[string]interface{}{"metadata": metav1.ObjectMeta{
Finalizers: []string {
finalizer,
},
}})
if err != nil {
return err
}
_, err = we.ClientSet.CoreV1().Pods(we.Namespace).Patch(ctx, we.PodName, types.MergePatchType, data, metav1.PatchOptions{})
return err
}

// RemoveFinalizer remove a Finalizer from the workflow pod
func (we *WorkflowExecutor) RemoveFinalizer(ctx context.Context, finalizer string) error {
err := retryutil.RetryOnConflict(retry.DefaultRetry, func() error {
currentPod, err := we.ClientSet.CoreV1().Pods(we.Namespace).Get(ctx, we.PodName, metav1.GetOptions{})
if err != nil {
return err
}
updatedPod := currentPod.DeepCopy()
updatedPod.Finalizers = argoutil.RemoveFinalizer(updatedPod.Finalizers, finalizer)
_, err = we.ClientSet.CoreV1().Pods(we.Namespace).Update(ctx, updatedPod, metav1.UpdateOptions{})
return err
})
if err != nil {
return err
}
return nil
}

// isTarball returns whether or not the file is a tarball
func isTarball(filePath string) (bool, error) {
log.Infof("Detecting if %s is a tarball", filePath)
Expand Down

0 comments on commit 0f9dd8a

Please sign in to comment.