diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index e24686a351b..e8b4227a5af 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -1401,6 +1401,7 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard", "workflow type specified: %s", workflowType) } + ensureCanSwitch(t, workflowType, cells, ksWorkflow) output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly,replica", "--dry_run", "SwitchTraffic", ksWorkflow) require.NoError(t, err, fmt.Sprintf("Switching Reads DryRun Error: %s: %s", err, output)) @@ -1409,6 +1410,23 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry } } +func ensureCanSwitch(t *testing.T, workflowType, cells, ksWorkflow string) { + timer := time.NewTimer(defaultTimeout) + defer timer.Stop() + for { + _, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--dry_run", "SwitchTraffic", ksWorkflow) + if err == nil { + return + } + select { + case <-timer.C: + t.Fatalf("Did not become ready to switch traffic for %s before the timeout of %s", ksWorkflow, defaultTimeout) + default: + time.Sleep(defaultTick) + } + } +} + func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse bool) { if workflowType != binlogdatapb.VReplicationWorkflowType_MoveTables.String() && workflowType != binlogdatapb.VReplicationWorkflowType_Reshard.String() { @@ -1421,6 +1439,7 @@ func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse b if reverse { command = "ReverseTraffic" } + ensureCanSwitch(t, workflowType, cells, ksWorkflow) output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly", command, ksWorkflow) require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output)) @@ -1440,6 +1459,7 @@ func switchWrites(t *testing.T, workflowType, ksWorkflow string, reverse bool) { command = "ReverseTraffic" } const SwitchWritesTimeout = "91s" // max: 3 tablet picker 30s waits + 1 + ensureCanSwitch(t, workflowType, "", ksWorkflow) // Use vtctldclient for MoveTables SwitchTraffic ~ 50% of the time. if workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables.String() && time.Now().Second()%2 == 0 { parts := strings.Split(ksWorkflow, ".") @@ -1535,6 +1555,7 @@ func generateInnoDBRowHistory(t *testing.T, sourceKS string, neededTrxHistory in // expected length. func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, expectedLength int64) { timer := time.NewTimer(defaultTimeout) + defer timer.Stop() historyLen := int64(0) for { res, err := tablet.QueryTablet(historyLenQuery, tablet.Keyspace, false)