Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't necessarily include all artifacts from templates in node outputs #13066

Merged
merged 15 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,17 @@ func waitContainer(ctx context.Context) error {
}

// Saving output artifacts
err = wfExecutor.SaveArtifacts(bgCtx)
artifacts, err := wfExecutor.SaveArtifacts(bgCtx)
if err != nil {
wfExecutor.AddError(err)
}

// Save log artifacts
logArtifacts := wfExecutor.SaveLogs(bgCtx)
artifacts = append(artifacts, logArtifacts...)

// Try to upsert TaskResult. If it fails, we will try to update the Pod's Annotations
err = wfExecutor.ReportOutputs(bgCtx, logArtifacts)
err = wfExecutor.ReportOutputs(bgCtx, artifacts)
if err != nil {
wfExecutor.AddError(err)
}
Expand Down
127 changes: 99 additions & 28 deletions test/e2e/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,53 @@ func (s *ArtifactsSuite) TestGlobalArtifactPassing() {
}

type artifactState struct {
key string
bucketName string
artifactLocation s3Location

deletedAtWFCompletion bool
deletedAtWFDeletion bool
}

type s3Location struct {
bucketName string
// specify one of these two:
specifiedKey string // exact key is known
derivedKey *artifactDerivedKey // exact key needs to be derived
}

type artifactDerivedKey struct {
templateName string
artifactName string
}

func (al *s3Location) getS3Key(wf *wfv1.Workflow) (string, error) {

juliev0 marked this conversation as resolved.
Show resolved Hide resolved
if al.specifiedKey == "" && al.derivedKey == nil {
panic(fmt.Sprintf("invalid artifactLocation: %+v, must have knownKey or derivedKey set", al))
juliev0 marked this conversation as resolved.
Show resolved Hide resolved
}

if al.specifiedKey != "" {
return al.specifiedKey, nil
}

// get key by finding the node in the Workflow's NodeStatus and looking at its Artifacts

// get node name using template
n := wf.Status.Nodes.Find(func(nodeStatus wfv1.NodeStatus) bool { return nodeStatus.TemplateName == al.derivedKey.templateName })
if n == nil {
return "", fmt.Errorf("no node with template name=%q found in workflow %+v", al.derivedKey.templateName, wf)
}
for _, a := range n.Outputs.Artifacts {
if a.Name == al.derivedKey.artifactName {
if a.S3 == nil {
return "", fmt.Errorf("didn't find expected S3 field in artifact %q: %+v", al.derivedKey.artifactName, a)
}
return a.S3.Key, nil
}
}

return "", fmt.Errorf("artifact named %q not found", al.derivedKey.artifactName)
}

func (s *ArtifactsSuite) TestStoppedWorkflow() {

for _, tt := range []struct {
Expand Down Expand Up @@ -257,79 +298,97 @@ func (s *ArtifactsSuite) TestArtifactGC() {
for _, tt := range []struct {
workflowFile string
hasGC bool
workflowShouldSucceed bool
expectedArtifacts []artifactState
expectedGCPodsOnWFCompletion int
}{
{
workflowFile: "@testdata/artifactgc/artgc-multi-strategy-multi-anno.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 2,
expectedArtifacts: []artifactState{
artifactState{"first-on-completion-1", "my-bucket-2", true, false},
artifactState{"first-on-completion-2", "my-bucket-3", true, false},
artifactState{"first-no-deletion", "my-bucket-3", false, false},
artifactState{"second-on-deletion", "my-bucket-3", false, true},
artifactState{"second-on-completion", "my-bucket-2", true, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "first-on-completion-1"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-3", specifiedKey: "first-on-completion-2"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-3", specifiedKey: "first-no-deletion"}, false, false},
artifactState{s3Location{bucketName: "my-bucket-3", specifiedKey: "second-on-deletion"}, false, true},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "second-on-completion"}, true, false},
},
},
// entire Workflow based on a WorkflowTemplate
{
workflowFile: "@testdata/artifactgc/artgc-from-template.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
artifactState{"on-deletion", "my-bucket-2", false, true},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true},
},
},
// entire Workflow based on a WorkflowTemplate
{
workflowFile: "@testdata/artifactgc/artgc-from-template-2.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
artifactState{"on-deletion", "my-bucket-2", false, true},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true},
},
},
// Step in Workflow references a WorkflowTemplate's template
{
workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
artifactState{"on-deletion", "my-bucket-2", false, true},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true},
},
},
// Step in Workflow references a WorkflowTemplate's template
{
workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl-2.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
artifactState{"on-deletion", "my-bucket-2", false, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, false},
},
},
// entire Workflow based on a WorkflowTemplate which has a Step that references another WorkflowTemplate's template
{
workflowFile: "@testdata/artifactgc/artgc-from-ref-template.yaml",
hasGC: true,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 1,
expectedArtifacts: []artifactState{
artifactState{"on-completion", "my-bucket-2", true, false},
artifactState{"on-deletion", "my-bucket-2", false, true},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true},
},
},
// Step in Workflow references a WorkflowTemplate's template
// Workflow defines ArtifactGC but all artifacts override with "Never" so Artifact GC should not be done
{
workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl-no-gc.yaml",
hasGC: false,
workflowShouldSucceed: true,
expectedGCPodsOnWFCompletion: 0,
expectedArtifacts: []artifactState{},
},
// Workflow fails to write an artifact that's been defined as an Output
{
workflowFile: "@testdata/artifactgc/artgc-artifact-not-written.yaml",
hasGC: true,
workflowShouldSucceed: false, // artifact not being present causes Workflow to fail
expectedGCPodsOnWFCompletion: 0,
expectedArtifacts: []artifactState{
artifactState{s3Location{bucketName: "my-bucket", derivedKey: &artifactDerivedKey{templateName: "artifact-written", artifactName: "present"}}, false, true},
},
},
} {
// for each test make sure that:
// 1. the finalizer gets added
Expand All @@ -352,7 +411,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
}
})

