Skip to content

Commit

Permalink
VTGate FK stress tests suite: improvements (#14098)
Browse files Browse the repository at this point in the history
  • Loading branch information
shlomi-noach authored Sep 29, 2023
1 parent d7b7166 commit 172637d
Showing 1 changed file with 57 additions and 23 deletions.
80 changes: 57 additions & 23 deletions go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"path"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -34,6 +33,7 @@ import (
"golang.org/x/exp/slices"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
Expand Down Expand Up @@ -308,6 +308,7 @@ func TestMain(m *testing.M) {
"--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
"--watch_replication_stream",
"--vreplication_tablet_type", "primary",
}
Expand Down Expand Up @@ -376,8 +377,39 @@ func tabletTestName(t *testing.T, tablet *cluster.Vttablet) string {
return ""
}

func getTabletPosition(t *testing.T, tablet *cluster.Vttablet) replication.Position {
rs := queryTablet(t, tablet, "select @@gtid_executed as gtid_executed", "")
row := rs.Named().Row()
require.NotNil(t, row)
gtidExecuted := row.AsString("gtid_executed", "")
require.NotEmpty(t, gtidExecuted)
pos, err := replication.DecodePositionDefaultFlavor(gtidExecuted, replication.Mysql56FlavorID)
assert.NoError(t, err)
return pos
}

func waitForReplicaCatchup(t *testing.T) {
cluster.WaitForReplicationPos(t, primary, replica, true, time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
primaryPos := getTabletPosition(t, primary)
for {
replicaPos := getTabletPosition(t, replica)
if replicaPos.GTIDSet.Contains(primaryPos.GTIDSet) {
// success
return
}
if !cluster.ValidateReplicationIsHealthy(t, replica) {
assert.FailNow(t, "replication is broken; not waiting for catchup")
return
}
select {
case <-ctx.Done():
assert.FailNow(t, "timeout waiting for replica to catch up")
return
case <-time.After(time.Second):
//
}
}
}

func validateMetrics(t *testing.T, tcase *testCase) {
Expand Down Expand Up @@ -421,8 +453,8 @@ func TestInitialSetup(t *testing.T) {

if val, present := os.LookupEnv("GITHUB_ACTIONS"); present && val != "" {
// This is the place to fine tune the stress parameters if GitHub actions are too slow
maxConcurrency = maxConcurrency * 1
singleConnectionSleepInterval = singleConnectionSleepInterval * 1
maxConcurrency = maxConcurrency / 2
singleConnectionSleepInterval = singleConnectionSleepInterval * 2
}
t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval)
}
Expand All @@ -440,6 +472,7 @@ type testCase struct {
// - Either one of ON UPDATE actions
// - Potentially running an Online DDL on an indicated table (this will not work in Vanilla MySQL, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/)
func ExecuteFKTest(t *testing.T, tcase *testCase) {
t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval)
workloadName := "static data"
if tcase.workload {
workloadName = "workload"
Expand All @@ -449,8 +482,7 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) {
testName = fmt.Sprintf("%s/ddl=%s", testName, tcase.onlineDDLTable)
}
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := context.Background()

t.Run("create schema", func(t *testing.T) {
createInitialSchema(t, tcase)
Expand All @@ -460,6 +492,9 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) {
})
if tcase.workload {
t.Run("workload", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, workloadDuration)
defer cancel()

var wg sync.WaitGroup
for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} {
wg.Add(1)
Expand All @@ -468,7 +503,6 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) {
runMultipleConnections(ctx, t, tbl)
}(workloadTable)
}
timer := time.NewTimer(workloadDuration)

if tcase.onlineDDLTable != "" {
t.Run("migrating", func(t *testing.T) {
Expand All @@ -494,9 +528,6 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) {
})
})
}

<-timer.C
cancel() // will cause runMultipleConnections() to terminate
wg.Wait()
})
}
Expand Down Expand Up @@ -524,7 +555,9 @@ func TestStressFK(t *testing.T) {
})

runOnlineDDL := false

if val, present := os.LookupEnv("FK_STRESS_ONLINE_DDL"); present && val != "" {
runOnlineDDL = true
}
// Without workload ; with workload
for _, workload := range []bool{false, true} {
// For any type of ON DELETE action
Expand All @@ -542,6 +575,10 @@ func TestStressFK(t *testing.T) {
}

if runOnlineDDL {
// Foreign keys introduce some overhead. We reduce concurrency so that GitHub CI can accommodate.
maxConcurrency = maxConcurrency * 4 / 5
singleConnectionSleepInterval = singleConnectionSleepInterval * 2

// Running Online DDL on all test tables. We don't use all of the combinations
// presented above; we will run with workload, and suffice with same ON DELETE - ON UPDATE actions.
for _, action := range referenceActions {
Expand Down Expand Up @@ -877,7 +914,7 @@ func generateDelete(t *testing.T, tableName string, conn *mysql.Conn) error {
return err
}

func runSingleConnection(ctx context.Context, t *testing.T, tableName string, done *int64) {
func runSingleConnection(ctx context.Context, t *testing.T, tableName string) {
log.Infof("Running single connection on %s", tableName)
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
Expand All @@ -889,10 +926,6 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, do
require.Nil(t, err)

for {
if atomic.LoadInt64(done) == 1 {
log.Infof("Terminating single connection")
return
}
switch rand.Int31n(3) {
case 0:
_ = generateInsert(t, tableName, conn)
Expand All @@ -901,26 +934,27 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, do
case 2:
_ = generateDelete(t, tableName, conn)
}
time.Sleep(singleConnectionSleepInterval)
select {
case <-ctx.Done():
log.Infof("Terminating single connection")
return
case <-time.After(singleConnectionSleepInterval):
}
}
}

func runMultipleConnections(ctx context.Context, t *testing.T, tableName string) {
log.Infof("Running multiple connections")
var done int64
var wg sync.WaitGroup
for i := 0; i < maxConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
runSingleConnection(ctx, t, tableName, &done)
runSingleConnection(ctx, t, tableName)
}()
}
<-ctx.Done()
atomic.StoreInt64(&done, 1)
log.Infof("Running multiple connections: done")
wg.Wait()
log.Infof("All connections cancelled")
log.Infof("Running multiple connections: done")
}

func wrapWithNoFKChecks(sql string) string {
Expand Down

0 comments on commit 172637d

Please sign in to comment.