Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: transient errors for s3 artifacts: Fixes #7349 #7352

Merged
merged 7 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions workflow/artifacts/s3/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package s3

import argos3 "github.com/argoproj/pkg/s3"
import (
argos3 "github.com/argoproj/pkg/s3"

"github.com/argoproj/argo-workflows/v3/util/errors"
)

// s3TransientErrorCodes is a list of S3 error codes that are transient (retryable)
// Reference: https://github.com/minio/minio-go/blob/92fe50d14294782d96402deb861d442992038109/retry.go#L90-L102
Expand All @@ -25,5 +29,5 @@ func isTransientS3Err(err error) bool {
return true
}
}
return false
return errors.IsTransientErr(err)
}
14 changes: 5 additions & 9 deletions workflow/artifacts/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@ import (
"context"
"fmt"
"os"
"time"

"github.com/argoproj/pkg/file"
argos3 "github.com/argoproj/pkg/s3"
"github.com/minio/minio-go/v7"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
artifactscommon "github.com/argoproj/argo-workflows/v3/workflow/artifacts/common"
"github.com/argoproj/argo-workflows/v3/workflow/common"
executorretry "github.com/argoproj/argo-workflows/v3/workflow/executor/retry"
)

// ArtifactDriver is a driver for AWS S3
Expand All @@ -35,10 +34,7 @@ type ArtifactDriver struct {
ServerSideCustomerKey string
}

var (
_ artifactscommon.ArtifactDriver = &ArtifactDriver{}
defaultRetry = wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1}
)
var _ artifactscommon.ArtifactDriver = &ArtifactDriver{}

