Skip to content

Commit

Permalink
allow e2d to keep snapshot history and configure retention policies, …
Browse files Browse the repository at this point in the history
…allow single node cluster to restart with on-disk data instead of restoring old snapshot/starting fresh every time
  • Loading branch information
thecubed committed May 21, 2021
1 parent ef539cc commit d542651
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 135 deletions.
38 changes: 22 additions & 16 deletions cmd/e2d/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"go.uber.org/zap/zapcore"
"math"
"strings"
"time"

Expand Down Expand Up @@ -41,10 +42,11 @@ type runOptions struct {

PeerDiscovery string `env:"E2D_PEER_DISCOVERY"`

SnapshotBackupURL string `env:"E2D_SNAPSHOT_BACKUP_URL"`
SnapshotCompression bool `env:"E2D_SNAPSHOT_COMPRESSION"`
SnapshotEncryption bool `env:"E2D_SNAPSHOT_ENCRYPTION"`
SnapshotInterval time.Duration `env:"E2D_SNAPSHOT_INTERVAL"`
SnapshotBackupURL string `env:"E2D_SNAPSHOT_BACKUP_URL"`
SnapshotCompression bool `env:"E2D_SNAPSHOT_COMPRESSION"`
SnapshotEncryption bool `env:"E2D_SNAPSHOT_ENCRYPTION"`
SnapshotInterval time.Duration `env:"E2D_SNAPSHOT_INTERVAL"`
SnapshotRetentionTime time.Duration `env:"E2D_SNAPSHOT_RETENTION_TIME"`

AWSAccessKey string `env:"E2D_AWS_ACCESS_KEY"`
AWSSecretKey string `env:"E2D_AWS_SECRET_KEY"`
Expand All @@ -67,17 +69,17 @@ func newRunCmd() *cobra.Command {
}
peerGetter, err := getPeerGetter(o)
if err != nil {
log.Fatalf("%+v", err)
log.Fatal("unable to get peer getter", zap.Error(err))
}

baddrs, err := getInitialBootstrapAddrs(o, peerGetter)
if err != nil {
log.Fatalf("%+v", err)
log.Fatal("unable to get initial bootstrap addresses", zap.Error(err))
}

snapshotter, err := getSnapshotProvider(o)
if err != nil {
log.Fatalf("%+v", err)
log.Fatal("unable to set up snapshot provider", zap.Error(err))
}

m, err := manager.New(&manager.Config{
Expand Down Expand Up @@ -141,10 +143,11 @@ func newRunCmd() *cobra.Command {

cmd.Flags().StringVar(&o.PeerDiscovery, "peer-discovery", "", "which method {aws-autoscaling-group,ec2-tags,do-tags} to use to discover peers")

cmd.Flags().DurationVar(&o.SnapshotInterval, "snapshot-interval", 1*time.Minute, "frequency of etcd snapshots")
cmd.Flags().StringVar(&o.SnapshotBackupURL, "snapshot-backup-url", "", "an absolute path to shared filesystem storage (like file:///etcd-backups) or cloud storage bucket (like s3://etcd-backups) for snapshot backups")
cmd.Flags().DurationVar(&o.SnapshotInterval, "snapshot-interval", 25*time.Minute, "frequency of etcd snapshots")
cmd.Flags().StringVar(&o.SnapshotBackupURL, "snapshot-url", "", "an absolute path to shared filesystem directory (like file:///tmp/etcd-backups/) or cloud storage bucket (like s3://etcd-backups/mycluster/) for snapshot backups. snapshots will be named etcd.snapshot.<timestamp>, and a file etcd.snapshot.LATEST will point to the most recent snapshot.")
cmd.Flags().BoolVar(&o.SnapshotCompression, "snapshot-compression", false, "compression snapshots with gzip")
cmd.Flags().BoolVar(&o.SnapshotEncryption, "snapshot-encryption", false, "encrypt snapshots with aes-256")
cmd.Flags().DurationVar(&o.SnapshotRetentionTime, "snapshot-retention-time", 24*time.Hour, "maximum age of a snapshot before it is deleted, set this to nonzero to enable retention support")

cmd.Flags().StringVar(&o.AWSAccessKey, "aws-access-key", "", "")
cmd.Flags().StringVar(&o.AWSSecretKey, "aws-secret-key", "", "")
Expand Down Expand Up @@ -238,18 +241,21 @@ func getSnapshotProvider(o *runOptions) (snapshot.Snapshotter, error) {

switch u.Type {
case snapshot.FileType:
return snapshot.NewFileSnapshotter(u.Path)
return snapshot.NewFileSnapshotter(u.Path, o.SnapshotRetentionTime)
case snapshot.S3Type:
origSnapshotRetentionDays := o.SnapshotRetentionTime.Hours() / 24
snapshotRetentionDays := int64(math.Ceil(origSnapshotRetentionDays))
if int64(origSnapshotRetentionDays) != snapshotRetentionDays {
log.Warn("S3 retention time rounded to the nearest day",
zap.Float64("original-days", origSnapshotRetentionDays),
zap.Int64("new-days", snapshotRetentionDays),
)
}
return snapshot.NewAmazonSnapshotter(&snapshot.AmazonConfig{
RoleSessionName: o.AWSRoleSessionName,
Bucket: u.Bucket,
Key: u.Path,
})
case snapshot.SpacesType:
return snapshot.NewDigitalOceanSnapshotter(&snapshot.DigitalOceanConfig{
SpacesURL: o.SnapshotBackupURL,
SpacesAccessKey: o.DOSpacesKey,
SpacesSecretKey: o.DOSpacesSecret,
RetentionDays: snapshotRetentionDays,
})
default:
return nil, errors.Errorf("unsupported snapshot url format: %#v", o.SnapshotBackupURL)
Expand Down
34 changes: 27 additions & 7 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (m *Manager) restoreFromSnapshot(peers []*Peer) (bool, error) {
// cluster, by conveying information about whether this is a brand new cluster
// or an existing cluster that recovered from total cluster failure.
func (m *Manager) startEtcdCluster(peers []*Peer) error {
snapshot, err := m.restoreFromSnapshot(peers)
restored, err := m.restoreFromSnapshot(peers)
if err != nil {
log.Error("cannot restore snapshot", zap.Error(err))
}
Expand All @@ -199,7 +199,7 @@ func (m *Manager) startEtcdCluster(peers []*Peer) error {
if err := m.etcd.startNew(ctx, peers); err != nil {
return err
}
if !snapshot {
if !restored {
return nil
}

Expand Down Expand Up @@ -449,6 +449,7 @@ func (m *Manager) runSnapshotter() {
log.Info("snapshotting disabled: no snapshot backup set")
return
}

log.Debug("starting snapshotter")
ticker := time.NewTicker(m.cfg.SnapshotInterval)
defer ticker.Stop()
Expand All @@ -459,7 +460,7 @@ func (m *Manager) runSnapshotter() {
select {
case <-ticker.C:
if m.etcd.isRestarting() {
log.Debug("server is restarting, skipping snapshot backup")
log.Warn("server is restarting, skipping snapshot backup")
continue
}
if !m.etcd.isLeader() {
Expand All @@ -469,7 +470,7 @@ func (m *Manager) runSnapshotter() {
log.Debug("starting snapshot backup")
snapshotData, snapshotSize, rev, err := m.etcd.createSnapshot(latestRev)
if err != nil {
log.Debug("cannot create snapshot",
log.Info("skipping snapshot, etcd revision hasn't changed since last snapshot",
zap.String("name", shortName(m.cfg.Name)),
zap.Error(err),
)
Expand All @@ -482,7 +483,7 @@ func (m *Manager) runSnapshotter() {
snapshotData = snapshotutil.NewGzipReadCloser(snapshotData)
}
if err := m.snapshotter.Save(snapshotData); err != nil {
log.Debug("cannot save snapshot",
log.Error("cannot save snapshot",
zap.String("name", shortName(m.cfg.Name)),
zap.Error(err),
)
Expand All @@ -509,9 +510,28 @@ func (m *Manager) Run() error {
case 1:
// a single-node etcd cluster does not require gossip or need to wait for
// other members and therefore can start immediately
if err := m.startEtcdCluster([]*Peer{{m.cfg.Name, m.cfg.PeerURL.String()}}); err != nil {
return err
peers := []*Peer{{m.cfg.Name, m.cfg.PeerURL.String()}}
if _, err := os.Lstat(m.cfg.Dir); err == nil {
// we have a data directory, attempt to use that
ctx, cancel := context.WithTimeout(m.ctx, 5*time.Minute)
defer cancel()
// try to use the on-disk data, and if it fails, fall back to restoring / creating new cluster
if err := m.etcd.joinExisting(ctx, peers); err != nil {
log.Error("unable to start from the on-disk etcd data, attempting to restore from snapshot or create new cluster", zap.Error(err))
if err := m.startEtcdCluster(peers); err != nil {
log.Error("unable to restore from backup and unable to create a new cluster", zap.Error(err))
return err
}
}
} else {
// we might be either a brand new cluster of size one, or one that was recently started
// after a node failure. try restoring from snapshot to recover, or create a new blank cluster instead.
if err := m.startEtcdCluster(peers); err != nil {
log.Error("no data directory exists, and unable to start new cluster or restore from backup", zap.Error(err))
return err
}
}

case 3, 5:
// all multi-node clusters require the gossip network to be started
if err := m.gossip.Start(m.ctx, m.cfg.BootstrapAddrs); err != nil {
Expand Down
52 changes: 26 additions & 26 deletions pkg/snapshot/snapshot.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package snapshot

import (
"encoding/json"
"io"
"net/url"
"path/filepath"
Expand Down Expand Up @@ -35,9 +36,11 @@ type Type int
const (
FileType Type = iota
S3Type
SpacesType
)

const snapshotFilename = "etcd.snapshot"
const latestSuffix = "LATEST"

type URL struct {
Type Type
Bucket string
Expand All @@ -46,14 +49,28 @@ type URL struct {

var (
ErrInvalidScheme = errors.New("invalid scheme")
ErrInvalidDirectoryPath = errors.New("path must be a directory")
ErrCannotParseURL = errors.New("cannot parse url")
)

type LatestFile struct {
Path string
Timestamp string
}

func (l *LatestFile) generate() ([]byte, error) {
content, err := json.Marshal(&l)
return content, err
}

func (l *LatestFile) read(input []byte) error {
return json.Unmarshal(input, l)
}

// ParseSnapshotBackupURL deconstructs a uri into a type prefix and a bucket
// example inputs and outputs:
// file://file -> file://, file
// s3://bucket -> s3://, bucket
// https://nyc3.digitaloceanspaces.com/bucket -> digitaloceanspaces, bucket
func ParseSnapshotBackupURL(s string) (*URL, error) {
if !hasValidScheme(s) {
return nil, errors.Wrapf(ErrInvalidScheme, "url does not specify valid scheme: %#v", s)
Expand All @@ -65,40 +82,23 @@ func ParseSnapshotBackupURL(s string) (*URL, error) {

switch strings.ToLower(u.Scheme) {
case "file":
if !strings.HasSuffix(u.Path, string(filepath.Separator)) {
return nil, ErrInvalidDirectoryPath
}
return &URL{
Type: FileType,
Path: filepath.Join(u.Host, u.Path),
}, nil
case "s3":
if u.Path == "" {
u.Path = "etcd.snapshot"
path := strings.TrimPrefix(u.Path, "/")
if !strings.HasSuffix(path, "/") && path != "" {
return nil, ErrInvalidDirectoryPath
}
return &URL{
Type: S3Type,
Bucket: u.Host,
Path: strings.TrimPrefix(u.Path, "/"),
Path: path,
}, nil
case "http", "https":
if strings.Contains(u.Host, "digitaloceanspaces") {
bucket, path := parseBucketKey(strings.TrimPrefix(u.Path, "/"))
return &URL{
Type: SpacesType,
Bucket: bucket,
Path: path,
}, nil
}
}
return nil, errors.Wrap(ErrCannotParseURL, s)
}

func parseBucketKey(s string) (string, string) {
parts := strings.SplitN(s, "/", 2)
switch len(parts) {
case 1:
return parts[0], "etcd.snapshot"
case 2:
return parts[0], parts[1]
default:
return "", ""
}
}
Loading

0 comments on commit d542651

Please sign in to comment.