diff --git a/workflow/artifacts/s3/errors.go b/workflow/artifacts/s3/errors.go index e731d5baba84..6e71aebb5567 100644 --- a/workflow/artifacts/s3/errors.go +++ b/workflow/artifacts/s3/errors.go @@ -1,6 +1,10 @@ package s3 -import argos3 "github.com/argoproj/pkg/s3" +import ( + argos3 "github.com/argoproj/pkg/s3" + + "github.com/argoproj/argo-workflows/v3/util/errors" +) // s3TransientErrorCodes is a list of S3 error codes that are transient (retryable) // Reference: https://github.com/minio/minio-go/blob/92fe50d14294782d96402deb861d442992038109/retry.go#L90-L102 @@ -25,5 +29,5 @@ func isTransientS3Err(err error) bool { return true } } - return false + return errors.IsTransientErr(err) } diff --git a/workflow/artifacts/s3/s3.go b/workflow/artifacts/s3/s3.go index b9c3caf398d3..b48f8eb94242 100644 --- a/workflow/artifacts/s3/s3.go +++ b/workflow/artifacts/s3/s3.go @@ -4,19 +4,18 @@ import ( "context" "fmt" "os" - "time" "github.com/argoproj/pkg/file" argos3 "github.com/argoproj/pkg/s3" "github.com/minio/minio-go/v7" log "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/util/wait" "github.com/argoproj/argo-workflows/v3/errors" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" waitutil "github.com/argoproj/argo-workflows/v3/util/wait" artifactscommon "github.com/argoproj/argo-workflows/v3/workflow/artifacts/common" "github.com/argoproj/argo-workflows/v3/workflow/common" + executorretry "github.com/argoproj/argo-workflows/v3/workflow/executor/retry" ) // ArtifactDriver is a driver for AWS S3 @@ -35,10 +34,7 @@ type ArtifactDriver struct { ServerSideCustomerKey string } -var ( - _ artifactscommon.ArtifactDriver = &ArtifactDriver{} - defaultRetry = wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1} -) +var _ artifactscommon.ArtifactDriver = &ArtifactDriver{} // newMinioClient instantiates a new minio client object. func (s3Driver *ArtifactDriver) newS3Client(ctx context.Context) (argos3.S3Client, error) { @@ -67,7 +63,7 @@ func (s3Driver *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := waitutil.Backoff(defaultRetry, + err := waitutil.Backoff(executorretry.ExecutorRetry, func() (bool, error) { log.Infof("S3 Load path: %s, key: %s", path, inputArtifact.S3.Key) s3cli, err := s3Driver.newS3Client(ctx) @@ -112,7 +108,7 @@ func (s3Driver *ArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := waitutil.Backoff(defaultRetry, + err := waitutil.Backoff(executorretry.ExecutorRetry, func() (bool, error) { log.Infof("S3 Save path: %s, key: %s", path, outputArtifact.S3.Key) s3cli, err := s3Driver.newS3Client(ctx) @@ -162,7 +158,7 @@ func (s3Driver *ArtifactDriver) ListObjects(artifact *wfv1.Artifact) ([]string, defer cancel() var files []string - err := waitutil.Backoff(defaultRetry, + err := waitutil.Backoff(executorretry.ExecutorRetry, func() (bool, error) { s3cli, err := s3Driver.newS3Client(ctx) if err != nil { diff --git a/workflow/artifacts/s3/s3_test.go b/workflow/artifacts/s3/s3_test.go index 9dc6e0ce530a..89d719e40c3a 100644 --- a/workflow/artifacts/s3/s3_test.go +++ b/workflow/artifacts/s3/s3_test.go @@ -14,6 +14,8 @@ import ( wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) +const transientEnvVarKey = "TRANSIENT_ERROR_PATTERN" + type mockS3Client struct { // files is a map where key is bucket name and value consists of file keys files map[string][]string @@ -175,6 +177,24 @@ func TestLoadS3Artifact(t *testing.T) { done: true, errMsg: "", }, + "Get File Other Transient Error": { + s3client: newMockS3Client( + map[string][]string{ + "my-bucket": []string{ + "/folder/hello-art-2.tar.gz", + }, + }, + map[string]error{ + "GetFile": minio.ErrorResponse{ + Code: "this error is transient", + }, + }), + bucket: "my-bucket", + key: "/folder/", + localPath: "/tmp/folder/", + done: false, + errMsg: "failed to get file: Error response code this error is transient.", + }, "Test Directory Failed": { s3client: newMockS3Client( map[string][]string{ @@ -219,6 +239,7 @@ func TestLoadS3Artifact(t *testing.T) { }, } + _ = os.Setenv(transientEnvVarKey, "this error is transient") for name, tc := range tests { t.Run(name, func(t *testing.T) { success, err := loadS3Artifact(tc.s3client, &wfv1.Artifact{ @@ -239,6 +260,7 @@ func TestLoadS3Artifact(t *testing.T) { } }) } + _ = os.Unsetenv(transientEnvVarKey) } func TestSaveS3Artifact(t *testing.T) { @@ -332,9 +354,26 @@ func TestSaveS3Artifact(t *testing.T) { done: false, errMsg: "failed to put file: We encountered an internal error, please try again.", }, + "Save File Other Transient Error": { + s3client: newMockS3Client( + map[string][]string{ + "my-bucket": {}, + }, + map[string]error{ + "PutFile": minio.ErrorResponse{ + Code: "this error is transient", + }, + }), + bucket: "my-bucket", + key: "/folder/hello-art.tar.gz", + localPath: tempFile, + done: false, + errMsg: "failed to put file: Error response code this error is transient.", + }, } for name, tc := range tests { + _ = os.Setenv(transientEnvVarKey, "this error is transient") t.Run(name, func(t *testing.T) { success, err := saveS3Artifact( tc.s3client, @@ -360,5 +399,6 @@ func TestSaveS3Artifact(t *testing.T) { assert.Equal(t, tc.errMsg, "") } }) + _ = os.Unsetenv(transientEnvVarKey) } } diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index ed4a7743b024..9b4ba736218c 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -25,7 +25,6 @@ import ( log "github.com/sirupsen/logrus" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -33,29 +32,15 @@ import ( wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/util" "github.com/argoproj/argo-workflows/v3/util/archive" - envutil "github.com/argoproj/argo-workflows/v3/util/env" errorsutil "github.com/argoproj/argo-workflows/v3/util/errors" "github.com/argoproj/argo-workflows/v3/util/retry" waitutil "github.com/argoproj/argo-workflows/v3/util/wait" artifact "github.com/argoproj/argo-workflows/v3/workflow/artifacts" artifactcommon "github.com/argoproj/argo-workflows/v3/workflow/artifacts/common" "github.com/argoproj/argo-workflows/v3/workflow/common" + executorretry "github.com/argoproj/argo-workflows/v3/workflow/executor/retry" ) -// ExecutorRetry is a retry backoff settings for WorkflowExecutor -// Run Seconds -// 0 0.000 -// 1 1.000 -// 2 2.600 -// 3 5.160 -// 4 9.256 -var ExecutorRetry = wait.Backoff{ - Steps: envutil.LookupEnvIntOr("EXECUTOR_RETRY_BACKOFF_STEPS", 5), - Duration: envutil.LookupEnvDurationOr("EXECUTOR_RETRY_BACKOFF_DURATION", 1*time.Second), - Factor: envutil.LookupEnvFloatOr("EXECUTOR_RETRY_BACKOFF_FACTOR", 1.6), - Jitter: envutil.LookupEnvFloatOr("EXECUTOR_RETRY_BACKOFF_JITTER", 0.5), -} - const ( // This directory temporarily stores the tarballs of the artifacts before uploading tempOutArtDir = "/tmp/argo/outputs/artifacts" @@ -125,6 +110,7 @@ func isErrUnknownGetPods(err error) bool { // NewExecutor instantiates a new workflow executor func NewExecutor(clientset kubernetes.Interface, restClient rest.Interface, podName, namespace string, cre ContainerRuntimeExecutor, template wfv1.Template, includeScriptOutput bool, deadline time.Time, annotationPatchTickDuration, readProgressFileTickDuration time.Duration) WorkflowExecutor { + log.WithFields(log.Fields{"Steps": executorretry.Steps, "Duration": executorretry.Duration, "Factor": executorretry.Factor, "Jitter": executorretry.Jitter}).Info("Using executor retry strategy") return WorkflowExecutor{ PodName: podName, ClientSet: clientset, @@ -612,7 +598,7 @@ func (we *WorkflowExecutor) InitDriver(ctx context.Context, art *wfv1.Artifact) func (we *WorkflowExecutor) getPod(ctx context.Context) (*apiv1.Pod, error) { podsIf := we.ClientSet.CoreV1().Pods(we.Namespace) var pod *apiv1.Pod - err := waitutil.Backoff(ExecutorRetry, func() (bool, error) { + err := waitutil.Backoff(executorretry.ExecutorRetry, func() (bool, error) { var err error pod, err = podsIf.Get(ctx, we.PodName, metav1.GetOptions{}) if err != nil && isErrUnknownGetPods(err) { @@ -765,7 +751,7 @@ func (we *WorkflowExecutor) HasError() error { // AddAnnotation adds an annotation to the workflow pod func (we *WorkflowExecutor) AddAnnotation(ctx context.Context, key, value string) error { - return common.AddPodAnnotation(ctx, we.ClientSet, we.PodName, we.Namespace, key, value, ExecutorRetry) + return common.AddPodAnnotation(ctx, we.ClientSet, we.PodName, we.Namespace, key, value, executorretry.ExecutorRetry) } // isTarball returns whether or not the file is a tarball @@ -948,7 +934,7 @@ func (we *WorkflowExecutor) Wait(ctx context.Context) error { } go we.monitorDeadline(ctx, containerNames) - err := waitutil.Backoff(ExecutorRetry, func() (bool, error) { + err := waitutil.Backoff(executorretry.ExecutorRetry, func() (bool, error) { err := we.RuntimeExecutor.Wait(ctx, containerNames) return err == nil, err }) diff --git a/workflow/executor/retry/retry.go b/workflow/executor/retry/retry.go new file mode 100644 index 000000000000..35ae6c6759c2 --- /dev/null +++ b/workflow/executor/retry/retry.go @@ -0,0 +1,24 @@ +package retry + +import ( + "time" + + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/argoproj/argo-workflows/v3/util/env" +) + +// ExecutorRetry is a retry backoff settings for WorkflowExecutor +// Run Seconds +// 0 0.000 +// 1 1.000 +// 2 2.600 +// 3 5.160 +// 4 9.256 +var ( + Steps = env.LookupEnvIntOr("EXECUTOR_RETRY_BACKOFF_STEPS", 5) + Duration = env.LookupEnvDurationOr("EXECUTOR_RETRY_BACKOFF_DURATION", 1*time.Second) + Factor = env.LookupEnvFloatOr("EXECUTOR_RETRY_BACKOFF_FACTOR", 1.6) + Jitter = env.LookupEnvFloatOr("EXECUTOR_RETRY_BACKOFF_JITTER", 0.5) + ExecutorRetry = wait.Backoff{Steps: Steps, Duration: Duration, Factor: Factor, Jitter: Jitter} +)