Skip to content

Commit

Permalink
Cherry-pick a3d4001 with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] authored and vitess-bot committed Dec 12, 2024
1 parent e340e78 commit 826f9e9
Showing 1 changed file with 333 additions and 8 deletions.
341 changes: 333 additions & 8 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,19 +1028,22 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) {
// cleanup all the old data.
conn, closer := start(t)
defer closer()
<<<<<<< HEAD

out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=none")
require.NoError(t, err, out)
defer clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
=======
defer func() {
reparentAllShards(t, clusterInstance, 0)
}()
>>>>>>> a3d400177f (Ensure PRS runs for all the shards in `TestSemiSyncRequiredWithTwoPC` (#17384))

// After changing the durability policy for the given keyspace to none, we run PRS.
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"PlannedReparentShard",
fmt.Sprintf("%s/%s", keyspaceName, shard.Name),
"--new-primary", newPrimary.Alias)
require.NoError(t, err)
reparentAllShards(t, clusterInstance, 0)
out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=none")
require.NoError(t, err, out)
// After changing the durability policy for the given keyspace to none, we run PRS to ensure the changes have taken effect.
reparentAllShards(t, clusterInstance, 1)

// A new distributed transaction should fail.
utils.Exec(t, conn, "begin")
Expand All @@ -1050,4 +1053,326 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) {
_, err = utils.ExecAllowError(t, conn, "commit")
require.Error(t, err)
require.ErrorContains(t, err, "two-pc is enabled, but semi-sync is not")
<<<<<<< HEAD
=======

_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
require.NoError(t, err)
reparentAllShards(t, clusterInstance, 0)

// Transaction should now succeed.
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)")
_, err = utils.ExecAllowError(t, conn, "commit")
require.NoError(t, err)
}

// reparentAllShards reparents all the shards to the given tablet index for that shard.
func reparentAllShards(t *testing.T, clusterInstance *cluster.LocalProcessCluster, idx int) {
for _, shard := range clusterInstance.Keyspaces[0].Shards {
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, shard.Vttablets[idx].Alias)
require.NoError(t, err)
}
}

// TestReadTransactionStatus tests that read transaction state rpc works as expected.
func TestReadTransactionStatus(t *testing.T) {
conn, closer := start(t)
defer closer()
defer conn.Close()
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitShard)
defer twopcutil.DeleteFile(twopcutil.DebugDelayCommitTime)

// We create a multi shard commit and delay its commit on one of the shards.
// This allows us to query to transaction status and actually get some data back.
var wg sync.WaitGroup
twopcutil.RunMultiShardCommitWithDelay(t, conn, "10", &wg, []string{
"begin",
"insert into twopc_t1(id, col) values(4, 4)",
"insert into twopc_t1(id, col) values(6, 4)",
"insert into twopc_t1(id, col) values(9, 4)",
})

// Create a tablet manager client and use it to read the transaction state.
tmc := grpctmclient.NewClient()
defer tmc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

primaryTablet := getTablet(clusterInstance.Keyspaces[0].Shards[2].FindPrimaryTablet().GrpcPort)
// Wait for the transaction to show up in the unresolved list.
var unresTransaction *querypb.TransactionMetadata
timeout := time.After(10 * time.Second)
for {
for _, shard := range clusterInstance.Keyspaces[0].Shards {
urtRes, err := tmc.GetUnresolvedTransactions(ctx, getTablet(shard.FindPrimaryTablet().GrpcPort), 1)
require.NoError(t, err)
if len(urtRes) > 0 {
unresTransaction = urtRes[0]
}
}
if unresTransaction != nil {
break
}
select {
case <-timeout:
require.Fail(t, "timed out waiting for unresolved transaction")
default:
}
}
require.NotNil(t, unresTransaction)
res, err := tmc.GetTransactionInfo(ctx, primaryTablet, unresTransaction.Dtid)
require.NoError(t, err)
assert.Equal(t, "PREPARED", res.State)
assert.Equal(t, "", res.Message)
assert.Equal(t, []string{"insert into twopc_t1(id, col) values (9, 4)"}, res.Statements)

// Also try running the RPC from vtctld and verify we see the same values.
out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction",
"Read",
fmt.Sprintf(`--dtid=%s`, unresTransaction.Dtid),
)
require.NoError(t, err)
require.Contains(t, out, "insert into twopc_t1(id, col) values (9, 4)")
require.Contains(t, out, unresTransaction.Dtid)

// Read the data from vtadmin API, and verify that too has the same information.
apiRes := clusterInstance.VtadminProcess.MakeAPICallRetry(t, fmt.Sprintf("/api/transaction/local/%v/info", unresTransaction.Dtid))
require.Contains(t, apiRes, "insert into twopc_t1(id, col) values (9, 4)")
require.Contains(t, apiRes, unresTransaction.Dtid)
require.Contains(t, apiRes, strconv.FormatInt(res.TimeCreated, 10))

// Wait for the commit to have returned.
wg.Wait()
}

