diff --git a/cmd/argoexec/commands/wait.go b/cmd/argoexec/commands/wait.go index 193b0ff6440b..76896ea87a5b 100644 --- a/cmd/argoexec/commands/wait.go +++ b/cmd/argoexec/commands/wait.go @@ -57,16 +57,17 @@ func waitContainer(ctx context.Context) error { } // Saving output artifacts - err = wfExecutor.SaveArtifacts(bgCtx) + artifacts, err := wfExecutor.SaveArtifacts(bgCtx) if err != nil { wfExecutor.AddError(err) } // Save log artifacts logArtifacts := wfExecutor.SaveLogs(bgCtx) + artifacts = append(artifacts, logArtifacts...) // Try to upsert TaskResult. If it fails, we will try to update the Pod's Annotations - err = wfExecutor.ReportOutputs(bgCtx, logArtifacts) + err = wfExecutor.ReportOutputs(bgCtx, artifacts) if err != nil { wfExecutor.AddError(err) } diff --git a/test/e2e/artifacts_test.go b/test/e2e/artifacts_test.go index cd9005d896e8..1caa76d4c35c 100644 --- a/test/e2e/artifacts_test.go +++ b/test/e2e/artifacts_test.go @@ -128,12 +128,52 @@ func (s *ArtifactsSuite) TestGlobalArtifactPassing() { } type artifactState struct { - key string - bucketName string + artifactLocation s3Location + deletedAtWFCompletion bool deletedAtWFDeletion bool } +type s3Location struct { + bucketName string + // specify one of these two: + specifiedKey string // exact key is known + derivedKey *artifactDerivedKey // exact key needs to be derived +} + +type artifactDerivedKey struct { + templateName string + artifactName string +} + +func (al *s3Location) getS3Key(wf *wfv1.Workflow) (string, error) { + if al.specifiedKey == "" && al.derivedKey == nil { + panic(fmt.Sprintf("invalid artifactLocation: %+v, must have specifiedKey or derivedKey set", al)) + } + + if al.specifiedKey != "" { + return al.specifiedKey, nil + } + + // get key by finding the node in the Workflow's NodeStatus and looking at its Artifacts + + // get node name using template + n := wf.Status.Nodes.Find(func(nodeStatus wfv1.NodeStatus) bool { return nodeStatus.TemplateName == al.derivedKey.templateName }) + if n == nil { + return "", fmt.Errorf("no node with template name=%q found in workflow %+v", al.derivedKey.templateName, wf) + } + for _, a := range n.Outputs.Artifacts { + if a.Name == al.derivedKey.artifactName { + if a.S3 == nil { + return "", fmt.Errorf("didn't find expected S3 field in artifact %q: %+v", al.derivedKey.artifactName, a) + } + return a.S3.Key, nil + } + } + + return "", fmt.Errorf("artifact named %q not found", al.derivedKey.artifactName) +} + func (s *ArtifactsSuite) TestStoppedWorkflow() { for _, tt := range []struct { @@ -257,69 +297,76 @@ func (s *ArtifactsSuite) TestArtifactGC() { for _, tt := range []struct { workflowFile string hasGC bool + workflowShouldSucceed bool expectedArtifacts []artifactState expectedGCPodsOnWFCompletion int }{ { workflowFile: "@testdata/artifactgc/artgc-multi-strategy-multi-anno.yaml", hasGC: true, + workflowShouldSucceed: true, expectedGCPodsOnWFCompletion: 2, expectedArtifacts: []artifactState{ - artifactState{"first-on-completion-1", "my-bucket-2", true, false}, - artifactState{"first-on-completion-2", "my-bucket-3", true, false}, - artifactState{"first-no-deletion", "my-bucket-3", false, false}, - artifactState{"second-on-deletion", "my-bucket-3", false, true}, - artifactState{"second-on-completion", "my-bucket-2", true, false}, + artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "first-on-completion-1"}, true, false}, + artifactState{s3Location{bucketName: "my-bucket-3", specifiedKey: "first-on-completion-2"}, true, false}, + artifactState{s3Location{bucketName: "my-bucket-3", specifiedKey: "first-no-deletion"}, false, false}, + artifactState{s3Location{bucketName: "my-bucket-3", specifiedKey: "second-on-deletion"}, false, true}, + artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "second-on-completion"}, true, false}, }, }, // entire Workflow based on a WorkflowTemplate { workflowFile: "@testdata/artifactgc/artgc-from-template.yaml", hasGC: true, + workflowShouldSucceed: true, expectedGCPodsOnWFCompletion: 1, expectedArtifacts: []artifactState{ - artifactState{"on-completion", "my-bucket-2", true, false}, - artifactState{"on-deletion", "my-bucket-2", false, true}, + artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false}, + artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true}, }, }, // entire Workflow based on a WorkflowTemplate { workflowFile: "@testdata/artifactgc/artgc-from-template-2.yaml", hasGC: true, + workflowShouldSucceed: true, expectedGCPodsOnWFCompletion: 1, expectedArtifacts: []artifactState{ - artifactState{"on-completion", "my-bucket-2", true, false}, - artifactState{"on-deletion", "my-bucket-2", false, true}, + artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false}, + artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true}, }, }, // Step in Workflow references a WorkflowTemplate's template { workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl.yaml", hasGC: true, + workflowShouldSucceed: true, expectedGCPodsOnWFCompletion: 1, expectedArtifacts: []artifactState{ - artifactState{"on-completion", "my-bucket-2", true, false}, - artifactState{"on-deletion", "my-bucket-2", false, true}, + artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false}, + artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true}, }, }, // Step in Workflow references a WorkflowTemplate's template { workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl-2.yaml", hasGC: true, + workflowShouldSucceed: true, expectedGCPodsOnWFCompletion: 1, expectedArtifacts: []artifactState{ - artifactState{"on-completion", "my-bucket-2", true, false}, - artifactState{"on-deletion", "my-bucket-2", false, false}, + artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false}, + artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, false}, }, }, // entire Workflow based on a WorkflowTemplate which has a Step that references another WorkflowTemplate's template { workflowFile: "@testdata/artifactgc/artgc-from-ref-template.yaml", hasGC: true, + workflowShouldSucceed: true, expectedGCPodsOnWFCompletion: 1, expectedArtifacts: []artifactState{ - artifactState{"on-completion", "my-bucket-2", true, false}, - artifactState{"on-deletion", "my-bucket-2", false, true}, + artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false}, + artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true}, }, }, // Step in Workflow references a WorkflowTemplate's template @@ -327,9 +374,20 @@ func (s *ArtifactsSuite) TestArtifactGC() { { workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl-no-gc.yaml", hasGC: false, + workflowShouldSucceed: true, expectedGCPodsOnWFCompletion: 0, expectedArtifacts: []artifactState{}, }, + // Workflow fails to write an artifact that's been defined as an Output + { + workflowFile: "@testdata/artifactgc/artgc-artifact-not-written.yaml", + hasGC: true, + workflowShouldSucceed: false, // artifact not being present causes Workflow to fail + expectedGCPodsOnWFCompletion: 0, + expectedArtifacts: []artifactState{ + artifactState{s3Location{bucketName: "my-bucket", derivedKey: &artifactDerivedKey{templateName: "artifact-written", artifactName: "present"}}, false, true}, + }, + }, } { // for each test make sure that: // 1. the finalizer gets added @@ -352,7 +410,7 @@ func (s *ArtifactsSuite) TestArtifactGC() { } }) - if when.WorkflowCondition(func(wf *wfv1.Workflow) bool { + if tt.workflowShouldSucceed && when.WorkflowCondition(func(wf *wfv1.Workflow) bool { return wf.Status.Phase == wfv1.WorkflowFailed || wf.Status.Phase == wfv1.WorkflowError }) { fmt.Println("can't reliably verify Artifact GC since workflow failed") @@ -365,7 +423,7 @@ func (s *ArtifactsSuite) TestArtifactGC() { WaitForWorkflow( fixtures.WorkflowCompletionOkay(true), fixtures.Condition(func(wf *wfv1.Workflow) (bool, string) { - return len(wf.Status.ArtifactGCStatus.PodsRecouped) >= tt.expectedGCPodsOnWFCompletion, + return (len(wf.Status.ArtifactGCStatus.PodsRecouped) >= tt.expectedGCPodsOnWFCompletion) || (tt.expectedGCPodsOnWFCompletion == 0), fmt.Sprintf("for all %d pods to have been recouped", tt.expectedGCPodsOnWFCompletion) })) @@ -373,14 +431,19 @@ func (s *ArtifactsSuite) TestArtifactGC() { // verify that the artifacts that should have been deleted at completion time were for _, expectedArtifact := range tt.expectedArtifacts { + artifactKey, err := expectedArtifact.artifactLocation.getS3Key(when.GetWorkflow()) + fmt.Printf("artifact key: %q\n", artifactKey) + if err != nil { + panic(err) + } if expectedArtifact.deletedAtWFCompletion { - fmt.Printf("verifying artifact %s is deleted at completion time\n", expectedArtifact.key) - then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) { + fmt.Printf("verifying artifact %s is deleted at completion time\n", artifactKey) + then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) { assert.NotNil(t, err) }) } else { - fmt.Printf("verifying artifact %s is not deleted at completion time\n", expectedArtifact.key) - then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) { + fmt.Printf("verifying artifact %s is not deleted at completion time\n", artifactKey) + then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) { assert.NoError(t, err) }) } @@ -390,25 +453,32 @@ func (s *ArtifactsSuite) TestArtifactGC() { when. DeleteWorkflow(). - WaitForWorkflowDeletion() + WaitForWorkflowDeletion(). + Then(). + ExpectWorkflowDeleted() when = when.RemoveFinalizers(false) // just in case - if the above test failed we need to forcibly remove the finalizer for Artifact GC then = when.Then() for _, expectedArtifact := range tt.expectedArtifacts { + artifactKey, err := expectedArtifact.artifactLocation.getS3Key(when.GetWorkflow()) + fmt.Printf("artifact key: %q\n", artifactKey) + if err != nil { + panic(err) + } if expectedArtifact.deletedAtWFCompletion { // already checked this continue } if expectedArtifact.deletedAtWFDeletion { - fmt.Printf("verifying artifact %s is deleted\n", expectedArtifact.key) - then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) { + fmt.Printf("verifying artifact %s is deleted\n", artifactKey) + then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) { assert.NotNil(t, err) }) } else { - fmt.Printf("verifying artifact %s is not deleted\n", expectedArtifact.key) - then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) { + fmt.Printf("verifying artifact %s is not deleted\n", artifactKey) + then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) { assert.NoError(t, err) }) } diff --git a/test/e2e/fixtures/when.go b/test/e2e/fixtures/when.go index f1545d84aa99..c66183b19c96 100644 --- a/test/e2e/fixtures/when.go +++ b/test/e2e/fixtures/when.go @@ -58,6 +58,10 @@ func (w *When) SubmitWorkflow() *When { return w } +func (w *When) GetWorkflow() *wfv1.Workflow { + return w.wf +} + func label(obj metav1.Object) { labels := obj.GetLabels() if labels == nil { diff --git a/test/e2e/testdata/artifactgc/artgc-artifact-not-written.yaml b/test/e2e/testdata/artifactgc/artgc-artifact-not-written.yaml new file mode 100644 index 000000000000..0b040d39de69 --- /dev/null +++ b/test/e2e/testdata/artifactgc/artgc-artifact-not-written.yaml @@ -0,0 +1,43 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: artgc-artifact-not-written- +spec: + entrypoint: entrypoint + artifactGC: + strategy: OnWorkflowDeletion + podGC: + strategy: "" + templates: + - name: entrypoint + steps: + - - name: artifact-written + template: artifact-written + - - name: artifact-not-written + template: artifact-not-written + - name: artifact-written + container: + image: argoproj/argosay:v2 + command: + - sh + - -c + args: + - | + echo "something" > /tmp/present + outputs: + artifacts: + - name: present + path: /tmp/present + - name: artifact-not-written + container: + image: argoproj/argosay:v2 + command: + - sh + - -c + args: + - | + echo "intentionally not writing anything to disk" + outputs: + artifacts: + - name: notpresent + path: /tmp/notpresent \ No newline at end of file diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 07cfe5d6b32f..f45c81bc9506 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -291,26 +291,27 @@ func (we *WorkflowExecutor) StageFiles() error { } // SaveArtifacts uploads artifacts to the archive location -func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) error { +func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) (wfv1.Artifacts, error) { + artifacts := wfv1.Artifacts{} if len(we.Template.Outputs.Artifacts) == 0 { log.Infof("No output artifacts") - return nil + return artifacts, nil } log.Infof("Saving output artifacts") err := os.MkdirAll(tempOutArtDir, os.ModePerm) if err != nil { - return argoerrs.InternalWrapError(err) + return artifacts, argoerrs.InternalWrapError(err) } - for i, art := range we.Template.Outputs.Artifacts { + for _, art := range we.Template.Outputs.Artifacts { err := we.saveArtifact(ctx, common.MainContainerName, &art) if err != nil { - return err + return artifacts, err } - we.Template.Outputs.Artifacts[i] = art + artifacts = append(artifacts, art) } - return nil + return artifacts, nil } func (we *WorkflowExecutor) saveArtifact(ctx context.Context, containerName string, art *wfv1.Artifact) error { @@ -832,9 +833,9 @@ func (we *WorkflowExecutor) InitializeOutput(ctx context.Context) { } // ReportOutputs updates the WorkflowTaskResult (or falls back to annotate the Pod) -func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, logArtifacts []wfv1.Artifact) error { +func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, artifacts []wfv1.Artifact) error { outputs := we.Template.Outputs.DeepCopy() - outputs.Artifacts = append(outputs.Artifacts, logArtifacts...) + outputs.Artifacts = artifacts return we.reportResult(ctx, wfv1.NodeResult{Outputs: outputs}) } diff --git a/workflow/executor/executor_test.go b/workflow/executor/executor_test.go index 7fab56a7bf98..200a25bfeedc 100644 --- a/workflow/executor/executor_test.go +++ b/workflow/executor/executor_test.go @@ -481,7 +481,7 @@ func TestSaveArtifacts(t *testing.T) { for _, tt := range tests { ctx := context.Background() - err := tt.workflowExecutor.SaveArtifacts(ctx) + _, err := tt.workflowExecutor.SaveArtifacts(ctx) if err != nil { assert.Equal(t, tt.expectError, true) continue