diff --git a/cmd/e2d/app/run.go b/cmd/e2d/app/run.go index caa6ab6..02515f8 100644 --- a/cmd/e2d/app/run.go +++ b/cmd/e2d/app/run.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "go.uber.org/zap/zapcore" + "math" "strings" "time" @@ -41,10 +42,11 @@ type runOptions struct { PeerDiscovery string `env:"E2D_PEER_DISCOVERY"` - SnapshotBackupURL string `env:"E2D_SNAPSHOT_BACKUP_URL"` - SnapshotCompression bool `env:"E2D_SNAPSHOT_COMPRESSION"` - SnapshotEncryption bool `env:"E2D_SNAPSHOT_ENCRYPTION"` - SnapshotInterval time.Duration `env:"E2D_SNAPSHOT_INTERVAL"` + SnapshotBackupURL string `env:"E2D_SNAPSHOT_BACKUP_URL"` + SnapshotCompression bool `env:"E2D_SNAPSHOT_COMPRESSION"` + SnapshotEncryption bool `env:"E2D_SNAPSHOT_ENCRYPTION"` + SnapshotInterval time.Duration `env:"E2D_SNAPSHOT_INTERVAL"` + SnapshotRetentionTime time.Duration `env:"E2D_SNAPSHOT_RETENTION_TIME"` AWSAccessKey string `env:"E2D_AWS_ACCESS_KEY"` AWSSecretKey string `env:"E2D_AWS_SECRET_KEY"` @@ -67,17 +69,17 @@ func newRunCmd() *cobra.Command { } peerGetter, err := getPeerGetter(o) if err != nil { - log.Fatalf("%+v", err) + log.Fatal("unable to get peer getter", zap.Error(err)) } baddrs, err := getInitialBootstrapAddrs(o, peerGetter) if err != nil { - log.Fatalf("%+v", err) + log.Fatal("unable to get initial bootstrap addresses", zap.Error(err)) } snapshotter, err := getSnapshotProvider(o) if err != nil { - log.Fatalf("%+v", err) + log.Fatal("unable to set up snapshot provider", zap.Error(err)) } m, err := manager.New(&manager.Config{ @@ -141,10 +143,11 @@ func newRunCmd() *cobra.Command { cmd.Flags().StringVar(&o.PeerDiscovery, "peer-discovery", "", "which method {aws-autoscaling-group,ec2-tags,do-tags} to use to discover peers") - cmd.Flags().DurationVar(&o.SnapshotInterval, "snapshot-interval", 1*time.Minute, "frequency of etcd snapshots") - cmd.Flags().StringVar(&o.SnapshotBackupURL, "snapshot-backup-url", "", "an absolute path to shared filesystem storage (like file:///etcd-backups) or cloud storage bucket (like s3://etcd-backups) for snapshot backups") + cmd.Flags().DurationVar(&o.SnapshotInterval, "snapshot-interval", 25*time.Minute, "frequency of etcd snapshots") + cmd.Flags().StringVar(&o.SnapshotBackupURL, "snapshot-url", "", "an absolute path to shared filesystem directory (like file:///tmp/etcd-backups/) or cloud storage bucket (like s3://etcd-backups/mycluster/) for snapshot backups. snapshots will be named etcd.snapshot., and a file etcd.snapshot.LATEST will point to the most recent snapshot.") cmd.Flags().BoolVar(&o.SnapshotCompression, "snapshot-compression", false, "compression snapshots with gzip") cmd.Flags().BoolVar(&o.SnapshotEncryption, "snapshot-encryption", false, "encrypt snapshots with aes-256") + cmd.Flags().DurationVar(&o.SnapshotRetentionTime, "snapshot-retention-time", 24*time.Hour, "maximum age of a snapshot before it is deleted, set this to nonzero to enable retention support") cmd.Flags().StringVar(&o.AWSAccessKey, "aws-access-key", "", "") cmd.Flags().StringVar(&o.AWSSecretKey, "aws-secret-key", "", "") @@ -238,18 +241,21 @@ func getSnapshotProvider(o *runOptions) (snapshot.Snapshotter, error) { switch u.Type { case snapshot.FileType: - return snapshot.NewFileSnapshotter(u.Path) + return snapshot.NewFileSnapshotter(u.Path, o.SnapshotRetentionTime) case snapshot.S3Type: + origSnapshotRetentionDays := o.SnapshotRetentionTime.Hours() / 24 + snapshotRetentionDays := int64(math.Ceil(origSnapshotRetentionDays)) + if int64(origSnapshotRetentionDays) != snapshotRetentionDays { + log.Warn("S3 retention time rounded to the nearest day", + zap.Float64("original-days", origSnapshotRetentionDays), + zap.Int64("new-days", snapshotRetentionDays), + ) + } return snapshot.NewAmazonSnapshotter(&snapshot.AmazonConfig{ RoleSessionName: o.AWSRoleSessionName, Bucket: u.Bucket, Key: u.Path, - }) - case snapshot.SpacesType: - return snapshot.NewDigitalOceanSnapshotter(&snapshot.DigitalOceanConfig{ - SpacesURL: o.SnapshotBackupURL, - SpacesAccessKey: o.DOSpacesKey, - SpacesSecretKey: o.DOSpacesSecret, + RetentionDays: snapshotRetentionDays, }) default: return nil, errors.Errorf("unsupported snapshot url format: %#v", o.SnapshotBackupURL) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 1052cb2..cf0c80f 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -189,7 +189,7 @@ func (m *Manager) restoreFromSnapshot(peers []*Peer) (bool, error) { // cluster, by conveying information about whether this is a brand new cluster // or an existing cluster that recovered from total cluster failure. func (m *Manager) startEtcdCluster(peers []*Peer) error { - snapshot, err := m.restoreFromSnapshot(peers) + restored, err := m.restoreFromSnapshot(peers) if err != nil { log.Error("cannot restore snapshot", zap.Error(err)) } @@ -199,7 +199,7 @@ func (m *Manager) startEtcdCluster(peers []*Peer) error { if err := m.etcd.startNew(ctx, peers); err != nil { return err } - if !snapshot { + if !restored { return nil } @@ -449,6 +449,7 @@ func (m *Manager) runSnapshotter() { log.Info("snapshotting disabled: no snapshot backup set") return } + log.Debug("starting snapshotter") ticker := time.NewTicker(m.cfg.SnapshotInterval) defer ticker.Stop() @@ -459,7 +460,7 @@ func (m *Manager) runSnapshotter() { select { case <-ticker.C: if m.etcd.isRestarting() { - log.Debug("server is restarting, skipping snapshot backup") + log.Warn("server is restarting, skipping snapshot backup") continue } if !m.etcd.isLeader() { @@ -469,7 +470,7 @@ func (m *Manager) runSnapshotter() { log.Debug("starting snapshot backup") snapshotData, snapshotSize, rev, err := m.etcd.createSnapshot(latestRev) if err != nil { - log.Debug("cannot create snapshot", + log.Info("skipping snapshot, etcd revision hasn't changed since last snapshot", zap.String("name", shortName(m.cfg.Name)), zap.Error(err), ) @@ -482,7 +483,7 @@ func (m *Manager) runSnapshotter() { snapshotData = snapshotutil.NewGzipReadCloser(snapshotData) } if err := m.snapshotter.Save(snapshotData); err != nil { - log.Debug("cannot save snapshot", + log.Error("cannot save snapshot", zap.String("name", shortName(m.cfg.Name)), zap.Error(err), ) @@ -509,9 +510,28 @@ func (m *Manager) Run() error { case 1: // a single-node etcd cluster does not require gossip or need to wait for // other members and therefore can start immediately - if err := m.startEtcdCluster([]*Peer{{m.cfg.Name, m.cfg.PeerURL.String()}}); err != nil { - return err + peers := []*Peer{{m.cfg.Name, m.cfg.PeerURL.String()}} + if _, err := os.Lstat(m.cfg.Dir); err == nil { + // we have a data directory, attempt to use that + ctx, cancel := context.WithTimeout(m.ctx, 5*time.Minute) + defer cancel() + // try to use the on-disk data, and if it fails, fall back to restoring / creating new cluster + if err := m.etcd.joinExisting(ctx, peers); err != nil { + log.Error("unable to start from the on-disk etcd data, attempting to restore from snapshot or create new cluster", zap.Error(err)) + if err := m.startEtcdCluster(peers); err != nil { + log.Error("unable to restore from backup and unable to create a new cluster", zap.Error(err)) + return err + } + } + } else { + // we might be either a brand new cluster of size one, or one that was recently started + // after a node failure. try restoring from snapshot to recover, or create a new blank cluster instead. + if err := m.startEtcdCluster(peers); err != nil { + log.Error("no data directory exists, and unable to start new cluster or restore from backup", zap.Error(err)) + return err + } } + case 3, 5: // all multi-node clusters require the gossip network to be started if err := m.gossip.Start(m.ctx, m.cfg.BootstrapAddrs); err != nil { diff --git a/pkg/snapshot/snapshot.go b/pkg/snapshot/snapshot.go index dbe4add..8320aaf 100644 --- a/pkg/snapshot/snapshot.go +++ b/pkg/snapshot/snapshot.go @@ -1,6 +1,7 @@ package snapshot import ( + "encoding/json" "io" "net/url" "path/filepath" @@ -35,9 +36,11 @@ type Type int const ( FileType Type = iota S3Type - SpacesType ) +const snapshotFilename = "etcd.snapshot" +const latestSuffix = "LATEST" + type URL struct { Type Type Bucket string @@ -46,14 +49,28 @@ type URL struct { var ( ErrInvalidScheme = errors.New("invalid scheme") + ErrInvalidDirectoryPath = errors.New("path must be a directory") ErrCannotParseURL = errors.New("cannot parse url") ) +type LatestFile struct { + Path string + Timestamp string +} + +func (l *LatestFile) generate() ([]byte, error) { + content, err := json.Marshal(&l) + return content, err +} + +func (l *LatestFile) read(input []byte) error { + return json.Unmarshal(input, l) +} + // ParseSnapshotBackupURL deconstructs a uri into a type prefix and a bucket // example inputs and outputs: // file://file -> file://, file // s3://bucket -> s3://, bucket -// https://nyc3.digitaloceanspaces.com/bucket -> digitaloceanspaces, bucket func ParseSnapshotBackupURL(s string) (*URL, error) { if !hasValidScheme(s) { return nil, errors.Wrapf(ErrInvalidScheme, "url does not specify valid scheme: %#v", s) @@ -65,40 +82,23 @@ func ParseSnapshotBackupURL(s string) (*URL, error) { switch strings.ToLower(u.Scheme) { case "file": + if !strings.HasSuffix(u.Path, string(filepath.Separator)) { + return nil, ErrInvalidDirectoryPath + } return &URL{ Type: FileType, Path: filepath.Join(u.Host, u.Path), }, nil case "s3": - if u.Path == "" { - u.Path = "etcd.snapshot" + path := strings.TrimPrefix(u.Path, "/") + if !strings.HasSuffix(path, "/") && path != "" { + return nil, ErrInvalidDirectoryPath } return &URL{ Type: S3Type, Bucket: u.Host, - Path: strings.TrimPrefix(u.Path, "/"), + Path: path, }, nil - case "http", "https": - if strings.Contains(u.Host, "digitaloceanspaces") { - bucket, path := parseBucketKey(strings.TrimPrefix(u.Path, "/")) - return &URL{ - Type: SpacesType, - Bucket: bucket, - Path: path, - }, nil - } } return nil, errors.Wrap(ErrCannotParseURL, s) } - -func parseBucketKey(s string) (string, string) { - parts := strings.SplitN(s, "/", 2) - switch len(parts) { - case 1: - return parts[0], "etcd.snapshot" - case 2: - return parts[0], parts[1] - default: - return "", "" - } -} diff --git a/pkg/snapshot/snapshot_aws.go b/pkg/snapshot/snapshot_aws.go index 228316e..f1fde68 100644 --- a/pkg/snapshot/snapshot_aws.go +++ b/pkg/snapshot/snapshot_aws.go @@ -1,7 +1,10 @@ package snapshot import ( + "bytes" "context" + "fmt" + "go.uber.org/zap" "io" "io/ioutil" "net/http" @@ -12,6 +15,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/criticalstack/e2d/pkg/log" e2daws "github.com/criticalstack/e2d/pkg/provider/aws" "github.com/pkg/errors" ) @@ -27,6 +31,7 @@ type AmazonConfig struct { RoleSessionName string Bucket string Key string + RetentionDays int64 } type AmazonSnapshotter struct { @@ -42,16 +47,17 @@ func NewAmazonSnapshotter(cfg *AmazonConfig) (*AmazonSnapshotter, error) { if err != nil { return nil, err } - return newAmazonSnapshotter(awsCfg, cfg.Bucket, cfg.Key) + return newAmazonSnapshotter(awsCfg, cfg.Bucket, cfg.Key, cfg.RetentionDays) } -func newAmazonSnapshotter(cfg *aws.Config, bucket, key string) (*AmazonSnapshotter, error) { +func newAmazonSnapshotter(cfg *aws.Config, bucket, key string, retentionDays int64) (*AmazonSnapshotter, error) { sess, err := session.NewSession(cfg) if err != nil { return nil, err } + s3conn := s3.New(sess) s := &AmazonSnapshotter{ - S3: s3.New(sess), + S3: s3conn, Downloader: s3manager.NewDownloader(sess), Uploader: s3manager.NewUploader(sess), bucket: bucket, @@ -75,6 +81,34 @@ func newAmazonSnapshotter(cfg *aws.Config, bucket, key string) (*AmazonSnapshott } } } + + // optionally setup retention + if retentionDays > 0 { + // TODO: figure out how to prevent deleting snapshots from s3 if etcd hasn't written a snapshot in a while + input := &s3.PutBucketLifecycleConfigurationInput{ + Bucket: aws.String(bucket), + LifecycleConfiguration: &s3.BucketLifecycleConfiguration{ + Rules: []*s3.LifecycleRule{ + { + Expiration: &s3.LifecycleExpiration{ + Days: aws.Int64(retentionDays), + }, + Filter: &s3.LifecycleRuleFilter{ + Prefix: aws.String(key), + }, + ID: aws.String(fmt.Sprintf("E2DLifecycle-%s", key)), + Status: aws.String("Enabled"), + }, + }, + }, + } + + _, err := s3conn.PutBucketLifecycleConfiguration(input) + if err != nil { + return nil, errors.Wrap(err, "unable to put bucket lifecycle policy") + } + } + return s, nil } @@ -85,9 +119,32 @@ func (s *AmazonSnapshotter) Load() (io.ReadCloser, error) { } ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() + + // generate the filename to the snapshot pointer file + latestPath := s.key + fmt.Sprintf("%s.%s", snapshotFilename, latestSuffix) + + // download the latest snapshot pointer file + var latestFilePath string + buf := aws.NewWriteAtBuffer([]byte{}) + if _, err = s.DownloadWithContext(ctx, buf, &s3.GetObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(latestPath), + }); err == nil { + l := &LatestFile{} + err := l.read(buf.Bytes()) + if err != nil { + return nil, errors.Wrap(err, "unable to unmarshal latest backup pointer file") + } + log.Debug("Received latestFile", zap.String("path", l.Path), zap.String("timestamp", l.Timestamp)) + latestFilePath = l.Path + } else { + return nil, errors.Wrap(err, "unable to retrieve latest backup pointer file") + } + + // download the latest snapshot if _, err = s.DownloadWithContext(ctx, tmpFile, &s3.GetObjectInput{ Bucket: aws.String(s.bucket), - Key: aws.String(s.key), + Key: aws.String(latestFilePath), }); err != nil { tmpFile.Close() return nil, errors.Wrapf(err, "cannot download file: %v", s.key) @@ -102,10 +159,35 @@ func (s *AmazonSnapshotter) Save(r io.ReadCloser) error { defer r.Close() ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() + + // generate the filenames + backupTimestamp := time.Now().UTC() + snapshotPath := s.key + fmt.Sprintf("%s.%d", snapshotFilename, backupTimestamp.Unix()) + latestPath := s.key + fmt.Sprintf("%s.%s", snapshotFilename, latestSuffix) + + // upload the snapshot itself _, err := s.UploadWithContext(ctx, &s3manager.UploadInput{ Body: r, Bucket: aws.String(s.bucket), - Key: aws.String(s.key), + Key: aws.String(snapshotPath), + }) + if err != nil { + return err + } + + // upload the latest snapshot pointer file + latestFile := &LatestFile{ + Path: snapshotPath, + Timestamp: backupTimestamp.Format("2006-01-02T15:04:05-0700"), + } + latestContent, err := latestFile.generate() + if err != nil { + return err + } + _, err = s.UploadWithContext(ctx, &s3manager.UploadInput{ + Body: bytes.NewReader(latestContent), + Bucket: aws.String(s.bucket), + Key: aws.String(latestPath), }) return err } diff --git a/pkg/snapshot/snapshot_do.go b/pkg/snapshot/snapshot_do.go deleted file mode 100644 index 70c9eab..0000000 --- a/pkg/snapshot/snapshot_do.go +++ /dev/null @@ -1,47 +0,0 @@ -package snapshot - -import ( - "net/url" - "strings" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" -) - -type DigitalOceanConfig struct { - AccessToken string - SpacesURL string - SpacesAccessKey string - SpacesSecretKey string -} - -func parseSpacesURL(s string) (string, string, string, error) { - u, err := url.Parse(s) - if err != nil { - return "", "", "", err - } - bucket, key := parseBucketKey(strings.TrimPrefix(u.Path, "/")) - return u.Host, bucket, key, nil -} - -type DigitalOceanSnapshotter struct { - *AmazonSnapshotter -} - -func NewDigitalOceanSnapshotter(cfg *DigitalOceanConfig) (*DigitalOceanSnapshotter, error) { - endpoint, spaceName, key, err := parseSpacesURL(cfg.SpacesURL) - if err != nil { - return nil, err - } - awsCfg := &aws.Config{ - Credentials: credentials.NewStaticCredentials(cfg.SpacesAccessKey, cfg.SpacesSecretKey, ""), - Endpoint: aws.String(endpoint), - // This is counter intuitive, but it will fail with a non-AWS region name. - Region: aws.String("us-east-1"), - } - s, err := newAmazonSnapshotter(awsCfg, spaceName, key) - if err != nil { - return nil, err - } - return &DigitalOceanSnapshotter{s}, nil -} diff --git a/pkg/snapshot/snapshot_file.go b/pkg/snapshot/snapshot_file.go index 189699b..10110c7 100644 --- a/pkg/snapshot/snapshot_file.go +++ b/pkg/snapshot/snapshot_file.go @@ -1,36 +1,81 @@ package snapshot import ( + "fmt" "io" + "io/ioutil" "os" "path/filepath" + "strconv" + "strings" + "time" + "github.com/criticalstack/e2d/pkg/log" "github.com/pkg/errors" ) type FileSnapshotter struct { - file string + path string + retentionTime time.Duration } -func NewFileSnapshotter(path string) (*FileSnapshotter, error) { +func NewFileSnapshotter(path string, retentionTime time.Duration) (*FileSnapshotter, error) { + // TODO: check if path is a directory + if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil && !os.IsExist(err) { return nil, errors.Wrapf(err, "cannot create snapshot directory: %#v", filepath.Dir(path)) } - return &FileSnapshotter{file: path}, nil + return &FileSnapshotter{path: path, retentionTime: retentionTime}, nil } func (fs *FileSnapshotter) Load() (io.ReadCloser, error) { - return os.Open(fs.file) + // read the latest symlink + latestSymlink := filepath.Join(fs.path, fmt.Sprintf("%s.%s", snapshotFilename, latestSuffix)) + return os.Open(latestSymlink) } func (fs *FileSnapshotter) Save(r io.ReadCloser) error { defer r.Close() - f, err := os.OpenFile(fs.file, os.O_RDWR|os.O_CREATE, 0600) + + // generate the filenames + backupTimestamp := time.Now().UTC() + snapshotFile := filepath.Join(fs.path, fmt.Sprintf("%s.%s", snapshotFilename, strconv.FormatInt(backupTimestamp.Unix(), 10))) + latestSymlink := filepath.Join(fs.path, fmt.Sprintf("%s.%s", snapshotFilename, latestSuffix)) + + // make the snapshot + f, err := os.OpenFile(snapshotFile, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return err } defer f.Close() + // update the symlink to point to the latest snapshot + if _, err := os.Lstat(latestSymlink); err == nil { + err = os.Remove(latestSymlink) + if err != nil { + return errors.Wrap(err, "can't remove latest symlink") + } + } + if err = os.Symlink(snapshotFile, latestSymlink); err != nil { + return errors.Wrap(err, "can't create latest symlink") + } + _, err = io.Copy(f, r) + + // purge old snapshots + if fs.retentionTime > 0 { + files, err := ioutil.ReadDir(fs.path) + if err != nil { + return errors.Wrap(err, "unable to list snapshot directory during pruning") + } + for _, f := range files { + if (f.Mode()&os.ModeSymlink != os.ModeSymlink) && strings.HasPrefix(f.Name(), snapshotFilename) && time.Now().Sub(f.ModTime()) > fs.retentionTime { + // prune the file + log.Warnf("Would have deleted %s", f.Name()) + //_ = os.Remove(f.Name()) + } + } + } + return err } diff --git a/pkg/snapshot/snapshot_test.go b/pkg/snapshot/snapshot_test.go index ad840b7..86edde8 100644 --- a/pkg/snapshot/snapshot_test.go +++ b/pkg/snapshot/snapshot_test.go @@ -21,49 +21,41 @@ func TestParseSnapshotBackupURL(t *testing.T) { expectedErr: ErrInvalidScheme, }, { - name: "file (empty)", - url: "file://", - expected: &URL{Type: FileType}, + name: "local directory at root", + url: "file:///", + expected: &URL{Type: FileType, Path: "/"}, }, { - name: "file", + name: "local file path (should fail)", url: "file://abc", - expected: &URL{Type: FileType, Path: "abc"}, + //expected: &URL{Type: FileType, Path: "abc"}, + expectedErr: ErrInvalidDirectoryPath, }, { - name: "file", - url: "file://abc/snapshot.gz", - expected: &URL{Type: FileType, Path: "abc/snapshot.gz"}, + name: "local directory", + url: "file://abc/", + expected: &URL{Type: FileType, Path: "abc"}, }, { - name: "file", - url: "file:///abc", + name: "local directory path with three slashes", + url: "file:///abc/", expected: &URL{Type: FileType, Path: "/abc"}, }, { - name: "s3", - url: "s3://abc", - expected: &URL{Type: S3Type, Bucket: "abc", Path: "etcd.snapshot"}, - }, - { - name: "s3", - url: "s3://abc/snapshot.gz", - expected: &URL{Type: S3Type, Bucket: "abc", Path: "snapshot.gz"}, - }, - { - name: "s3", - url: "s3://abc/backupdir/snapshot.gz", - expected: &URL{Type: S3Type, Bucket: "abc", Path: "backupdir/snapshot.gz"}, + name: "s3 bucket with default name", + url: "s3://abc/", + expected: &URL{Type: S3Type, Bucket: "abc", Path: ""}, }, { - name: "spaces", - url: "https://nyc3.digitaloceanspaces.com/abc", - expected: &URL{Type: SpacesType, Bucket: "abc", Path: "etcd.snapshot"}, + name: "s3 bucket with prefix", + url: "s3://abc/backupdir/", + expected: &URL{Type: S3Type, Bucket: "abc", Path: "backupdir/"}, }, { - name: "spaces", - url: "https://nyc3.digitaloceanspaces.com/abc/snapshot.gz", - expected: &URL{Type: SpacesType, Bucket: "abc", Path: "snapshot.gz"}, + name: "s3 with no directory (should fail)", + url: "s3://abc/backupdir", + //expected: &URL{Type: S3Type, Bucket: "abc", Path: "backupdir"}, + expectedErr: ErrInvalidDirectoryPath, }, } for _, tt := range tests {