Skip to content

Commit

Permalink
Revendor etcd/raft
Browse files Browse the repository at this point in the history
Change Campaign logic to trigger after Advance, because now Campaign
won't do anything if there are uncommitted entries. Also, remove the
check on Progress, because now Progress is only filled in if the node is
already the leader.

Set a smaller WAL segment size to work around high I/O load on Mac when
running unit tests.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed Aug 11, 2016
1 parent 77a6894 commit 62e73cd
Show file tree
Hide file tree
Showing 343 changed files with 3,813 additions and 40,209 deletions.
324 changes: 71 additions & 253 deletions Godeps/Godeps.json

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,23 +416,22 @@ func (n *Node) Run(ctx context.Context) error {
}
}

// Advance the state machine
n.Advance()

// If we are the only registered member after
// restoring from the state, campaign to be the
// leader.
if !n.restored {
// Node ID should be in the progress list to Campaign
_, ok := n.Node.Status().Progress[n.Config.ID]
if len(n.cluster.Members()) <= 1 && ok {
if len(n.cluster.Members()) <= 1 {
if err := n.Campaign(n.Ctx); err != nil {
panic("raft: cannot campaign to be the leader on node restore")
}
}
n.restored = true
}

// Advance the state machine
n.Advance()

case snapshotIndex := <-n.snapshotInProgress:
if snapshotIndex > n.snapshotIndex {
n.snapshotIndex = snapshotIndex
Expand Down
5 changes: 5 additions & 0 deletions manager/state/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"golang.org/x/net/context"

"github.com/Sirupsen/logrus"
"github.com/coreos/etcd/wal"
"github.com/docker/swarmkit/api"
cautils "github.com/docker/swarmkit/ca/testutils"
"github.com/docker/swarmkit/manager/state/raft"
Expand All @@ -39,6 +40,10 @@ func init() {
logrus.SetOutput(ioutil.Discard)

tc = cautils.NewTestCA(nil)

// Set a smaller segment size so we don't incur cost preallocating
// space on old filesystems like HFS+.
wal.SegmentSizeBytes = 64 * 1024
}

func TestRaftBootstrap(t *testing.T) {
Expand Down
9 changes: 2 additions & 7 deletions manager/state/raft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,17 +355,12 @@ func (n *Node) saveSnapshot(snapshot raftpb.Snapshot, keepOldSnapshots uint64) e

for i := 0; i < deleteUntil; i++ {
walPath := filepath.Join(n.walDir(), wals[i])
l, err := fileutil.NewLock(walPath)
if err != nil {
continue
}
err = l.TryLock()
l, err := fileutil.TryLockFile(walPath, os.O_WRONLY, fileutil.PrivateFileMode)
if err != nil {
return fmt.Errorf("could not lock old WAL file %s for removal: %v", wals[i], err)
}
err = os.Remove(walPath)
l.Unlock()
l.Destroy()
l.Close()
if err != nil {
return fmt.Errorf("error removing old WAL file %s: %v", wals[i], err)
}
Expand Down
39 changes: 25 additions & 14 deletions manager/state/raft/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io/ioutil"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -248,27 +249,24 @@ func TestRaftSnapshotRestart(t *testing.T) {
}

func TestGCWAL(t *testing.T) {
if testing.Short() {
t.Skip("TestGCWAL skipped with -short because it's resource-intensive")
}
t.Parallel()

// Additional log entries from cluster setup, leader election
extraLogEntries := 5
// Number of large entries to propose
proposals := 47
proposals := 8

// Bring up a 3 node cluster
nodes, clockSource := raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0})

for i := 0; i != proposals; i++ {
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
_, err := proposeLargeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
assert.NoError(t, err, "failed to propose value")
}

time.Sleep(250 * time.Millisecond)

// Snapshot should have been triggered just before the WAL rotated, so
// Snapshot should have been triggered just as the WAL rotated, so
// both WAL files should be preserved
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap"))
Expand All @@ -283,8 +281,14 @@ func TestGCWAL(t *testing.T) {
if err != nil {
return err
}
if len(dirents) != 2 {
return fmt.Errorf("expected 2 WAL files, found %d", len(dirents))
var walCount int
for _, f := range dirents {
if strings.HasSuffix(f.Name(), ".wal") {
walCount++
}
}
if walCount != 2 {
return fmt.Errorf("expected 2 WAL files, found %d", walCount)
}
return nil
}))
Expand All @@ -297,7 +301,7 @@ func TestGCWAL(t *testing.T) {
defer raftutils.TeardownCluster(t, nodes)

for i := 0; i != proposals; i++ {
_, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
_, err := proposeLargeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i))
assert.NoError(t, err, "failed to propose value")
}

Expand All @@ -309,6 +313,7 @@ func TestGCWAL(t *testing.T) {
if err != nil {
return err
}

if len(dirents) != 1 {
return fmt.Errorf("expected 1 snapshot, found %d", len(dirents))
}
Expand All @@ -317,8 +322,14 @@ func TestGCWAL(t *testing.T) {
if err != nil {
return err
}
if len(dirents) != 1 {
return fmt.Errorf("expected 1 WAL file, found %d", len(dirents))
var walCount int
for _, f := range dirents {
if strings.HasSuffix(f.Name(), ".wal") {
walCount++
}
}
if walCount != 1 {
return fmt.Errorf("expected 1 WAL file, found %d", walCount)
}
return nil
}))
Expand Down Expand Up @@ -380,13 +391,13 @@ func TestGCWAL(t *testing.T) {
}
}

// proposeHugeValue proposes a 1.4MB value to a raft test cluster
func proposeHugeValue(t *testing.T, raftNode *raftutils.TestNode, time time.Duration, nodeID ...string) (*api.Node, error) {
// proposeLargeValue proposes a 10kb value to a raft test cluster
func proposeLargeValue(t *testing.T, raftNode *raftutils.TestNode, time time.Duration, nodeID ...string) (*api.Node, error) {
nodeIDStr := "id1"
if len(nodeID) != 0 {
nodeIDStr = nodeID[0]
}
a := make([]byte, 1400000)
a := make([]byte, 10000)
for i := 0; i != len(a); i++ {
a[i] = 'a'
}
Expand Down
4 changes: 2 additions & 2 deletions manager/state/raft/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func PollFuncWithTimeout(clockSource *fakeclock.FakeClock, f func() error, timeo
return nil
}
timer := time.After(timeout)
for {
if clockSource != nil {
for i := 0; ; i++ {
if i%5 == 0 && clockSource != nil {
clockSource.Increment(time.Second)
}
err := f()
Expand Down
6 changes: 4 additions & 2 deletions manager/testcluster/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func TestManager(t *testing.T) {

m.Stop(ctx)

// After stopping we should NOT receive an error from ListenAndServe.
assert.NoError(t, <-done)
// After stopping we should MAY receive an error from ListenAndServe if
// all this happened before WaitForLeader completed, so don't check the
// error.
<-done
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

12 changes: 0 additions & 12 deletions vendor/github.com/cloudfoundry-incubator/candiedyaml/.travis.yml

This file was deleted.

57 changes: 0 additions & 57 deletions vendor/github.com/cloudfoundry-incubator/candiedyaml/README.md

This file was deleted.

Loading

0 comments on commit 62e73cd

Please sign in to comment.