Skip to content

Commit

Permalink
Change the behavior of hanlding a WaitTask timeout
Browse files Browse the repository at this point in the history
Today, when a WaitTask timeout happens, the WaitTask sends the
TimeoutError on the TaskChannel. After receiving the TimeoutError,
`baseRunner.run` terminates immediately by returning the error to its
caller (Applier.Run or Destroyer.Run). The caller then sends the error
onto the EventChannel and terminates.

With this PR, when a WaitTask timeout happens, the WaitTask sends a
WaitType Event including the TimeoutError on the EventChannel, and then
sends an empty TaskResult on the TaskChannel. An empty TaskResult
suggests that the task finished successfully, and therefore
`baseRunner.run` would continue instead of terminate.

The motivation of this change is to make sure that cli-utils only
terminates on fatal errors (such as inventory-related errors, and
ApplyOptions creation errors). A WaitTask timeout may not always mean a
fatal error (it may happen because the StatusPoller has not finished
polling everything, or some but not all the resources have not reached
the desired status), and therefore should not terminate cli-utils.
  • Loading branch information
haiyanmeng committed Oct 29, 2021
1 parent 6f5ee6f commit 87877d2
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 47 deletions.
9 changes: 9 additions & 0 deletions pkg/apply/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
StatusType
PruneType
DeleteType
WaitType
)

// Event is the type of the objects that will be returned through
Expand Down Expand Up @@ -58,6 +59,9 @@ type Event struct {
// DeleteEvent contains information about object that have been
// deleted.
DeleteEvent DeleteEvent

// WaitEvent contains information about any errors encountered in a WaitTask.
WaitEvent WaitEvent
}

type InitEvent struct {
Expand Down Expand Up @@ -85,6 +89,11 @@ type ErrorEvent struct {
Err error
}

type WaitEvent struct {
GroupName string
Error error
}

//go:generate stringer -type=ActionGroupEventType
type ActionGroupEventType int

Expand Down
24 changes: 3 additions & 21 deletions pkg/apply/taskrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,9 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
}
}
// A message on the taskChannel means that the current task
// has either completed or failed. If it has failed, we return
// the error. If the abort flag is true, which means something
// has either completed or failed.
// If it has failed, we return the error.
// If the abort flag is true, which means something
// else has gone wrong and we are waiting for the current task to
// finish, we exit.
// If everything is ok, we fetch and start the next task.
Expand All @@ -238,7 +239,6 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
},
}
if msg.Err != nil {
b.amendTimeoutError(taskContext, msg.Err)
return msg.Err
}
if abort {
Expand All @@ -262,24 +262,6 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
}
}

func (b *baseRunner) amendTimeoutError(taskContext *TaskContext, err error) {
if timeoutErr, ok := err.(*TimeoutError); ok {
var timedOutResources []TimedOutResource
for _, id := range timeoutErr.Identifiers {
result := taskContext.ResourceCache().Get(id)
if timeoutErr.Condition.Meets(result.Status) {
continue
}
timedOutResources = append(timedOutResources, TimedOutResource{
Identifier: id,
Status: result.Status,
Message: result.StatusMessage,
})
}
timeoutErr.TimedOutResources = timedOutResources
}
}

