Skip to content

Commit

Permalink
Move s3 snapshot list functionality to s3.go
Browse files Browse the repository at this point in the history
Also, don't list ONLY s3 snapshots if S3 is enabled.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Oct 12, 2023
1 parent 8d47645 commit 80f909d
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 70 deletions.
59 changes: 59 additions & 0 deletions pkg/etcd/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -226,6 +227,64 @@ func (s *S3) snapshotRetention(ctx context.Context) error {
return nil
}

// listSnapshots provides a list of currently stored
// snapshots in S3 along with their relevant
// metadata.
func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) {
snapshots := make(map[string]snapshotFile)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var loo minio.ListObjectsOptions
if s.config.EtcdS3Folder != "" {
loo = minio.ListObjectsOptions{
Prefix: s.config.EtcdS3Folder,
Recursive: true,
}
}

objects := s.client.ListObjects(ctx, s.config.EtcdS3BucketName, loo)

for obj := range objects {
if obj.Err != nil {
return nil, obj.Err
}
if obj.Size == 0 {
continue
}

filename := path.Base(obj.Key)
basename, compressed := strings.CutSuffix(filename, compressedExtension)
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
if err != nil {
ts = obj.LastModified.Unix()
}

sf := snapshotFile{
Name: filename,
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: time.Unix(ts, 0),
},
Size: obj.Size,
S3: &s3Config{
Endpoint: s.config.EtcdS3Endpoint,
EndpointCA: s.config.EtcdS3EndpointCA,
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
Bucket: s.config.EtcdS3BucketName,
Region: s.config.EtcdS3Region,
Folder: s.config.EtcdS3Folder,
Insecure: s.config.EtcdS3Insecure,
},
Status: successfulSnapshotStatus,
Compressed: compressed,
}
sfKey := generateSnapshotConfigMapKey(sf)
snapshots[sfKey] = sf
}
return snapshots, nil
}

func readS3EndpointCA(endpointCA string) ([]byte, error) {
ca, err := base64.StdEncoding.DecodeString(endpointCA)
if err != nil {
Expand Down
90 changes: 20 additions & 70 deletions pkg/etcd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"io"
"math/rand"
"os"
"path"
"path/filepath"
"runtime"
"sort"
Expand Down Expand Up @@ -436,71 +435,6 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) {
return snapshots, nil
}

// listS3Snapshots provides a list of currently stored
// snapshots in S3 along with their relevant
// metadata.
func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]snapshotFile, error) {
snapshots := make(map[string]snapshotFile)

if e.config.EtcdS3 {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if err := e.initS3IfNil(ctx); err != nil {
return nil, err
}

var loo minio.ListObjectsOptions
if e.config.EtcdS3Folder != "" {
loo = minio.ListObjectsOptions{
Prefix: e.config.EtcdS3Folder,
Recursive: true,
}
}

objects := e.s3.client.ListObjects(ctx, e.config.EtcdS3BucketName, loo)

for obj := range objects {
if obj.Err != nil {
return nil, obj.Err
}
if obj.Size == 0 {
continue
}

filename := path.Base(obj.Key)
basename, compressed := strings.CutSuffix(filename, compressedExtension)
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
if err != nil {
ts = obj.LastModified.Unix()
}

sf := snapshotFile{
Name: filename,
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: time.Unix(ts, 0),
},
Size: obj.Size,
S3: &s3Config{
Endpoint: e.config.EtcdS3Endpoint,
EndpointCA: e.config.EtcdS3EndpointCA,
SkipSSLVerify: e.config.EtcdS3SkipSSLVerify,
Bucket: e.config.EtcdS3BucketName,
Region: e.config.EtcdS3Region,
Folder: e.config.EtcdS3Folder,
Insecure: e.config.EtcdS3Insecure,
},
Status: successfulSnapshotStatus,
Compressed: compressed,
}
sfKey := generateSnapshotConfigMapKey(sf)
snapshots[sfKey] = sf
}
}
return snapshots, nil
}

// initS3IfNil initializes the S3 client
// if it hasn't yet been initialized.
func (e *ETCD) initS3IfNil(ctx context.Context) error {
Expand Down Expand Up @@ -535,17 +469,33 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) error {
}
}
}

return e.ReconcileSnapshotData(ctx)
}

// ListSnapshots is an exported wrapper method that wraps an
// unexported method of the same name.
func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, error) {
snapshotFiles := map[string]snapshotFile{}
if e.config.EtcdS3 {
return e.listS3Snapshots(ctx)
if err := e.initS3IfNil(ctx); err != nil {
return nil, err
}
sfs, err := e.s3.listSnapshots(ctx)
if err != nil {
return nil, err
}
snapshotFiles = sfs
}

sfs, err := e.listLocalSnapshots()
if err != nil {
return nil, err
}
return e.listLocalSnapshots()
for k, sf := range sfs {
snapshotFiles[k] = sf
}

return snapshotFiles, err
}

// deleteSnapshots removes the given snapshots from
Expand Down Expand Up @@ -785,7 +735,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
var s3ListSuccessful bool

if e.config.EtcdS3 {
if s3Snapshots, err := e.listS3Snapshots(ctx); err != nil {
if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil {
logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err)
} else {
for k, v := range s3Snapshots {
Expand Down

0 comments on commit 80f909d

Please sign in to comment.