diff --git a/service/history/nDCActivityReplicator.go b/service/history/nDCActivityReplicator.go index 5d5209da682..92042ac9996 100644 --- a/service/history/nDCActivityReplicator.go +++ b/service/history/nDCActivityReplicator.go @@ -30,13 +30,16 @@ import ( "context" "time" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/service/history/tasks" + commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" - enumsspb "go.temporal.io/server/api/enums/v1" historyspb "go.temporal.io/server/api/history/v1" "go.temporal.io/server/api/historyservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" @@ -90,10 +93,10 @@ func (r *nDCActivityReplicatorImpl) SyncActivity( ) (retError error) { // sync activity info will only be sent from active side, when - // 1. activity has retry policy and activity got started - // 2. activity heart beat + // 1. activity retry + // 2. activity start + // 3. activity heart beat // no sync activity task will be sent when active side fail / timeout activity, - // since standby side does not have activity retry timer namespaceID := namespace.ID(request.GetNamespaceId()) execution := commonpb.WorkflowExecution{ WorkflowId: request.WorkflowId, @@ -154,6 +157,22 @@ func (r *nDCActivityReplicatorImpl) SyncActivity( return nil } + // sync activity with empty started ID means activity retry + eventTime := timestamp.TimeValue(request.GetScheduledTime()) + if request.StartedId == common.EmptyEventID && request.Attempt > activityInfo.GetAttempt() { + mutableState.AddTasks(&tasks.ActivityRetryTimerTask{ + WorkflowKey: definition.WorkflowKey{ + NamespaceID: request.GetNamespaceId(), + WorkflowID: request.GetWorkflowId(), + RunID: request.GetRunId(), + }, + VisibilityTimestamp: eventTime, + EventID: request.GetScheduledId(), + Version: request.GetVersion(), + Attempt: request.GetAttempt(), + }) + } + refreshTask := r.testRefreshActivityTimerTaskMask( request.GetVersion(), request.GetAttempt(), @@ -165,7 +184,6 @@ func (r *nDCActivityReplicatorImpl) SyncActivity( } // see whether we need to refresh the activity timer - eventTime := timestamp.TimeValue(request.GetScheduledTime()) startedTime := timestamp.TimeValue(request.GetStartedTime()) lastHeartbeatTime := timestamp.TimeValue(request.GetLastHeartbeatTime()) if eventTime.Before(startedTime) { diff --git a/service/history/nDCActivityReplicator_test.go b/service/history/nDCActivityReplicator_test.go index 38babdfa3ba..7d89fda0064 100644 --- a/service/history/nDCActivityReplicator_test.go +++ b/service/history/nDCActivityReplicator_test.go @@ -820,15 +820,18 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityFound_Zombie() { weContext.EXPECT().LoadWorkflowExecution(gomock.Any()).Return(s.mockMutableState, nil) weContext.EXPECT().Lock(gomock.Any(), workflow.CallerTypeAPI).Return(nil) weContext.EXPECT().Unlock(workflow.CallerTypeAPI) + _, err := s.historyCache.PutIfNotExist(key, weContext) s.NoError(err) + now := time.Now() request := &historyservice.SyncActivityRequest{ NamespaceId: namespaceID.String(), WorkflowId: workflowID, RunId: runID, Version: version, ScheduledId: scheduleID, + ScheduledTime: &now, VersionHistory: incomingVersionHistory, } @@ -843,7 +846,6 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityFound_Zombie() { }, true) s.mockMutableState.EXPECT().ReplicateActivityInfo(request, false).Return(nil) s.mockMutableState.EXPECT().GetPendingActivityInfos().Return(map[int64]*persistencespb.ActivityInfo{}) - s.mockClusterMetadata.EXPECT().IsVersionFromSameCluster(version, version).Return(true) weContext.EXPECT().UpdateWorkflowExecutionWithNew( @@ -914,12 +916,14 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityFound_NonZombie() { _, err := s.historyCache.PutIfNotExist(key, weContext) s.NoError(err) + now := time.Now() request := &historyservice.SyncActivityRequest{ NamespaceId: namespaceID.String(), WorkflowId: workflowID, RunId: runID, Version: version, ScheduledId: scheduleID, + ScheduledTime: &now, VersionHistory: incomingVersionHistory, } diff --git a/service/history/nDCStandbyTaskUtil.go b/service/history/nDCStandbyTaskUtil.go index c0bd1b7c9a1..d18b1cc4c2c 100644 --- a/service/history/nDCStandbyTaskUtil.go +++ b/service/history/nDCStandbyTaskUtil.go @@ -99,6 +99,8 @@ type ( } pushActivityTaskToMatchingInfo struct { + taskQueue string + namespaceID string activityTaskScheduleToStartTimeout time.Duration } @@ -127,6 +129,19 @@ func newPushActivityToMatchingInfo( } } +func newActivityRetryTimerToMatchingInfo( + taskQueue string, + namespaceID string, + activityScheduleToStartTimeout time.Duration, +) *pushActivityTaskToMatchingInfo { + + return &pushActivityTaskToMatchingInfo{ + taskQueue: taskQueue, + namespaceID: namespaceID, + activityTaskScheduleToStartTimeout: activityScheduleToStartTimeout, + } +} + func newPushWorkflowTaskToMatchingInfo( workflowTaskScheduleToStartTimeout int64, taskqueue taskqueuepb.TaskQueue, diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index adf31ec7bf8..0ad79171522 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -378,6 +378,7 @@ func (t *timerQueueProcessorImpl) handleClusterMetadataUpdate( t.shard, t.workflowCache, t.workflowDeleteManager, + t.matchingClient, clusterName, t.taskAllocator, nDCHistoryResender, diff --git a/service/history/timerQueueStandbyProcessor.go b/service/history/timerQueueStandbyProcessor.go index 9eda52e2d93..09a11df6292 100644 --- a/service/history/timerQueueStandbyProcessor.go +++ b/service/history/timerQueueStandbyProcessor.go @@ -28,6 +28,8 @@ import ( "context" "time" + "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -59,6 +61,7 @@ func newTimerQueueStandbyProcessor( shard shard.Context, workflowCache workflow.Cache, workflowDeleteManager workflow.DeleteManager, + matchingClient matchingservice.MatchingServiceClient, clusterName string, taskAllocator taskAllocator, nDCHistoryResender xdc.NDCHistoryResender, @@ -104,6 +107,7 @@ func newTimerQueueStandbyProcessor( workflowCache, workflowDeleteManager, nDCHistoryResender, + matchingClient, logger, clusterName, shard.GetConfig(), diff --git a/service/history/timerQueueStandbyTaskExecutor.go b/service/history/timerQueueStandbyTaskExecutor.go index 695cfd531f5..5a2ee98de26 100644 --- a/service/history/timerQueueStandbyTaskExecutor.go +++ b/service/history/timerQueueStandbyTaskExecutor.go @@ -29,10 +29,12 @@ import ( "fmt" "time" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" - + taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/server/api/adminservice/v1" + "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/log" @@ -53,6 +55,7 @@ type ( clusterName string adminClient adminservice.AdminServiceClient + matchingClient matchingservice.MatchingServiceClient nDCHistoryResender xdc.NDCHistoryResender } ) @@ -62,6 +65,7 @@ func newTimerQueueStandbyTaskExecutor( workflowCache workflow.Cache, workflowDeleteManager workflow.DeleteManager, nDCHistoryResender xdc.NDCHistoryResender, + matchingClient matchingservice.MatchingServiceClient, logger log.Logger, clusterName string, config *configs.Config, @@ -77,6 +81,7 @@ func newTimerQueueStandbyTaskExecutor( clusterName: clusterName, adminClient: shard.GetRemoteAdminClient(clusterName), nDCHistoryResender: nDCHistoryResender, + matchingClient: matchingClient, } } @@ -108,9 +113,10 @@ func (t *timerQueueStandbyTaskExecutor) execute( } return t.executeWorkflowBackoffTimerTask(ctx, task) case *tasks.ActivityRetryTimerTask: - // retry backoff timer should not get created on passive cluster - // TODO: add error logs - return nil + if !shouldProcessTask { + return nil + } + return t.executeActivityRetryTimerTask(ctx, task) case *tasks.WorkflowTimeoutTask: return t.executeWorkflowTimeoutTask(ctx, task) case *tasks.DeleteHistoryEventTask: @@ -205,7 +211,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( return getHistoryResendInfo(mutableState) } // since the activity timer are already sorted, so if there is one timer which will not expired - // all activity timer after this timer will not expired + // all activity timer after this timer will not expire break Loop //nolint:staticcheck } @@ -272,6 +278,49 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( ) } +func (t *timerQueueStandbyTaskExecutor) executeActivityRetryTimerTask( + ctx context.Context, + task *tasks.ActivityRetryTimerTask, +) (retError error) { + + actionFn := func(context workflow.Context, mutableState workflow.MutableState) (interface{}, error) { + + activityInfo, ok := mutableState.GetActivityInfo(task.EventID) //activity schedule ID + if !ok { + return nil, nil + } + + ok = VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), activityInfo.Version, task.Version, task) + if !ok { + return nil, nil + } + + if activityInfo.Attempt > task.Attempt { + return nil, nil + } + + if activityInfo.StartedId != common.EmptyEventID { + return nil, nil + } + + return newActivityRetryTimerToMatchingInfo(activityInfo.TaskQueue, activityInfo.NamespaceId, *activityInfo.ScheduleToStartTimeout), nil + } + + return t.processTimer( + ctx, + task, + actionFn, + getStandbyPostActionFn( + task, + t.getCurrentTime, + t.config.StandbyTaskMissingEventsResendDelay(), + t.config.StandbyTaskMissingEventsDiscardDelay(), + t.pushActivity, + t.pushActivity, + ), + ) +} + func (t *timerQueueStandbyTaskExecutor) executeWorkflowTaskTimeoutTask( ctx context.Context, timerTask *tasks.WorkflowTaskTimeoutTask, @@ -518,6 +567,39 @@ func (t *timerQueueStandbyTaskExecutor) fetchHistoryFromRemote( return consts.ErrTaskRetry } +func (t *timerQueueStandbyTaskExecutor) pushActivity( + task tasks.Task, + postActionInfo interface{}, + logger log.Logger, +) error { + + if postActionInfo == nil { + return nil + } + + pushActivityInfo := postActionInfo.(*pushActivityTaskToMatchingInfo) + activityScheduleToStartTimeout := &pushActivityInfo.activityTaskScheduleToStartTimeout + activityTask := task.(*tasks.ActivityRetryTimerTask) + ctx, cancel := context.WithTimeout(context.Background(), transferActiveTaskDefaultTimeout) + defer cancel() + + _, err := t.matchingClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{ + NamespaceId: pushActivityInfo.namespaceID, + SourceNamespaceId: activityTask.NamespaceID, + Execution: &commonpb.WorkflowExecution{ + WorkflowId: activityTask.WorkflowID, + RunId: activityTask.RunID, + }, + TaskQueue: &taskqueuepb.TaskQueue{ + Name: pushActivityInfo.taskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + ScheduleId: activityTask.EventID, + ScheduleToStartTimeout: activityScheduleToStartTimeout, + }) + return err +} + func (t *timerQueueStandbyTaskExecutor) getCurrentTime() time.Time { return t.shard.GetCurrentTime(t.clusterName) } diff --git a/service/history/timerQueueStandbyTaskExecutor_test.go b/service/history/timerQueueStandbyTaskExecutor_test.go index 2c9a95f5255..171bc740ecd 100644 --- a/service/history/timerQueueStandbyTaskExecutor_test.go +++ b/service/history/timerQueueStandbyTaskExecutor_test.go @@ -37,11 +37,12 @@ import ( enumspb "go.temporal.io/api/enums/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" - "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/api/adminservicemock/v1" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/api/matchingservicemock/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/clock" @@ -78,6 +79,7 @@ type ( mockAdminClient *adminservicemock.MockAdminServiceClient mockNDCHistoryResender *xdc.MockNDCHistoryResender mockDeleteManager *workflow.MockDeleteManager + mockMatchingClient *matchingservicemock.MockMatchingServiceClient logger log.Logger namespaceID namespace.ID @@ -152,6 +154,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() { s.mockExecutionMgr = s.mockShard.Resource.ExecutionMgr s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata s.mockAdminClient = s.mockShard.Resource.RemoteAdminClient + s.mockMatchingClient = s.mockShard.Resource.MatchingClient s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() @@ -184,6 +187,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() { h.historyCache, s.mockDeleteManager, s.mockNDCHistoryResender, + s.mockMatchingClient, s.logger, s.clusterName, config, @@ -1201,7 +1205,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessRetryTimeout() { taskQueueName := "some random task queue" mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId()) - _, err := mutableState.AddWorkflowExecutionStartedEvent( + startEvent, err := mutableState.AddWorkflowExecutionStartedEvent( execution, &historyservice.StartWorkflowExecutionRequest{ Attempt: 1, @@ -1215,7 +1219,8 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessRetryTimeout() { }, ) s.Nil(err) - + persistenceMutableState := s.createPersistenceMutableState(mutableState, startEvent.GetEventId(), startEvent.GetVersion()) + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil).AnyTimes() timerTask := &tasks.ActivityRetryTimerTask{ WorkflowKey: definition.NewWorkflowKey( s.namespaceID.String(), @@ -1234,6 +1239,256 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessRetryTimeout() { s.Nil(err) } +func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_Noop() { + + execution := commonpb.WorkflowExecution{ + WorkflowId: "some random workflow ID", + RunId: uuid.New(), + } + workflowType := "some random workflow type" + taskQueueName := "some random task queue" + + mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId()) + _, err := mutableState.AddWorkflowExecutionStartedEvent( + execution, + &historyservice.StartWorkflowExecutionRequest{ + Attempt: 1, + NamespaceId: s.namespaceID.String(), + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName}, + WorkflowRunTimeout: timestamp.DurationPtr(200 * time.Second), + WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second), + }, + }, + ) + s.Nil(err) + + di := addWorkflowTaskScheduledEvent(mutableState) + event := addWorkflowTaskStartedEvent(mutableState, di.ScheduleID, taskQueueName, uuid.New()) + di.StartedID = event.GetEventId() + event = addWorkflowTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, "some random identity") + + identity := "identity" + taskqueue := "taskqueue" + activityID := "activity" + activityType := "activity type" + timerTimeout := 2 * time.Second + scheduledEvent, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskqueue, nil, + timerTimeout, timerTimeout, timerTimeout, timerTimeout) + startedEvent := addActivityTaskStartedEvent(mutableState, scheduledEvent.GetEventId(), identity) + + timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) + mutableState.InsertTasks[tasks.CategoryTimer] = nil + modified, err := timerSequence.CreateNextActivityTimer() + s.NoError(err) + s.True(modified) + task := mutableState.InsertTasks[tasks.CategoryTimer][0] + + // Flush buffered events so real IDs get assigned + mutableState.FlushBufferedEvents() + + persistenceMutableState := s.createPersistenceMutableState(mutableState, startedEvent.GetEventId(), startedEvent.GetVersion()) + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + s.mockShard.SetCurrentTime(s.clusterName, s.now) + + timerTask := &tasks.ActivityRetryTimerTask{ + WorkflowKey: definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ), + Attempt: 2, + Version: s.version, + TaskID: int64(100), + VisibilityTimestamp: task.(*tasks.ActivityTimeoutTask).VisibilityTimestamp, + EventID: scheduledEvent.GetEventId(), + } + err = s.timerQueueStandbyTaskExecutor.execute(context.Background(), timerTask, true) + s.Nil(err) + + timerTask = &tasks.ActivityRetryTimerTask{ + WorkflowKey: definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ), + Attempt: 2, + Version: s.version - 1, + TaskID: int64(100), + VisibilityTimestamp: task.(*tasks.ActivityTimeoutTask).VisibilityTimestamp, + EventID: scheduledEvent.GetEventId(), + } + err = s.timerQueueStandbyTaskExecutor.execute(context.Background(), timerTask, true) + s.Nil(err) + + timerTask = &tasks.ActivityRetryTimerTask{ + WorkflowKey: definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ), + Attempt: 0, + Version: s.version, + TaskID: int64(100), + VisibilityTimestamp: task.(*tasks.ActivityTimeoutTask).VisibilityTimestamp, + EventID: scheduledEvent.GetEventId(), + } + err = s.timerQueueStandbyTaskExecutor.execute(context.Background(), timerTask, true) + s.Nil(err) +} + +func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_ActivityCompleted() { + + execution := commonpb.WorkflowExecution{ + WorkflowId: "some random workflow ID", + RunId: uuid.New(), + } + workflowType := "some random workflow type" + taskQueueName := "some random task queue" + + mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId()) + _, err := mutableState.AddWorkflowExecutionStartedEvent( + execution, + &historyservice.StartWorkflowExecutionRequest{ + Attempt: 1, + NamespaceId: s.namespaceID.String(), + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName}, + WorkflowRunTimeout: timestamp.DurationPtr(200 * time.Second), + WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second), + }, + }, + ) + s.Nil(err) + + di := addWorkflowTaskScheduledEvent(mutableState) + event := addWorkflowTaskStartedEvent(mutableState, di.ScheduleID, taskQueueName, uuid.New()) + di.StartedID = event.GetEventId() + event = addWorkflowTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, "some random identity") + + identity := "identity" + taskqueue := "taskqueue" + activityID := "activity" + activityType := "activity type" + timerTimeout := 2 * time.Second + scheduledEvent, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskqueue, nil, + timerTimeout, timerTimeout, timerTimeout, timerTimeout) + startedEvent := addActivityTaskStartedEvent(mutableState, scheduledEvent.GetEventId(), identity) + + timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) + mutableState.InsertTasks[tasks.CategoryTimer] = nil + modified, err := timerSequence.CreateNextActivityTimer() + s.NoError(err) + s.True(modified) + task := mutableState.InsertTasks[tasks.CategoryTimer][0] + + completeEvent := addActivityTaskCompletedEvent(mutableState, scheduledEvent.GetEventId(), startedEvent.GetEventId(), nil, identity) + // Flush buffered events so real IDs get assigned + mutableState.FlushBufferedEvents() + + persistenceMutableState := s.createPersistenceMutableState(mutableState, completeEvent.GetEventId(), completeEvent.GetVersion()) + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + + s.mockShard.SetCurrentTime(s.clusterName, s.now) + timerTask := &tasks.ActivityRetryTimerTask{ + WorkflowKey: definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ), + Attempt: 2, + Version: s.version, + TaskID: int64(100), + VisibilityTimestamp: task.(*tasks.ActivityTimeoutTask).VisibilityTimestamp, + EventID: scheduledEvent.GetEventId(), + } + err = s.timerQueueStandbyTaskExecutor.execute(context.Background(), timerTask, true) + s.Nil(err) +} + +func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_PushToMatching() { + + execution := commonpb.WorkflowExecution{ + WorkflowId: "some random workflow ID", + RunId: uuid.New(), + } + workflowType := "some random workflow type" + taskQueueName := "some random task queue" + + mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetRunId()) + _, err := mutableState.AddWorkflowExecutionStartedEvent( + execution, + &historyservice.StartWorkflowExecutionRequest{ + Attempt: 1, + NamespaceId: s.namespaceID.String(), + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName}, + WorkflowRunTimeout: timestamp.DurationPtr(200 * time.Second), + WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second), + }, + }, + ) + s.Nil(err) + + di := addWorkflowTaskScheduledEvent(mutableState) + event := addWorkflowTaskStartedEvent(mutableState, di.ScheduleID, taskQueueName, uuid.New()) + di.StartedID = event.GetEventId() + event = addWorkflowTaskCompletedEvent(mutableState, di.ScheduleID, di.StartedID, "some random identity") + + taskqueue := "taskqueue" + activityID := "activity" + activityType := "activity type" + timerTimeout := 2 * time.Second + scheduledEvent, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskqueue, nil, + timerTimeout, timerTimeout, timerTimeout, timerTimeout) + + timerSequence := workflow.NewTimerSequence(s.timeSource, mutableState) + mutableState.InsertTasks[tasks.CategoryTimer] = nil + modified, err := timerSequence.CreateNextActivityTimer() + s.NoError(err) + s.True(modified) + task := mutableState.InsertTasks[tasks.CategoryTimer][0] + + // Flush buffered events so real IDs get assigned + mutableState.FlushBufferedEvents() + + persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion()) + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration)) + s.mockMatchingClient.EXPECT().AddActivityTask( + gomock.Any(), + &matchingservice.AddActivityTaskRequest{ + NamespaceId: s.namespaceID.String(), + SourceNamespaceId: s.namespaceID.String(), + Execution: &execution, + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskqueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + ScheduleId: scheduledEvent.EventId, + ScheduleToStartTimeout: &timerTimeout, + }, + gomock.Any(), + ).Return(&matchingservice.AddActivityTaskResponse{}, nil) + timerTask := &tasks.ActivityRetryTimerTask{ + WorkflowKey: definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ), + Attempt: 2, + Version: s.version, + TaskID: int64(100), + VisibilityTimestamp: task.(*tasks.ActivityTimeoutTask).VisibilityTimestamp, + EventID: scheduledEvent.GetEventId(), + } + err = s.timerQueueStandbyTaskExecutor.execute(context.Background(), timerTask, true) + s.Nil(err) +} + func (s *timerQueueStandbyTaskExecutorSuite) createPersistenceMutableState( ms workflow.MutableState, lastEventID int64, diff --git a/service/history/transferQueueTaskExecutorBase.go b/service/history/transferQueueTaskExecutorBase.go index 5f40aca3847..9d1e89f0ae4 100644 --- a/service/history/transferQueueTaskExecutorBase.go +++ b/service/history/transferQueueTaskExecutorBase.go @@ -33,7 +33,6 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/server/api/matchingservice/v1" - m "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" @@ -108,7 +107,7 @@ func (t *transferQueueTaskExecutorBase) pushActivity( ctx, cancel := context.WithTimeout(context.Background(), transferActiveTaskDefaultTimeout) defer cancel() - _, err := t.matchingClient.AddActivityTask(ctx, &m.AddActivityTaskRequest{ + _, err := t.matchingClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{ NamespaceId: task.TargetNamespaceID, SourceNamespaceId: task.NamespaceID, Execution: &commonpb.WorkflowExecution{ @@ -135,7 +134,7 @@ func (t *transferQueueTaskExecutorBase) pushWorkflowTask( ctx, cancel := context.WithTimeout(context.Background(), transferActiveTaskDefaultTimeout) defer cancel() - _, err := t.matchingClient.AddWorkflowTask(ctx, &m.AddWorkflowTaskRequest{ + _, err := t.matchingClient.AddWorkflowTask(ctx, &matchingservice.AddWorkflowTaskRequest{ NamespaceId: task.NamespaceID, Execution: &commonpb.WorkflowExecution{ WorkflowId: task.WorkflowID,