Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Parallel artifact GC #11768

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 116 additions & 36 deletions cmd/argoexec/commands/artifact/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
workflow "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
wfv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/env"
"github.com/argoproj/argo-workflows/v3/util/retry"
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
executor "github.com/argoproj/argo-workflows/v3/workflow/artifacts"
Expand Down Expand Up @@ -52,63 +53,142 @@ func NewArtifactDeleteCommand() *cobra.Command {
}
}

func deleteArtifacts(labelSelector string, ctx context.Context, artifactGCTaskInterface wfv1alpha1.WorkflowArtifactGCTaskInterface) error {
type request struct {
Task *v1alpha1.WorkflowArtifactGCTask
NodeName string
ArtifactNodeSpec *v1alpha1.ArtifactNodeSpec
}

type response struct {
Task *v1alpha1.WorkflowArtifactGCTask
NodeName string
Results map[string]v1alpha1.ArtifactResult
Err error
}

func deleteArtifacts(labelSelector string, ctx context.Context, artifactGCTaskInterface wfv1alpha1.WorkflowArtifactGCTaskInterface) error {
taskList, err := artifactGCTaskInterface.List(context.Background(), metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
return err
}
gcWorkers := env.LookupEnvIntOr(common.EnvExecArtifactGCWorkers, 4)

totalTasks := 0
juliev0 marked this conversation as resolved.
Show resolved Hide resolved
for _, task := range taskList.Items {
totalTasks += len(task.Spec.ArtifactsByNode)
}
// Channel for dispatching GC work to the workers
// large enough for every single task
taskQueue := make(chan *request, totalTasks)
// Channel for the replies when they complete
responseQueue := make(chan response, totalTasks)
for i := 0; i < gcWorkers; i++ {
go deleteWorker(ctx, taskQueue, responseQueue)
}

taskno := 0
nodesToGo := make(map[*v1alpha1.WorkflowArtifactGCTask]int)
// Dispatch all GC tasks to workers
for _, task := range taskList.Items {
nodesToGo[&task] = len(task.Spec.ArtifactsByNode)
task.Status.ArtifactResultsByNode = make(map[string]v1alpha1.ArtifactResultNodeStatus)
for nodeName, artifactNodeSpec := range task.Spec.ArtifactsByNode {

var archiveLocation *v1alpha1.ArtifactLocation
artResultNodeStatus := v1alpha1.ArtifactResultNodeStatus{ArtifactResults: make(map[string]v1alpha1.ArtifactResult)}
if artifactNodeSpec.ArchiveLocation != nil {
archiveLocation = artifactNodeSpec.ArchiveLocation
taskQueue <- &request{Task: &task, NodeName: nodeName, ArtifactNodeSpec: &artifactNodeSpec}
taskno++
}
}
close(taskQueue)
completed := 0
// And now process all the responses as they are completed
for {
response := <-responseQueue
// A worker has had an error, we abort collecting any more information
// This is probably not ideal.
if response.Err != nil {
return response.Err
}
// If we get a nil response this means a worker has reached the end of the queued
// work, so keep a record and complete if all workers are done.
if response.Task == nil {
completed++
if completed >= gcWorkers {
break
}
} else {
if response.Task == nil {
// Internal error, this shouldn't happen
return err
}
// Process the response
response.Task.Status.ArtifactResultsByNode[response.NodeName] = v1alpha1.ArtifactResultNodeStatus{ArtifactResults: response.Results}
// Check for completed tasks
nodesToGo[response.Task]--
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we need to add an ok check here? I see that this should obviously exist, just tempted to be more defensive so that we future proof this a bit more.

Alternatively add a comment that we expect the response.Task to be present here because it was populated in the previous loop, probably should be accompanied by a comment at line 89 where it gets populated.

Sorry for being pedantic/annoying when it is obvious, I just don't want things to go in the way of NodeStatus, tempted to use a linter to force ok checks everywhere to be honest.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Contributor

@juliev0 juliev0 Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, is this the if response.Task == nil check? I'm actually kind of confused why that's in there. It seems redundant, no? I think we should remove this. If we started doing this everywhere, it would make the code unnecessarily longer everywhere.


var resources resources
resources.Files = make(map[string][]byte) // same resources for every artifact
for _, artifact := range artifactNodeSpec.Artifacts {
if archiveLocation != nil {
err := artifact.Relocate(archiveLocation)
if err != nil {
return err
}
if nodesToGo[response.Task] == 0 {
patch, err := json.Marshal(map[string]interface{}{"status": v1alpha1.ArtifactGCStatus{ArtifactResultsByNode: response.Task.Status.ArtifactResultsByNode}})
if err != nil {
return err
}

drv, err := executor.NewDriver(ctx, &artifact, resources)
_, err = artifactGCTaskInterface.Patch(context.Background(), response.Task.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status")
if err != nil {
return err
}

err = waitutil.Backoff(retry.DefaultRetry, func() (bool, error) {
err = drv.Delete(&artifact)
if err != nil {
errString := err.Error()
artResultNodeStatus.ArtifactResults[artifact.Name] = v1alpha1.ArtifactResult{Name: artifact.Name, Success: false, Error: &errString}
return false, err
}
artResultNodeStatus.ArtifactResults[artifact.Name] = v1alpha1.ArtifactResult{Name: artifact.Name, Success: true, Error: nil}
return true, err
})
}
}
}
return nil
}

task.Status.ArtifactResultsByNode[nodeName] = artResultNodeStatus
func deleteWorker(ctx context.Context, taskQueue chan *request, responseQueue chan response) {
for {
item, ok := <-taskQueue
if !ok {
// Report a done to the response queue
responseQueue <- response{Task: nil, NodeName: "", Err: nil}
return
}
patch, err := json.Marshal(map[string]interface{}{"status": v1alpha1.ArtifactGCStatus{ArtifactResultsByNode: task.Status.ArtifactResultsByNode}})
if err != nil {
return err
var archiveLocation *v1alpha1.ArtifactLocation
results := make(map[string]v1alpha1.ArtifactResult)
if item.ArtifactNodeSpec.ArchiveLocation != nil {
archiveLocation = item.ArtifactNodeSpec.ArchiveLocation
}
_, err = artifactGCTaskInterface.Patch(context.Background(), task.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status")
if err != nil {
return err

var resources resources
resources.Files = make(map[string][]byte) // same resources for every artifact
for _, artifact := range item.ArtifactNodeSpec.Artifacts {
if archiveLocation != nil {
err := artifact.Relocate(archiveLocation)
if err != nil {
responseQueue <- response{Task: item.Task, NodeName: item.NodeName, Results: results, Err: err}
continue
}
}
drv, err := executor.NewDriver(ctx, &artifact, resources)
if err != nil {
// Report an error and process next item
responseQueue <- response{Task: item.Task, NodeName: item.NodeName, Results: results, Err: err}
continue
}

err = waitutil.Backoff(retry.DefaultRetry, func() (bool, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @juliev0 states, the err is not handled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

err = drv.Delete(&artifact)
if err != nil {
errString := err.Error()
results[artifact.Name] = v1alpha1.ArtifactResult{Name: artifact.Name, Success: false, Error: &errString}
return false, err
}
results[artifact.Name] = v1alpha1.ArtifactResult{Name: artifact.Name, Success: true, Error: nil}
return true, nil
})
Copy link
Contributor

@juliev0 juliev0 Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is a bug in the original code: it's not doing anything if err is non-nill

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have guaranteed err == nil here. I've made that explicit now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, as @isubasinghe pointed out you meant the other err. Oops. Done.

if err != nil {
// Report an error and process next item
responseQueue <- response{Task: item.Task, NodeName: item.NodeName, Results: results, Err: err}
continue
}
}
// Report a success to the response queue
responseQueue <- response{Task: item.Task, NodeName: item.NodeName, Results: results, Err: nil}
juliev0 marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

type resources struct {
Expand Down
3 changes: 3 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ const (
// EnvAgentPatchRate is the rate that the Argo Agent will patch the Workflow TaskSet
EnvAgentPatchRate = "ARGO_AGENT_PATCH_RATE"

// EnvExecArtifactGCWorkers is the number of artifact GC workers for argoexec
EnvExecArtifactGCWorkers = "ARGO_EXEC_ARTIFACT_GC_WORKERS"

agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
// Finalizer to block deletion of the workflow if deletion of artifacts fail for some reason.
FinalizerArtifactGC = workflow.WorkflowFullName + "/artifact-gc"

Expand Down
8 changes: 4 additions & 4 deletions workflow/controller/artifact_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,12 @@ func (woc *wfOperationCtx) createArtifactGCPod(ctx context.Context, strategy wfv
// if this pod is breached by an attacker these limits prevent excessive CPU and memory usage
Resources: corev1.ResourceRequirements{
Limits: map[corev1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("100m"),
"memory": resource.MustParse("64Mi"),
"cpu": resource.MustParse("200m"),
"memory": resource.MustParse("128Mi"),
},
Requests: map[corev1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("50m"),
"memory": resource.MustParse("32Mi"),
"cpu": resource.MustParse("100m"),
"memory": resource.MustParse("64Mi"),
},
},
VolumeMounts: volumeMounts,
Expand Down
Loading