Skip to content

Commit

Permalink
feat: add remaining tests
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Dec 16, 2024
1 parent 9c750d0 commit b780dd6
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 11 deletions.
41 changes: 39 additions & 2 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,11 @@ func compareMaps(t *testing.T, expected, actual map[string][]string, flexibleExp
// TestDTResolveAfterMMCommit tests that transaction is committed on recovery
// failure after MM commit.
func TestDTResolveAfterMMCommit(t *testing.T) {
defer cleanup(t)
initconn, closer := start(t)
defer closer()

// Do an insertion into a table that has a consistent lookup vindex.
utils.Exec(t, initconn, "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)")

vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "")
require.NoError(t, err)
Expand All @@ -589,6 +593,10 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
require.NoError(t, err)
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(10,'apa')", nil)
require.NoError(t, err)
// Also do an insertion into a table that has a consistent lookup vindex.
// We expect to see only the pre-session changes in the logs.
_, err = conn.Execute(qCtx, "update twopc_consistent_lookup set col = 22 where id = 4", nil)
require.NoError(t, err)

// The caller ID is used to simulate the failure at the desired point.
newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil)
Expand Down Expand Up @@ -625,7 +633,9 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
},
"ks.redo_statement:-40": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]",
"insert:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"update twopc_consistent_lookup set col = 22 where id = 4 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"update twopc_consistent_lookup set col = 22 where id = 4 limit 10001 /* INT64 */\")]",
},
"ks.redo_statement:40-80": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]",
Expand All @@ -641,6 +651,12 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
`insert:[INT64(7) VARCHAR("foo")]`,
`insert:[INT64(9) VARCHAR("baz")]`,
},
"ks.consistent_lookup:-40": {
"insert:[INT64(22) INT64(4) VARBINARY(\" \\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.twopc_consistent_lookup:-40": {
"update:[INT64(4) INT64(22) INT64(6)]",
},
}
assert.Equal(t, expectations, logTable,
"mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations))
Expand All @@ -649,7 +665,11 @@ func TestDTResolveAfterMMCommit(t *testing.T) {
// TestDTResolveAfterRMPrepare tests that transaction is rolled back on recovery
// failure after RM prepare and before MM commit.
func TestDTResolveAfterRMPrepare(t *testing.T) {
defer cleanup(t)
initconn, closer := start(t)
defer closer()

// Do an insertion into a table that has a consistent lookup vindex.
utils.Exec(t, initconn, "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)")

vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "")
require.NoError(t, err)
Expand All @@ -671,6 +691,10 @@ func TestDTResolveAfterRMPrepare(t *testing.T) {
require.NoError(t, err)
_, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil)
require.NoError(t, err)
// Also do an insertion into a table that has a consistent lookup vindex.
// We expect to see only the pre-session changes in the logs.
_, err = conn.Execute(qCtx, "update twopc_consistent_lookup set col = 22 where id = 4", nil)
require.NoError(t, err)

// The caller ID is used to simulate the failure at the desired point.
newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("RMPrepared_FailNow", "", ""), nil)
Expand All @@ -693,16 +717,29 @@ func TestDTResolveAfterRMPrepare(t *testing.T) {
},
"ks.dt_participant:80-": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]",
"insert:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]",
},
"ks.redo_state:40-80": {
"insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
"delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
},
"ks.redo_state:-40": {
"insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
"delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]",
},
"ks.redo_statement:40-80": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]",
},
"ks.redo_statement:-40": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 22 where id = 4 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 22 where id = 4 limit 10001 /* INT64 */\")]",
},
"ks.consistent_lookup:-40": {
"insert:[INT64(22) INT64(4) VARBINARY(\" \\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
}
assert.Equal(t, expectations, logTable,
"mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations))
Expand Down
20 changes: 11 additions & 9 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,25 @@ func (txc *TxConn) Commit(ctx context.Context, session *econtext.SafeSession) er

defer recordCommitTime(session, twopc, time.Now())

if err := txc.runSessions(ctx, session.PreSessions, session.GetLogger(), txc.commitShard); err != nil {
err := txc.runSessions(ctx, session.PreSessions, session.GetLogger(), txc.commitShard)
if err != nil {
_ = txc.Release(ctx, session)
return err
}

if twopc {
if err := txc.commit2PC(ctx, session); err != nil {
return err
}
err = txc.commit2PC(ctx, session)
} else {
if err := txc.commitNormal(ctx, session); err != nil {
return err
}
err = txc.commitNormal(ctx, session)
}

if err != nil {
_ = txc.Release(ctx, session)
return err
}

if err := txc.runSessions(ctx, session.PostSessions, session.GetLogger(), txc.commitShard); err != nil {
err = txc.runSessions(ctx, session.PostSessions, session.GetLogger(), txc.commitShard)
if err != nil {
// If last commit fails, there will be nothing to rollback.
session.RecordWarning(&querypb.QueryWarning{Message: fmt.Sprintf("post-operation transaction had an error: %v", err)})
// With reserved connection we should release them.
Expand Down Expand Up @@ -209,7 +212,6 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSessi
})
warnings.Add("NonAtomicCommit", 1)
}
_ = txc.Release(ctx, session)
return err
}
}
Expand Down

0 comments on commit b780dd6

Please sign in to comment.