diff --git a/cdc/sink/mysql_params.go b/cdc/sink/mysql_params.go index 9f0fec92a41..320ba69bc30 100644 --- a/cdc/sink/mysql_params.go +++ b/cdc/sink/mysql_params.go @@ -268,8 +268,6 @@ func generateDSNByParams( dsnCfg.Params["readTimeout"] = params.readTimeout dsnCfg.Params["writeTimeout"] = params.writeTimeout dsnCfg.Params["timeout"] = params.dialTimeout - // Since we don't need select, just set default isolation level to read-committed - dsnCfg.Params["transaction_isolation"] = fmt.Sprintf(`"%s"`, defaultTxnIsolationRC) autoRandom, err := checkTiDBVariable(ctx, testDB, "allow_auto_random_explicit_insert", "1") if err != nil { @@ -287,6 +285,18 @@ func generateDSNByParams( dsnCfg.Params["tidb_txn_mode"] = txnMode } + // Since we don't need select, just set default isolation level to read-committed + // transaction_isolation is mysql newly introduced variable and will vary from MySQL5.7/MySQL8.0/Mariadb + isolation, err := checkTiDBVariable(ctx, testDB, "transaction_isolation", defaultTxnIsolationRC) + if err != nil { + return "", err + } + if isolation != "" { + dsnCfg.Params["transaction_isolation"] = fmt.Sprintf(`"%s"`, defaultTxnIsolationRC) + } else { + dsnCfg.Params["tx_isolation"] = fmt.Sprintf(`"%s"`, defaultTxnIsolationRC) + } + dsnClone := dsnCfg.Clone() dsnClone.Passwd = "******" log.Info("sink uri is configured", zap.String("dsn", dsnClone.FormatDSN())) diff --git a/cdc/sink/mysql_params_test.go b/cdc/sink/mysql_params_test.go index 075e5d41d26..80a104ab244 100644 --- a/cdc/sink/mysql_params_test.go +++ b/cdc/sink/mysql_params_test.go @@ -118,9 +118,66 @@ func TestGenerateDSNByParams(t *testing.T) { } } + testIsolationParams := func() { + db, mock, err := sqlmock.New() + require.Nil(t, err) + defer db.Close() // nolint:errcheck + columns := []string{"Variable_name", "Value"} + mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"), + ) + mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"), + ) + // simulate error + dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") + require.Nil(t, err) + params := defaultParams.Clone() + var dsnStr string + _, err = generateDSNByParams(context.TODO(), dsn, params, db) + require.Error(t, err) + + // simulate no transaction_isolation + mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"), + ) + mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"), + ) + mock.ExpectQuery("show session variables like 'transaction_isolation';").WillReturnError(sql.ErrNoRows) + dsnStr, err = generateDSNByParams(context.TODO(), dsn, params, db) + require.Nil(t, err) + expectedParams := []string{ + "tx_isolation=%22READ-COMMITTED%22", + } + for _, param := range expectedParams { + require.True(t, strings.Contains(dsnStr, param)) + } + + // simulate transaction_isolation + mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"), + ) + mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"), + ) + mock.ExpectQuery("show session variables like 'transaction_isolation';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("transaction_isolation", "REPEATED-READ"), + ) + dsnStr, err = generateDSNByParams(context.TODO(), dsn, params, db) + require.Nil(t, err) + expectedParams = []string{ + "transaction_isolation=%22READ-COMMITTED%22", + } + for _, param := range expectedParams { + require.True(t, strings.Contains(dsnStr, param)) + } + } + testDefaultParams() testTimezoneParam() testTimeoutParams() + testIsolationParams() } func TestParseSinkURIToParams(t *testing.T) { diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index cc0b119350c..aa69c25da23 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -472,6 +472,9 @@ func mockTestDB(adjustSQLMode bool) (*sql.DB, error) { mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"), ) + mock.ExpectQuery("show session variables like 'transaction_isolation';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("transaction_isolation", "REPEATED-READ"), + ) mock.ExpectClose() return db, nil } diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index 5d2466bd27e..8c33f61de86 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -137,6 +137,9 @@ func TestApplyDMLs(t *testing.T) { mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"), ) + mock.ExpectQuery("show session variables like 'transaction_isolation';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("transaction_isolation", "REPEATED-READ"), + ) mock.ExpectClose() return db, nil }