Skip to content

Commit

Permalink
Backoff failed workflow task (#2548)
Browse files Browse the repository at this point in the history
* Backoff failed workflow task
  • Loading branch information
yiminc committed Feb 25, 2022
1 parent a9a22a1 commit 33c6a45
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 12 deletions.
84 changes: 73 additions & 11 deletions host/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,18 +585,16 @@ func (s *clientIntegrationSuite) Test_ActivityTimeouts() {
//s.printHistory(id, workflowRun.GetRunID())
}

// This test simulates workflow try to complete itself while there is buffered event.
// Event sequence:
// 1st WorkflowTask runs a local activity.
// While local activity is running, a signal is received by server.
// After signal is received, local activity completed, and workflow drains signal chan (no signal yet) and complete workflow.
// Server failed the complete request because there is unhandled signal.
// Server rescheduled a new workflow task.
// Workflow runs the local activity again and drain the signal chan (with one signal) and complete workflow.
// Server complete workflow as requested.
func (s *clientIntegrationSuite) Test_UnhandledCommandAndNewTask() {
/*
Event sequence:
1st WorkflowTask runs a local activity.
While local activity is running, a signal is received by server.
After signal is received, local activity completed, and workflow drains signal chan (no signal yet) and complete workflow.
Server failed the complete request because there is unhandled signal.
Server rescheduled a new workflow task.
Workflow runs the local activity again and drain the signal chan (with one signal) and complete workflow.
Server complete workflow as requested.
*/

sigReadyToSendChan := make(chan struct{}, 1)
sigSendDoneChan := make(chan struct{})
localActivityFn := func(ctx context.Context) error {
Expand Down Expand Up @@ -694,6 +692,70 @@ func (s *clientIntegrationSuite) Test_UnhandledCommandAndNewTask() {
s.assertHistory(id, workflowRun.GetRunID(), expectedHistory)
}

// This test simulates workflow generate command with invalid attributes.
// Server is expected to fail the workflow task and schedule a retry immediately for first attempt,
// but if workflow task keeps failing, server will drop the task and wait for timeout to schedule additional retries.
// This is the same behavior as the SDK used to do, but now we would do on server.
func (s *clientIntegrationSuite) Test_InvalidCommandAttribute() {
activityFn := func(ctx context.Context) error {
return nil
}

var calledTime []time.Time
workflowFn := func(ctx workflow.Context) error {
calledTime = append(calledTime, time.Now().UTC())
ao := workflow.ActivityOptions{} // invalid activity option without StartToClose timeout
ctx = workflow.WithActivityOptions(ctx, ao)

err := workflow.ExecuteActivity(ctx, activityFn).Get(ctx, nil)
return err
}

s.worker.RegisterWorkflow(workflowFn)
s.worker.RegisterActivity(activityFn)

id := "integration-test-invalid-command-attributes"
workflowOptions := sdkclient.StartWorkflowOptions{
ID: id,
TaskQueue: s.taskQueue,
// With 3s TaskTimeout and 5s RunTimeout, we expect to see total of 3 attempts.
// First attempt follow by immediate retry follow by timeout and 3rd attempt after WorkflowTaskTimeout.
WorkflowTaskTimeout: 3 * time.Second,
WorkflowRunTimeout: 5 * time.Second,
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn)
if err != nil {
s.Logger.Fatal("Start workflow failed with err", tag.Error(err))
}

s.NotNil(workflowRun)
s.True(workflowRun.GetRunID() != "")

// wait until workflow close (it will be timeout)
err = workflowRun.Get(ctx, nil)
s.Error(err)
s.Contains(err.Error(), "timeout")

// verify event sequence
expectedHistory := []enumspb.EventType{
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT,
}
s.assertHistory(id, workflowRun.GetRunID(), expectedHistory)

// assert workflow task retried 3 times
s.Equal(3, len(calledTime))

s.True(calledTime[1].Sub(calledTime[0]) < time.Second) // retry immediately
s.True(calledTime[2].Sub(calledTime[1]) > time.Second*3) // retry after WorkflowTaskTimeout
}

func (s *clientIntegrationSuite) Test_BufferedQuery() {
localActivityFn := func(ctx context.Context) error {
time.Sleep(5 * time.Second) // use local activity sleep to block workflow task to force query to be buffered
Expand Down
4 changes: 4 additions & 0 deletions service/history/workflowTaskHandlerCallbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,10 @@ Update_History_Loop:
tag.WorkflowID(token.GetWorkflowId()),
tag.WorkflowRunID(token.GetRunId()),
tag.WorkflowNamespaceID(namespaceID.String()))
if currentWorkflowTask.Attempt > 1 {
// drop this workflow task if it keeps failing. This will cause the workflow task to timeout and get retried after timeout.
return nil, serviceerror.NewInvalidArgument(wtFailedCause.Message())
}
msBuilder, err = handler.historyEngine.failWorkflowTask(weContext, scheduleID, startedID, wtFailedCause, request)
if err != nil {
return nil, err
Expand Down

0 comments on commit 33c6a45

Please sign in to comment.