Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: Garbage collect WAL files #1327

Merged
merged 1 commit into from
Aug 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 94 additions & 15 deletions manager/state/raft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sort"
"strings"

"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) {
}
n.wal, err = wal.Create(n.walDir(), metadata)
if err != nil {
return raft.Peer{}, fmt.Errorf("create wal error: %v", err)
return raft.Peer{}, fmt.Errorf("create WAL error: %v", err)
}

n.cluster.AddMember(&membership.Member{RaftMember: raftNode})
Expand Down Expand Up @@ -127,15 +128,15 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC
repaired := false
for {
if n.wal, err = wal.Open(n.walDir(), walsnap); err != nil {
return fmt.Errorf("open wal error: %v", err)
return fmt.Errorf("open WAL error: %v", err)
}
if metadata, st, ents, err = n.wal.ReadAll(); err != nil {
if err := n.wal.Close(); err != nil {
return err
}
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
return fmt.Errorf("read wal error (%v) and cannot be repaired", err)
return fmt.Errorf("read WAL error (%v) and cannot be repaired", err)
}
if !wal.Repair(n.walDir()) {
return fmt.Errorf("WAL error (%v) cannot be repaired", err)
Expand All @@ -157,7 +158,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC

var raftNode api.RaftMember
if err := raftNode.Unmarshal(metadata); err != nil {
return fmt.Errorf("error unmarshalling wal metadata: %v", err)
return fmt.Errorf("error unmarshalling WAL metadata: %v", err)
}
n.Config.ID = raftNode.RaftID

Expand Down Expand Up @@ -274,25 +275,103 @@ func (n *Node) saveSnapshot(snapshot raftpb.Snapshot, keepOldSnapshots uint64) e
// This means that if the current snapshot doesn't appear in the
// directory for some strange reason, we won't delete anything, which
// is the safe behavior.
curSnapshotIdx := -1
var (
afterCurSnapshot bool
removeErr error
removeErr error
oldestSnapshot string
)

for i, snapFile := range snapshots {
if afterCurSnapshot {
if uint64(len(snapshots)-i) <= keepOldSnapshots {
return removeErr
}
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
if err != nil && removeErr == nil {
removeErr = err
if curSnapshotIdx >= 0 && i > curSnapshotIdx {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we construct remainingSnapshots within this loop? So that it can reduce disk I/O by avoiding the call of ioutil.ReadDir, (or it may not harm the performance because of cache? I don't know).

Also, it may reduce some redundant code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@runshenzhu: Updated.

if uint64(i-curSnapshotIdx) > keepOldSnapshots {
err := os.Remove(filepath.Join(n.snapDir(), snapFile))
if err != nil && removeErr == nil {
removeErr = err
}
continue
}
} else if snapFile == curSnapshot {
afterCurSnapshot = true
curSnapshotIdx = i
}
oldestSnapshot = snapFile
}

if removeErr != nil {
return removeErr
}

// Remove any WAL files that only contain data from before the oldest
// remaining snapshot.

if oldestSnapshot == "" {
return nil
}

// Parse index out of oldest snapshot's filename
var snapTerm, snapIndex uint64
_, err = fmt.Sscanf(oldestSnapshot, "%016x-%016x.snap", &snapTerm, &snapIndex)
if err != nil {
return fmt.Errorf("malformed snapshot filename %s: %v", oldestSnapshot, err)
}

// List the WALs
dirents, err = ioutil.ReadDir(n.walDir())
if err != nil {
return err
}

var wals []string
for _, dirent := range dirents {
if strings.HasSuffix(dirent.Name(), ".wal") {
wals = append(wals, dirent.Name())
}
}

return removeErr
// Sort WAL filenames in lexical order
sort.Sort(sort.StringSlice(wals))

found := false
deleteUntil := -1

for i, walName := range wals {
var walSeq, walIndex uint64
_, err = fmt.Sscanf(walName, "%016x-%016x.wal", &walSeq, &walIndex)
if err != nil {
return fmt.Errorf("could not parse WAL name %s: %v", walName, err)
}

if walIndex >= snapIndex {
deleteUntil = i - 1
found = true
break
}
}

// If all WAL files started with indices below the oldest snapshot's
// index, we can delete all but the newest WAL file.
if !found && len(wals) != 0 {
deleteUntil = len(wals) - 1
}

for i := 0; i < deleteUntil; i++ {
walPath := filepath.Join(n.walDir(), wals[i])
l, err := fileutil.NewLock(walPath)
if err != nil {
continue
}
err = l.TryLock()
if err != nil {
return fmt.Errorf("could not lock old WAL file %s for removal: %v", wals[i], err)
}
err = os.Remove(walPath)
l.Unlock()
l.Destroy()
if err != nil {
return fmt.Errorf("error removing old WAL file %s: %v", wals[i], err)
}
}

return nil
}

func (n *Node) doSnapshot(raftConfig *api.RaftConfig) {
Expand Down
180 changes: 180 additions & 0 deletions manager/state/raft/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"io/ioutil"
"path/filepath"
"testing"
"time"

"github.com/docker/swarmkit/api"
raftutils "github.com/docker/swarmkit/manager/state/raft/testutils"
"github.com/docker/swarmkit/manager/state/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)

func TestRaftSnapshot(t *testing.T) {
Expand Down Expand Up @@ -243,3 +246,180 @@ func TestRaftSnapshotRestart(t *testing.T) {
require.NoError(t, err)
raftutils.CheckValuesOnNodes(t, clockSource, nodes, nodeIDs, values)
}

func TestGCWAL(t *testing.T) {
if testing.Short() {
t.Skip("TestGCWAL skipped with -short because it's resource-intensive")
}
t.Parallel()

// Additional log entries from cluster setup, leader election
extraLogEntries := 5
// Number of large entries to propose
proposals := 47

// Bring up a 3 node cluster
nodes, clockSource := raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})

for i := 0; i != proposals; i++ {
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
assert.NoError(t, err, "failed to propose value")
}

time.Sleep(250 * time.Millisecond)

// Snapshot should have been triggered just before the WAL rotated, so
// both WAL files should be preserved
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
if err != nil {
return err
}
if len(dirents) != 1 {
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
}

dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal"))
if err != nil {
return err
}
if len(dirents) != 2 {
return fmt.Errorf("expected 2 WAL files, found %d", len(dirents))
}
return nil
}))

raftutils.TeardownCluster(t, nodes)

// Repeat this test, but trigger the snapshot after the WAL has rotated
proposals++
nodes, clockSource = raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})
defer raftutils.TeardownCluster(t, nodes)

