From 41bad99497731821ae64f7e02849afd5eafe7b26 Mon Sep 17 00:00:00 2001 From: Yimin Chen Date: Fri, 1 Apr 2022 16:02:57 -0700 Subject: [PATCH] buffer event as local namespace after promotion (#2699) * Pretend local namespace after ns promotion if there is inflight workflow task --- host/xdc/integration_failover_test.go | 127 ++++++++++++++---- .../history/workflow/mutable_state_impl.go | 4 +- 2 files changed, 106 insertions(+), 25 deletions(-) diff --git a/host/xdc/integration_failover_test.go b/host/xdc/integration_failover_test.go index da634041234..2b92f542223 100644 --- a/host/xdc/integration_failover_test.go +++ b/host/xdc/integration_failover_test.go @@ -1984,6 +1984,9 @@ func (s *integrationClustersTestSuite) printHistory(frontendClient workflowservi } func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { + testCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + namespace := "local-ns-to-be-promote-" + common.GenerateRandomString(5) s.registerNamespace(namespace, false) @@ -2000,7 +2003,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { // Start wf1 (in local ns) workflowID := "local-ns-wf-1" - run1, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{ + run1, err := client1.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{ ID: workflowID, TaskQueue: taskqueue, WorkflowRunTimeout: time.Second * 30, @@ -2010,12 +2013,12 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { s.NotEmpty(run1.GetRunID()) s.logger.Info("start wf1", tag.WorkflowRunID(run1.GetRunID())) // wait until wf1 complete - err = run1.Get(context.Background(), nil) + err = run1.Get(testCtx, nil) s.NoError(err) // Start wf2 (start in local ns, and then promote to global ns, wf2 close in global ns) workflowID2 := "local-ns-wf-2" - run2, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{ + run2, err := client1.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{ ID: workflowID2, TaskQueue: taskqueue, WorkflowRunTimeout: time.Second * 30, @@ -2047,7 +2050,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { var receivedSig string wfWithBufferedEvents := func(ctx workflow.Context) error { ctx1 := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ - StartToCloseTimeout: 30 * time.Second, + StartToCloseTimeout: 40 * time.Second, RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, }) f1 := workflow.ExecuteLocalActivity(ctx1, localActivityFn) @@ -2071,29 +2074,92 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { } worker1.RegisterWorkflow(wfWithBufferedEvents) + // Start wf7 (start in local ns, then ns promote and buffer events, close in global ns) + workflowID7 := "local-ns-promoted-buffered-events-wf7" + sigReadyToSendChan2 := make(chan struct{}, 1) + sigSendDoneChan2 := make(chan struct{}) + localActivityFn2 := func(ctx context.Context) error { + // to unblock signal sending, so signal is send after first workflow task started. + select { + case sigReadyToSendChan2 <- struct{}{}: + default: + } + + // this will block workflow task and cause the signal to become buffered event + select { + case <-sigSendDoneChan2: + case <-ctx.Done(): + } + + return nil + } + + var receivedSig2 string + wfWithBufferedEvents2 := func(ctx workflow.Context) error { + ctx1 := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + StartToCloseTimeout: 40 * time.Second, + RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, + }) + f1 := workflow.ExecuteLocalActivity(ctx1, localActivityFn2) + err1 := f1.Get(ctx1, nil) + if err1 != nil { + return err1 + } + + sigCh := workflow.GetSignalChannel(ctx, "signal-name") + + for { + var sigVal string + ok := sigCh.ReceiveAsync(&sigVal) + if !ok { + break + } + receivedSig2 = sigVal + } + + return nil + } + worker1.RegisterWorkflow(wfWithBufferedEvents2) + workflowOptions := sdkclient.StartWorkflowOptions{ ID: workflowID6, TaskQueue: taskqueue, // Intentionally use same timeout for WorkflowTaskTimeout and WorkflowRunTimeout so if workflow task is not // correctly dispatched, it would time out which would fail the workflow and cause test to fail. - WorkflowTaskTimeout: 30 * time.Second, - WorkflowRunTimeout: 30 * time.Second, + WorkflowTaskTimeout: 40 * time.Second, + WorkflowRunTimeout: 40 * time.Second, } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - run6, err := client1.ExecuteWorkflow(ctx, workflowOptions, wfWithBufferedEvents) + run6, err := client1.ExecuteWorkflow(testCtx, workflowOptions, wfWithBufferedEvents) s.NoError(err) s.NotNil(run6) s.True(run6.GetRunID() != "") + workflowOptions2 := sdkclient.StartWorkflowOptions{ + ID: workflowID7, + TaskQueue: taskqueue, + // Intentionally use same timeout for WorkflowTaskTimeout and WorkflowRunTimeout so if workflow task is not + // correctly dispatched, it would time out which would fail the workflow and cause test to fail. + WorkflowTaskTimeout: 40 * time.Second, + WorkflowRunTimeout: 40 * time.Second, + } + run7, err := client1.ExecuteWorkflow(testCtx, workflowOptions2, wfWithBufferedEvents2) + s.NoError(err) + s.NotNil(run7) + s.True(run7.GetRunID() != "") + // block until first workflow task started select { case <-sigReadyToSendChan: - case <-ctx.Done(): + case <-testCtx.Done(): + } + + select { + case <-sigReadyToSendChan2: + case <-testCtx.Done(): } // this signal will become buffered event - err = client1.SignalWorkflow(ctx, workflowID6, run6.GetRunID(), "signal-name", "signal-value") + err = client1.SignalWorkflow(testCtx, workflowID6, run6.GetRunID(), "signal-name", "signal-value") s.NoError(err) // promote ns @@ -2109,8 +2175,17 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { s.NoError(err) s.True(nsResp.IsGlobalNamespace) s.Equal(1, len(nsResp.ReplicationConfig.Clusters)) + time.Sleep(cacheRefreshInterval) + + // this will buffer after ns promotion + err = client1.SignalWorkflow(testCtx, workflowID7, run7.GetRunID(), "signal-name", "signal-value") + s.NoError(err) + // send 2 signals to wf7, both would be buffered. + err = client1.SignalWorkflow(testCtx, workflowID7, run7.GetRunID(), "signal-name", "signal-value2") + s.NoError(err) + // update ns to have 2 clusters - _, err = frontendClient1.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{ + _, err = frontendClient1.UpdateNamespace(testCtx, &workflowservice.UpdateNamespaceRequest{ Namespace: namespace, ReplicationConfig: &replicationpb.NamespaceReplicationConfig{ Clusters: clusterReplicationConfig, @@ -2121,7 +2196,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { // wait for ns cache to pick up the change time.Sleep(cacheRefreshInterval) - nsResp, err = frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{ + nsResp, err = frontendClient1.DescribeNamespace(testCtx, &workflowservice.DescribeNamespaceRequest{ Namespace: namespace, }) s.NoError(err) @@ -2130,19 +2205,24 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { // namespace update completed, now resume wf6 (bufferedEvent workflow) close(sigSendDoneChan) + close(sigSendDoneChan2) // wait until wf2 complete - err = run2.Get(context.Background(), nil) + err = run2.Get(testCtx, nil) s.NoError(err) // wait until wf6 complete - err = run6.Get(context.Background(), nil) + err = run6.Get(testCtx, nil) s.NoError(err) // if new workflow task is not correctly dispatched, it would cause timeout error here s.Equal("signal-value", receivedSig) + err = run7.Get(testCtx, nil) + s.NoError(err) // if new workflow task is not correctly dispatched, it would cause timeout error here + s.Equal("signal-value2", receivedSig2) + // start wf3 (start in global ns) workflowID3 := "local-ns-wf-3" - run3, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{ + run3, err := client1.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{ ID: workflowID3, TaskQueue: taskqueue, WorkflowRunTimeout: time.Second * 30, @@ -2151,7 +2231,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { s.NotEmpty(run3.GetRunID()) s.logger.Info("start wf3", tag.WorkflowRunID(run3.GetRunID())) // wait until wf3 complete - err = run3.Get(context.Background(), nil) + err = run3.Get(testCtx, nil) s.NoError(err) // start force-replicate wf @@ -2160,7 +2240,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { Namespace: "temporal-system", }) workflowID4 := "force-replication-wf-4" - run4, err := sysClient.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{ + run4, err := sysClient.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{ ID: workflowID4, TaskQueue: sw.DefaultWorkerTaskQueue, WorkflowRunTimeout: time.Second * 30, @@ -2170,12 +2250,12 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { }) s.NoError(err) - err = run4.Get(context.Background(), nil) + err = run4.Get(testCtx, nil) s.NoError(err) // start namespace-handover wf workflowID5 := "namespace-handover-wf-5" - run5, err := sysClient.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{ + run5, err := sysClient.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{ ID: workflowID5, TaskQueue: sw.DefaultWorkerTaskQueue, WorkflowRunTimeout: time.Second * 30, @@ -2186,12 +2266,12 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { HandoverTimeoutSeconds: 30, }) s.NoError(err) - err = run5.Get(context.Background(), nil) + err = run5.Get(testCtx, nil) s.NoError(err) // at this point ns migration is done. // verify namespace is now active in cluster2 - nsResp2, err := frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{ + nsResp2, err := frontendClient1.DescribeNamespace(testCtx, &workflowservice.DescribeNamespaceRequest{ Namespace: namespace, }) s.NoError(err) @@ -2206,7 +2286,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { }) s.NoError(err) verify := func(wfID string, expectedRunID string) { - desc1, err := client2.DescribeWorkflowExecution(host.NewContext(), wfID, "") + desc1, err := client2.DescribeWorkflowExecution(testCtx, wfID, "") s.NoError(err) s.Equal(expectedRunID, desc1.WorkflowExecutionInfo.Execution.RunId) s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, desc1.WorkflowExecutionInfo.Status) @@ -2215,6 +2295,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { verify(workflowID2, run2.GetRunID()) verify(workflowID3, run3.GetRunID()) verify(workflowID6, run6.GetRunID()) + verify(workflowID7, run7.GetRunID()) } func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent { diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index d05ee5e1cb8..9fbd2a61143 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4198,8 +4198,8 @@ func (e *MutableStateImpl) startTransactionHandleNamespaceMigration( return nil, err } - // local namespace -> global namespace && with buffered events - if lastWriteVersion == common.EmptyVersion && namespaceEntry.FailoverVersion() > common.EmptyVersion && e.HasBufferedEvents() { + // local namespace -> global namespace && with inflight workflow task + if lastWriteVersion == common.EmptyVersion && namespaceEntry.FailoverVersion() > common.EmptyVersion && e.HasInFlightWorkflowTask() { localNamespaceMutation := namespace.NewPretendAsLocalNamespace( e.clusterMetadata.GetCurrentClusterName(), )