// TestVindexes tests that different vindexes work well with two-phase commit.
func TestVindexes(t *testing.T) {
testcases := []struct {
name string
initQueries []string
testQueries []string
logExpected map[string][]string
}{
{
name: "Lookup Single Update",
initQueries: []string{
"insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"update twopc_lookup set col = 9 where col_unique = 9",
"commit",
},
logExpected: map[string][]string{
"ks.redo_statement:80-": {
"insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"insert into lookup(col, id, keyspace_id) values (9, 6, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"insert into lookup(col, id, keyspace_id) values (9, 6, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]",
},
"ks.twopc_lookup:40-80": {
"update:[INT64(6) INT64(9) INT64(9)]",
},
"ks.lookup:80-": {
"delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[VARCHAR(\"9\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
{
name: "Lookup-Unique Single Update",
initQueries: []string{
"insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"update twopc_lookup set col_unique = 20 where col_unique = 9",
"commit",
},
logExpected: map[string][]string{
"ks.redo_statement:80-": {
"insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup_unique where col_unique = 9 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"insert into lookup_unique(col_unique, keyspace_id) values (20, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup_unique where col_unique = 9 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"insert into lookup_unique(col_unique, keyspace_id) values (20, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]",
},
"ks.twopc_lookup:40-80": {
"update:[INT64(6) INT64(4) INT64(20)]",
},
"ks.lookup_unique:80-": {
"delete:[VARCHAR(\"9\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[VARCHAR(\"20\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
{
name: "Lookup And Lookup-Unique Single Delete",
initQueries: []string{
"insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"delete from twopc_lookup where col_unique = 9",
"commit",
},
logExpected: map[string][]string{
"ks.redo_statement:80-": {
"insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from lookup_unique where col_unique = 9 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from lookup_unique where col_unique = 9 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
},
"ks.twopc_lookup:40-80": {
"delete:[INT64(6) INT64(4) INT64(9)]",
},
"ks.lookup_unique:80-": {
"delete:[VARCHAR(\"9\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.lookup:80-": {
"delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
{
name: "Lookup And Lookup-Unique Single Insertion",
initQueries: []string{
"insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"insert into twopc_lookup(id, col, col_unique) values(20, 4, 22)",
"commit",
},
logExpected: map[string][]string{
"ks.redo_statement:80-": {
"insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]",
},
"ks.lookup:80-": {
"insert:[VARCHAR(\"4\") INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.lookup_unique:-40": {
"insert:[VARCHAR(\"22\") VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.twopc_lookup:-40": {
"insert:[INT64(20) INT64(4) INT64(22)]",
},
},
},
{
name: "Lookup And Lookup-Unique Mix",
initQueries: []string{
"insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"insert into twopc_lookup(id, col, col_unique) values(20, 4, 22)",
"update twopc_lookup set col = 9 where col_unique = 9",
"delete from twopc_lookup where id = 9",
"commit",
},
logExpected: map[string][]string{
"ks.redo_statement:80-": {
"insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]",
"insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"insert:[VARCHAR(\"dtid-3\") INT64(3) BLOB(\"insert into lookup(col, id, keyspace_id) values (9, 6, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]",
"insert:[VARCHAR(\"dtid-3\") INT64(4) BLOB(\"delete from lookup where col = 4 and id = 9 and keyspace_id = _binary'\\x90\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"insert:[VARCHAR(\"dtid-3\") INT64(5) BLOB(\"delete from lookup_unique where col_unique = 4 and keyspace_id = _binary'\\x90\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"insert:[VARCHAR(\"dtid-3\") INT64(6) BLOB(\"delete from twopc_lookup where id = 9 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(3) BLOB(\"insert into lookup(col, id, keyspace_id) values (9, 6, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(4) BLOB(\"delete from lookup where col = 4 and id = 9 and keyspace_id = _binary'\\x90\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(5) BLOB(\"delete from lookup_unique where col_unique = 4 and keyspace_id = _binary'\\x90\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(6) BLOB(\"delete from twopc_lookup where id = 9 limit 10001 /* INT64 */\")]",
},
"ks.redo_statement:40-80": {
"insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"update twopc_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"update twopc_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]",
},
"ks.twopc_lookup:-40": {
"insert:[INT64(20) INT64(4) INT64(22)]",
},
"ks.twopc_lookup:40-80": {
"update:[INT64(6) INT64(9) INT64(9)]",
},
"ks.twopc_lookup:80-": {
"delete:[INT64(9) INT64(4) INT64(4)]",
},
"ks.lookup_unique:-40": {
"insert:[VARCHAR(\"22\") VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.lookup_unique:80-": {
"delete:[VARCHAR(\"4\") VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.lookup:80-": {
"insert:[VARCHAR(\"4\") INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[VARCHAR(\"9\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[VARCHAR(\"4\") INT64(9) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
defer cleanup(t)

vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "")
require.NoError(t, err)
defer vtgateConn.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan *binlogdatapb.VEvent)
runVStream(t, ctx, ch, vtgateConn)

conn := vtgateConn.Session("", nil)
qCtx, cancel := context.WithCancel(context.Background())
defer cancel()

// initial insert
for _, query := range tt.initQueries {
execute(qCtx, t, conn, query)
}

// ignore initial change
tableMap := make(map[string][]*querypb.Field)
dtMap := make(map[string]string)
_ = retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second)

// Insert into multiple shards
for _, query := range tt.testQueries {
execute(qCtx, t, conn, query)
}

// Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM.
logTable := retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second)
for key, val := range tt.logExpected {
assert.EqualValues(t, val, logTable[key], key)
}
})
}
}

func getTablet(tabletGrpcPort int) *tabletpb.Tablet {
portMap := make(map[string]int32)
portMap["grpc"] = int32(tabletGrpcPort)
return &tabletpb.Tablet{Hostname: hostname, PortMap: portMap}
>>>>>>> a3d400177f (Ensure PRS runs for all the shards in `TestSemiSyncRequiredWithTwoPC` (#17384))
}

0 comments on commit 826f9e9

Please sign in to comment.