Skip to content

Commit

Permalink
onlineddl_vrepl_stress: fix flakiness caused by timeouts (#14295)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach authored Oct 17, 2023
1 parent 9a8e347 commit e7aaa5b
Showing 1 changed file with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"path"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -138,6 +137,7 @@ var (

const (
maxTableRows = 4096
workloadDuration = 5 * time.Second
maxConcurrency = 20
singleConnectionSleepInterval = 2 * time.Millisecond
countIterations = 5
Expand Down Expand Up @@ -227,6 +227,8 @@ func TestMain(m *testing.M) {
func TestSchemaChange(t *testing.T) {
defer cluster.PanicHandler(t)

ctx := context.Background()

shards = clusterInstance.Keyspaces[0].Shards
require.Equal(t, 1, len(shards))

Expand All @@ -251,16 +253,17 @@ func TestSchemaChange(t *testing.T) {
// that our testing/metrics logic is sound in the first place.
testName := fmt.Sprintf("workload without ALTER TABLE %d/%d", (i + 1), countIterations)
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
initTable(t)

ctx, cancel := context.WithTimeout(ctx, workloadDuration)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
runMultipleConnections(ctx, t)
}()
time.Sleep(5 * time.Second)
cancel() // will cause runMultipleConnections() to terminate
wg.Wait()
testSelectTableMetrics(t)
})
Expand All @@ -285,14 +288,17 @@ func TestSchemaChange(t *testing.T) {
// the vreplication/ALTER TABLE did not corrupt our data and we are happy.
testName := fmt.Sprintf("ALTER TABLE with workload %d/%d", (i + 1), countIterations)
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
t.Run("create schema", func(t *testing.T) {
testWithInitialSchema(t)
})
t.Run("init table", func(t *testing.T) {
initTable(t)
})
t.Run("migrate", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -302,7 +308,7 @@ func TestSchemaChange(t *testing.T) {
hint := fmt.Sprintf("hint-alter-with-workload-%d", i)
uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, hint), onlineDDLStrategy, "vtgate", hint)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
cancel() // will cause runMultipleConnections() to terminate
cancel() // Now that the migration is complete, we can stop the workload.
wg.Wait()
})
t.Run("validate metrics", func(t *testing.T) {
Expand Down Expand Up @@ -485,7 +491,7 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error {
return err
}

func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
func runSingleConnection(ctx context.Context, t *testing.T) {
log.Infof("Running single connection")
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
Expand All @@ -497,10 +503,6 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
require.Nil(t, err)

for {
if atomic.LoadInt64(done) == 1 {
log.Infof("Terminating single connection")
return
}
switch rand.Int31n(3) {
case 0:
err = generateInsert(t, conn)
Expand All @@ -509,27 +511,28 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
case 2:
err = generateDelete(t, conn)
}
select {
case <-ctx.Done():
log.Infof("Terminating single connection")
return
case <-time.After(singleConnectionSleepInterval):
}
assert.Nil(t, err)
time.Sleep(singleConnectionSleepInterval)
}
}

func runMultipleConnections(ctx context.Context, t *testing.T) {
log.Infof("Running multiple connections")
var done int64
var wg sync.WaitGroup
for i := 0; i < maxConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
runSingleConnection(ctx, t, &done)
runSingleConnection(ctx, t)
}()
}
<-ctx.Done()
atomic.StoreInt64(&done, 1)
log.Infof("Running multiple connections: done")
wg.Wait()
log.Infof("All connections cancelled")
log.Infof("Running multiple connections: done")
}

func initTable(t *testing.T) {
Expand Down

0 comments on commit e7aaa5b

Please sign in to comment.