Skip to content

Commit

Permalink
sink(ticdc): fix Unknown system variable 'transaction_isolation' fo…
Browse files Browse the repository at this point in the history
…r some mysql versions (#4569)

close #4504
  • Loading branch information
maxshuang authored Feb 14, 2022
1 parent 1fb41f9 commit 67a3029
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 2 deletions.
14 changes: 12 additions & 2 deletions cdc/sink/mysql_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ func generateDSNByParams(
dsnCfg.Params["readTimeout"] = params.readTimeout
dsnCfg.Params["writeTimeout"] = params.writeTimeout
dsnCfg.Params["timeout"] = params.dialTimeout
// Since we don't need select, just set default isolation level to read-committed
dsnCfg.Params["transaction_isolation"] = fmt.Sprintf(`"%s"`, defaultTxnIsolationRC)

autoRandom, err := checkTiDBVariable(ctx, testDB, "allow_auto_random_explicit_insert", "1")
if err != nil {
Expand All @@ -287,6 +285,18 @@ func generateDSNByParams(
dsnCfg.Params["tidb_txn_mode"] = txnMode
}

// Since we don't need select, just set default isolation level to read-committed
// transaction_isolation is mysql newly introduced variable and will vary from MySQL5.7/MySQL8.0/Mariadb
isolation, err := checkTiDBVariable(ctx, testDB, "transaction_isolation", defaultTxnIsolationRC)
if err != nil {
return "", err
}
if isolation != "" {
dsnCfg.Params["transaction_isolation"] = fmt.Sprintf(`"%s"`, defaultTxnIsolationRC)
} else {
dsnCfg.Params["tx_isolation"] = fmt.Sprintf(`"%s"`, defaultTxnIsolationRC)
}

dsnClone := dsnCfg.Clone()
dsnClone.Passwd = "******"
log.Info("sink uri is configured", zap.String("dsn", dsnClone.FormatDSN()))
Expand Down
57 changes: 57 additions & 0 deletions cdc/sink/mysql_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,66 @@ func TestGenerateDSNByParams(t *testing.T) {
}
}

testIsolationParams := func() {
db, mock, err := sqlmock.New()
require.Nil(t, err)
defer db.Close() // nolint:errcheck
columns := []string{"Variable_name", "Value"}
mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"),
)
mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"),
)
// simulate error
dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/")
require.Nil(t, err)
params := defaultParams.Clone()
var dsnStr string
_, err = generateDSNByParams(context.TODO(), dsn, params, db)
require.Error(t, err)

// simulate no transaction_isolation
mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"),
)
mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"),
)
mock.ExpectQuery("show session variables like 'transaction_isolation';").WillReturnError(sql.ErrNoRows)
dsnStr, err = generateDSNByParams(context.TODO(), dsn, params, db)
require.Nil(t, err)
expectedParams := []string{
"tx_isolation=%22READ-COMMITTED%22",
}
for _, param := range expectedParams {
require.True(t, strings.Contains(dsnStr, param))
}

// simulate transaction_isolation
mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"),
)
mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"),
)
mock.ExpectQuery("show session variables like 'transaction_isolation';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("transaction_isolation", "REPEATED-READ"),
)
dsnStr, err = generateDSNByParams(context.TODO(), dsn, params, db)
require.Nil(t, err)
expectedParams = []string{
"transaction_isolation=%22READ-COMMITTED%22",
}
for _, param := range expectedParams {
require.True(t, strings.Contains(dsnStr, param))
}
}

testDefaultParams()
testTimezoneParam()
testTimeoutParams()
testIsolationParams()
}

func TestParseSinkURIToParams(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,9 @@ func mockTestDB(adjustSQLMode bool) (*sql.DB, error) {
mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"),
)
mock.ExpectQuery("show session variables like 'transaction_isolation';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("transaction_isolation", "REPEATED-READ"),
)
mock.ExpectClose()
return db, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/applier/redo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func TestApplyDMLs(t *testing.T) {
mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"),
)
mock.ExpectQuery("show session variables like 'transaction_isolation';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("transaction_isolation", "REPEATED-READ"),
)
mock.ExpectClose()
return db, nil
}
Expand Down

0 comments on commit 67a3029

Please sign in to comment.