if when.WorkflowCondition(func(wf *wfv1.Workflow) bool {
if tt.workflowShouldSucceed && when.WorkflowCondition(func(wf *wfv1.Workflow) bool {
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
return wf.Status.Phase == wfv1.WorkflowFailed || wf.Status.Phase == wfv1.WorkflowError
}) {
fmt.Println("can't reliably verify Artifact GC since workflow failed")
Expand All @@ -365,22 +424,27 @@ func (s *ArtifactsSuite) TestArtifactGC() {
WaitForWorkflow(
fixtures.WorkflowCompletionOkay(true),
fixtures.Condition(func(wf *wfv1.Workflow) (bool, string) {
return len(wf.Status.ArtifactGCStatus.PodsRecouped) >= tt.expectedGCPodsOnWFCompletion,
return (len(wf.Status.ArtifactGCStatus.PodsRecouped) >= tt.expectedGCPodsOnWFCompletion) || (tt.expectedGCPodsOnWFCompletion == 0),
fmt.Sprintf("for all %d pods to have been recouped", tt.expectedGCPodsOnWFCompletion)
}))

then := when.Then()

// verify that the artifacts that should have been deleted at completion time were
for _, expectedArtifact := range tt.expectedArtifacts {
artifactKey, err := expectedArtifact.artifactLocation.getS3Key(when.GetWorkflow())
fmt.Printf("artifact key: %q\n", artifactKey)
if err != nil {
panic(err)
}
if expectedArtifact.deletedAtWFCompletion {
fmt.Printf("verifying artifact %s is deleted at completion time\n", expectedArtifact.key)
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
fmt.Printf("verifying artifact %s is deleted at completion time\n", artifactKey)
then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})
} else {
fmt.Printf("verifying artifact %s is not deleted at completion time\n", expectedArtifact.key)
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
fmt.Printf("verifying artifact %s is not deleted at completion time\n", artifactKey)
then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NoError(t, err)
})
}
Expand All @@ -390,25 +454,32 @@ func (s *ArtifactsSuite) TestArtifactGC() {

when.
DeleteWorkflow().
WaitForWorkflowDeletion()
WaitForWorkflowDeletion().
Then().
ExpectWorkflowDeleted()

when = when.RemoveFinalizers(false) // just in case - if the above test failed we need to forcibly remove the finalizer for Artifact GC

then = when.Then()

for _, expectedArtifact := range tt.expectedArtifacts {
artifactKey, err := expectedArtifact.artifactLocation.getS3Key(when.GetWorkflow())
fmt.Printf("artifact key: %q\n", artifactKey)
if err != nil {
panic(err)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be below the continue conditional? Since it's not used by it, so can be skipped in that case. Unless you want to print each key?


if expectedArtifact.deletedAtWFCompletion { // already checked this
continue
}
if expectedArtifact.deletedAtWFDeletion {
fmt.Printf("verifying artifact %s is deleted\n", expectedArtifact.key)
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
fmt.Printf("verifying artifact %s is deleted\n", artifactKey)
then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NotNil(t, err)
})
} else {
fmt.Printf("verifying artifact %s is not deleted\n", expectedArtifact.key)
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
fmt.Printf("verifying artifact %s is not deleted\n", artifactKey)
then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
assert.NoError(t, err)
})
}
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (w *When) SubmitWorkflow() *When {
return w
}

func (w *When) GetWorkflow() *wfv1.Workflow {
return w.wf
}

