Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix distributed transactions disruptions test with move table #16765

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package stress
package fuzz

import (
"context"
Expand All @@ -36,6 +36,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/test/endtoend/cluster"
twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/schema"
Expand Down Expand Up @@ -131,7 +132,7 @@ func TestTwoPCFuzzTest(t *testing.T) {
fz.stop()

// Wait for all transactions to be resolved.
waitForResults(t, fmt.Sprintf(`show unresolved transactions for %v`, keyspaceName), "[]", 30*time.Second)
twopcutil.WaitForResults(t, &vtParams, fmt.Sprintf(`show unresolved transactions for %v`, keyspaceName), "[]", 30*time.Second)
// Verify that all the transactions run were actually atomic and no data issues have occurred.
fz.verifyTransactionsWereAtomic(t)

Expand Down Expand Up @@ -428,12 +429,16 @@ func vttabletRestarts(t *testing.T) {
}
}

var orderedDDLFuzzer = []string{
"alter table twopc_fuzzer_insert add column extra_col1 varchar(20)",
"alter table twopc_fuzzer_insert add column extra_col2 varchar(20)",
"alter table twopc_fuzzer_insert drop column extra_col1",
"alter table twopc_fuzzer_insert drop column extra_col2",
}
var (
count = 0

orderedDDLFuzzer = []string{
"alter table twopc_fuzzer_insert add column extra_col1 varchar(20)",
"alter table twopc_fuzzer_insert add column extra_col2 varchar(20)",
"alter table twopc_fuzzer_insert drop column extra_col1",
"alter table twopc_fuzzer_insert drop column extra_col2",
}
)

// onlineDDLFuzzer runs an online DDL statement while ignoring any errors for the fuzzer.
func onlineDDLFuzzer(t *testing.T) {
Expand All @@ -445,7 +450,8 @@ func onlineDDLFuzzer(t *testing.T) {
return
}
fmt.Println("Running online DDL with uuid: ", output)
waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
twopcutil.WaitForMigrationStatus(t, &vtParams, keyspaceName, clusterInstance.Keyspaces[0].Shards,
strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
}

var moveTablesCount int
Expand Down
129 changes: 129 additions & 0 deletions go/test/endtoend/transaction/twopc/fuzz/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fuzz

import (
"context"
_ "embed"
"flag"
"fmt"
"os"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
vtgateGrpcAddress string
keyspaceName = "ks"
unshardedKeyspaceName = "uks"
cell = "zone1"
hostname = "localhost"

//go:embed schema.sql
SchemaSQL string

//go:embed vschema.json
VSchema string
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitcode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()

// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}

// Reserve vtGate port in order to pass it to vtTablet
clusterInstance.VtgateGrpcPort = clusterInstance.GetAndReservePort()

// Set extra args for twopc
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--transaction_mode", "TWOPC",
"--grpc_use_effective_callerid",
)
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--twopc_enable",
"--twopc_abandon_age", "1",
"--migration_check_interval", "2s",
)

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil {
return 1
}

// Start an unsharded keyspace
unshardedKeyspace := &cluster.Keyspace{
Name: unshardedKeyspaceName,
SchemaSQL: "",
VSchema: "{}",
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartUnshardedKeyspace(*unshardedKeyspace, 2, false); err != nil {
return 1
}

// Start Vtgate
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
vtParams = clusterInstance.GetVTParams("")
vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort)

return m.Run()
}()
os.Exit(exitcode)
}

func start(t *testing.T) (*mysql.Conn, func()) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
cleanup(t)

return conn, func() {
conn.Close()
cleanup(t)
}
}

func cleanup(t *testing.T) {
cluster.PanicHandler(t)

utils.ClearOutTable(t, vtParams, "twopc_fuzzer_insert")
utils.ClearOutTable(t, vtParams, "twopc_fuzzer_update")
utils.ClearOutTable(t, vtParams, "twopc_t1")
}
20 changes: 20 additions & 0 deletions go/test/endtoend/transaction/twopc/fuzz/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
create table twopc_fuzzer_update (
id bigint,
col bigint,
primary key (id)
) Engine=InnoDB;

create table twopc_fuzzer_insert (
id bigint,
updateSet bigint,
threadId bigint,
col bigint auto_increment,
key(col),
primary key (id, col)
) Engine=InnoDB;

create table twopc_t1 (
id bigint,
col bigint,
primary key (id)
) Engine=InnoDB;
34 changes: 34 additions & 0 deletions go/test/endtoend/transaction/twopc/fuzz/vschema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"sharded":true,
"vindexes": {
"reverse_bits": {
"type": "reverse_bits"
}
},
"tables": {
"twopc_fuzzer_update": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"twopc_fuzzer_insert": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"twopc_t1": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
}
}
}
82 changes: 3 additions & 79 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"golang.org/x/exp/rand"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
Expand Down Expand Up @@ -146,7 +145,7 @@ func TestDisruptions(t *testing.T) {
// But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken.
wg.Wait()
// Check the data in the table.
waitForResults(t, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second)
twopcutil.WaitForResults(t, &vtParams, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second)
writeCancel()
writerWg.Wait()

Expand Down Expand Up @@ -184,32 +183,6 @@ func reparentToFirstTablet(t *testing.T) {
}
}

// waitForResults waits for the results of the query to be as expected.
func waitForResults(t *testing.T, query string, resultExpected string, waitTime time.Duration) {
timeout := time.After(waitTime)
var prevRes []sqltypes.Row
for {
select {
case <-timeout:
t.Fatalf("didn't reach expected results for %s. Last results - %v", query, prevRes)
default:
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err == nil {
res, _ := utils.ExecAllowError(t, conn, query)
conn.Close()
if res != nil {
prevRes = res.Rows
if fmt.Sprintf("%v", res.Rows) == resultExpected {
return
}
}
}
time.Sleep(100 * time.Millisecond)
}
}
}

/*
Cluster Level Disruptions for the fuzzer
*/
Expand Down Expand Up @@ -328,58 +301,9 @@ func onlineDDL(t *testing.T) error {
require.NoError(t, err)
count++
fmt.Println("uuid: ", output)
status := waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
status := twopcutil.WaitForMigrationStatus(t, &vtParams, keyspaceName, clusterInstance.Keyspaces[0].Shards,
strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status)
require.Equal(t, schema.OnlineDDLStatusComplete, status)
return nil
}

func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
shardNames := map[string]bool{}
for _, shard := range shards {
shardNames[shard.Name] = true
}
query := fmt.Sprintf("show vitess_migrations from %s like '%s'", keyspaceName, uuid)

statusesMap := map[string]bool{}
for _, status := range expectStatuses {
statusesMap[string(status)] = true
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

lastKnownStatus := ""
for {
select {
case <-ctx.Done():
return schema.OnlineDDLStatus(lastKnownStatus)
case <-ticker.C:
}
countMatchedShards := 0
conn, err := mysql.Connect(ctx, vtParams)
if err != nil {
continue
}
r, err := utils.ExecAllowError(t, conn, query)
conn.Close()
if err != nil {
continue
}
for _, row := range r.Named().Rows {
shardName := row["shard"].ToString()
if !shardNames[shardName] {
// irrelevant shard
continue
}
lastKnownStatus = row["migration_status"].ToString()
if row["migration_uuid"].ToString() == uuid && statusesMap[lastKnownStatus] {
countMatchedShards++
}
}
if countMatchedShards == len(shards) {
return schema.OnlineDDLStatus(lastKnownStatus)
}
}
}
Loading
Loading