Skip to content

Commit

Permalink
*: make sure snapshot save downloads SHA256 checksum
Browse files Browse the repository at this point in the history
ref. #11896

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
  • Loading branch information
gyuho committed May 18, 2020
1 parent 9caec0d commit 924b812
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 22 deletions.
5 changes: 5 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/coreos/etcd/clientv3/credentials"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/pkg/logutil"
"github.com/coreos/pkg/capnslog"
"github.com/google/uuid"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -47,6 +48,10 @@ var (
roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
)

var (
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "clientv3")
)

func init() {
lg := zap.NewNop()
if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
Expand Down
17 changes: 13 additions & 4 deletions clientv3/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,23 +193,32 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
return nil, toErr(ctx, err)
}

plog.Info("opened snapshot stream; downloading")
pr, pw := io.Pipe()
go func() {
for {
resp, err := ss.Recv()
if err != nil {
switch err {
case io.EOF:
plog.Info("completed snapshot read; closing")
default:
plog.Warningf("failed to receive from snapshot stream; closing (%v)", err)
}
pw.CloseWithError(err)
return
}
if resp == nil && err == nil {
break
}

// can "resp == nil && err == nil"
// before we receive snapshot SHA digest?
// No, server sends EOF with an empty response
// after it sends SHA digest at the end

if _, werr := pw.Write(resp.Blob); werr != nil {
pw.CloseWithError(werr)
return
}
}
pw.Close()
}()
return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
}
Expand Down
29 changes: 19 additions & 10 deletions clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/dustin/go-humanize"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -87,6 +88,14 @@ type v3Manager struct {
skipHashCheck bool
}

// hasChecksum returns "true" if the file size "n"
// has appended sha256 hash digest.
func hasChecksum(n int64) bool {
// 512 is chosen because it's a minimum disk sector size
// smaller than (and multiplies to) OS page size in most systems
return (n % 512) == sha256.Size
}

// Save fetches snapshot from remote etcd server and saves data to target path.
func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error {
if len(cfg.Endpoints) != 1 {
Expand All @@ -106,24 +115,23 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string
if err != nil {
return fmt.Errorf("could not open %s (%v)", partpath, err)
}
s.lg.Info(
"created temporary db file",
zap.String("path", partpath),
)
s.lg.Info("created temporary db file", zap.String("path", partpath))

now := time.Now()
var rd io.ReadCloser
rd, err = cli.Snapshot(ctx)
if err != nil {
return err
}
s.lg.Info(
"fetching snapshot",
zap.String("endpoint", cfg.Endpoints[0]),
)
if _, err = io.Copy(f, rd); err != nil {
s.lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0]))
var size int64
size, err = io.Copy(f, rd)
if err != nil {
return err
}
if !hasChecksum(size) {
return fmt.Errorf("sha256 checksum not found [bytes: %d]", size)
}
if err = fileutil.Fsync(f); err != nil {
return err
}
Expand All @@ -133,6 +141,7 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string
s.lg.Info(
"fetched snapshot",
zap.String("endpoint", cfg.Endpoints[0]),
zap.String("size", humanize.Bytes(uint64(size))),
zap.Duration("took", time.Since(now)),
)

Expand Down Expand Up @@ -344,7 +353,7 @@ func (s *v3Manager) saveDB() error {
if serr != nil {
return serr
}
hasHash := (off % 512) == sha256.Size
hasHash := hasChecksum(off)
if hasHash {
if err := db.Truncate(off - sha256.Size); err != nil {
return err
Expand Down
45 changes: 37 additions & 8 deletions etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"crypto/sha256"
"io"
"time"

"github.com/coreos/etcd/auth"
"github.com/coreos/etcd/etcdserver"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/version"
"github.com/dustin/go-humanize"
)

type KVGetter interface {
Expand Down Expand Up @@ -81,6 +83,9 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
return &pb.DefragmentResponse{}, nil
}

// big enough size to hold >1 OS pages in the buffer
const snapshotSendBufferSize = 32 * 1024

func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
snap := ms.bg.Backend().Snapshot()
pr, pw := io.Pipe()
Expand All @@ -95,19 +100,39 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
pw.Close()
}()

// send file data
// record SHA digest of snapshot data
// used for integrity checks during snapshot restore operation
h := sha256.New()
br := int64(0)
buf := make([]byte, 32*1024)
sz := snap.Size()
for br < sz {

// buffer just holds read bytes from stream
// response size is multiple of OS page size, fetched in boltdb
// e.g. 4*1024
buf := make([]byte, snapshotSendBufferSize)

sent := int64(0)
total := snap.Size()
size := humanize.Bytes(uint64(total))

start := time.Now()
plog.Infof("sending database snapshot to client %s [%d bytes]", size, total)
for total-sent > 0 {
n, err := io.ReadFull(pr, buf)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return togRPCError(err)
}
br += int64(n)
sent += int64(n)

// if total is x * snapshotSendBufferSize. it is possible that
// resp.RemainingBytes == 0
// resp.Blob == zero byte but not nil
// does this make server response sent to client nil in proto
// and client stops receiving from snapshot stream before
// server sends snapshot SHA?
// No, the client will still receive non-nil response
// until server closes the stream with EOF

resp := &pb.SnapshotResponse{
RemainingBytes: uint64(sz - br),
RemainingBytes: uint64(total - sent),
Blob: buf[:n],
}
if err = srv.Send(resp); err != nil {
Expand All @@ -116,13 +141,17 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
h.Write(buf[:n])
}

// send sha
// send SHA digest for integrity checks
// during snapshot restore operation
sha := h.Sum(nil)

plog.Infof("sending database sha256 checksum to client [%d bytes]", len(sha))
hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha}
if err := srv.Send(hresp); err != nil {
return togRPCError(err)
}

plog.Infof("successfully sent database snapshot to client %s [%d bytes, took %s]", size, total, humanize.Time(start))
return nil
}

Expand Down

0 comments on commit 924b812

Please sign in to comment.