From 438330c38da69a68d6b0b0b24f6aae0053fc35ee Mon Sep 17 00:00:00 2001 From: kshamajain99 Date: Thu, 15 Nov 2018 13:40:13 -0800 Subject: [PATCH] #1081 added retry logic to s3 load and save function (#1082) --- workflow/artifacts/s3/s3.go | 92 +++++++++++++++++++++++-------------- 1 file changed, 58 insertions(+), 34 deletions(-) diff --git a/workflow/artifacts/s3/s3.go b/workflow/artifacts/s3/s3.go index 9ea6e4ac68c7..cbbe325d9f3d 100644 --- a/workflow/artifacts/s3/s3.go +++ b/workflow/artifacts/s3/s3.go @@ -6,6 +6,8 @@ import ( log "github.com/sirupsen/logrus" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "k8s.io/apimachinery/pkg/util/wait" + "time" ) // S3ArtifactDriver is a driver for AWS S3 @@ -31,42 +33,64 @@ func (s3Driver *S3ArtifactDriver) newS3Client() (argos3.S3Client, error) { // Load downloads artifacts from S3 compliant storage func (s3Driver *S3ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error { - s3cli, err := s3Driver.newS3Client() - if err != nil { - return err - } - origErr := s3cli.GetFile(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path) - if origErr == nil { - return nil - } - if !argos3.IsS3ErrCode(origErr, "NoSuchKey") { - return origErr - } - // If we get here, the error was a NoSuchKey. The key might be a s3 "directory" - isDir, err := s3cli.IsDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key) - if err != nil { - log.Warnf("Failed to test if %s is a directory: %v", inputArtifact.S3.Bucket, err) - return origErr - } - if !isDir { - // It's neither a file, nor a directory. Return the original NoSuchKey error - return origErr - } - return s3cli.GetDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path) + err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Millisecond * 10, Factor: 2.0, Steps: 5, Jitter: 0.1}, + func() (bool, error) { + + s3cli, err := s3Driver.newS3Client() + if err != nil { + log.Warnf("Failed to create new S3 client: %v", err) + return false, nil + } + origErr := s3cli.GetFile(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path) + if origErr == nil { + return true, nil + } + if !argos3.IsS3ErrCode(origErr, "NoSuchKey") { + return false, origErr + } + // If we get here, the error was a NoSuchKey. The key might be a s3 "directory" + isDir, err := s3cli.IsDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key) + if err != nil { + log.Warnf("Failed to test if %s is a directory: %v", inputArtifact.S3.Bucket, err) + return false, nil + } + if !isDir { + // It's neither a file, nor a directory. Return the original NoSuchKey error + return false, origErr + } + + if err = s3cli.GetDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path); err != nil { + return false, nil + } + return true, nil + }) + + return err } // Save saves an artifact to S3 compliant storage func (s3Driver *S3ArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) error { - s3cli, err := s3Driver.newS3Client() - if err != nil { - return err - } - isDir, err := file.IsDirectory(path) - if err != nil { - return err - } - if isDir { - return s3cli.PutDirectory(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path) - } - return s3cli.PutFile(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path) + err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Millisecond * 10, Factor: 2.0, Steps: 5, Jitter: 0.1}, + func() (bool, error) { + s3cli, err := s3Driver.newS3Client() + if err != nil { + log.Warnf("Failed to create new S3 client: %v", err) + return false, nil + } + isDir, err := file.IsDirectory(path) + if err != nil { + log.Warnf("Failed to test if %s is a directory: %v", path, err) + return false, nil + } + if isDir { + if err = s3cli.PutDirectory(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path); err != nil { + return false, nil + } + } + if err = s3cli.PutFile(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path); err != nil { + return false, nil + } + return true, nil + }) + return err }