Skip to content

Commit

Permalink
raft: Garbage collect WAL files
Browse files Browse the repository at this point in the history
We currently garbage collect snapshot files (keeping only
KeepOldSnapshot outdated snapshots, which defaults to 0). However, we
don't garbage collect the WAL files that the snapshots replace.

Delete any WALs which are so old that they only contain information that
predates the oldest of the snapshots we have retained. This means that
by default, we will remove old WALs once they are supplanted by a
snapshot. However, if KeepOldSnapshots is set above 0, we will keep
whichever WALs are necessary to catch up from the oldest of the retained
snapshots. This makes sure that the old snapshots we retain are actually
useful, and avoids adding an independent knob for WAL retention that
might end up with an inconsistent setting.

Also, fix serious brokenness in the the deletion of old snapshots (it
was deleting the most recent outdated snapshots, instead of the oldest).

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed Aug 6, 2016
1 parent e021d14 commit c72cdf1
Show file tree
Hide file tree
Showing 2 changed files with 274 additions and 15 deletions.
109 changes: 94 additions & 15 deletions manager/state/raft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sort"
"strings"

"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) {
}
n.wal, err = wal.Create(n.walDir(), metadata)
if err != nil {
return raft.Peer{}, fmt.Errorf("create wal error: %v", err)
return raft.Peer{}, fmt.Errorf("create WAL error: %v", err)
}

n.cluster.AddMember(&membership.Member{RaftMember: raftNode})
Expand Down Expand Up @@ -127,15 +128,15 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC
repaired := false
for {
if n.wal, err = wal.Open(n.walDir(), walsnap); err != nil {
return fmt.Errorf("open wal error: %v", err)
return fmt.Errorf("open WAL error: %v", err)
}
if metadata, st, ents, err = n.wal.ReadAll(); err != nil {
if err := n.wal.Close(); err != nil {
return err
}
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
return fmt.Errorf("read wal error (%v) and cannot be repaired", err)
return fmt.Errorf("read WAL error (%v) and cannot be repaired", err)
}
if !wal.Repair(n.walDir()) {
return fmt.Errorf("WAL error (%v) cannot be repaired", err)
Expand All @@ -157,7 +158,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC

var raftNode api.RaftMember
if err := raftNode.Unmarshal(metadata); err != nil {
return fmt.Errorf("error unmarshalling wal metadata: %v", err)
return fmt.Errorf("error unmarshalling WAL metadata: %v", err)
}
n.Config.ID = raftNode.RaftID

Expand Down Expand Up @@ -274,25 +275,103 @@ func (n *Node) saveSnapshot(snapshot raftpb.Snapshot, keepOldSnapshots uint64) e
// This means that if the current snapshot doesn't appear in the
// directory for some strange reason, we won't delete anything, which
// is the safe behavior.
curSnapshotIdx := -1
var (
afterCurSnapshot bool
removeErr error
removeErr error
oldestSnapshot string
)

for i, snapFile := range snapshots {
if afterCurSnapshot {
if uint64(len(snapshots)-i) <= keepOldSnapshots {
return removeErr
}
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
if err != nil && removeErr == nil {
removeErr = err
if curSnapshotIdx >= 0 && i > curSnapshotIdx {
if uint64(i-curSnapshotIdx) > keepOldSnapshots {
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
if err != nil && removeErr == nil {
removeErr = err
}
continue
}
} else if snapFile == curSnapshot {
afterCurSnapshot = true
curSnapshotIdx = i
}
oldestSnapshot = snapFile
}

if removeErr != nil {
return removeErr
}

// Remove any WAL files that only contain data from before the oldest
// remaining snapshot.

if oldestSnapshot == "" {
return nil
}

// Parse index out of oldest snapshot's filename
var snapTerm, snapIndex uint64
_, err = fmt.Sscanf(oldestSnapshot, "%016x-%016x.snap", &snapTerm, &snapIndex)
if err != nil {
return fmt.Errorf("malformed snapshot filename %s: %v", oldestSnapshot, err)
}

// List the WALs
dirents, err = ioutil.ReadDir(n.walDir())
if err != nil {
return err
}

var wals []string
for _, dirent := range dirents {
if strings.HasSuffix(dirent.Name(), ".wal") {
wals = append(wals, dirent.Name())
}
}

return removeErr
// Sort WAL filenames in lexical order
sort.Sort(sort.StringSlice(wals))

found := false
deleteUntil := -1

for i, walName := range wals {
var walSeq, walIndex uint64
_, err = fmt.Sscanf(walName, "%016x-%016x.wal", &walSeq, &walIndex)
if err != nil {
return fmt.Errorf("could not parse WAL name %s: %v", walName, err)
}

if walIndex >= snapIndex {
deleteUntil = i - 1
found = true
break
}
}

// If all WAL files started with indices below the oldest snapshot's
// index, we can delete all but the newest WAL file.
if !found && len(wals) != 0 {
deleteUntil = len(wals) - 1
}

for i := 0; i < deleteUntil; i++ {
walPath := filepath.Join(n.walDir(), wals[i])
l, err := fileutil.NewLock(walPath)
if err != nil {
continue
}
err = l.TryLock()
if err != nil {
return fmt.Errorf("could not lock old WAL file %s for removal: %v", wals[i], err)
}
err = os.Remove(walPath)
l.Unlock()
l.Destroy()
if err != nil {
return fmt.Errorf("error removing old WAL file %s: %v", wals[i], err)
}
}

return nil
}

func (n *Node) doSnapshot(raftConfig *api.RaftConfig) {
Expand Down
180 changes: 180 additions & 0 deletions manager/state/raft/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"io/ioutil"
"path/filepath"
"testing"
"time"

"github.com/docker/swarmkit/api"
raftutils "github.com/docker/swarmkit/manager/state/raft/testutils"
"github.com/docker/swarmkit/manager/state/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)

func TestRaftSnapshot(t *testing.T) {
Expand Down Expand Up @@ -243,3 +246,180 @@ func TestRaftSnapshotRestart(t *testing.T) {
require.NoError(t, err)
raftutils.CheckValuesOnNodes(t, clockSource, nodes, nodeIDs, values)
}

func TestGCWAL(t *testing.T) {
if testing.Short() {
t.Skip("TestGCWAL skipped with -short because it's resource-intensive")
}
t.Parallel()

// Additional log entries from cluster setup, leader election
extraLogEntries := 5
// Number of large entries to propose
proposals := 47

// Bring up a 3 node cluster
nodes, clockSource := raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})

for i := 0; i != proposals; i++ {
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
assert.NoError(t, err, "failed to propose value")
}

time.Sleep(250 * time.Millisecond)

// Snapshot should have been triggered just before the WAL rotated, so
// both WAL files should be preserved
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
if err != nil {
return err
}
if len(dirents) != 1 {
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
}

dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal"))
if err != nil {
return err
}
if len(dirents) != 2 {
return fmt.Errorf("expected 2 WAL files, found %d", len(dirents))
}
return nil
}))

raftutils.TeardownCluster(t, nodes)

// Repeat this test, but trigger the snapshot after the WAL has rotated
proposals++
nodes, clockSource = raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})
defer raftutils.TeardownCluster(t, nodes)

