Skip to content

Commit

Permalink
clientv3: make sure snapshot has integrity check hash
Browse files Browse the repository at this point in the history
I've seen some cases SHA blobs are missing (still investigating).
Adding a check to make sure snapshot save always downloads
hash digests for integrity checks.

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
  • Loading branch information
gyuho committed May 15, 2020
1 parent b0433cf commit ad64955
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 18 deletions.
22 changes: 17 additions & 5 deletions clientv3/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"io"

pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"

"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -68,13 +68,15 @@ type Maintenance interface {
}

type maintenance struct {
lg *zap.Logger
dial func(endpoint string) (pb.MaintenanceClient, func(), error)
remote pb.MaintenanceClient
callOpts []grpc.CallOption
}

func NewMaintenance(c *Client) Maintenance {
api := &maintenance{
lg: c.lg,
dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
conn, err := c.Dial(endpoint)
if err != nil {
Expand All @@ -93,6 +95,7 @@ func NewMaintenance(c *Client) Maintenance {

func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
api := &maintenance{
lg: c.lg,
dial: func(string) (pb.MaintenanceClient, func(), error) {
return remote, func() {}, nil
},
Expand Down Expand Up @@ -193,23 +196,32 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
return nil, toErr(ctx, err)
}

m.lg.Info("opened snapshot stream; downloading")
pr, pw := io.Pipe()
go func() {
for {
resp, err := ss.Recv()
if err != nil {
switch err {
case io.EOF:
m.lg.Info("completed snapshot read; closing")
default:
m.lg.Warn("failed to receive from snapshot stream; closing", zap.Error(err))
}
pw.CloseWithError(err)
return
}
if resp == nil && err == nil {
break
}

// can "resp == nil && err == nil"
// before we receive snapshot SHA digest?
// No, server sends EOF with an empty response
// after it sends SHA digest at the end

if _, werr := pw.Write(resp.Blob); werr != nil {
pw.CloseWithError(werr)
return
}
}
pw.Close()
}()
return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
}
Expand Down
34 changes: 21 additions & 13 deletions clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"time"

"github.com/dustin/go-humanize"
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/v3/clientv3"
"go.etcd.io/etcd/v3/etcdserver"
Expand Down Expand Up @@ -89,6 +90,14 @@ type v3Manager struct {
skipHashCheck bool
}

// hasChecksum returns "true" if the file size "n"
// has appended sha256 hash digest.
func hasChecksum(n int64) bool {
// 512 is chosen because it's a minimum disk sector size
// smaller than (and multiplies to) OS page size in most systems
return (n % 512) == sha256.Size
}

// Save fetches snapshot from remote etcd server and saves data to target path.
func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error {
if len(cfg.Endpoints) != 1 {
Expand All @@ -108,34 +117,33 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string
if err != nil {
return fmt.Errorf("could not open %s (%v)", partpath, err)
}
s.lg.Info(
"created temporary db file",
zap.String("path", partpath),
)
s.lg.Info("created temporary db file", zap.String("path", partpath))

now := time.Now()
var rd io.ReadCloser
rd, err = cli.Snapshot(ctx)
if err != nil {
return err
}
s.lg.Info(
"fetching snapshot",
zap.String("endpoint", cfg.Endpoints[0]),
)
if _, err = io.Copy(f, rd); err != nil {
s.lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0]))
var size int64
size, err = io.Copy(f, rd)
if err != nil {
return err
}
if !hasChecksum(size) {
return fmt.Errorf("sha256 checksum not found [bytes: %d]", size)
}
if err = fileutil.Fsync(f); err != nil {
return err
}
if err = f.Close(); err != nil {
return err
}
s.lg.Info(
"fetched snapshot",
s.lg.Info("fetched snapshot",
zap.String("endpoint", cfg.Endpoints[0]),
zap.Duration("took", time.Since(now)),
zap.String("size", humanize.Bytes(uint64(size))),
zap.String("took", humanize.Time(now)),
)

if err = os.Rename(partpath, dbPath); err != nil {
Expand Down Expand Up @@ -362,7 +370,7 @@ func (s *v3Manager) saveDB() error {
if serr != nil {
return serr
}
hasHash := (off % 512) == sha256.Size
hasHash := hasChecksum(off)
if hasHash {
if err := db.Truncate(off - sha256.Size); err != nil {
return err
Expand Down

0 comments on commit ad64955

Please sign in to comment.