Skip to content

Commit

Permalink
#1081 added retry logic to s3 load and save function (#1082)
Browse files Browse the repository at this point in the history
  • Loading branch information
kshamajain99 authored and jessesuen committed Nov 15, 2018
1 parent cb8b036 commit 438330c
Showing 1 changed file with 58 additions and 34 deletions.
92 changes: 58 additions & 34 deletions workflow/artifacts/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
log "github.com/sirupsen/logrus"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"k8s.io/apimachinery/pkg/util/wait"
"time"
)

// S3ArtifactDriver is a driver for AWS S3
Expand All @@ -31,42 +33,64 @@ func (s3Driver *S3ArtifactDriver) newS3Client() (argos3.S3Client, error) {

// Load downloads artifacts from S3 compliant storage
func (s3Driver *S3ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error {
s3cli, err := s3Driver.newS3Client()
if err != nil {
return err
}
origErr := s3cli.GetFile(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path)
if origErr == nil {
return nil
}
if !argos3.IsS3ErrCode(origErr, "NoSuchKey") {
return origErr
}
// If we get here, the error was a NoSuchKey. The key might be a s3 "directory"
isDir, err := s3cli.IsDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key)
if err != nil {
log.Warnf("Failed to test if %s is a directory: %v", inputArtifact.S3.Bucket, err)
return origErr
}
if !isDir {
// It's neither a file, nor a directory. Return the original NoSuchKey error
return origErr
}
return s3cli.GetDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path)
err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Millisecond * 10, Factor: 2.0, Steps: 5, Jitter: 0.1},
func() (bool, error) {

s3cli, err := s3Driver.newS3Client()
if err != nil {
log.Warnf("Failed to create new S3 client: %v", err)
return false, nil
}
origErr := s3cli.GetFile(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path)
if origErr == nil {
return true, nil
}
if !argos3.IsS3ErrCode(origErr, "NoSuchKey") {
return false, origErr
}
// If we get here, the error was a NoSuchKey. The key might be a s3 "directory"
isDir, err := s3cli.IsDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key)
if err != nil {
log.Warnf("Failed to test if %s is a directory: %v", inputArtifact.S3.Bucket, err)
return false, nil
}
if !isDir {
// It's neither a file, nor a directory. Return the original NoSuchKey error
return false, origErr
}

if err = s3cli.GetDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path); err != nil {
return false, nil
}
return true, nil
})

return err
}

// Save saves an artifact to S3 compliant storage
func (s3Driver *S3ArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) error {
s3cli, err := s3Driver.newS3Client()
if err != nil {
return err
}
isDir, err := file.IsDirectory(path)
if err != nil {
return err
}
if isDir {
return s3cli.PutDirectory(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path)
}
return s3cli.PutFile(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path)
err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Millisecond * 10, Factor: 2.0, Steps: 5, Jitter: 0.1},
func() (bool, error) {
s3cli, err := s3Driver.newS3Client()
if err != nil {
log.Warnf("Failed to create new S3 client: %v", err)
return false, nil
}
isDir, err := file.IsDirectory(path)
if err != nil {
log.Warnf("Failed to test if %s is a directory: %v", path, err)
return false, nil
}
if isDir {
if err = s3cli.PutDirectory(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path); err != nil {
return false, nil
}
}
if err = s3cli.PutFile(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path); err != nil {
return false, nil
}
return true, nil
})
return err
}

0 comments on commit 438330c

Please sign in to comment.