Skip to content

Commit

Permalink
Fix flaky tests
Browse files Browse the repository at this point in the history
It is likely that a large portion of test flakiness, especially in CI,
comes from the fact that swarmkit components under test are started in
goroutines, but those goroutines never have an opportunity to run. This
adds code ensuring those goroutines are scheduled and run, which should
hopefully solve many inexplicably flaky tests.

Signed-off-by: Drew Erny <drew.erny@docker.com>
  • Loading branch information
dperny committed Jan 16, 2019
1 parent 47b1cd4 commit 30e7367
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 38 deletions.
36 changes: 18 additions & 18 deletions manager/orchestrator/global/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orc
ctx := context.Background()
// Start the global orchestrator.
global := NewGlobalOrchestrator(store)
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, global.Run(ctx))
}()
})

addService(t, store, service1)
testutils.Expect(t, watch, api.EventCreateService{})
Expand Down Expand Up @@ -581,9 +581,9 @@ func TestInitializationRejectedTasks(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -644,9 +644,9 @@ func TestInitializationFailedTasks(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -736,9 +736,9 @@ func TestInitializationExtraTask(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.True(t, observedTask1.ID == "task1" || observedTask1.ID == "task2")
Expand Down Expand Up @@ -816,9 +816,9 @@ func TestInitializationMultipleServices(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

// Nothing should happen because both tasks are up to date.
select {
Expand Down Expand Up @@ -957,9 +957,9 @@ func TestInitializationTaskWithoutService(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedTask1.ID, "task2")
Expand Down Expand Up @@ -1015,9 +1015,9 @@ func TestInitializationTaskOnDrainedNode(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -1087,9 +1087,9 @@ func TestInitializationTaskOnNonexistentNode(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -1256,9 +1256,9 @@ func TestInitializationRestartHistory(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

// Fail the running task
s.Update(func(tx store.Tx) error {
Expand Down
4 changes: 2 additions & 2 deletions manager/orchestrator/replicated/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask := testutils.WatchTaskCreate(t, watchCreate)
assert.Equal(t, observedTask.Status.State, api.TaskStateNew)
Expand Down
34 changes: 16 additions & 18 deletions manager/orchestrator/taskreaper/task_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestTaskReaperInit(t *testing.T) {
reaper := New(s)

// Now, start the reaper
go reaper.Run(ctx)
testutils.EnsureRuns(func() { reaper.Run(ctx) })

// And then stop the reaper. This will cause the reaper to run through its
// whole init phase and then immediately enter the loop body, get the stop
Expand Down Expand Up @@ -259,10 +259,10 @@ func TestTaskHistory(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
Expand Down Expand Up @@ -394,10 +394,8 @@ func TestTaskStateRemoveOnScaledown(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) })
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
Expand Down Expand Up @@ -526,10 +524,10 @@ func TestTaskStateRemoveOnServiceRemoval(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
Expand Down Expand Up @@ -666,10 +664,10 @@ func TestServiceRemoveDeadTasks(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator and the reaper.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, api.TaskStateNew, observedTask1.Status.State)
Expand Down Expand Up @@ -846,7 +844,7 @@ func TestTaskReaperBatching(t *testing.T) {
taskReaper := New(s)
taskReaper.tickSignal = make(chan struct{}, 1)
defer taskReaper.Stop()
go taskReaper.Run(ctx)
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

// None of the tasks we've created are eligible for deletion. We should
// see no task delete events. Wait for a tick signal, or 500ms to pass, to
Expand Down Expand Up @@ -1013,10 +1011,10 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, api.TaskStateNew, observedTask1.Status.State)
Expand Down
16 changes: 16 additions & 0 deletions manager/orchestrator/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ import (
"github.com/stretchr/testify/assert"
)

// EnsureRuns takes a closure and runs it in a goroutine, blocking until the
// goroutine has had an opportunity to run. It returns a channel which will be
// closed when the provided closure exits.
func EnsureRuns(closure func()) <-chan struct{} {
started := make(chan struct{})
stopped := make(chan struct{})
go func() {
close(started)
closure()
close(stopped)
}()

<-started
return stopped
}

// WatchTaskCreate waits for a task to be created.
func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task {
for {
Expand Down

0 comments on commit 30e7367

Please sign in to comment.