Skip to content

Commit

Permalink
buffer event as local namespace after promotion (#2699)
Browse files Browse the repository at this point in the history
* Pretend local namespace after ns promotion if there is inflight workflow task
  • Loading branch information
yiminc authored and mastermanu committed Apr 4, 2022
1 parent 9111f47 commit 41bad99
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 25 deletions.
127 changes: 104 additions & 23 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,9 @@ func (s *integrationClustersTestSuite) printHistory(frontendClient workflowservi
}

func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
testCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

namespace := "local-ns-to-be-promote-" + common.GenerateRandomString(5)
s.registerNamespace(namespace, false)

Expand All @@ -2000,7 +2003,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {

// Start wf1 (in local ns)
workflowID := "local-ns-wf-1"
run1, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
run1, err := client1.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
Expand All @@ -2010,12 +2013,12 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
s.NotEmpty(run1.GetRunID())
s.logger.Info("start wf1", tag.WorkflowRunID(run1.GetRunID()))
// wait until wf1 complete
err = run1.Get(context.Background(), nil)
err = run1.Get(testCtx, nil)
s.NoError(err)

// Start wf2 (start in local ns, and then promote to global ns, wf2 close in global ns)
workflowID2 := "local-ns-wf-2"
run2, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
run2, err := client1.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{
ID: workflowID2,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
Expand Down Expand Up @@ -2047,7 +2050,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
var receivedSig string
wfWithBufferedEvents := func(ctx workflow.Context) error {
ctx1 := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 30 * time.Second,
StartToCloseTimeout: 40 * time.Second,
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
})
f1 := workflow.ExecuteLocalActivity(ctx1, localActivityFn)
Expand All @@ -2071,29 +2074,92 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
}
worker1.RegisterWorkflow(wfWithBufferedEvents)

// Start wf7 (start in local ns, then ns promote and buffer events, close in global ns)
workflowID7 := "local-ns-promoted-buffered-events-wf7"
sigReadyToSendChan2 := make(chan struct{}, 1)
sigSendDoneChan2 := make(chan struct{})
localActivityFn2 := func(ctx context.Context) error {
// to unblock signal sending, so signal is send after first workflow task started.
select {
case sigReadyToSendChan2 <- struct{}{}:
default:
}

// this will block workflow task and cause the signal to become buffered event
select {
case <-sigSendDoneChan2:
case <-ctx.Done():
}

return nil
}

var receivedSig2 string
wfWithBufferedEvents2 := func(ctx workflow.Context) error {
ctx1 := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 40 * time.Second,
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
})
f1 := workflow.ExecuteLocalActivity(ctx1, localActivityFn2)
err1 := f1.Get(ctx1, nil)
if err1 != nil {
return err1
}

sigCh := workflow.GetSignalChannel(ctx, "signal-name")

for {
var sigVal string
ok := sigCh.ReceiveAsync(&sigVal)
if !ok {
break
}
receivedSig2 = sigVal
}

return nil
}
worker1.RegisterWorkflow(wfWithBufferedEvents2)

