Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Update from openark/golib to support TZ with logging #1017

Merged
merged 13 commits into from
Dec 5, 2019
65 changes: 29 additions & 36 deletions go/raft/file_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/openark/golib/log"
"hash"
"hash/crc64"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"sort"
Expand All @@ -32,17 +32,15 @@ const (
type FileSnapshotStore struct {
path string
retain int
logger *log.Logger
}

type snapMetaSlice []*fileSnapshotMeta

// FileSnapshotSink implements SnapshotSink with a file.
type FileSnapshotSink struct {
store *FileSnapshotStore
logger *log.Logger
dir string
meta fileSnapshotMeta
store *FileSnapshotStore
dir string
meta fileSnapshotMeta

stateFile *os.File
stateHash hash.Hash64
Expand Down Expand Up @@ -76,13 +74,10 @@ func (b *bufferedFile) Close() error {
// NewFileSnapshotStoreWithLogger creates a new FileSnapshotStore based
// on a base directory. The `retain` parameter controls how many
// snapshots are retained. Must be at least 1.
func NewFileSnapshotStoreWithLogger(base string, retain int, logger *log.Logger) (*FileSnapshotStore, error) {
func NewFileSnapshotStoreWithLogger(base string, retain int) (*FileSnapshotStore, error) {
if retain < 1 {
return nil, fmt.Errorf("must retain at least one snapshot")
}
if logger == nil {
logger = log.New(os.Stderr, "", log.LstdFlags)
}

// Ensure our path exists
path := filepath.Join(base, snapPath)
Expand All @@ -94,7 +89,6 @@ func NewFileSnapshotStoreWithLogger(base string, retain int, logger *log.Logger)
store := &FileSnapshotStore{
path: path,
retain: retain,
logger: logger,
}

// Do a permissions test
Expand All @@ -111,7 +105,7 @@ func NewFileSnapshotStore(base string, retain int, logOutput io.Writer) (*FileSn
if logOutput == nil {
logOutput = os.Stderr
}
return NewFileSnapshotStoreWithLogger(base, retain, log.New(logOutput, "", log.LstdFlags))
return NewFileSnapshotStoreWithLogger(base, retain)
}

// testPermissions tries to touch a file in our path to see if it works.
Expand Down Expand Up @@ -144,19 +138,18 @@ func (f *FileSnapshotStore) Create(index, term uint64, peers []byte) (raft.Snaps
// Create a new path
name := snapshotName(term, index)
path := filepath.Join(f.path, name+tmpSuffix)
f.logger.Printf("[INFO] snapshot: Creating new snapshot at %s", path)
log.Infof("snapshot: Creating new snapshot at %s", path)

// Make the directory
if err := os.MkdirAll(path, 0755); err != nil {
f.logger.Printf("[ERR] snapshot: Failed to make snapshot directory: %v", err)
_ = log.Error("[ERR] snapshot: Failed to make snapshot directory: %v", err)
return nil, err
}

// Create the sink
sink := &FileSnapshotSink{
store: f,
logger: f.logger,
dir: path,
store: f,
dir: path,
meta: fileSnapshotMeta{
SnapshotMeta: raft.SnapshotMeta{
ID: name,
Expand All @@ -170,15 +163,15 @@ func (f *FileSnapshotStore) Create(index, term uint64, peers []byte) (raft.Snaps

// Write out the meta data
if err := sink.writeMeta(); err != nil {
f.logger.Printf("[ERR] snapshot: Failed to write metadata: %v", err)
_ = log.Errorf("snapshot: Failed to write metadata: %v", err)
return nil, err
}

// Open the state file
statePath := filepath.Join(path, stateFilePath)
fh, err := os.Create(statePath)
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to create state file: %v", err)
_ = log.Errorf("snapshot: Failed to create state file: %v", err)
return nil, err
}
sink.stateFile = fh
Expand All @@ -199,7 +192,7 @@ func (f *FileSnapshotStore) List() ([]*raft.SnapshotMeta, error) {
// Get the eligible snapshots
snapshots, err := f.getSnapshots()
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to get snapshots: %v", err)
_ = log.Errorf("snapshot: Failed to get snapshots: %v", err)
return nil, err
}

Expand All @@ -218,7 +211,7 @@ func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) {
// Get the eligible snapshots
snapshots, err := ioutil.ReadDir(f.path)
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to scan snapshot dir: %v", err)
_ = log.Errorf("snapshot: Failed to scan snapshot dir: %v", err)
return nil, err
}

Expand All @@ -233,14 +226,14 @@ func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) {
// Ignore any temporary snapshots
dirName := snap.Name()
if strings.HasSuffix(dirName, tmpSuffix) {
f.logger.Printf("[WARN] snapshot: Found temporary snapshot: %v", dirName)
_ = log.Warningf("snapshot: Found temporary snapshot: %v", dirName)
continue
}

// Try to read the meta data
meta, err := f.readMeta(dirName)
if err != nil {
f.logger.Printf("[WARN] snapshot: Failed to read metadata for %v: %v", dirName, err)
_ = log.Warningf("snapshot: Failed to read metadata for %v: %v", dirName, err)
continue
}

Expand Down Expand Up @@ -281,15 +274,15 @@ func (f *FileSnapshotStore) Open(id string) (*raft.SnapshotMeta, io.ReadCloser,
// Get the metadata
meta, err := f.readMeta(id)
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to get meta data to open snapshot: %v", err)
_ = log.Errorf("snapshot: Failed to get meta data to open snapshot: %v", err)
return nil, nil, err
}

// Open the state file
statePath := filepath.Join(f.path, id, stateFilePath)
fh, err := os.Open(statePath)
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to open state file: %v", err)
_ = log.Errorf("snapshot: Failed to open state file: %v", err)
return nil, nil, err
}

Expand All @@ -299,23 +292,23 @@ func (f *FileSnapshotStore) Open(id string) (*raft.SnapshotMeta, io.ReadCloser,
// Compute the hash
_, err = io.Copy(stateHash, fh)
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to read state file: %v", err)
_ = log.Errorf("snapshot: Failed to read state file: %v", err)
fh.Close()
return nil, nil, err
}

// Verify the hash
computed := stateHash.Sum(nil)
if bytes.Compare(meta.CRC, computed) != 0 {
f.logger.Printf("[ERR] snapshot: CRC checksum failed (stored: %v computed: %v)",
_ = log.Errorf("snapshot: CRC checksum failed (stored: %v computed: %v)",
meta.CRC, computed)
fh.Close()
return nil, nil, fmt.Errorf("CRC mismatch")
}

// Seek to the start
if _, err := fh.Seek(0, 0); err != nil {
f.logger.Printf("[ERR] snapshot: State file seek failed: %v", err)
_ = log.Errorf("snapshot: State file seek failed: %v", err)
fh.Close()
return nil, nil, err
}
Expand All @@ -334,16 +327,16 @@ func (f *FileSnapshotStore) ReapSnapshots(currentSnapshotMeta *fileSnapshotMeta)

reapSnapshot := func(snapshot *fileSnapshotMeta) error {
path := filepath.Join(f.path, snapshot.ID)
f.logger.Printf("[INFO] snapshot: reaping snapshot %v", path)
log.Infof("snapshot: reaping snapshot %v", path)
if err := os.RemoveAll(path); err != nil {
f.logger.Printf("[ERR] snapshot: Failed to reap snapshot %v: %v", path, err)
_ = log.Errorf("snapshot: Failed to reap snapshot %v: %v", path, err)
return err
}
return nil
}
snapshots, err := f.getSnapshots()
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to get snapshots: %v", err)
_ = log.Errorf("snapshot: Failed to get snapshots: %v", err)
return err
}

Expand All @@ -360,7 +353,7 @@ func (f *FileSnapshotStore) ReapSnapshots(currentSnapshotMeta *fileSnapshotMeta)
// re-read list, since we've removed files
snapshots, err = f.getSnapshots()
if err != nil {
f.logger.Printf("[ERR] snapshot: Failed to get snapshots: %v", err)
_ = log.Errorf("snapshot: Failed to get snapshots: %v", err)
return err
}
}
Expand Down Expand Up @@ -392,20 +385,20 @@ func (s *FileSnapshotSink) Close() error {

// Close the open handles
if err := s.finalize(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to finalize snapshot: %v", err)
_ = log.Errorf("snapshot: Failed to finalize snapshot: %v", err)
return err
}

// Write out the meta data
if err := s.writeMeta(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to write metadata: %v", err)
_ = log.Errorf("snapshot: Failed to write metadata: %v", err)
return err
}

// Move the directory into place
newPath := strings.TrimSuffix(s.dir, tmpSuffix)
if err := os.Rename(s.dir, newPath); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to move snapshot into place: %v", err)
_ = log.Errorf("snapshot: Failed to move snapshot into place: %v", err)
return err
}

Expand All @@ -427,7 +420,7 @@ func (s *FileSnapshotSink) Cancel() error {

// Close the open handles
if err := s.finalize(); err != nil {
s.logger.Printf("[ERR] snapshot: Failed to finalize snapshot: %v", err)
_ = log.Errorf("snapshot: Failed to finalize snapshot: %v", err)
return err
}

Expand Down
Loading