// completeIfWaitTask checks if the current task is a wait task. If so,
// we invoke the complete function to complete it.
func completeIfWaitTask(currentTask Task, taskContext *TaskContext) {
Expand Down
37 changes: 22 additions & 15 deletions pkg/apply/taskrunner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ func TestBaseRunner(t *testing.T) {
statusEventsDelay time.Duration
statusEvents []pollevent.Event
expectedEventTypes []event.Type
expectedError error
expectedTimedOutResources []TimedOutResource
expectedErrorMsg string
expectedTimeoutErrorMsg string
}{
"wait task runs until condition is met": {
tasks: []Task{
Expand Down Expand Up @@ -112,17 +111,19 @@ func TestBaseRunner(t *testing.T) {
},
},
expectedEventTypes: []event.Type{
event.ActionGroupType,
event.StatusType,
event.WaitType,
event.ActionGroupType,
},
expectedError: &TimeoutError{},
expectedTimedOutResources: []TimedOutResource{
{
Identifier: depID,
Status: status.UnknownStatus,
Message: "resource not cached",
},
},
expectedErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent",
expectedTimeoutErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent",
},
"wait task times out eventually (InProgress)": {
tasks: []Task{
Expand All @@ -147,16 +148,19 @@ func TestBaseRunner(t *testing.T) {
},
},
expectedEventTypes: []event.Type{
event.ActionGroupType,
event.StatusType,
event.StatusType,
event.WaitType,
event.ActionGroupType,
},
expectedError: &TimeoutError{},
expectedTimedOutResources: []TimedOutResource{
{
Identifier: depID,
Status: status.InProgressStatus,
},
},
expectedErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent",
expectedTimeoutErrorMsg: "timeout after 2 seconds waiting for 2 resources ([default_cm__ConfigMap default_dep_apps_Deployment]) to reach condition AllCurrent",
},
"tasks run in order": {
tasks: []Task{
Expand Down Expand Up @@ -244,18 +248,13 @@ func TestBaseRunner(t *testing.T) {
close(eventChannel)
wg.Wait()

if tc.expectedError != nil {
assert.IsType(t, tc.expectedError, err)
if timeoutError, ok := err.(*TimeoutError); ok {
assert.ElementsMatch(t, tc.expectedTimedOutResources,
timeoutError.TimedOutResources)
assert.Equal(t, timeoutError.Error(), tc.expectedErrorMsg)
}
return
} else if err != nil {
if err != nil {
t.Errorf("expected no error, but got %v", err)
}

for _, event := range events {
t.Log(event)
}
if want, got := len(tc.expectedEventTypes), len(events); want != got {
t.Errorf("expected %d events, but got %d", want, got)
}
Expand All @@ -265,6 +264,14 @@ func TestBaseRunner(t *testing.T) {
t.Errorf("expected event type %s, but got %s",
want, got)
}
if e.Type == event.WaitType {
err := e.WaitEvent.Error
if timeoutError, ok := err.(*TimeoutError); ok {
assert.ElementsMatch(t, tc.expectedTimedOutResources,
timeoutError.TimedOutResources)
assert.Equal(t, timeoutError.Error(), tc.expectedTimeoutErrorMsg)
}
}
}
})
}
Expand Down
39 changes: 32 additions & 7 deletions pkg/apply/taskrunner/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (w *WaitTask) Start(taskContext *TaskContext) {

// setTimer creates the timer with the timeout value taken from
// the WaitTask struct. Once the timer expires, it will send
// a message on the TaskChannel provided in the taskContext.
// a message on the EventChannel provided in the taskContext.
func (w *WaitTask) setTimer(taskContext *TaskContext) {
timer := time.NewTimer(w.Timeout)
go func() {
Expand All @@ -111,16 +111,23 @@ func (w *WaitTask) setTimer(taskContext *TaskContext) {
// Timeout is cancelled.
<-timer.C
select {
// We only send the taskResult if no one has gotten
// We only send the TimeoutError to the eventChannel if no one has gotten
// to the token first.
case <-w.token:
taskContext.TaskChannel() <- TaskResult{
Err: &TimeoutError{
Identifiers: w.Ids,
Timeout: w.Timeout,
Condition: w.Condition,
err := &TimeoutError{
Identifiers: w.Ids,
Timeout: w.Timeout,
Condition: w.Condition,
}
amendTimeoutError(taskContext, err)
taskContext.EventChannel() <- event.Event{
Type: event.WaitType,
WaitEvent: event.WaitEvent{
GroupName: w.Name(),
Error: err,
},
}
taskContext.TaskChannel() <- TaskResult{}
default:
return
}
Expand All @@ -130,6 +137,24 @@ func (w *WaitTask) setTimer(taskContext *TaskContext) {
}
}

func amendTimeoutError(taskContext *TaskContext, err error) {
if timeoutErr, ok := err.(*TimeoutError); ok {
var timedOutResources []TimedOutResource
for _, id := range timeoutErr.Identifiers {
result := taskContext.ResourceCache().Get(id)
if timeoutErr.Condition.Meets(result.Status) {
continue
}
timedOutResources = append(timedOutResources, TimedOutResource{
Identifier: id,
Status: result.Status,
Message: result.StatusMessage,
})
}
timeoutErr.TimedOutResources = timedOutResources
}
}

// checkCondition checks whether the condition set in the task
// is currently met given the status of resources in the cache.
func (w *WaitTask) checkCondition(taskContext *TaskContext) bool {
Expand Down
16 changes: 12 additions & 4 deletions pkg/apply/taskrunner/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
)

func TestWaitTask_TimeoutTriggered(t *testing.T) {
task := NewWaitTask("wait", object.ObjMetadataSet{}, AllCurrent,
taskName := "wait"
task := NewWaitTask(taskName, object.ObjMetadataSet{}, AllCurrent,
2*time.Second, testutil.NewFakeRESTMapper())

eventChannel := make(chan event.Event)
Expand All @@ -28,9 +29,16 @@ func TestWaitTask_TimeoutTriggered(t *testing.T) {
timer := time.NewTimer(3 * time.Second)

select {
case res := <-taskContext.TaskChannel():
if _, ok := IsTimeoutError(res.Err); !ok {
t.Errorf("expected timeout error, but got %v", res.Err)
case e := <-taskContext.EventChannel():
if e.Type != event.WaitType {
t.Errorf("expected a WaitType event, but got a %v event", e.Type)
}
if e.WaitEvent.GroupName != taskName {
t.Errorf("expected WaitEvent.GroupName = %q, but got %q", taskName, e.WaitEvent.GroupName)
}
err := e.WaitEvent.Error
if _, ok := IsTimeoutError(err); !ok {
t.Errorf("expected timeout error, but got %v", err)
}
return
case <-timer.C:
Expand Down
33 changes: 33 additions & 0 deletions pkg/testutil/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type ExpEvent struct {
StatusEvent *ExpStatusEvent
PruneEvent *ExpPruneEvent
DeleteEvent *ExpDeleteEvent
WaitEvent *ExpWaitEvent
}

type ExpInitEvent struct {
Expand Down Expand Up @@ -67,6 +68,11 @@ type ExpDeleteEvent struct {
Error error
}

type ExpWaitEvent struct {
GroupName string
Error error
}

func VerifyEvents(expEvents []ExpEvent, events []event.Event) error {
if len(expEvents) == 0 && len(events) == 0 {
return nil
Expand Down Expand Up @@ -234,6 +240,24 @@ func isMatch(ee ExpEvent, e event.Event) bool {
return de.Error != nil
}
return de.Error == nil

case event.WaitType:
wee := ee.WaitEvent
if wee == nil {
return true
}
we := e.WaitEvent

if wee.GroupName != "" {
if wee.GroupName != we.GroupName {
return false
}
}

if wee.Error != nil {
return cmp.Equal(wee.Error, we.Error, cmpopts.EquateErrors())
}
return we.Error == nil
}
return true
}
Expand Down Expand Up @@ -317,6 +341,15 @@ func EventToExpEvent(e event.Event) ExpEvent {
Error: e.DeleteEvent.Error,
},
}

case event.WaitType:
return ExpEvent{
EventType: event.WaitType,
WaitEvent: &ExpWaitEvent{
GroupName: e.WaitEvent.GroupName,
Error: e.WaitEvent.Error,
},
}
}
return ExpEvent{}
}
Expand Down

0 comments on commit 87877d2

Please sign in to comment.