// newMinioClient instantiates a new minio client object.
func (s3Driver *ArtifactDriver) newS3Client(ctx context.Context) (argos3.S3Client, error) {
Expand Down Expand Up @@ -67,7 +63,7 @@ func (s3Driver *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := waitutil.Backoff(defaultRetry,
err := waitutil.Backoff(executorretry.ExecutorRetry,
func() (bool, error) {
log.Infof("S3 Load path: %s, key: %s", path, inputArtifact.S3.Key)
s3cli, err := s3Driver.newS3Client(ctx)
Expand Down Expand Up @@ -112,7 +108,7 @@ func (s3Driver *ArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := waitutil.Backoff(defaultRetry,
err := waitutil.Backoff(executorretry.ExecutorRetry,
func() (bool, error) {
log.Infof("S3 Save path: %s, key: %s", path, outputArtifact.S3.Key)
s3cli, err := s3Driver.newS3Client(ctx)
Expand Down Expand Up @@ -162,7 +158,7 @@ func (s3Driver *ArtifactDriver) ListObjects(artifact *wfv1.Artifact) ([]string,
defer cancel()

var files []string
err := waitutil.Backoff(defaultRetry,
err := waitutil.Backoff(executorretry.ExecutorRetry,
func() (bool, error) {
s3cli, err := s3Driver.newS3Client(ctx)
if err != nil {
Expand Down
40 changes: 40 additions & 0 deletions workflow/artifacts/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

const transientEnvVarKey = "TRANSIENT_ERROR_PATTERN"

type mockS3Client struct {
// files is a map where key is bucket name and value consists of file keys
files map[string][]string
Expand Down Expand Up @@ -175,6 +177,24 @@ func TestLoadS3Artifact(t *testing.T) {
done: true,
errMsg: "",
},
"Get File Other Transient Error": {
s3client: newMockS3Client(
map[string][]string{
"my-bucket": []string{
"/folder/hello-art-2.tar.gz",
},
},
map[string]error{
"GetFile": minio.ErrorResponse{
Code: "this error is transient",
},
}),
bucket: "my-bucket",
key: "/folder/",
localPath: "/tmp/folder/",
done: false,
errMsg: "failed to get file: Error response code this error is transient.",
},
"Test Directory Failed": {
s3client: newMockS3Client(
map[string][]string{
Expand Down Expand Up @@ -219,6 +239,7 @@ func TestLoadS3Artifact(t *testing.T) {
},
}

_ = os.Setenv(transientEnvVarKey, "this error is transient")
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
success, err := loadS3Artifact(tc.s3client, &wfv1.Artifact{
Expand All @@ -239,6 +260,7 @@ func TestLoadS3Artifact(t *testing.T) {
}
})
}
_ = os.Unsetenv(transientEnvVarKey)
}

func TestSaveS3Artifact(t *testing.T) {
Expand Down Expand Up @@ -332,9 +354,26 @@ func TestSaveS3Artifact(t *testing.T) {
done: false,
errMsg: "failed to put file: We encountered an internal error, please try again.",
},
"Save File Other Transient Error": {
s3client: newMockS3Client(
map[string][]string{
"my-bucket": {},
},
map[string]error{
"PutFile": minio.ErrorResponse{
Code: "this error is transient",
},
}),
bucket: "my-bucket",
key: "/folder/hello-art.tar.gz",
localPath: tempFile,
done: false,
errMsg: "failed to put file: Error response code this error is transient.",
},
}

for name, tc := range tests {
_ = os.Setenv(transientEnvVarKey, "this error is transient")
t.Run(name, func(t *testing.T) {
success, err := saveS3Artifact(
tc.s3client,
Expand All @@ -360,5 +399,6 @@ func TestSaveS3Artifact(t *testing.T) {
assert.Equal(t, tc.errMsg, "")
}
})
_ = os.Unsetenv(transientEnvVarKey)
}
}
24 changes: 5 additions & 19 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,22 @@ import (
log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

argoerrs "github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util"
"github.com/argoproj/argo-workflows/v3/util/archive"
envutil "github.com/argoproj/argo-workflows/v3/util/env"
errorsutil "github.com/argoproj/argo-workflows/v3/util/errors"
"github.com/argoproj/argo-workflows/v3/util/retry"
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
artifact "github.com/argoproj/argo-workflows/v3/workflow/artifacts"
artifactcommon "github.com/argoproj/argo-workflows/v3/workflow/artifacts/common"
"github.com/argoproj/argo-workflows/v3/workflow/common"
executorretry "github.com/argoproj/argo-workflows/v3/workflow/executor/retry"
)

// ExecutorRetry is a retry backoff settings for WorkflowExecutor
// Run Seconds
// 0 0.000
// 1 1.000
// 2 2.600
// 3 5.160
// 4 9.256
var ExecutorRetry = wait.Backoff{
Steps: envutil.LookupEnvIntOr("EXECUTOR_RETRY_BACKOFF_STEPS", 5),
Duration: envutil.LookupEnvDurationOr("EXECUTOR_RETRY_BACKOFF_DURATION", 1*time.Second),
Factor: envutil.LookupEnvFloatOr("EXECUTOR_RETRY_BACKOFF_FACTOR", 1.6),
Jitter: envutil.LookupEnvFloatOr("EXECUTOR_RETRY_BACKOFF_JITTER", 0.5),
}

const (
// This directory temporarily stores the tarballs of the artifacts before uploading
tempOutArtDir = "/tmp/argo/outputs/artifacts"
Expand Down Expand Up @@ -125,6 +110,7 @@ func isErrUnknownGetPods(err error) bool {

// NewExecutor instantiates a new workflow executor
func NewExecutor(clientset kubernetes.Interface, restClient rest.Interface, podName, namespace string, cre ContainerRuntimeExecutor, template wfv1.Template, includeScriptOutput bool, deadline time.Time, annotationPatchTickDuration, readProgressFileTickDuration time.Duration) WorkflowExecutor {
log.WithFields(log.Fields{"Steps": executorretry.Steps, "Duration": executorretry.Duration, "Factor": executorretry.Factor, "Jitter": executorretry.Jitter}).Info("Using executor retry strategy")
return WorkflowExecutor{
PodName: podName,
ClientSet: clientset,
Expand Down Expand Up @@ -612,7 +598,7 @@ func (we *WorkflowExecutor) InitDriver(ctx context.Context, art *wfv1.Artifact)
func (we *WorkflowExecutor) getPod(ctx context.Context) (*apiv1.Pod, error) {
podsIf := we.ClientSet.CoreV1().Pods(we.Namespace)
var pod *apiv1.Pod
err := waitutil.Backoff(ExecutorRetry, func() (bool, error) {
err := waitutil.Backoff(executorretry.ExecutorRetry, func() (bool, error) {
var err error
pod, err = podsIf.Get(ctx, we.PodName, metav1.GetOptions{})
if err != nil && isErrUnknownGetPods(err) {
Expand Down Expand Up @@ -765,7 +751,7 @@ func (we *WorkflowExecutor) HasError() error {

// AddAnnotation adds an annotation to the workflow pod
func (we *WorkflowExecutor) AddAnnotation(ctx context.Context, key, value string) error {
return common.AddPodAnnotation(ctx, we.ClientSet, we.PodName, we.Namespace, key, value, ExecutorRetry)
return common.AddPodAnnotation(ctx, we.ClientSet, we.PodName, we.Namespace, key, value, executorretry.ExecutorRetry)
}

// isTarball returns whether or not the file is a tarball
Expand Down Expand Up @@ -948,7 +934,7 @@ func (we *WorkflowExecutor) Wait(ctx context.Context) error {
}

go we.monitorDeadline(ctx, containerNames)
err := waitutil.Backoff(ExecutorRetry, func() (bool, error) {
err := waitutil.Backoff(executorretry.ExecutorRetry, func() (bool, error) {
err := we.RuntimeExecutor.Wait(ctx, containerNames)
return err == nil, err
})
Expand Down
24 changes: 24 additions & 0 deletions workflow/executor/retry/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package retry

import (
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/argoproj/argo-workflows/v3/util/env"
)

// ExecutorRetry is a retry backoff settings for WorkflowExecutor
// Run Seconds
// 0 0.000
// 1 1.000
// 2 2.600
// 3 5.160
// 4 9.256
var (
Steps = env.LookupEnvIntOr("EXECUTOR_RETRY_BACKOFF_STEPS", 5)
Duration = env.LookupEnvDurationOr("EXECUTOR_RETRY_BACKOFF_DURATION", 1*time.Second)
Factor = env.LookupEnvFloatOr("EXECUTOR_RETRY_BACKOFF_FACTOR", 1.6)
Jitter = env.LookupEnvFloatOr("EXECUTOR_RETRY_BACKOFF_JITTER", 0.5)
ExecutorRetry = wait.Backoff{Steps: Steps, Duration: Duration, Factor: Factor, Jitter: Jitter}
)