Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't necessarily include all artifacts from templates in node outputs #13066

Merged
merged 15 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,17 @@ func waitContainer(ctx context.Context) error {
}

// Saving output artifacts
err = wfExecutor.SaveArtifacts(bgCtx)
artifacts, err := wfExecutor.SaveArtifacts(bgCtx)
if err != nil {
wfExecutor.AddError(err)
}

// Save log artifacts
logArtifacts := wfExecutor.SaveLogs(bgCtx)
artifacts = append(artifacts, logArtifacts...)

// Try to upsert TaskResult. If it fails, we will try to update the Pod's Annotations
err = wfExecutor.ReportOutputs(bgCtx, logArtifacts)
err = wfExecutor.ReportOutputs(bgCtx, artifacts)
if err != nil {
wfExecutor.AddError(err)
}
Expand Down
22 changes: 20 additions & 2 deletions test/e2e/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,14 @@ func (s *ArtifactsSuite) TestArtifactGC() {
for _, tt := range []struct {
workflowFile string
hasGC bool
workflowShouldSucceed bool
expectedArtifacts []artifactState
expectedGCPodsOnWFCompletion int
}{
{
workflowFile: "@testdata/artifactgc/artgc-multi-strategy-multi-anno.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 2,
expectedArtifacts: []artifactState{
artifactState{"first-on-completion-1", "my-bucket-2", true, false},
Expand All @@ -276,6 +278,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
{
workflowFile: "@testdata/artifactgc/artgc-from-template.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
Expand All @@ -286,6 +289,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
{
workflowFile: "@testdata/artifactgc/artgc-from-template-2.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
Expand All @@ -296,6 +300,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
{
workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
Expand All @@ -306,6 +311,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
{
workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl-2.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
Expand All @@ -316,6 +322,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
{
workflowFile: "@testdata/artifactgc/artgc-from-ref-template.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
Expand All @@ -327,6 +334,15 @@ func (s *ArtifactsSuite) TestArtifactGC() {
{
workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl-no-gc.yaml",
hasGC: false,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 0,
expectedArtifacts: []artifactState{},
},
// Workflow fails to write an artifact that's been defined as an Output
{
workflowFile: "@testdata/artifactgc/artgc-artifact-not-written.yaml",
hasGC: true,
workflowShouldSucceed: false, // artifact not being present causes Workflow to fail
expectedGCPodsOnWFCompletion: 0,
expectedArtifacts: []artifactState{},
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
},
Expand All @@ -352,7 +368,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
}
})

if when.WorkflowCondition(func(wf *wfv1.Workflow) bool {
if tt.workflowShouldSucceed && when.WorkflowCondition(func(wf *wfv1.Workflow) bool {
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
return wf.Status.Phase == wfv1.WorkflowFailed || wf.Status.Phase == wfv1.WorkflowError
}) {
fmt.Println("can't reliably verify Artifact GC since workflow failed")
Expand All @@ -365,7 +381,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
WaitForWorkflow(
fixtures.WorkflowCompletionOkay(true),
fixtures.Condition(func(wf *wfv1.Workflow) (bool, string) {
return len(wf.Status.ArtifactGCStatus.PodsRecouped) >= tt.expectedGCPodsOnWFCompletion,
return (len(wf.Status.ArtifactGCStatus.PodsRecouped) >= tt.expectedGCPodsOnWFCompletion) || (tt.expectedGCPodsOnWFCompletion == 0),
fmt.Sprintf("for all %d pods to have been recouped", tt.expectedGCPodsOnWFCompletion)
}))

Expand All @@ -392,6 +408,8 @@ func (s *ArtifactsSuite) TestArtifactGC() {
DeleteWorkflow().
WaitForWorkflowDeletion()
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved

when.Then().ExpectWorkflowDeleted()

juliev0 marked this conversation as resolved.
Show resolved Hide resolved
when = when.RemoveFinalizers(false) // just in case - if the above test failed we need to forcibly remove the finalizer for Artifact GC

then = when.Then()
Expand Down
28 changes: 28 additions & 0 deletions test/e2e/testdata/artifactgc/artgc-artifact-not-written.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: testing-artifact-not-present-
juliev0 marked this conversation as resolved.
Show resolved Hide resolved
spec:
entrypoint: entrypoint
artifactGC:
strategy: OnWorkflowDeletion
podGC:
strategy: ""
templates:
- name: entrypoint
steps:
- - name: on-deletion
template: on-deletion
- name: on-deletion
juliev0 marked this conversation as resolved.
Show resolved Hide resolved
container:
image: argoproj/argosay:v2
command:
- sh
- -c
args:
- |
echo "not writing anything"
juliev0 marked this conversation as resolved.
Show resolved Hide resolved
outputs:
artifacts:
- name: notpresent
path: /tmp/notpresent
19 changes: 10 additions & 9 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,26 +291,27 @@ func (we *WorkflowExecutor) StageFiles() error {
}

// SaveArtifacts uploads artifacts to the archive location
func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) error {
func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) (wfv1.Artifacts, error) {
artifacts := wfv1.Artifacts{}
if len(we.Template.Outputs.Artifacts) == 0 {
log.Infof("No output artifacts")
return nil
return artifacts, nil
}

log.Infof("Saving output artifacts")
err := os.MkdirAll(tempOutArtDir, os.ModePerm)
if err != nil {
return argoerrs.InternalWrapError(err)
return artifacts, argoerrs.InternalWrapError(err)
}

for i, art := range we.Template.Outputs.Artifacts {
for _, art := range we.Template.Outputs.Artifacts {
err := we.saveArtifact(ctx, common.MainContainerName, &art)
if err != nil {
return err
return artifacts, err
}
we.Template.Outputs.Artifacts[i] = art
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of updating this directly, create a new list of the artifacts that we actually saved and return that

Copy link
Contributor

@Garett-MacGowan Garett-MacGowan Jun 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. So before we were using the we.Template.Outputs.Artifacts to pass to ReportOutputs but now we are creating a fresh wfv1.Artifacts object that will only include the successfully saved artifacts. This makes sense.

Copy link

@agilgur5 agilgur5 Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I do see a small issue here, but I think this already existed beforehand:
If the wait container is interrupted during this loop, some artifacts may save to S3 (etc) but not be reported

But since adding artifacts to Outputs.Artifacts wouldn't report them until the very end anyway, I think this doesn't change any existing behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the wait container is interrupted during this loop, some artifacts may save to S3 (etc) but not be reported

How do you mean it wouldn't be reported? Since the background context is passed into both SaveArtifacts() and ReportOutputs() below, I believe all of that logic is still supposed to be executed. Is there something different you're referring to besides the context being cancelled?

Copy link

@agilgur5 agilgur5 Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the loop is long or the container is otherwise non-gracefully terminated, the request context won't matter.

Also realized that saving artifacts can be parallelized here to reduce that chance and improve throughput (related: #12442). Similarly pre-existing logic though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is basically if this Pod receives a SIGKILL? I don't think there's anything that can be done in that case, is there? (at least from the Workflow Pod side)

Copy link

@agilgur5 agilgur5 Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More or less yes, but there are things we can do to mitigate that or detect that scenario, such as parallel uploads and writing pieces of the report as each artifact completes uploading (although k8s does not have streaming requests afaik, so that would put more strain on the api server when there are many artifacts).

I also recently remembered we had #12413 merged for 3.6 that does help prevent that too.

(To be clear, this is pre-existing logic / an incidental finding, so not going to block review on that, but there are perhaps actions we should take around this case. Perhaps better discussed in #12993 though; although that's a specific case)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

artifacts = append(artifacts, art)
}
return nil
return artifacts, nil
}

func (we *WorkflowExecutor) saveArtifact(ctx context.Context, containerName string, art *wfv1.Artifact) error {
Expand Down Expand Up @@ -832,9 +833,9 @@ func (we *WorkflowExecutor) InitializeOutput(ctx context.Context) {
}

// ReportOutputs updates the WorkflowTaskResult (or falls back to annotate the Pod)
func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, logArtifacts []wfv1.Artifact) error {
func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, artifacts []wfv1.Artifact) error {
outputs := we.Template.Outputs.DeepCopy()
outputs.Artifacts = append(outputs.Artifacts, logArtifacts...)
outputs.Artifacts = artifacts
return we.reportResult(ctx, wfv1.NodeResult{Outputs: outputs})
}

Expand Down
2 changes: 1 addition & 1 deletion workflow/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func TestSaveArtifacts(t *testing.T) {

for _, tt := range tests {
ctx := context.Background()
err := tt.workflowExecutor.SaveArtifacts(ctx)
_, err := tt.workflowExecutor.SaveArtifacts(ctx)
if err != nil {
assert.Equal(t, tt.expectError, true)
continue
Expand Down
Loading