workflowOptions := sdkclient.StartWorkflowOptions{
ID: workflowID6,
TaskQueue: taskqueue,
// Intentionally use same timeout for WorkflowTaskTimeout and WorkflowRunTimeout so if workflow task is not
// correctly dispatched, it would time out which would fail the workflow and cause test to fail.
WorkflowTaskTimeout: 30 * time.Second,
WorkflowRunTimeout: 30 * time.Second,
WorkflowTaskTimeout: 40 * time.Second,
WorkflowRunTimeout: 40 * time.Second,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
run6, err := client1.ExecuteWorkflow(ctx, workflowOptions, wfWithBufferedEvents)
run6, err := client1.ExecuteWorkflow(testCtx, workflowOptions, wfWithBufferedEvents)
s.NoError(err)
s.NotNil(run6)
s.True(run6.GetRunID() != "")

workflowOptions2 := sdkclient.StartWorkflowOptions{
ID: workflowID7,
TaskQueue: taskqueue,
// Intentionally use same timeout for WorkflowTaskTimeout and WorkflowRunTimeout so if workflow task is not
// correctly dispatched, it would time out which would fail the workflow and cause test to fail.
WorkflowTaskTimeout: 40 * time.Second,
WorkflowRunTimeout: 40 * time.Second,
}
run7, err := client1.ExecuteWorkflow(testCtx, workflowOptions2, wfWithBufferedEvents2)
s.NoError(err)
s.NotNil(run7)
s.True(run7.GetRunID() != "")

// block until first workflow task started
select {
case <-sigReadyToSendChan:
case <-ctx.Done():
case <-testCtx.Done():
}

select {
case <-sigReadyToSendChan2:
case <-testCtx.Done():
}

// this signal will become buffered event
err = client1.SignalWorkflow(ctx, workflowID6, run6.GetRunID(), "signal-name", "signal-value")
err = client1.SignalWorkflow(testCtx, workflowID6, run6.GetRunID(), "signal-name", "signal-value")
s.NoError(err)

// promote ns
Expand All @@ -2109,8 +2175,17 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
s.NoError(err)
s.True(nsResp.IsGlobalNamespace)
s.Equal(1, len(nsResp.ReplicationConfig.Clusters))
time.Sleep(cacheRefreshInterval)

// this will buffer after ns promotion
err = client1.SignalWorkflow(testCtx, workflowID7, run7.GetRunID(), "signal-name", "signal-value")
s.NoError(err)
// send 2 signals to wf7, both would be buffered.
err = client1.SignalWorkflow(testCtx, workflowID7, run7.GetRunID(), "signal-name", "signal-value2")
s.NoError(err)

// update ns to have 2 clusters
_, err = frontendClient1.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{
_, err = frontendClient1.UpdateNamespace(testCtx, &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
Clusters: clusterReplicationConfig,
Expand All @@ -2121,7 +2196,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
// wait for ns cache to pick up the change
time.Sleep(cacheRefreshInterval)

nsResp, err = frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{
nsResp, err = frontendClient1.DescribeNamespace(testCtx, &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
})
s.NoError(err)
Expand All @@ -2130,19 +2205,24 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {

// namespace update completed, now resume wf6 (bufferedEvent workflow)
close(sigSendDoneChan)
close(sigSendDoneChan2)

// wait until wf2 complete
err = run2.Get(context.Background(), nil)
err = run2.Get(testCtx, nil)
s.NoError(err)

// wait until wf6 complete
err = run6.Get(context.Background(), nil)
err = run6.Get(testCtx, nil)
s.NoError(err) // if new workflow task is not correctly dispatched, it would cause timeout error here
s.Equal("signal-value", receivedSig)

err = run7.Get(testCtx, nil)
s.NoError(err) // if new workflow task is not correctly dispatched, it would cause timeout error here
s.Equal("signal-value2", receivedSig2)

// start wf3 (start in global ns)
workflowID3 := "local-ns-wf-3"
run3, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
run3, err := client1.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{
ID: workflowID3,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
Expand All @@ -2151,7 +2231,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
s.NotEmpty(run3.GetRunID())
s.logger.Info("start wf3", tag.WorkflowRunID(run3.GetRunID()))
// wait until wf3 complete
err = run3.Get(context.Background(), nil)
err = run3.Get(testCtx, nil)
s.NoError(err)

// start force-replicate wf
Expand All @@ -2160,7 +2240,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
Namespace: "temporal-system",
})
workflowID4 := "force-replication-wf-4"
run4, err := sysClient.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
run4, err := sysClient.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{
ID: workflowID4,
TaskQueue: sw.DefaultWorkerTaskQueue,
WorkflowRunTimeout: time.Second * 30,
Expand All @@ -2170,12 +2250,12 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
})

s.NoError(err)
err = run4.Get(context.Background(), nil)
err = run4.Get(testCtx, nil)
s.NoError(err)

// start namespace-handover wf
workflowID5 := "namespace-handover-wf-5"
run5, err := sysClient.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
run5, err := sysClient.ExecuteWorkflow(testCtx, sdkclient.StartWorkflowOptions{
ID: workflowID5,
TaskQueue: sw.DefaultWorkerTaskQueue,
WorkflowRunTimeout: time.Second * 30,
Expand All @@ -2186,12 +2266,12 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
HandoverTimeoutSeconds: 30,
})
s.NoError(err)
err = run5.Get(context.Background(), nil)
err = run5.Get(testCtx, nil)
s.NoError(err)

// at this point ns migration is done.
// verify namespace is now active in cluster2
nsResp2, err := frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{
nsResp2, err := frontendClient1.DescribeNamespace(testCtx, &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
})
s.NoError(err)
Expand All @@ -2206,7 +2286,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
})
s.NoError(err)
verify := func(wfID string, expectedRunID string) {
desc1, err := client2.DescribeWorkflowExecution(host.NewContext(), wfID, "")
desc1, err := client2.DescribeWorkflowExecution(testCtx, wfID, "")
s.NoError(err)
s.Equal(expectedRunID, desc1.WorkflowExecutionInfo.Execution.RunId)
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, desc1.WorkflowExecutionInfo.Status)
Expand All @@ -2215,6 +2295,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
verify(workflowID2, run2.GetRunID())
verify(workflowID3, run3.GetRunID())
verify(workflowID6, run6.GetRunID())
verify(workflowID7, run7.GetRunID())
}

func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent {
Expand Down
4 changes: 2 additions & 2 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4198,8 +4198,8 @@ func (e *MutableStateImpl) startTransactionHandleNamespaceMigration(
return nil, err
}

// local namespace -> global namespace && with buffered events
if lastWriteVersion == common.EmptyVersion && namespaceEntry.FailoverVersion() > common.EmptyVersion && e.HasBufferedEvents() {
// local namespace -> global namespace && with inflight workflow task
if lastWriteVersion == common.EmptyVersion && namespaceEntry.FailoverVersion() > common.EmptyVersion && e.HasInFlightWorkflowTask() {
localNamespaceMutation := namespace.NewPretendAsLocalNamespace(
e.clusterMetadata.GetCurrentClusterName(),
)
Expand Down

0 comments on commit 41bad99

Please sign in to comment.