Skip to content

Commit

Permalink
chore: move IsTransientErr into isTransientS3Err.
Browse files Browse the repository at this point in the history
Signed-off-by: Roel van den Berg <roel.vandenberg@kadaster.nl>
  • Loading branch information
RoelvandenBerg committed Dec 16, 2021
1 parent eaf3556 commit a08f0c7
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
7 changes: 5 additions & 2 deletions workflow/artifacts/s3/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package s3

import argos3 "github.com/argoproj/pkg/s3"
import (
errorsutil "github.com/argoproj/argo-workflows/v3/util/errors"
argos3 "github.com/argoproj/pkg/s3"
)

// s3TransientErrorCodes is a list of S3 error codes that are transient (retryable)
// Reference: https://github.com/minio/minio-go/blob/92fe50d14294782d96402deb861d442992038109/retry.go#L90-L102
Expand All @@ -25,5 +28,5 @@ func isTransientS3Err(err error) bool {
return true
}
}
return false
return errorsutil.IsTransientErr(err)
}
21 changes: 10 additions & 11 deletions workflow/artifacts/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
envutil "github.com/argoproj/argo-workflows/v3/util/env"
errorsutil "github.com/argoproj/argo-workflows/v3/util/errors"
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
artifactscommon "github.com/argoproj/argo-workflows/v3/workflow/artifacts/common"
"github.com/argoproj/argo-workflows/v3/workflow/common"
Expand Down Expand Up @@ -79,7 +78,7 @@ func (s3Driver *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string)
log.Infof("S3 Load path: %s, key: %s", path, inputArtifact.S3.Key)
s3cli, err := s3Driver.newS3Client(ctx)
if err != nil {
return !(isTransientS3Err(err) || errorsutil.IsTransientErr(err)), fmt.Errorf("failed to create new S3 client: %v", err)
return !isTransientS3Err(err), fmt.Errorf("failed to create new S3 client: %v", err)
}
return loadS3Artifact(s3cli, inputArtifact, path)
})
Expand All @@ -96,20 +95,20 @@ func loadS3Artifact(s3cli argos3.S3Client, inputArtifact *wfv1.Artifact, path st
return true, nil
}
if !argos3.IsS3ErrCode(origErr, "NoSuchKey") {
return !(isTransientS3Err(origErr) || errorsutil.IsTransientErr(origErr)), fmt.Errorf("failed to get file: %v", origErr)
return !isTransientS3Err(origErr), fmt.Errorf("failed to get file: %v", origErr)
}
// If we get here, the error was a NoSuchKey. The key might be a s3 "directory"
isDir, err := s3cli.IsDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key)
if err != nil {
return !(isTransientS3Err(err) || errorsutil.IsTransientErr(err)), fmt.Errorf("failed to test if %s is a directory: %v", inputArtifact.S3.Key, err)
return !isTransientS3Err(err), fmt.Errorf("failed to test if %s is a directory: %v", inputArtifact.S3.Key, err)
}
if !isDir {
// It's neither a file, nor a directory. Return the original NoSuchKey error
return true, errors.New(errors.CodeNotFound, origErr.Error())
}

if err = s3cli.GetDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path); err != nil {
return !(isTransientS3Err(err) || errorsutil.IsTransientErr(err)), fmt.Errorf("failed to get directory: %v", err)
return !isTransientS3Err(err), fmt.Errorf("failed to get directory: %v", err)
}
return true, nil
}
Expand All @@ -124,7 +123,7 @@ func (s3Driver *ArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact)
log.Infof("S3 Save path: %s, key: %s", path, outputArtifact.S3.Key)
s3cli, err := s3Driver.newS3Client(ctx)
if err != nil {
return !(isTransientS3Err(err) || errorsutil.IsTransientErr(err)), fmt.Errorf("failed to create new S3 client: %v", err)
return !isTransientS3Err(err), fmt.Errorf("failed to create new S3 client: %v", err)
}
return saveS3Artifact(s3cli, path, outputArtifact)
})
Expand All @@ -148,17 +147,17 @@ func saveS3Artifact(s3cli argos3.S3Client, path string, outputArtifact *wfv1.Art
ObjectLocking: outputArtifact.S3.CreateBucketIfNotPresent.ObjectLocking,
})
if err != nil {
return !(isTransientS3Err(err) || errorsutil.IsTransientErr(err)), fmt.Errorf("failed to create bucket %s: %v", outputArtifact.S3.Bucket, err)
return !isTransientS3Err(err), fmt.Errorf("failed to create bucket %s: %v", outputArtifact.S3.Bucket, err)
}
}

if isDir {
if err = s3cli.PutDirectory(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path); err != nil {
return !(isTransientS3Err(err) || errorsutil.IsTransientErr(err)), fmt.Errorf("failed to put directory: %v", err)
return !isTransientS3Err(err), fmt.Errorf("failed to put directory: %v", err)
}
} else {
if err = s3cli.PutFile(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path); err != nil {
return !(isTransientS3Err(err) || errorsutil.IsTransientErr(err)), fmt.Errorf("failed to put file: %v", err)
return !isTransientS3Err(err), fmt.Errorf("failed to put file: %v", err)
}
}
return true, nil
Expand All @@ -173,11 +172,11 @@ func (s3Driver *ArtifactDriver) ListObjects(artifact *wfv1.Artifact) ([]string,
func() (bool, error) {
s3cli, err := s3Driver.newS3Client(ctx)
if err != nil {
return !(isTransientS3Err(err) || errorsutil.IsTransientErr(err)), fmt.Errorf("failed to create new S3 client: %v", err)
return !isTransientS3Err(err), fmt.Errorf("failed to create new S3 client: %v", err)
}
files, err = s3cli.ListDirectory(artifact.S3.Bucket, artifact.S3.Key)
if err != nil {
return !(isTransientS3Err(err) || errorsutil.IsTransientErr(err)), fmt.Errorf("failed to list directory: %v", err)
return !isTransientS3Err(err), fmt.Errorf("failed to list directory: %v", err)
}
return true, nil
})
Expand Down

0 comments on commit a08f0c7

Please sign in to comment.