Skip to content

Commit

Permalink
Tidy s3 upload functions
Browse files Browse the repository at this point in the history
Consistently refer to object keys as such, simplify error handling.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Oct 12, 2023
1 parent 2b0e2e8 commit f1afe15
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 94 deletions.
110 changes: 34 additions & 76 deletions pkg/etcd/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net/http"
"os"
"path"
"path/filepath"
"sort"
"strings"
Expand Down Expand Up @@ -96,89 +97,56 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) {
func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshotFile, error) {
logrus.Infof("Uploading snapshot %s to S3", snapshot)
basename := filepath.Base(snapshot)
var snapshotFileName string
var sf snapshotFile
if s.config.EtcdS3Folder != "" {
snapshotFileName = filepath.Join(s.config.EtcdS3Folder, basename)
} else {
snapshotFileName = basename
sf := &snapshotFile{
Name: basename,
NodeName: "s3",
CreatedAt: &metav1.Time{},
S3: &s3Config{
Endpoint: s.config.EtcdS3Endpoint,
EndpointCA: s.config.EtcdS3EndpointCA,
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
Bucket: s.config.EtcdS3BucketName,
Region: s.config.EtcdS3Region,
Folder: s.config.EtcdS3Folder,
Insecure: s.config.EtcdS3Insecure,
},
metadataSource: extraMetadata,
}

snapshotKey := path.Join(s.config.EtcdS3Folder, basename)

toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
opts := minio.PutObjectOptions{NumThreads: 2}
if strings.HasSuffix(snapshot, compressedExtension) {
opts.ContentType = "application/zip"
sf.Compressed = true
} else {
opts.ContentType = "application/octet-stream"
}
uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts)
uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotKey, snapshot, opts)
if err != nil {
sf = snapshotFile{
Name: filepath.Base(uploadInfo.Key),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: now,
},
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
Size: 0,
Status: failedSnapshotStatus,
S3: &s3Config{
Endpoint: s.config.EtcdS3Endpoint,
EndpointCA: s.config.EtcdS3EndpointCA,
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
Bucket: s.config.EtcdS3BucketName,
Region: s.config.EtcdS3Region,
Folder: s.config.EtcdS3Folder,
Insecure: s.config.EtcdS3Insecure,
},
metadataSource: extraMetadata,
}
logrus.Errorf("Error received during snapshot upload to S3: %s", err)
sf.CreatedAt.Time = now
sf.Status = failedSnapshotStatus
sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error()))
} else {
ca, err := time.Parse(time.RFC3339, uploadInfo.LastModified.Format(time.RFC3339))
if err != nil {
return nil, err
}

sf = snapshotFile{
Name: filepath.Base(uploadInfo.Key),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: ca,
},
Size: uploadInfo.Size,
Status: successfulSnapshotStatus,
S3: &s3Config{
Endpoint: s.config.EtcdS3Endpoint,
EndpointCA: s.config.EtcdS3EndpointCA,
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
Bucket: s.config.EtcdS3BucketName,
Region: s.config.EtcdS3Region,
Folder: s.config.EtcdS3Folder,
Insecure: s.config.EtcdS3Insecure,
},
metadataSource: extraMetadata,
}
sf.CreatedAt.Time = uploadInfo.LastModified
sf.Status = successfulSnapshotStatus
sf.Size = uploadInfo.Size
}
return &sf, nil
return sf, err
}

// download downloads the given snapshot from the configured S3
// compatible backend.
func (s *S3) Download(ctx context.Context) error {
var remotePath string
if s.config.EtcdS3Folder != "" {
remotePath = filepath.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath)
} else {
remotePath = s.config.ClusterResetRestorePath
}
snapshotKey := path.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath)

logrus.Debugf("retrieving snapshot: %s", remotePath)
logrus.Debugf("retrieving snapshot: %s", snapshotKey)
toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()

