diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index 0ec774e120e7..e38a58ed88c9 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -13,6 +13,7 @@ import ( "path" "path/filepath" "sort" + "strconv" "strings" "time" @@ -226,6 +227,64 @@ func (s *S3) snapshotRetention(ctx context.Context) error { return nil } +// listSnapshots provides a list of currently stored +// snapshots in S3 along with their relevant +// metadata. +func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) { + snapshots := make(map[string]snapshotFile) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var loo minio.ListObjectsOptions + if s.config.EtcdS3Folder != "" { + loo = minio.ListObjectsOptions{ + Prefix: s.config.EtcdS3Folder, + Recursive: true, + } + } + + objects := s.client.ListObjects(ctx, s.config.EtcdS3BucketName, loo) + + for obj := range objects { + if obj.Err != nil { + return nil, obj.Err + } + if obj.Size == 0 { + continue + } + + filename := path.Base(obj.Key) + basename, compressed := strings.CutSuffix(filename, compressedExtension) + ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) + if err != nil { + ts = obj.LastModified.Unix() + } + + sf := snapshotFile{ + Name: filename, + NodeName: "s3", + CreatedAt: &metav1.Time{ + Time: time.Unix(ts, 0), + }, + Size: obj.Size, + S3: &s3Config{ + Endpoint: s.config.EtcdS3Endpoint, + EndpointCA: s.config.EtcdS3EndpointCA, + SkipSSLVerify: s.config.EtcdS3SkipSSLVerify, + Bucket: s.config.EtcdS3BucketName, + Region: s.config.EtcdS3Region, + Folder: s.config.EtcdS3Folder, + Insecure: s.config.EtcdS3Insecure, + }, + Status: successfulSnapshotStatus, + Compressed: compressed, + } + sfKey := generateSnapshotConfigMapKey(sf) + snapshots[sfKey] = sf + } + return snapshots, nil +} + func readS3EndpointCA(endpointCA string) ([]byte, error) { ca, err := base64.StdEncoding.DecodeString(endpointCA) if err != nil { diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index e07b5a3740f7..d640b69eadb9 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -9,7 +9,6 @@ import ( "io" "math/rand" "os" - "path" "path/filepath" "runtime" "sort" @@ -436,71 +435,6 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) { return snapshots, nil } -// listS3Snapshots provides a list of currently stored -// snapshots in S3 along with their relevant -// metadata. -func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]snapshotFile, error) { - snapshots := make(map[string]snapshotFile) - - if e.config.EtcdS3 { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - if err := e.initS3IfNil(ctx); err != nil { - return nil, err - } - - var loo minio.ListObjectsOptions - if e.config.EtcdS3Folder != "" { - loo = minio.ListObjectsOptions{ - Prefix: e.config.EtcdS3Folder, - Recursive: true, - } - } - - objects := e.s3.client.ListObjects(ctx, e.config.EtcdS3BucketName, loo) - - for obj := range objects { - if obj.Err != nil { - return nil, obj.Err - } - if obj.Size == 0 { - continue - } - - filename := path.Base(obj.Key) - basename, compressed := strings.CutSuffix(filename, compressedExtension) - ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) - if err != nil { - ts = obj.LastModified.Unix() - } - - sf := snapshotFile{ - Name: filename, - NodeName: "s3", - CreatedAt: &metav1.Time{ - Time: time.Unix(ts, 0), - }, - Size: obj.Size, - S3: &s3Config{ - Endpoint: e.config.EtcdS3Endpoint, - EndpointCA: e.config.EtcdS3EndpointCA, - SkipSSLVerify: e.config.EtcdS3SkipSSLVerify, - Bucket: e.config.EtcdS3BucketName, - Region: e.config.EtcdS3Region, - Folder: e.config.EtcdS3Folder, - Insecure: e.config.EtcdS3Insecure, - }, - Status: successfulSnapshotStatus, - Compressed: compressed, - } - sfKey := generateSnapshotConfigMapKey(sf) - snapshots[sfKey] = sf - } - } - return snapshots, nil -} - // initS3IfNil initializes the S3 client // if it hasn't yet been initialized. func (e *ETCD) initS3IfNil(ctx context.Context) error { @@ -535,17 +469,33 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) error { } } } - return e.ReconcileSnapshotData(ctx) } // ListSnapshots is an exported wrapper method that wraps an // unexported method of the same name. func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, error) { + snapshotFiles := map[string]snapshotFile{} if e.config.EtcdS3 { - return e.listS3Snapshots(ctx) + if err := e.initS3IfNil(ctx); err != nil { + return nil, err + } + sfs, err := e.s3.listSnapshots(ctx) + if err != nil { + return nil, err + } + snapshotFiles = sfs + } + + sfs, err := e.listLocalSnapshots() + if err != nil { + return nil, err } - return e.listLocalSnapshots() + for k, sf := range sfs { + snapshotFiles[k] = sf + } + + return snapshotFiles, err } // deleteSnapshots removes the given snapshots from @@ -785,7 +735,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { var s3ListSuccessful bool if e.config.EtcdS3 { - if s3Snapshots, err := e.listS3Snapshots(ctx); err != nil { + if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil { logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err) } else { for k, v := range s3Snapshots {