for i := 0; i != proposals; i++ {
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
assert.NoError(t, err, "failed to propose value")
}

time.Sleep(250 * time.Millisecond)

// This time only one WAL file should be saved.
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
if err != nil {
return err
}
if len(dirents) != 1 {
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
}

dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal"))
if err != nil {
return err
}
if len(dirents) != 1 {
return fmt.Errorf("expected 1 WAL file, found %d", len(dirents))
}
return nil
}))

// Restart the whole cluster
for _, node := range nodes {
node.Server.Stop()
node.Shutdown()
}

raftutils.AdvanceTicks(clockSource, 5)

i := 0
for k, node := range nodes {
nodes[k] = raftutils.RestartNode(t, clockSource, node, false)
i++
}
raftutils.WaitForCluster(t, clockSource, nodes)

// Is the data intact after restart?
for _, node := range nodes {
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
var err error
node.MemoryStore().View(func(tx store.ReadTx) {
var allNodes []*api.Node
allNodes, err = store.FindNodes(tx, store.All)
if err != nil {
return
}
if len(allNodes) != proposals {
err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes))
return
}
})
return err
}))
}

// It should still be possible to propose values
_, err := raftutils.ProposeValue(t, raftutils.Leader(nodes), DefaultProposalTime, "newnode")
assert.NoError(t, err, "failed to propose value")

for _, node := range nodes {
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
var err error
node.MemoryStore().View(func(tx store.ReadTx) {
var allNodes []*api.Node
allNodes, err = store.FindNodes(tx, store.All)
if err != nil {
return
}
if len(allNodes) != proposals+1 {
err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes))
return
}
})
return err
}))
}
}

// proposeHugeValue proposes a 1.4MB value to a raft test cluster
func proposeHugeValue(t *testing.T, raftNode *raftutils.TestNode, time time.Duration, nodeID ...string) (*api.Node, error) {
nodeIDStr := "id1"
if len(nodeID) != 0 {
nodeIDStr = nodeID[0]
}
a := make([]byte, 1400000)
for i := 0; i != len(a); i++ {
a[i] = 'a'
}
node := &api.Node{
ID: nodeIDStr,
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: nodeIDStr,
Labels: map[string]string{
"largestring": string(a),
},
},
},
}

storeActions := []*api.StoreAction{
{
Action: api.StoreActionKindCreate,
Target: &api.StoreAction_Node{
Node: node,
},
},
}

ctx, _ := context.WithTimeout(context.Background(), time)

err := raftNode.ProposeValue(ctx, storeActions, func() {
err := raftNode.MemoryStore().ApplyStoreActions(storeActions)
assert.NoError(t, err, "error applying actions")
})
if err != nil {
return nil, err
}

return node, nil
}