r, err := s.client.GetObject(toCtx, s.config.EtcdS3BucketName, remotePath, minio.GetObjectOptions{})
r, err := s.client.GetObject(toCtx, s.config.EtcdS3BucketName, snapshotKey, minio.GetObjectOptions{})
if err != nil {
return nil
}
Expand Down Expand Up @@ -213,14 +181,7 @@ func (s *S3) Download(ctx context.Context) error {
// snapshotPrefix returns the prefix used in the
// naming of the snapshots.
func (s *S3) snapshotPrefix() string {
fullSnapshotPrefix := s.config.EtcdSnapshotName
var prefix string
if s.config.EtcdS3Folder != "" {
prefix = filepath.Join(s.config.EtcdS3Folder, fullSnapshotPrefix)
} else {
prefix = fullSnapshotPrefix
}
return prefix
return path.Join(s.config.EtcdS3Folder, s.config.EtcdSnapshotName)
}

// snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
Expand Down Expand Up @@ -250,15 +211,12 @@ func (s *S3) snapshotRetention(ctx context.Context) error {
return nil
}

sort.Slice(snapshotFiles, func(firstSnapshot, secondSnapshot int) bool {
// it takes the key from the snapshot file ex: etcd-snapshot-example-{date}, makes the split using "-" to find the date, takes the date and sort by date
firstSnapshotName, secondSnapshotName := strings.Split(snapshotFiles[firstSnapshot].Key, "-"), strings.Split(snapshotFiles[secondSnapshot].Key, "-")
firstSnapshotDate, secondSnapshotDate := firstSnapshotName[len(firstSnapshotName)-1], secondSnapshotName[len(secondSnapshotName)-1]
return firstSnapshotDate < secondSnapshotDate
// sort newest-first so we can prune entries past the retention count
sort.Slice(snapshotFiles, func(i, j int) bool {
return snapshotFiles[j].LastModified.Before(snapshotFiles[i].LastModified)
})

delCount := len(snapshotFiles) - s.config.EtcdSnapshotRetention
for _, df := range snapshotFiles[:delCount] {
for _, df := range snapshotFiles[s.config.EtcdSnapshotRetention:] {
logrus.Infof("Removing S3 snapshot: %s", df.Key)
if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil {
return err
Expand Down
29 changes: 11 additions & 18 deletions pkg/etcd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ func (e *ETCD) Snapshot(ctx context.Context) error {

if e.config.EtcdS3 {
logrus.Infof("Saving etcd snapshot %s to S3", snapshotName)
// Set sf to nil so that we can attempt to now upload the snapshot to S3 if needed
sf = nil
if err := e.initS3IfNil(ctx); err != nil {
logrus.Warnf("Unable to initialize S3 client: %v", err)
sf = &snapshotFile{
Expand All @@ -336,21 +334,23 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
},
metadataSource: extraMetadata,
}
}
// sf should be nil if we were able to successfully initialize the S3 client.
if sf == nil {
} else {
// upload will return a snapshotFile even on error - if there was an
// error, it will be reflected in the status and message.
sf, err = e.s3.upload(ctx, snapshotPath, extraMetadata, now)
if err != nil {
return err
}
logrus.Infof("S3 upload complete for %s", snapshotName)
if err := e.s3.snapshotRetention(ctx); err != nil {
return errors.Wrap(err, "failed to apply s3 snapshot retention policy")
logrus.Errorf("Error received during snapshot upload to S3: %s", err)
} else {
logrus.Infof("S3 upload complete for %s", snapshotName)
}
}
if err := e.addSnapshotData(*sf); err != nil {
return errors.Wrap(err, "failed to save snapshot data to configmap")
}
if err := e.s3.snapshotRetention(ctx); err != nil {
logrus.Errorf("Failed to apply s3 snapshot retention policy: %v", err)
}

}
}

Expand Down Expand Up @@ -463,17 +463,11 @@ func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]snapshotFile, er
if obj.Size == 0 {
continue
}

ca, err := time.Parse(time.RFC3339, obj.LastModified.Format(time.RFC3339))
if err != nil {
return nil, err
}

sf := snapshotFile{
Name: filepath.Base(obj.Key),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: ca,
Time: obj.LastModified,
},
Size: obj.Size,
S3: &s3Config{
Expand Down Expand Up @@ -634,7 +628,6 @@ func marshalSnapshotFile(sf snapshotFile) ([]byte, error) {
if m, err := json.Marshal(sf.metadataSource.Data); err != nil {
logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err)
} else {
logrus.Tracef("Marshalled extra metadata in %s ConfigMap was: %s", snapshotExtraMetadataConfigMapName, string(m))
sf.Metadata = base64.StdEncoding.EncodeToString(m)
}
}
Expand Down

0 comments on commit f1afe15

Please sign in to comment.