Skip to content

Commit

Permalink
Merge pull request #2808 from dperny/fix-flaky-tests
Browse files Browse the repository at this point in the history
Fix flaky tests
  • Loading branch information
dperny authored Jun 20, 2019
2 parents fb584e7 + 06a3566 commit 3df33bc
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 73 deletions.
36 changes: 18 additions & 18 deletions manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orc
ctx := context.Background()
// Start the global orchestrator.
global := NewGlobalOrchestrator(store)
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, global.Run(ctx))
}()
})

addService(t, store, service1)
testutils.Expect(t, watch, api.EventCreateService{})
Expand Down Expand Up @@ -579,9 +579,9 @@ func TestInitializationRejectedTasks(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -642,9 +642,9 @@ func TestInitializationFailedTasks(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -734,9 +734,9 @@ func TestInitializationExtraTask(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.True(t, observedTask1.ID == "task1" || observedTask1.ID == "task2")
Expand Down Expand Up @@ -814,9 +814,9 @@ func TestInitializationMultipleServices(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

// Nothing should happen because both tasks are up to date.
select {
Expand Down Expand Up @@ -955,9 +955,9 @@ func TestInitializationTaskWithoutService(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedTask1.ID, "task2")
Expand Down Expand Up @@ -1013,9 +1013,9 @@ func TestInitializationTaskOnDrainedNode(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -1085,9 +1085,9 @@ func TestInitializationTaskOnNonexistentNode(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -1254,9 +1254,9 @@ func TestInitializationRestartHistory(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

// Fail the running task
s.Update(func(tx store.Tx) error {
Expand Down
72 changes: 38 additions & 34 deletions manager/orchestrator/replicated/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package replicated

import (
"context"
"sync/atomic"
"sync"
"testing"
"time"

Expand All @@ -27,7 +27,7 @@ func TestUpdaterRollback(t *testing.T) {
}

func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_FailureAction, setMonitor bool, useSpecVersion bool) {
// this test should complete within 20 seconds. if not, bail out
// this test should complete within 30 seconds. if not, bail out
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand All @@ -37,14 +37,15 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa

orchestrator := NewReplicatedOrchestrator(s)

// TODO(dperny): these are used with atomic.StoreUint32 and
// atomic.LoadUint32. using atomic primitives is bad practice and easy to
// mess up
// These variables will be used to signal that The Fail Loop should start
// failing these tasks. Once they're closed, The Failing Can Begin.
var (
failImage1 uint32
failImage2 uint32
failMu sync.Mutex
failImage1 bool
)

// create a watch for task creates, which we will use to verify that the
// updater works correctly.
watchCreate, cancelCreate := state.Watch(s.WatchQueue(), api.EventCreateTask{})
defer cancelCreate()

Expand All @@ -54,8 +55,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
// Fail new tasks the updater tries to run
watchUpdate, cancelUpdate := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancelUpdate()
go func() {

// We're gonna call this big chunk here "The Fail Loop". its job is to put
// tasks into a Failed state in certain conditions.
testutils.EnsureRuns(func() {
failedLast := false
// typical go pattern: infinite for loop in a goroutine, exits on
// ctx.Done
for {
var e events.Event
select {
Expand All @@ -67,15 +73,26 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
if task.DesiredState == task.Status.State {
continue
}
if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed && task.Status.State != api.TaskStateRunning {
// This used to have a 3rd clause,
// "&& task.Status.State != api.TaskStateRunning"
// however, this is unneeded. If DesiredState is Running, then
// actual state cannot be Running, because that would get caught
// in the condition about (DesiredState == State)
if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed {
err := s.Update(func(tx store.Tx) error {
task = store.GetTask(tx, task.ID)
// Never fail two image2 tasks in a row, so there's a mix of
// failed and successful tasks for the rollback.
if task.Spec.GetContainer().Image == "image1" && atomic.LoadUint32(&failImage1) == 1 {
// lock mutex governing access to failImage1.
failMu.Lock()
defer failMu.Unlock()
// we should start failing tasks with image1 only after1
if task.Spec.GetContainer().Image == "image1" && failImage1 {
// only fail the task if we can read from failImage1
// (which will only be true if it's closed)
task.Status.State = api.TaskStateFailed
failedLast = true
} else if task.Spec.GetContainer().Image == "image2" && atomic.LoadUint32(&failImage2) == 1 && !failedLast {
} else if task.Spec.GetContainer().Image == "image2" && !failedLast {
// Never fail two image2 tasks in a row, so there's a mix of
// failed and successful tasks for the rollback.
task.Status.State = api.TaskStateFailed
failedLast = true
} else {
Expand All @@ -94,7 +111,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
assert.NoError(t, err)
}
}
}()
})

// Create a service with four replicas specified before the orchestrator
// is started. This should result in two tasks when the orchestrator
Expand Down Expand Up @@ -153,23 +170,9 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa

// Start the orchestrator.
var orchestratorError error
orchestratorDone := make(chan struct{})
// verify that the orchestrator has had a chance to run by blocking the
// main test routine until it has.
orchestratorRan := make(chan struct{})
go func() {
close(orchestratorRan)
// try not to fail the test in go routines. it's racey. instead, save
// the error and check it in a defer
orchestratorDone := testutils.EnsureRuns(func() {
orchestratorError = orchestrator.Run(ctx)
close(orchestratorDone)
}()

select {
case <-orchestratorRan:
case <-ctx.Done():
t.Error("orchestrator did not start before test timed out")
}
})

defer func() {
orchestrator.Stop()
Expand All @@ -196,8 +199,6 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
assert.Equal(t, observedTask.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask.Spec.GetContainer().Image, "image1")

atomic.StoreUint32(&failImage2, 1)

// Start a rolling update
err = s.Update(func(tx store.Tx) error {
s1 := store.GetService(tx, "id1")
Expand Down Expand Up @@ -268,6 +269,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
var e events.Event
select {
case e = <-watchServiceUpdate:
t.Log("service was updated")
case <-ctx.Done():
t.Error("test timed out before watchServiceUpdate provided an event")
return
Expand All @@ -278,10 +280,12 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
}
}

atomic.StoreUint32(&failImage1, 1)

// Repeat the rolling update but this time fail the tasks that the
// rollback creates.
failMu.Lock()
failImage1 = true
failMu.Unlock()

err = s.Update(func(tx store.Tx) error {
s1 := store.GetService(tx, "id1")
require.NotNil(t, s1)
Expand Down
34 changes: 16 additions & 18 deletions manager/orchestrator/taskreaper/task_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestTaskReaperInit(t *testing.T) {
reaper := New(s)

// Now, start the reaper
go reaper.Run(ctx)
testutils.EnsureRuns(func() { reaper.Run(ctx) })

// And then stop the reaper. This will cause the reaper to run through its
// whole init phase and then immediately enter the loop body, get the stop
Expand Down Expand Up @@ -259,10 +259,10 @@ func TestTaskHistory(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
Expand Down Expand Up @@ -394,10 +394,8 @@ func TestTaskStateRemoveOnScaledown(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) })
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
Expand Down Expand Up @@ -526,10 +524,10 @@ func TestTaskStateRemoveOnServiceRemoval(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
Expand Down Expand Up @@ -664,10 +662,10 @@ func TestServiceRemoveDeadTasks(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator and the reaper.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, api.TaskStateNew, observedTask1.Status.State)
Expand Down Expand Up @@ -843,7 +841,7 @@ func TestTaskReaperBatching(t *testing.T) {
taskReaper := New(s)
taskReaper.tickSignal = make(chan struct{}, 1)
defer taskReaper.Stop()
go taskReaper.Run(ctx)
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

// None of the tasks we've created are eligible for deletion. We should
// see no task delete events. Wait for a tick signal, or 500ms to pass, to
Expand Down Expand Up @@ -1010,10 +1008,10 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, api.TaskStateNew, observedTask1.Status.State)
Expand Down
20 changes: 18 additions & 2 deletions manager/orchestrator/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ import (
"github.com/stretchr/testify/assert"
)

// EnsureRuns takes a closure and runs it in a goroutine, blocking until the
// goroutine has had an opportunity to run. It returns a channel which will be
// closed when the provided closure exits.
func EnsureRuns(closure func()) <-chan struct{} {
started := make(chan struct{})
stopped := make(chan struct{})
go func() {
close(started)
closure()
close(stopped)
}()

<-started
return stopped
}

// WatchTaskCreate waits for a task to be created.
func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task {
for {
Expand All @@ -22,7 +38,7 @@ func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task {
if _, ok := event.(api.EventUpdateTask); ok {
assert.FailNow(t, "got EventUpdateTask when expecting EventCreateTask", fmt.Sprint(event))
}
case <-time.After(2 * time.Second):
case <-time.After(3 * time.Second):
assert.FailNow(t, "no task creation")
}
}
Expand All @@ -39,7 +55,7 @@ func WatchTaskUpdate(t *testing.T, watch chan events.Event) *api.Task {
if _, ok := event.(api.EventCreateTask); ok {
assert.FailNow(t, "got EventCreateTask when expecting EventUpdateTask", fmt.Sprint(event))
}
case <-time.After(time.Second):
case <-time.After(2 * time.Second):
assert.FailNow(t, "no task update")
}
}
Expand Down
Loading

0 comments on commit 3df33bc

Please sign in to comment.