func label(obj metav1.Object) {
labels := obj.GetLabels()
if labels == nil {
Expand Down
43 changes: 43 additions & 0 deletions test/e2e/testdata/artifactgc/artgc-artifact-not-written.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: artgc-artifact-not-written-
spec:
entrypoint: entrypoint
artifactGC:
strategy: OnWorkflowDeletion
podGC:
strategy: ""
templates:
- name: entrypoint
steps:
- - name: artifact-written
template: artifact-written
- - name: artifact-not-written
template: artifact-not-written
- name: artifact-written
container:
image: argoproj/argosay:v2
command:
- sh
- -c
args:
- |
echo "something" > /tmp/present
outputs:
artifacts:
- name: present
path: /tmp/present
- name: artifact-not-written
container:
image: argoproj/argosay:v2
command:
- sh
- -c
args:
- |
echo "intentionally not writing anything to disk"
outputs:
artifacts:
- name: notpresent
path: /tmp/notpresent
19 changes: 10 additions & 9 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,26 +291,27 @@ func (we *WorkflowExecutor) StageFiles() error {
}

// SaveArtifacts uploads artifacts to the archive location
func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) error {
func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) (wfv1.Artifacts, error) {
artifacts := wfv1.Artifacts{}
if len(we.Template.Outputs.Artifacts) == 0 {
log.Infof("No output artifacts")
return nil
return artifacts, nil
}

log.Infof("Saving output artifacts")
err := os.MkdirAll(tempOutArtDir, os.ModePerm)
if err != nil {
return argoerrs.InternalWrapError(err)
return artifacts, argoerrs.InternalWrapError(err)
}

for i, art := range we.Template.Outputs.Artifacts {
for _, art := range we.Template.Outputs.Artifacts {
err := we.saveArtifact(ctx, common.MainContainerName, &art)
if err != nil {
return err
return artifacts, err
}
we.Template.Outputs.Artifacts[i] = art
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of updating this directly, create a new list of the artifacts that we actually saved and return that

Copy link
Contributor

@Garett-MacGowan Garett-MacGowan Jun 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. So before we were using the we.Template.Outputs.Artifacts to pass to ReportOutputs but now we are creating a fresh wfv1.Artifacts object that will only include the successfully saved artifacts. This makes sense.

Copy link

@agilgur5 agilgur5 Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I do see a small issue here, but I think this already existed beforehand:
If the wait container is interrupted during this loop, some artifacts may save to S3 (etc) but not be reported

But since adding artifacts to Outputs.Artifacts wouldn't report them until the very end anyway, I think this doesn't change any existing behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the wait container is interrupted during this loop, some artifacts may save to S3 (etc) but not be reported

How do you mean it wouldn't be reported? Since the background context is passed into both SaveArtifacts() and ReportOutputs() below, I believe all of that logic is still supposed to be executed. Is there something different you're referring to besides the context being cancelled?

Copy link

@agilgur5 agilgur5 Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the loop is long or the container is otherwise non-gracefully terminated, the request context won't matter.

Also realized that saving artifacts can be parallelized here to reduce that chance and improve throughput (related: #12442). Similarly pre-existing logic though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is basically if this Pod receives a SIGKILL? I don't think there's anything that can be done in that case, is there? (at least from the Workflow Pod side)

Copy link

@agilgur5 agilgur5 Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More or less yes, but there are things we can do to mitigate that or detect that scenario, such as parallel uploads and writing pieces of the report as each artifact completes uploading (although k8s does not have streaming requests afaik, so that would put more strain on the api server when there are many artifacts).

I also recently remembered we had #12413 merged for 3.6 that does help prevent that too.

(To be clear, this is pre-existing logic / an incidental finding, so not going to block review on that, but there are perhaps actions we should take around this case. Perhaps better discussed in #12993 though; although that's a specific case)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

artifacts = append(artifacts, art)
}
return nil
return artifacts, nil
}

func (we *WorkflowExecutor) saveArtifact(ctx context.Context, containerName string, art *wfv1.Artifact) error {
Expand Down Expand Up @@ -832,9 +833,9 @@ func (we *WorkflowExecutor) InitializeOutput(ctx context.Context) {
}

// ReportOutputs updates the WorkflowTaskResult (or falls back to annotate the Pod)
func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, logArtifacts []wfv1.Artifact) error {
func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, artifacts []wfv1.Artifact) error {
outputs := we.Template.Outputs.DeepCopy()
outputs.Artifacts = append(outputs.Artifacts, logArtifacts...)
outputs.Artifacts = artifacts
return we.reportResult(ctx, wfv1.NodeResult{Outputs: outputs})
}

Expand Down
2 changes: 1 addition & 1 deletion workflow/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func TestSaveArtifacts(t *testing.T) {

for _, tt := range tests {
ctx := context.Background()
err := tt.workflowExecutor.SaveArtifacts(ctx)
_, err := tt.workflowExecutor.SaveArtifacts(ctx)
if err != nil {
assert.Equal(t, tt.expectError, true)
continue
Expand Down
Loading