for i := 0; i != proposals; i++ {
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
assert.NoError(t, err, "failed to propose value")
}

time.Sleep(250 * time.Millisecond)

// This time only one WAL file should be saved.
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
if err != nil {
return err
}
if len(dirents) != 1 {
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
}

dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal"))
if err != nil {
return err
}
if len(dirents) != 1 {
return fmt.Errorf("expected 1 WAL file, found %d", len(dirents))
}
return nil
}))

// Restart the whole cluster
for _, node := range nodes {
node.Server.Stop()
node.Shutdown()
}

raftutils.AdvanceTicks(clockSource, 5)

i := 0
for k, node := range nodes {
nodes[k] = raftutils.RestartNode(t, clockSource, node, false)
i++
}
raftutils.WaitForCluster(t, clockSource, nodes)

// Is the data intact after restart?
for _, node := range nodes {
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
var err error
node.MemoryStore().View(func(tx store.ReadTx) {
var allNodes []*api.Node
allNodes, err = store.FindNodes(tx, store.All)
if err != nil {
return
}
if len(allNodes) != proposals {
err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes))
return
}
})
return err
}))
}

// It should still be possible to propose values
_, err := raftutils.ProposeValue(t, raftutils.Leader(nodes), DefaultProposalTime, "newnode")
assert.NoError(t, err, "failed to propose value")

for _, node := range nodes {
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
var err error
node.MemoryStore().View(func(tx store.ReadTx) {
var allNodes []*api.Node
allNodes, err = store.FindNodes(tx, store.All)
if err != nil {
return
}
if len(allNodes) != proposals+1 {
err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes))
return
}
})
return err
}))
}
}

// proposeHugeValue proposes a 1.4MB value to a raft test cluster
func proposeHugeValue(t *testing.T, raftNode *raftutils.TestNode, time time.Duration, nodeID ...string) (*api.Node, error) {
nodeIDStr := "id1"
if len(nodeID) != 0 {
nodeIDStr = nodeID[0]
}
a := make([]byte, 1400000)
for i := 0; i != len(a); i++ {
a[i] = 'a'
}
node := &api.Node{
ID: nodeIDStr,
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: nodeIDStr,
Labels: map[string]string{
"largestring": string(a),
},
},
},
}

storeActions := []*api.StoreAction{
{
Action: api.StoreActionKindCreate,
Target: &api.StoreAction_Node{
Node: node,
},
},
}

ctx, _ := context.WithTimeout(context.Background(), time)

err := raftNode.ProposeValue(ctx, storeActions, func() {
err := raftNode.MemoryStore().ApplyStoreActions(storeActions)
assert.NoError(t, err, "error applying actions")
})
if err != nil {
return nil, err
}

return node, nil
}

0 comments on commit c72cdf1

Please sign in to comment.