From 172637d64cf7c77b462900a2cf4e7848c3944c41 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Fri, 29 Sep 2023 17:11:22 +0300 Subject: [PATCH] VTGate FK stress tests suite: improvements (#14098) --- .../foreignkey/stress/fk_stress_test.go | 80 +++++++++++++------ 1 file changed, 57 insertions(+), 23 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 600961e6f0c..c55ce9eef79 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -25,7 +25,6 @@ import ( "path" "strings" "sync" - "sync/atomic" "testing" "time" @@ -34,6 +33,7 @@ import ( "golang.org/x/exp/slices" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" @@ -308,6 +308,7 @@ func TestMain(m *testing.M) { "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", + "--migration_check_interval", "5s", "--watch_replication_stream", "--vreplication_tablet_type", "primary", } @@ -376,8 +377,39 @@ func tabletTestName(t *testing.T, tablet *cluster.Vttablet) string { return "" } +func getTabletPosition(t *testing.T, tablet *cluster.Vttablet) replication.Position { + rs := queryTablet(t, tablet, "select @@gtid_executed as gtid_executed", "") + row := rs.Named().Row() + require.NotNil(t, row) + gtidExecuted := row.AsString("gtid_executed", "") + require.NotEmpty(t, gtidExecuted) + pos, err := replication.DecodePositionDefaultFlavor(gtidExecuted, replication.Mysql56FlavorID) + assert.NoError(t, err) + return pos +} + func waitForReplicaCatchup(t *testing.T) { - cluster.WaitForReplicationPos(t, primary, replica, true, time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + primaryPos := getTabletPosition(t, primary) + for { + replicaPos := getTabletPosition(t, replica) + if replicaPos.GTIDSet.Contains(primaryPos.GTIDSet) { + // success + return + } + if !cluster.ValidateReplicationIsHealthy(t, replica) { + assert.FailNow(t, "replication is broken; not waiting for catchup") + return + } + select { + case <-ctx.Done(): + assert.FailNow(t, "timeout waiting for replica to catch up") + return + case <-time.After(time.Second): + // + } + } } func validateMetrics(t *testing.T, tcase *testCase) { @@ -421,8 +453,8 @@ func TestInitialSetup(t *testing.T) { if val, present := os.LookupEnv("GITHUB_ACTIONS"); present && val != "" { // This is the place to fine tune the stress parameters if GitHub actions are too slow - maxConcurrency = maxConcurrency * 1 - singleConnectionSleepInterval = singleConnectionSleepInterval * 1 + maxConcurrency = maxConcurrency / 2 + singleConnectionSleepInterval = singleConnectionSleepInterval * 2 } t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) } @@ -440,6 +472,7 @@ type testCase struct { // - Either one of ON UPDATE actions // - Potentially running an Online DDL on an indicated table (this will not work in Vanilla MySQL, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/) func ExecuteFKTest(t *testing.T, tcase *testCase) { + t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) workloadName := "static data" if tcase.workload { workloadName = "workload" @@ -449,8 +482,7 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { testName = fmt.Sprintf("%s/ddl=%s", testName, tcase.onlineDDLTable) } t.Run(testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := context.Background() t.Run("create schema", func(t *testing.T) { createInitialSchema(t, tcase) @@ -460,6 +492,9 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { }) if tcase.workload { t.Run("workload", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, workloadDuration) + defer cancel() + var wg sync.WaitGroup for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { wg.Add(1) @@ -468,7 +503,6 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { runMultipleConnections(ctx, t, tbl) }(workloadTable) } - timer := time.NewTimer(workloadDuration) if tcase.onlineDDLTable != "" { t.Run("migrating", func(t *testing.T) { @@ -494,9 +528,6 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { }) }) } - - <-timer.C - cancel() // will cause runMultipleConnections() to terminate wg.Wait() }) } @@ -524,7 +555,9 @@ func TestStressFK(t *testing.T) { }) runOnlineDDL := false - + if val, present := os.LookupEnv("FK_STRESS_ONLINE_DDL"); present && val != "" { + runOnlineDDL = true + } // Without workload ; with workload for _, workload := range []bool{false, true} { // For any type of ON DELETE action @@ -542,6 +575,10 @@ func TestStressFK(t *testing.T) { } if runOnlineDDL { + // Foreign keys introduce some overhead. We reduce concurrency so that GitHub CI can accommodate. + maxConcurrency = maxConcurrency * 4 / 5 + singleConnectionSleepInterval = singleConnectionSleepInterval * 2 + // Running Online DDL on all test tables. We don't use all of the combinations // presented above; we will run with workload, and suffice with same ON DELETE - ON UPDATE actions. for _, action := range referenceActions { @@ -877,7 +914,7 @@ func generateDelete(t *testing.T, tableName string, conn *mysql.Conn) error { return err } -func runSingleConnection(ctx context.Context, t *testing.T, tableName string, done *int64) { +func runSingleConnection(ctx context.Context, t *testing.T, tableName string) { log.Infof("Running single connection on %s", tableName) conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -889,10 +926,6 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, do require.Nil(t, err) for { - if atomic.LoadInt64(done) == 1 { - log.Infof("Terminating single connection") - return - } switch rand.Int31n(3) { case 0: _ = generateInsert(t, tableName, conn) @@ -901,26 +934,27 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, do case 2: _ = generateDelete(t, tableName, conn) } - time.Sleep(singleConnectionSleepInterval) + select { + case <-ctx.Done(): + log.Infof("Terminating single connection") + return + case <-time.After(singleConnectionSleepInterval): + } } } func runMultipleConnections(ctx context.Context, t *testing.T, tableName string) { log.Infof("Running multiple connections") - var done int64 var wg sync.WaitGroup for i := 0; i < maxConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() - runSingleConnection(ctx, t, tableName, &done) + runSingleConnection(ctx, t, tableName) }() } - <-ctx.Done() - atomic.StoreInt64(&done, 1) - log.Infof("Running multiple connections: done") wg.Wait() - log.Infof("All connections cancelled") + log.Infof("Running multiple connections: done") } func wrapWithNoFKChecks(sql string) string {