Skip to content

Commit

Permalink
Handle activity retry timer in passive (#2640)
Browse files Browse the repository at this point in the history
* Handle activity retry timer in passive
  • Loading branch information
yux0 authored Mar 25, 2022
1 parent 82ab54f commit 12f4731
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 17 deletions.
28 changes: 23 additions & 5 deletions service/history/nDCActivityReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ import (
"context"
"time"

"go.temporal.io/server/common/definition"
"go.temporal.io/server/service/history/tasks"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
historyspb "go.temporal.io/server/api/history/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"

"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -90,10 +93,10 @@ func (r *nDCActivityReplicatorImpl) SyncActivity(
) (retError error) {

// sync activity info will only be sent from active side, when
// 1. activity has retry policy and activity got started
// 2. activity heart beat
// 1. activity retry
// 2. activity start
// 3. activity heart beat
// no sync activity task will be sent when active side fail / timeout activity,
// since standby side does not have activity retry timer
namespaceID := namespace.ID(request.GetNamespaceId())
execution := commonpb.WorkflowExecution{
WorkflowId: request.WorkflowId,
Expand Down Expand Up @@ -154,6 +157,22 @@ func (r *nDCActivityReplicatorImpl) SyncActivity(
return nil
}

// sync activity with empty started ID means activity retry
eventTime := timestamp.TimeValue(request.GetScheduledTime())
if request.StartedId == common.EmptyEventID && request.Attempt > activityInfo.GetAttempt() {
mutableState.AddTasks(&tasks.ActivityRetryTimerTask{
WorkflowKey: definition.WorkflowKey{
NamespaceID: request.GetNamespaceId(),
WorkflowID: request.GetWorkflowId(),
RunID: request.GetRunId(),
},
VisibilityTimestamp: eventTime,
EventID: request.GetScheduledId(),
Version: request.GetVersion(),
Attempt: request.GetAttempt(),
})
}

refreshTask := r.testRefreshActivityTimerTaskMask(
request.GetVersion(),
request.GetAttempt(),
Expand All @@ -165,7 +184,6 @@ func (r *nDCActivityReplicatorImpl) SyncActivity(
}

// see whether we need to refresh the activity timer
eventTime := timestamp.TimeValue(request.GetScheduledTime())
startedTime := timestamp.TimeValue(request.GetStartedTime())
lastHeartbeatTime := timestamp.TimeValue(request.GetLastHeartbeatTime())
if eventTime.Before(startedTime) {
Expand Down
6 changes: 5 additions & 1 deletion service/history/nDCActivityReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,15 +820,18 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityFound_Zombie() {
weContext.EXPECT().LoadWorkflowExecution(gomock.Any()).Return(s.mockMutableState, nil)
weContext.EXPECT().Lock(gomock.Any(), workflow.CallerTypeAPI).Return(nil)
weContext.EXPECT().Unlock(workflow.CallerTypeAPI)

_, err := s.historyCache.PutIfNotExist(key, weContext)
s.NoError(err)

now := time.Now()
request := &historyservice.SyncActivityRequest{
NamespaceId: namespaceID.String(),
WorkflowId: workflowID,
RunId: runID,
Version: version,
ScheduledId: scheduleID,
ScheduledTime: &now,
VersionHistory: incomingVersionHistory,
}

Expand All @@ -843,7 +846,6 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityFound_Zombie() {
}, true)
s.mockMutableState.EXPECT().ReplicateActivityInfo(request, false).Return(nil)
s.mockMutableState.EXPECT().GetPendingActivityInfos().Return(map[int64]*persistencespb.ActivityInfo{})

s.mockClusterMetadata.EXPECT().IsVersionFromSameCluster(version, version).Return(true)

weContext.EXPECT().UpdateWorkflowExecutionWithNew(
Expand Down Expand Up @@ -914,12 +916,14 @@ func (s *activityReplicatorSuite) TestSyncActivity_ActivityFound_NonZombie() {
_, err := s.historyCache.PutIfNotExist(key, weContext)
s.NoError(err)

now := time.Now()
request := &historyservice.SyncActivityRequest{
NamespaceId: namespaceID.String(),
WorkflowId: workflowID,
RunId: runID,
Version: version,
ScheduledId: scheduleID,
ScheduledTime: &now,
VersionHistory: incomingVersionHistory,
}

Expand Down
15 changes: 15 additions & 0 deletions service/history/nDCStandbyTaskUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ type (
}

pushActivityTaskToMatchingInfo struct {
taskQueue string
namespaceID string
activityTaskScheduleToStartTimeout time.Duration
}

Expand Down Expand Up @@ -127,6 +129,19 @@ func newPushActivityToMatchingInfo(
}
}

func newActivityRetryTimerToMatchingInfo(
taskQueue string,
namespaceID string,
activityScheduleToStartTimeout time.Duration,
) *pushActivityTaskToMatchingInfo {

return &pushActivityTaskToMatchingInfo{
taskQueue: taskQueue,
namespaceID: namespaceID,
activityTaskScheduleToStartTimeout: activityScheduleToStartTimeout,
}
}

func newPushWorkflowTaskToMatchingInfo(
workflowTaskScheduleToStartTimeout int64,
taskqueue taskqueuepb.TaskQueue,
Expand Down
1 change: 1 addition & 0 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func (t *timerQueueProcessorImpl) handleClusterMetadataUpdate(
t.shard,
t.workflowCache,
t.workflowDeleteManager,
t.matchingClient,
clusterName,
t.taskAllocator,
nDCHistoryResender,
Expand Down
4 changes: 4 additions & 0 deletions service/history/timerQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"context"
"time"

"go.temporal.io/server/api/matchingservice/v1"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -59,6 +61,7 @@ func newTimerQueueStandbyProcessor(
shard shard.Context,
workflowCache workflow.Cache,
workflowDeleteManager workflow.DeleteManager,
matchingClient matchingservice.MatchingServiceClient,
clusterName string,
taskAllocator taskAllocator,
nDCHistoryResender xdc.NDCHistoryResender,
Expand Down Expand Up @@ -104,6 +107,7 @@ func newTimerQueueStandbyProcessor(
workflowCache,
workflowDeleteManager,
nDCHistoryResender,
matchingClient,
logger,
clusterName,
shard.GetConfig(),
Expand Down
92 changes: 87 additions & 5 deletions service/history/timerQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"fmt"
"time"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/log"
Expand All @@ -53,6 +55,7 @@ type (

clusterName string
adminClient adminservice.AdminServiceClient
matchingClient matchingservice.MatchingServiceClient
nDCHistoryResender xdc.NDCHistoryResender
}
)
Expand All @@ -62,6 +65,7 @@ func newTimerQueueStandbyTaskExecutor(
workflowCache workflow.Cache,
workflowDeleteManager workflow.DeleteManager,
nDCHistoryResender xdc.NDCHistoryResender,
matchingClient matchingservice.MatchingServiceClient,
logger log.Logger,
clusterName string,
config *configs.Config,
Expand All @@ -77,6 +81,7 @@ func newTimerQueueStandbyTaskExecutor(
clusterName: clusterName,
adminClient: shard.GetRemoteAdminClient(clusterName),
nDCHistoryResender: nDCHistoryResender,
matchingClient: matchingClient,
}
}

Expand Down Expand Up @@ -108,9 +113,10 @@ func (t *timerQueueStandbyTaskExecutor) execute(
}
return t.executeWorkflowBackoffTimerTask(ctx, task)
case *tasks.ActivityRetryTimerTask:
// retry backoff timer should not get created on passive cluster
// TODO: add error logs
return nil
if !shouldProcessTask {
return nil
}
return t.executeActivityRetryTimerTask(ctx, task)
case *tasks.WorkflowTimeoutTask:
return t.executeWorkflowTimeoutTask(ctx, task)
case *tasks.DeleteHistoryEventTask:
Expand Down Expand Up @@ -205,7 +211,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
return getHistoryResendInfo(mutableState)
}
// since the activity timer are already sorted, so if there is one timer which will not expired
// all activity timer after this timer will not expired
// all activity timer after this timer will not expire
break Loop //nolint:staticcheck
}

Expand Down Expand Up @@ -272,6 +278,49 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
)
}

func (t *timerQueueStandbyTaskExecutor) executeActivityRetryTimerTask(
ctx context.Context,
task *tasks.ActivityRetryTimerTask,
) (retError error) {

actionFn := func(context workflow.Context, mutableState workflow.MutableState) (interface{}, error) {

activityInfo, ok := mutableState.GetActivityInfo(task.EventID) //activity schedule ID
if !ok {
return nil, nil
}

ok = VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), activityInfo.Version, task.Version, task)
if !ok {
return nil, nil
}

if activityInfo.Attempt > task.Attempt {
return nil, nil
}

if activityInfo.StartedId != common.EmptyEventID {
return nil, nil
}

return newActivityRetryTimerToMatchingInfo(activityInfo.TaskQueue, activityInfo.NamespaceId, *activityInfo.ScheduleToStartTimeout), nil
}

return t.processTimer(
ctx,
task,
actionFn,
getStandbyPostActionFn(
task,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.pushActivity,
t.pushActivity,
),
)
}

func (t *timerQueueStandbyTaskExecutor) executeWorkflowTaskTimeoutTask(
ctx context.Context,
timerTask *tasks.WorkflowTaskTimeoutTask,
Expand Down Expand Up @@ -518,6 +567,39 @@ func (t *timerQueueStandbyTaskExecutor) fetchHistoryFromRemote(
return consts.ErrTaskRetry
}

func (t *timerQueueStandbyTaskExecutor) pushActivity(
task tasks.Task,
postActionInfo interface{},
logger log.Logger,
) error {

if postActionInfo == nil {
return nil
}

pushActivityInfo := postActionInfo.(*pushActivityTaskToMatchingInfo)
activityScheduleToStartTimeout := &pushActivityInfo.activityTaskScheduleToStartTimeout
activityTask := task.(*tasks.ActivityRetryTimerTask)
ctx, cancel := context.WithTimeout(context.Background(), transferActiveTaskDefaultTimeout)
defer cancel()

_, err := t.matchingClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{
NamespaceId: pushActivityInfo.namespaceID,
SourceNamespaceId: activityTask.NamespaceID,
Execution: &commonpb.WorkflowExecution{
WorkflowId: activityTask.WorkflowID,
RunId: activityTask.RunID,
},
TaskQueue: &taskqueuepb.TaskQueue{
Name: pushActivityInfo.taskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
},
ScheduleId: activityTask.EventID,
ScheduleToStartTimeout: activityScheduleToStartTimeout,
})
return err
}

func (t *timerQueueStandbyTaskExecutor) getCurrentTime() time.Time {
return t.shard.GetCurrentTime(t.clusterName)
}
Loading

0 comments on commit 12f4731

Please sign in to comment.