From c5579dc1201dfcac7b378592856780afc800bb31 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 8 Sep 2020 18:42:47 +0800 Subject: [PATCH 1/4] sink/mysql: add db read/write timeout --- cdc/sink/mysql.go | 86 ++++++++++++++++++++++++++++++------------ cdc/sink/mysql_test.go | 29 ++++++++++++++ go.mod | 2 +- 3 files changed, 92 insertions(+), 25 deletions(-) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 9be821574e1..e034ea4035a 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -61,6 +61,8 @@ const ( defaultFlushInterval = time.Millisecond * 50 defaultBatchReplaceEnabled = true defaultBatchReplaceSize = 20 + defaultReadTimeout = "2m" + defaultWriteTimeout = "2m" ) var ( @@ -270,6 +272,8 @@ type sinkParams struct { captureAddr string batchReplaceEnabled bool batchReplaceSize int + readTimeout string + writeTimeout string } func (s *sinkParams) Clone() *sinkParams { @@ -283,48 +287,69 @@ var defaultParams = &sinkParams{ tidbTxnMode: defaultTiDBTxnMode, batchReplaceEnabled: defaultBatchReplaceEnabled, batchReplaceSize: defaultBatchReplaceSize, + readTimeout: defaultReadTimeout, + writeTimeout: defaultWriteTimeout, } -func configureSinkURI(ctx context.Context, dsnCfg *dmysql.Config, tz *time.Location, params *sinkParams) (string, error) { - if dsnCfg.Params == nil { - dsnCfg.Params = make(map[string]string, 1) - } - dsnCfg.DBName = "" - dsnCfg.InterpolateParams = true - dsnCfg.MultiStatements = true - dsnCfg.Params["time_zone"] = fmt.Sprintf(`"%s"`, tz.String()) - - testDB, err := sql.Open("mysql", dsnCfg.FormatDSN()) - if err != nil { - return "", errors.Annotate( - cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection when configuring sink") - } - defer testDB.Close() - log.Debug("Opened connection to configure some tidb special parameters") - +func checkAutoRandom(ctx context.Context, db *sql.DB) (string, error) { var variableName string var autoRandomInsertEnabled string queryStr := "show session variables like 'allow_auto_random_explicit_insert';" - err = testDB.QueryRowContext(ctx, queryStr).Scan(&variableName, &autoRandomInsertEnabled) + err := db.QueryRowContext(ctx, queryStr).Scan(&variableName, &autoRandomInsertEnabled) if err != nil && err != sql.ErrNoRows { return "", errors.Annotate( cerror.WrapError(cerror.ErrMySQLQueryError, err), "fail to query sink for support of auto-random") } if err == nil && (autoRandomInsertEnabled == "off" || autoRandomInsertEnabled == "0") { - dsnCfg.Params["allow_auto_random_explicit_insert"] = "1" - log.Debug("Set allow_auto_random_explicit_insert to 1") + return "1", nil } + return "", nil +} +func checkTiDBTxnMode(ctx context.Context, db *sql.DB, mode string) (string, error) { + var variableName string var txnMode string - queryStr = "show session variables like 'tidb_txn_mode';" - err = testDB.QueryRowContext(ctx, queryStr).Scan(&variableName, &txnMode) + queryStr := "show session variables like 'tidb_txn_mode';" + err := db.QueryRowContext(ctx, queryStr).Scan(&variableName, &txnMode) if err != nil && err != sql.ErrNoRows { return "", errors.Annotate( cerror.WrapError(cerror.ErrMySQLQueryError, err), "fail to query sink for txn mode") } if err == nil { - dsnCfg.Params["tidb_txn_mode"] = params.tidbTxnMode + return mode, nil } + return "", nil +} + +func configureSinkURI( + ctx context.Context, + dsnCfg *dmysql.Config, + tz *time.Location, + params *sinkParams, + testDB *sql.DB, +) (string, error) { + if dsnCfg.Params == nil { + dsnCfg.Params = make(map[string]string, 1) + } + dsnCfg.DBName = "" + dsnCfg.InterpolateParams = true + dsnCfg.MultiStatements = true + dsnCfg.Params["time_zone"] = fmt.Sprintf(`"%s"`, tz.String()) + dsnCfg.Params["readTimeout"] = params.readTimeout + dsnCfg.Params["writeTimeout"] = params.writeTimeout + + autoRandom, err := checkAutoRandom(ctx, testDB) + if err != nil { + return "", err + } + dsnCfg.Params["allow_auto_random_explicit_insert"] = autoRandom + log.Debug("Set allow_auto_random_explicit_insert", zap.String("val", autoRandom)) + + txnMode, err := checkTiDBTxnMode(ctx, testDB, params.tidbTxnMode) + if err != nil { + return "", err + } + dsnCfg.Params["tidb_txn_mode"] = txnMode dsnClone := dsnCfg.Clone() dsnClone.Passwd = "******" @@ -432,7 +457,20 @@ func newMySQLSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI if err != nil { return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) } - dsnStr, err = configureSinkURI(ctx, dsn, tz, params) + + // create test db used for parameter detection + if dsn.Params == nil { + dsn.Params = make(map[string]string, 1) + } + dsn.Params["time_zone"] = fmt.Sprintf(`"%s"`, tz.String()) + testDB, err := sql.Open("mysql", dsn.FormatDSN()) + if err != nil { + return nil, errors.Annotate( + cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection when configuring sink") + } + defer testDB.Close() + + dsnStr, err = configureSinkURI(ctx, dsn, tz, params, testDB) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index dab6500b3fe..bd47915a886 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -17,10 +17,13 @@ import ( "context" "fmt" "sort" + "strings" "testing" "time" + "github.com/DATA-DOG/go-sqlmock" "github.com/davecgh/go-spew/spew" + dmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/check" "github.com/pingcap/parser/mysql" "github.com/pingcap/ticdc/cdc/model" @@ -619,6 +622,32 @@ func (s MySQLSinkSuite) TestSinkParamsClone(c *check.C) { }) } +func (s MySQLSinkSuite) TestConfigureSinkURI(c *check.C) { + db, mock, err := sqlmock.New() + c.Assert(err, check.IsNil) + columns := []string{"Variable_name", "Value"} + mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"), + ) + mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"), + ) + + dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") + c.Assert(err, check.IsNil) + dsnStr, err := configureSinkURI(context.TODO(), dsn, time.Local, defaultParams.Clone(), db) + c.Assert(err, check.IsNil) + expectedParams := []string{ + "tidb_txn_mode=optimistic", + "readTimeout=2m", + "writeTimeout=2m", + "allow_auto_random_explicit_insert=1", + } + for _, param := range expectedParams { + c.Assert(strings.Contains(dsnStr, param), check.IsTrue) + } +} + /* import ( "context" diff --git a/go.mod b/go.mod index ab976ed7341..51981193eda 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.13 require ( github.com/BurntSushi/toml v0.3.1 - github.com/DATA-DOG/go-sqlmock v1.3.3 // indirect + github.com/DATA-DOG/go-sqlmock v1.3.3 github.com/Shopify/sarama v1.26.1 github.com/apache/pulsar-client-go v0.1.1 github.com/cenkalti/backoff v2.2.1+incompatible From f83396b80f95d1b01e3870ba4cc875158ce157ab Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 8 Sep 2020 19:48:01 +0800 Subject: [PATCH 2/4] refine function and test case --- cdc/sink/mysql.go | 50 +++++++++++++++++++----------------------- cdc/sink/mysql_test.go | 23 +++++++++++++++++++ 2 files changed, 45 insertions(+), 28 deletions(-) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index e034ea4035a..5d3931a41d2 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -291,34 +291,25 @@ var defaultParams = &sinkParams{ writeTimeout: defaultWriteTimeout, } -func checkAutoRandom(ctx context.Context, db *sql.DB) (string, error) { - var variableName string - var autoRandomInsertEnabled string - queryStr := "show session variables like 'allow_auto_random_explicit_insert';" - err := db.QueryRowContext(ctx, queryStr).Scan(&variableName, &autoRandomInsertEnabled) - if err != nil && err != sql.ErrNoRows { - return "", errors.Annotate( - cerror.WrapError(cerror.ErrMySQLQueryError, err), "fail to query sink for support of auto-random") - } - if err == nil && (autoRandomInsertEnabled == "off" || autoRandomInsertEnabled == "0") { - return "1", nil - } - return "", nil -} - -func checkTiDBTxnMode(ctx context.Context, db *sql.DB, mode string) (string, error) { - var variableName string - var txnMode string - queryStr := "show session variables like 'tidb_txn_mode';" - err := db.QueryRowContext(ctx, queryStr).Scan(&variableName, &txnMode) +func checkTiDBVariable( + ctx context.Context, + db *sql.DB, + variableName string, + defaultValue string, +) (string, error) { + var name string + var value string + querySQL := fmt.Sprintf("show session variables like '%s';", variableName) + err := db.QueryRowContext(ctx, querySQL).Scan(&name, &value) if err != nil && err != sql.ErrNoRows { - return "", errors.Annotate( - cerror.WrapError(cerror.ErrMySQLQueryError, err), "fail to query sink for txn mode") + errMsg := "fail to query session variable " + variableName + return "", errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), errMsg) } if err == nil { - return mode, nil + return defaultValue, nil } return "", nil + } func configureSinkURI( @@ -338,18 +329,21 @@ func configureSinkURI( dsnCfg.Params["readTimeout"] = params.readTimeout dsnCfg.Params["writeTimeout"] = params.writeTimeout - autoRandom, err := checkAutoRandom(ctx, testDB) + autoRandom, err := checkTiDBVariable(ctx, testDB, "allow_auto_random_explicit_insert", "1") if err != nil { return "", err } - dsnCfg.Params["allow_auto_random_explicit_insert"] = autoRandom - log.Debug("Set allow_auto_random_explicit_insert", zap.String("val", autoRandom)) + if autoRandom != "" { + dsnCfg.Params["allow_auto_random_explicit_insert"] = autoRandom + } - txnMode, err := checkTiDBTxnMode(ctx, testDB, params.tidbTxnMode) + txnMode, err := checkTiDBVariable(ctx, testDB, "tidb_txn_mode", params.tidbTxnMode) if err != nil { return "", err } - dsnCfg.Params["tidb_txn_mode"] = txnMode + if txnMode != "" { + dsnCfg.Params["tidb_txn_mode"] = txnMode + } dsnClone := dsnCfg.Clone() dsnClone.Passwd = "******" diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index bd47915a886..e56622247a0 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -15,6 +15,7 @@ package sink import ( "context" + "database/sql" "fmt" "sort" "strings" @@ -648,6 +649,28 @@ func (s MySQLSinkSuite) TestConfigureSinkURI(c *check.C) { } } +func (s MySQLSinkSuite) TestCheckTiDBVariable(c *check.C) { + db, mock, err := sqlmock.New() + c.Assert(err, check.IsNil) + columns := []string{"Variable_name", "Value"} + + mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"), + ) + val, err := checkTiDBVariable(context.TODO(), db, "allow_auto_random_explicit_insert", "1") + c.Assert(err, check.IsNil) + c.Assert(val, check.Equals, "1") + + mock.ExpectQuery("show session variables like 'no_exist_variable';").WillReturnError(sql.ErrNoRows) + val, err = checkTiDBVariable(context.TODO(), db, "no_exist_variable", "0") + c.Assert(err, check.IsNil) + c.Assert(val, check.Equals, "") + + mock.ExpectQuery("show session variables like 'version';").WillReturnError(sql.ErrConnDone) + _, err = checkTiDBVariable(context.TODO(), db, "version", "5.7.25-TiDB-v4.0.0") + c.Assert(err, check.ErrorMatches, ".*"+sql.ErrConnDone.Error()) +} + /* import ( "context" From 5f54f14096635765ae9bdcb33dddf70fc34a59db Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 8 Sep 2020 19:50:03 +0800 Subject: [PATCH 3/4] code lint --- cdc/sink/mysql.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 5d3931a41d2..de2173bc129 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -291,12 +291,7 @@ var defaultParams = &sinkParams{ writeTimeout: defaultWriteTimeout, } -func checkTiDBVariable( - ctx context.Context, - db *sql.DB, - variableName string, - defaultValue string, -) (string, error) { +func checkTiDBVariable(ctx context.Context, db *sql.DB, variableName, defaultValue string) (string, error) { var name string var value string querySQL := fmt.Sprintf("show session variables like '%s';", variableName) @@ -305,11 +300,12 @@ func checkTiDBVariable( errMsg := "fail to query session variable " + variableName return "", errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), errMsg) } + // session variable works, use given default value if err == nil { return defaultValue, nil } + // session variable not exists, return "" to ignore it return "", nil - } func configureSinkURI( From 6cf9387856da6b95887ea9ab4e26528d9f6f2178 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Tue, 8 Sep 2020 19:52:16 +0800 Subject: [PATCH 4/4] fix unit test --- cdc/sink/mysql_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index e56622247a0..363b257ce9f 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -612,6 +612,8 @@ func (s MySQLSinkSuite) TestSinkParamsClone(c *check.C) { tidbTxnMode: defaultTiDBTxnMode, batchReplaceEnabled: defaultBatchReplaceEnabled, batchReplaceSize: defaultBatchReplaceSize, + readTimeout: defaultReadTimeout, + writeTimeout: defaultWriteTimeout, }) c.Assert(param2, check.DeepEquals, &sinkParams{ changefeedID: "123", @@ -620,6 +622,8 @@ func (s MySQLSinkSuite) TestSinkParamsClone(c *check.C) { tidbTxnMode: defaultTiDBTxnMode, batchReplaceEnabled: false, batchReplaceSize: defaultBatchReplaceSize, + readTimeout: defaultReadTimeout, + writeTimeout: defaultWriteTimeout, }) }